airflow-hitl

Menschliche Genehmigungstore, Formulareingaben und Verzweigungen in Airflow-DAGs unter Verwendung von aufschiebbaren Operatoren. Vier Operatortypen: ApprovalOperator für Genehmigen/Ablehnen-Entscheidungen, HITLOperator für Mehrfachauswahl mit Formularen, HITLBranchOperator für menschlich gesteuerte Aufgabenweiterleitung und HITLEntryOperator für Formulardatenerfassung. Alle Operatoren sind aufschiebbar und geben Worker-Slots frei, während sie auf menschliche Antworten über den Bereich "Erforderliche Aktionen" der Airflow-Benutzeroberfläche oder die REST-API warten. Unterstützt optionale Funktionen einschließlich benutzerdefinierter...

npx skills add https://github.com/astronomer/agents --skill airflow-hitl

Airflow Human-in-the-Loop Operators

Pause a DAG until a human responds via the Airflow UI or REST API. HITL operators are deferrable — they release their worker slot while waiting.

Requires Airflow 3.1+ (af config version).

UI location: Browse → Required Actions. Respond from the task instance page's Required Actions tab.

Cross-references: airflow-ai for AI/LLM task decorators; airflow for registry and API discovery commands used below.


Step 1 — Pick the capability you need

CapabilityClass (verify in Step 2)
Approve or reject; downstream skips on rejectApprovalOperator
Present N options and return which were chosenHITLOperator
Branch to one or more downstream tasks based on a choiceHITLBranchOperator
Collect a form (no approve/select step)HITLEntryOperator
Use the HITL trigger directly (advanced / custom operators)HITLTrigger

This is the only place class names are hardcoded. The provider adds, renames, and removes params across releases — do not copy parameter lists from memory. Fetch the current signature before writing code.


Step 2 — Discover the current signatures from the Airflow Registry

Before writing HITL code, run these to see the live roster and constructor params (see the airflow skill for the full af registry reference):

# Every HITL-related module in the standard provider
af registry modules standard \
  | jq '.modules[] | select(.import_path | test("\\.hitl\\.")) | {name, type, import_path, short_description, docs_url}'

# Constructor signatures: name, type, default, required, description
af registry parameters standard \
  | jq '.classes | to_entries[] | select(.key | test("\\.hitl\\.")) | {fqn: .key, parameters: .value.parameters}'

# Pin to the exact installed provider version
af config providers \
  | jq '.providers[] | select(.package_name == "apache-airflow-providers-standard") | .version'
# then: af registry parameters standard --version <VERSION>

If the registry shows a param that this skill does not mention, prefer the registry. If the registry shows a class that is not in Step 1, treat it as additive — the decision table above may be stale.


Step 3 — Canonical example (approval gate)

Starting point for any HITL task. Adapt by swapping the class name and params per Step 2.

from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def approval_example():
    @task
    def prepare():
        return "Review quarterly report"

    approval = ApprovalOperator(
        task_id="approve_report",
        subject="Report Approval",
        body="{{ ti.xcom_pull(task_ids='prepare') }}",
        defaults="Approve",              # Auto-selected on timeout
        params={"comments": Param("", type="string")},
    )

    @task
    def after_approval(result):
        print(f"Decision: {result['chosen_options']}")

    chain(prepare(), approval)
    after_approval(approval.output)

approval_example()

For the other classes in Step 1, the shape is the same (task_id, subject, plus class-specific params). Verify each constructor through Step 2 — for example, HITLBranchOperator requires every option either to match a downstream task id directly or to be resolved via a mapping param surfaced in the registry.


Step 4 — Behavior contracts (stable across versions)

Timeout

  • With defaults set: task succeeds on timeout, default option(s) selected.
  • Without defaults: task fails on timeout.

Markdown + Jinja in body

body supports Markdown and is Jinja-templatable. Render XCom context directly:

body = """**Total Budget:** {{ ti.xcom_pull(task_ids='get_budget') }}

| Category | Amount |
|----------|--------|
| Marketing | $1M |
"""

Callbacks

All HITL operators accept the standard Airflow callback kwargs (on_success_callback, on_failure_callback, etc.).

Notifiers

HITL operators accept a notifiers list. Inside a notifier's notify(context) method, build a link to the pending task with HITLOperator.generate_link_to_ui_from_context(context, base_url=...).

Restricting who can respond

The parameter name and accepted identifier format depend on the active auth manager. Do not hardcode — check which one is active and which kwarg the current provider exposes:

af config show | jq '.auth_manager // .core.auth_manager'

Then look up the current kwarg in Step 2 (at the time of writing it is assigned_users, accepting identifiers in whatever format the active auth manager uses — Astro uses the Astro user ID, FabAuthManager uses email, SimpleAuthManager uses username).


Step 5 — Responding from external integrations

For Slack bots, custom apps, or scripts. Discover the live endpoint rather than hardcoding a path:

af api ls --filter hitl           # live endpoint list
af api spec \
  | jq '.paths | to_entries[] | select(.key | test("hitl"))'   # request/response schemas

The PATCH-to-respond pattern is stable; the exact path is discovered. Typical shape:

import os, requests

HOST = os.environ["AIRFLOW_HOST"]
TOKEN = os.environ["AIRFLOW_API_TOKEN"]
HEADERS = {"Authorization": f"Bearer {TOKEN}"}

# List pending — use the path from `af api ls --filter hitl`
requests.get(f"{HOST}/<path>", headers=HEADERS, params={"state": "pending"})

# Respond — same discovered path family, PATCH
requests.patch(
    f"{HOST}/<path>/{dag_id}/{run_id}/{task_id}",
    headers=HEADERS,
    json={"chosen_options": ["Approve"], "params_input": {"comments": "ok"}},
)

Step 6 — Safety checks

  • Airflow version ≥ 3.1 (af config version).
  • Constructor kwargs match the current registry output from Step 2 — no respondents-vs-assigned_users style drift.
  • For branching: every option resolves to a downstream task id (directly or via the mapping kwarg from Step 2).
  • Every value in defaults is also in options.
  • execution_timeout set; defaults configured if timeout should succeed rather than fail.
  • API token configured if external responders are part of the flow.

References

The upstream docs URL is surfaced per-module by the registry — do not hardcode:

af registry modules standard \
  | jq '.modules[] | select(.import_path | test("\\.hitl\\.")) | {name, docs_url}'

Related skills

  • airflowaf registry, af api, af config command reference.
  • airflow-ai — AI/LLM task decorators and GenAI patterns.
  • authoring-dags — general DAG writing best practices.
  • testing-dags — iterative test → debug → fix cycles.

Mehr Skills von astronomer

airflow
astronomer
Apache Airflow-DAGs, Ausführungen, Aufgaben und Systemkonfiguration abfragen, verwalten und Fehler beheben. Unterstützt über 30 Befehle für DAG-Inspektion, Ausführungsverwaltung, Aufgabenprotokollierung, Konfigurationsabfragen und direkten REST-API-Zugriff. Mehrere Airflow-Instanzen mit persistenter Konfiguration verwalten; lokale und Astro-Bereitstellungen automatisch erkennen. DAG-Ausführungen synchron (warten auf Abschluss) oder asynchron auslösen, Fehler diagnostizieren, Ausführungen für Wiederholungen löschen und Aufgabenprotokolle mit Wiederholungs-/Kartenindex-Filterung abrufen. Ausgabe...
official
airflow-plugins
astronomer
Erstellen Sie Airflow 3.1+-Plugins, die FastAPI-Apps, benutzerdefinierte UI-Seiten, React-Komponenten, Middleware, Makros und Operator-Links direkt in die Airflow-Oberfläche einbetten. Verwenden Sie…
official
analyzing-data
astronomer
Fragen Sie Ihr Data Warehouse, um Geschäftsfragen mit zwischengespeicherten Mustern und Konzeptzuordnungen zu beantworten. Unterstützt Mustersuche und Zwischenspeicherung für wiederkehrende Fragetypen, mit Aufzeichnung der Ergebnisse zur Verbesserung zukünftiger Abfragen. Enthält eine Konzept-zu-Tabelle-Zuordnungs-Cache und Tabellenschema-Erkennung über INFORMATION_SCHEMA oder Codebase-Grep. Bietet run_sql()- und run_sql_pandas()-Kernel-Funktionen, die Polars- oder Pandas-DataFrames für Analysen zurückgeben. CLI-Befehle zur Verwaltung von Konzept-, Muster- und Tabellen-Caches, plus...
official
annotating-task-lineage
astronomer
Annotieren von Airflow-Tasks mit Data Lineage mithilfe von Inlets und Outlets. Unterstützt OpenLineage-Dataset-Objekte, Airflow-Assets und Airflow-Datasets zur Definition von Ein- und Ausgaben über Datenbanken, Data Warehouses und Cloud-Speicher hinweg. Verwenden Sie es als Fallback, wenn Operatoren keine integrierten OpenLineage-Extraktoren besitzen; folgt einem vierstufigen Prioritätssystem, bei dem benutzerdefinierte Extraktoren und OpenLineage-Methoden Vorrang haben. Enthält Dataset-Namenshilfen für Snowflake, BigQuery, S3 und PostgreSQL, um eine konsistente...
official
authoring-dags
astronomer
Geführter Workflow zur Erstellung von Apache Airflow DAGs mit Validierungs- und Testintegration. Strukturierter Sechs-Phasen-Ansatz: Umgebung und bestehende Muster erkunden, DAG-Struktur planen, Implementierung nach Best Practices, Validierung mit af CLI-Befehlen, Testen mit Benutzereinwilligung und Iteration über Fehlerbehebungen. CLI-Befehle zur Erkundung (af config connections, af config providers, af dags list) und Validierung (af dags errors, af dags get, af dags explore) bieten sofortiges Feedback zu DAG...
official
blueprint
astronomer
Wiederverwendbare Airflow-Task-Gruppen-Vorlagen mit Pydantic-Validierung definieren und DAGs aus YAML zusammenstellen. Verwenden beim Erstellen von Blueprint-Vorlagen, Zusammenstellen von DAGs aus…
official
checking-freshness
astronomer
Überprüft die Datenaktualität durch Abgleich von Tabellenzeitstempeln und Aktualisierungsmustern mit einer Veraltungsskala. Identifiziert Zeitstempelspalten anhand gängiger ETL-Benennungsmuster (_loaded_at, _updated_at, created_at usw.) und fragt deren Maximalwerte ab, um das Alter zu bestimmen. Klassifiziert Daten in vier Aktualitätsstatus: Frisch (< 4 Stunden), Veraltet (4–24 Stunden), Stark veraltet (> 24 Stunden) oder Unbekannt (kein Zeitstempel gefunden). Stellt SQL-Vorlagen zur Überprüfung der letzten Aktualisierungszeit und der Zeilenanzahltrends der letzten Tage bereit, um...
official
cosmos-dbt-core
astronomer
Konvertieren Sie dbt Core-Projekte mit Astronomer Cosmos in Airflow-DAGs oder TaskGroups. Unterstützt drei Assembly-Muster: eigenständigen DbtDag, DbtTaskGroup innerhalb bestehender DAGs und einzelne Cosmos-Operatoren für feingranulare Steuerung. Wählen Sie aus acht Ausführungsmodi (WATCHER, LOCAL, VIRTUALENV, KUBERNETES, AIRFLOW_ASYNC und andere) basierend auf Isolations- und Leistungsanforderungen. Bietet drei Parsing-Strategien (dbt_manifest, dbt_ls, dbt_ls_file, automatisch), um Geschwindigkeit und Selektorkomplexität auszugleichen...
official