airflow-hitl

Portões de aprovação humana, entradas de formulário e ramificações em DAGs do Airflow usando operadores adiáveis. Quatro tipos de operadores: ApprovalOperator para decisões de aprovar/rejeitar, HITLOperator para seleção de múltiplas opções com formulários, HITLBranchOperator para roteamento de tarefas orientado por humanos e HITLEntryOperator para coleta de dados de formulário. Todos os operadores são adiáveis, liberando slots de worker enquanto aguardam resposta humana via a aba Ações Necessárias da interface do Airflow ou API REST. Suporta recursos opcionais incluindo personalização...

npx skills add https://github.com/astronomer/agents --skill airflow-hitl

Airflow Human-in-the-Loop Operators

Pause a DAG until a human responds via the Airflow UI or REST API. HITL operators are deferrable — they release their worker slot while waiting.

Requires Airflow 3.1+ (af config version).

UI location: Browse → Required Actions. Respond from the task instance page's Required Actions tab.

Cross-references: migrating-ai-sdk-to-common-ai for AI/LLM task decorators; airflow for registry and API discovery commands used below.


Step 1 — Pick the capability you need

CapabilityClass (verify in Step 2)
Approve or reject; downstream skips on rejectApprovalOperator
Present N options and return which were chosenHITLOperator
Branch to one or more downstream tasks based on a choiceHITLBranchOperator
Collect a form (no approve/select step)HITLEntryOperator
Use the HITL trigger directly (advanced / custom operators)HITLTrigger

This is the only place class names are hardcoded. The provider adds, renames, and removes params across releases — do not copy parameter lists from memory. Fetch the current signature before writing code.


Step 2 — Discover the current signatures from the Airflow Registry

Before writing HITL code, run these to see the live roster and constructor params (see the airflow skill for the full af registry reference):

# Every HITL-related module in the standard provider
af registry modules standard \
  | jq '.modules[] | select(.import_path | test("\\.hitl\\.")) | {name, type, import_path, short_description, docs_url}'

# Constructor signatures: name, type, default, required, description
af registry parameters standard \
  | jq '.classes | to_entries[] | select(.key | test("\\.hitl\\.")) | {fqn: .key, parameters: .value.parameters}'

# Pin to the exact installed provider version
af config providers \
  | jq '.providers[] | select(.package_name == "apache-airflow-providers-standard") | .version'
# then: af registry parameters standard --version <VERSION>

If the registry shows a param that this skill does not mention, prefer the registry. If the registry shows a class that is not in Step 1, treat it as additive — the decision table above may be stale.


Step 3 — Canonical example (approval gate)

Starting point for any HITL task. Adapt by swapping the class name and params per Step 2.

from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def approval_example():
    @task
    def prepare():
        return "Review quarterly report"

    approval = ApprovalOperator(
        task_id="approve_report",
        subject="Report Approval",
        body="{{ ti.xcom_pull(task_ids='prepare') }}",
        defaults="Approve",              # Auto-selected on timeout
        params={"comments": Param("", type="string")},
    )

    @task
    def after_approval(result):
        print(f"Decision: {result['chosen_options']}")

    chain(prepare(), approval)
    after_approval(approval.output)

approval_example()

For the other classes in Step 1, the shape is the same (task_id, subject, plus class-specific params). Verify each constructor through Step 2 — for example, HITLBranchOperator requires every option either to match a downstream task id directly or to be resolved via a mapping param surfaced in the registry.


Step 4 — Behavior contracts (stable across versions)

Timeout

  • With defaults set: task succeeds on timeout, default option(s) selected.
  • Without defaults: task fails on timeout.

Markdown + Jinja in body

body supports Markdown and is Jinja-templatable. Render XCom context directly:

body = """**Total Budget:** {{ ti.xcom_pull(task_ids='get_budget') }}

| Category | Amount |
|----------|--------|
| Marketing | $1M |
"""

Callbacks

All HITL operators accept the standard Airflow callback kwargs (on_success_callback, on_failure_callback, etc.).

Notifiers

HITL operators accept a notifiers list. Inside a notifier's notify(context) method, build a link to the pending task with HITLOperator.generate_link_to_ui_from_context(context, base_url=...).

Restricting who can respond

The parameter name and accepted identifier format depend on the active auth manager. Do not hardcode — check which one is active and which kwarg the current provider exposes:

af config show | jq '.auth_manager // .core.auth_manager'

Then look up the current kwarg in Step 2 (at the time of writing it is assigned_users, accepting identifiers in whatever format the active auth manager uses — Astro uses the Astro user ID, FabAuthManager uses email, SimpleAuthManager uses username).


Step 5 — Responding from external integrations

For Slack bots, custom apps, or scripts. Discover the live endpoint rather than hardcoding a path:

af api ls --filter hitl           # live endpoint list
af api spec \
  | jq '.paths | to_entries[] | select(.key | test("hitl"))'   # request/response schemas

The PATCH-to-respond pattern is stable; the exact path is discovered. Typical shape:

import os, requests

HOST = os.environ["AIRFLOW_HOST"]
TOKEN = os.environ["AIRFLOW_API_TOKEN"]
HEADERS = {"Authorization": f"Bearer {TOKEN}"}

# List pending — use the path from `af api ls --filter hitl`
requests.get(f"{HOST}/<path>", headers=HEADERS, params={"state": "pending"})

# Respond — same discovered path family, PATCH
requests.patch(
    f"{HOST}/<path>/{dag_id}/{run_id}/{task_id}",
    headers=HEADERS,
    json={"chosen_options": ["Approve"], "params_input": {"comments": "ok"}},
)

Step 6 — Safety checks

  • Airflow version ≥ 3.1 (af config version).
  • Constructor kwargs match the current registry output from Step 2 — no respondents-vs-assigned_users style drift.
  • For branching: every option resolves to a downstream task id (directly or via the mapping kwarg from Step 2).
  • Every value in defaults is also in options.
  • execution_timeout set; defaults configured if timeout should succeed rather than fail.
  • API token configured if external responders are part of the flow.

References

The upstream docs URL is surfaced per-module by the registry — do not hardcode:

af registry modules standard \
  | jq '.modules[] | select(.import_path | test("\\.hitl\\.")) | {name, docs_url}'

Related skills

  • airflowaf registry, af api, af config command reference.
  • migrating-ai-sdk-to-common-ai — AI/LLM task decorators and GenAI patterns (common-ai provider).
  • authoring-dags — general DAG writing best practices.
  • testing-dags — iterative test → debug → fix cycles.

Mais skills de astronomer

airflow
astronomer
Consulte, gerencie e solucione problemas de DAGs, execuções, tarefas e configuração de sistema do Apache Airflow. Suporta mais de 30 comandos para inspeção de DAGs, gerenciamento de execuções, registro de tarefas, consultas de configuração e acesso direto à API REST. Gerencie múltiplas instâncias do Airflow com configuração persistente; descubra automaticamente implantações locais e Astro. Dispare execuções de DAG de forma síncrona (aguardando conclusão) ou assíncrona, diagnostique falhas, limpe execuções para repetição e acesse logs de tarefas com filtragem por repetição/índice de mapa. Saída...
official
airflow-plugins
astronomer
Crie plugins do Airflow 3.1+ que incorporam aplicativos FastAPI, páginas de UI personalizadas, componentes React, middleware, macros e links de operador diretamente na interface do Airflow. Use…
official
analyzing-data
astronomer
Consulte seu data warehouse para responder perguntas de negócios com padrões em cache e mapeamentos de conceitos. Suporta busca de padrões e cache para tipos de perguntas repetidas, com registro de resultados para melhorar consultas futuras. Inclui cache de mapeamento conceito-tabela e descoberta de esquemas de tabela via INFORMATION_SCHEMA ou grep no código-fonte. Fornece funções de kernel run_sql() e run_sql_pandas() que retornam DataFrames Polars ou Pandas para análise. Comandos CLI para gerenciar caches de conceitos, padrões e tabelas, além de...
official
annotating-task-lineage
astronomer
Anotar tarefas do Airflow com linhagem de dados usando inlets e outlets. Suporta objetos OpenLineage Dataset, Assets do Airflow e Datasets do Airflow para definir entradas e saídas em bancos de dados, data warehouses e armazenamento em nuvem. Use como fallback quando operadores não possuem extratores OpenLineage integrados; segue um sistema de precedência de quatro níveis onde extratores personalizados e métodos OpenLineage têm prioridade. Inclui auxiliares de nomenclatura de datasets para Snowflake, BigQuery, S3 e PostgreSQL para garantir consistência...
official
authoring-dags
astronomer
Fluxo de trabalho guiado para criação de DAGs do Apache Airflow com integração de validação e testes. Abordagem estruturada em seis fases: descobrir o ambiente e padrões existentes, planejar a estrutura da DAG, implementar seguindo as melhores práticas, validar com comandos da CLI af, testar com consentimento do usuário e iterar em correções. Comandos da CLI para descoberta (af config connections, af config providers, af dags list) e validação (af dags errors, af dags get, af dags explore) fornecem feedback imediato sobre a DAG...
official
blueprint
astronomer
Defina modelos reutilizáveis de grupos de tarefas do Airflow com validação Pydantic e componha DAGs a partir de YAML. Use ao criar modelos de blueprint, compor DAGs a partir de…
official
checking-freshness
astronomer
Verifique a atualização dos dados analisando os timestamps das tabelas e os padrões de atualização em relação a uma escala de obsolescência. Identifica colunas de timestamp usando padrões comuns de nomenclatura ETL (_loaded_at, _updated_at, created_at, etc.) e consulta seus valores máximos para determinar a idade. Classifica os dados em quatro status de atualização: Atualizados (< 4 horas), Desatualizados (4–24 horas), Muito Desatualizados (> 24 horas) ou Desconhecido (nenhum timestamp encontrado). Fornece modelos SQL para verificar o horário da última atualização e as tendências de contagem de linhas nos dias recentes para...
official
cosmos-dbt-core
astronomer
Converta projetos dbt Core em DAGs ou TaskGroups do Airflow usando o Astronomer Cosmos. Suporta três padrões de montagem: DbtDag independente, DbtTaskGroup dentro de DAGs existentes e operadores Cosmos individuais para controle refinado. Escolha entre oito modos de execução (WATCHER, LOCAL, VIRTUALENV, KUBERNETES, AIRFLOW_ASYNC e outros) com base nas necessidades de isolamento e desempenho. Oferece três estratégias de parsing (dbt_manifest, dbt_ls, dbt_ls_file, automática) para equilibrar velocidade e complexidade do seletor...
official