airflow-hitl

Gerbang persetujuan manusia, input formulir, dan percabangan dalam DAG Airflow menggunakan operator yang dapat ditunda. Empat jenis operator: ApprovalOperator untuk keputusan setuju/tolak, HITLOperator untuk pemilihan multi-opsi dengan formulir, HITLBranchOperator untuk perutean tugas yang digerakkan manusia, dan HITLEntryOperator untuk pengumpulan data formulir. Semua operator dapat ditunda, membebaskan slot pekerja sambil menunggu respons manusia melalui tab Required Actions di UI Airflow atau REST API. Mendukung fitur opsional termasuk kustom...

npx skills add https://github.com/astronomer/agents --skill airflow-hitl

Airflow Human-in-the-Loop Operators

Pause a DAG until a human responds via the Airflow UI or REST API. HITL operators are deferrable — they release their worker slot while waiting.

Requires Airflow 3.1+ (af config version).

UI location: Browse → Required Actions. Respond from the task instance page's Required Actions tab.

Cross-references: migrating-ai-sdk-to-common-ai for AI/LLM task decorators; airflow for registry and API discovery commands used below.


Step 1 — Pick the capability you need

CapabilityClass (verify in Step 2)
Approve or reject; downstream skips on rejectApprovalOperator
Present N options and return which were chosenHITLOperator
Branch to one or more downstream tasks based on a choiceHITLBranchOperator
Collect a form (no approve/select step)HITLEntryOperator
Use the HITL trigger directly (advanced / custom operators)HITLTrigger

This is the only place class names are hardcoded. The provider adds, renames, and removes params across releases — do not copy parameter lists from memory. Fetch the current signature before writing code.


Step 2 — Discover the current signatures from the Airflow Registry

Before writing HITL code, run these to see the live roster and constructor params (see the airflow skill for the full af registry reference):

# Every HITL-related module in the standard provider
af registry modules standard \
  | jq '.modules[] | select(.import_path | test("\\.hitl\\.")) | {name, type, import_path, short_description, docs_url}'

# Constructor signatures: name, type, default, required, description
af registry parameters standard \
  | jq '.classes | to_entries[] | select(.key | test("\\.hitl\\.")) | {fqn: .key, parameters: .value.parameters}'

# Pin to the exact installed provider version
af config providers \
  | jq '.providers[] | select(.package_name == "apache-airflow-providers-standard") | .version'
# then: af registry parameters standard --version <VERSION>

If the registry shows a param that this skill does not mention, prefer the registry. If the registry shows a class that is not in Step 1, treat it as additive — the decision table above may be stale.


Step 3 — Canonical example (approval gate)

Starting point for any HITL task. Adapt by swapping the class name and params per Step 2.

from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def approval_example():
    @task
    def prepare():
        return "Review quarterly report"

    approval = ApprovalOperator(
        task_id="approve_report",
        subject="Report Approval",
        body="{{ ti.xcom_pull(task_ids='prepare') }}",
        defaults="Approve",              # Auto-selected on timeout
        params={"comments": Param("", type="string")},
    )

    @task
    def after_approval(result):
        print(f"Decision: {result['chosen_options']}")

    chain(prepare(), approval)
    after_approval(approval.output)

approval_example()

For the other classes in Step 1, the shape is the same (task_id, subject, plus class-specific params). Verify each constructor through Step 2 — for example, HITLBranchOperator requires every option either to match a downstream task id directly or to be resolved via a mapping param surfaced in the registry.


Step 4 — Behavior contracts (stable across versions)

Timeout

  • With defaults set: task succeeds on timeout, default option(s) selected.
  • Without defaults: task fails on timeout.

Markdown + Jinja in body

body supports Markdown and is Jinja-templatable. Render XCom context directly:

body = """**Total Budget:** {{ ti.xcom_pull(task_ids='get_budget') }}

| Category | Amount |
|----------|--------|
| Marketing | $1M |
"""

Callbacks

All HITL operators accept the standard Airflow callback kwargs (on_success_callback, on_failure_callback, etc.).

Notifiers

HITL operators accept a notifiers list. Inside a notifier's notify(context) method, build a link to the pending task with HITLOperator.generate_link_to_ui_from_context(context, base_url=...).

Restricting who can respond

The parameter name and accepted identifier format depend on the active auth manager. Do not hardcode — check which one is active and which kwarg the current provider exposes:

af config show | jq '.auth_manager // .core.auth_manager'

Then look up the current kwarg in Step 2 (at the time of writing it is assigned_users, accepting identifiers in whatever format the active auth manager uses — Astro uses the Astro user ID, FabAuthManager uses email, SimpleAuthManager uses username).


Step 5 — Responding from external integrations

For Slack bots, custom apps, or scripts. Discover the live endpoint rather than hardcoding a path:

af api ls --filter hitl           # live endpoint list
af api spec \
  | jq '.paths | to_entries[] | select(.key | test("hitl"))'   # request/response schemas

The PATCH-to-respond pattern is stable; the exact path is discovered. Typical shape:

import os, requests

HOST = os.environ["AIRFLOW_HOST"]
TOKEN = os.environ["AIRFLOW_API_TOKEN"]
HEADERS = {"Authorization": f"Bearer {TOKEN}"}

# List pending — use the path from `af api ls --filter hitl`
requests.get(f"{HOST}/<path>", headers=HEADERS, params={"state": "pending"})

# Respond — same discovered path family, PATCH
requests.patch(
    f"{HOST}/<path>/{dag_id}/{run_id}/{task_id}",
    headers=HEADERS,
    json={"chosen_options": ["Approve"], "params_input": {"comments": "ok"}},
)

Step 6 — Safety checks

  • Airflow version ≥ 3.1 (af config version).
  • Constructor kwargs match the current registry output from Step 2 — no respondents-vs-assigned_users style drift.
  • For branching: every option resolves to a downstream task id (directly or via the mapping kwarg from Step 2).
  • Every value in defaults is also in options.
  • execution_timeout set; defaults configured if timeout should succeed rather than fail.
  • API token configured if external responders are part of the flow.

References

The upstream docs URL is surfaced per-module by the registry — do not hardcode:

af registry modules standard \
  | jq '.modules[] | select(.import_path | test("\\.hitl\\.")) | {name, docs_url}'

Related skills

  • airflowaf registry, af api, af config command reference.
  • migrating-ai-sdk-to-common-ai — AI/LLM task decorators and GenAI patterns (common-ai provider).
  • authoring-dags — general DAG writing best practices.
  • testing-dags — iterative test → debug → fix cycles.

Lebih banyak skill dari astronomer

airflow
astronomer
Kueri, kelola, dan pecahkan masalah DAG, proses, tugas, serta konfigurasi sistem Apache Airflow. Mendukung 30+ perintah untuk inspeksi DAG, manajemen proses, pencatatan tugas, kueri konfigurasi, dan akses langsung REST API. Kelola beberapa instance Airflow dengan konfigurasi persisten; temukan secara otomatis deployment lokal dan Astro. Jalankan proses DAG secara sinkron (tunggu hingga selesai) atau asinkron, diagnosis kegagalan, hapus proses untuk percobaan ulang, dan akses log tugas dengan filter percobaan ulang/indeks peta. Keluaran...
official
airflow-plugins
astronomer
Bangun plugin Airflow 3.1+ yang menyematkan aplikasi FastAPI, halaman UI kustom, komponen React, middleware, makro, dan tautan operator langsung ke dalam UI Airflow. Gunakan…
official
analyzing-data
astronomer
Kueri gudang data Anda untuk menjawab pertanyaan bisnis dengan pola yang di-cache dan pemetaan konsep. Mendukung pencarian pola dan caching untuk jenis pertanyaan berulang, dengan pencatatan hasil untuk meningkatkan kueri di masa mendatang. Menyertakan cache pemetaan konsep-ke-tabel dan penemuan skema tabel melalui INFORMATION_SCHEMA atau grep basis kode. Menyediakan fungsi kernel run_sql() dan run_sql_pandas() yang mengembalikan DataFrame Polars atau Pandas untuk analisis. Perintah CLI untuk mengelola cache konsep, pola, dan tabel, plus...
official
annotating-task-lineage
astronomer
Anotasi tugas Airflow dengan lineage data menggunakan inlet dan outlet. Mendukung objek Dataset OpenLineage, Aset Airflow, dan Dataset Airflow untuk mendefinisikan input dan output di seluruh basis data, gudang data, dan penyimpanan cloud. Digunakan sebagai cadangan ketika operator tidak memiliki ekstraktor OpenLineage bawaan; mengikuti sistem prioritas empat tingkat di mana ekstraktor kustom dan metode OpenLineage diutamakan. Menyertakan pembantu penamaan dataset untuk Snowflake, BigQuery, S3, dan PostgreSQL guna memastikan konsistensi...
official
authoring-dags
astronomer
Panduan kerja untuk membuat DAG Apache Airflow dengan integrasi validasi dan pengujian. Pendekatan enam fase terstruktur: temukan lingkungan dan pola yang ada, rencanakan struktur DAG, implementasikan sesuai praktik terbaik, validasi dengan perintah CLI af, uji dengan persetujuan pengguna, dan lakukan iterasi perbaikan. Perintah CLI untuk penemuan (af config connections, af config providers, af dags list) dan validasi (af dags errors, af dags get, af dags explore) memberikan umpan balik langsung pada DAG...
official
blueprint
astronomer
Definisikan templat grup tugas Airflow yang dapat digunakan kembali dengan validasi Pydantic dan susun DAG dari YAML. Gunakan saat membuat templat blueprint, menyusun DAG dari…
official
checking-freshness
astronomer
Verifikasi kesegaran data dengan memeriksa timestamp tabel dan pola pembaruan terhadap skala ketidaksegaran. Mengidentifikasi kolom timestamp menggunakan pola penamaan ETL umum (_loaded_at, _updated_at, created_at, dll.) dan menanyakan nilai maksimumnya untuk menentukan usia. Mengklasifikasikan data ke dalam empat status kesegaran: Segar (< 4 jam), Agak Basi (4–24 jam), Sangat Basi (> 24 jam), atau Tidak Diketahui (tidak ada timestamp ditemukan). Menyediakan template SQL untuk memeriksa waktu pembaruan terakhir dan tren jumlah baris selama beberapa hari terakhir hingga...
official
cosmos-dbt-core
astronomer
Konversi proyek dbt Core menjadi DAG atau TaskGroup Airflow menggunakan Astronomer Cosmos. Mendukung tiga pola perakitan: DbtDag mandiri, DbtTaskGroup dalam DAG yang sudah ada, dan operator Cosmos individual untuk kontrol yang lebih terperinci. Pilih dari delapan mode eksekusi (WATCHER, LOCAL, VIRTUALENV, KUBERNETES, AIRFLOW_ASYNC, dan lainnya) berdasarkan kebutuhan isolasi dan kinerja. Menawarkan tiga strategi parsing (dbt_manifest, dbt_ls, dbt_ls_file, otomatis) untuk menyeimbangkan kecepatan dan kompleksitas pemilih...
official