airflow-hitl

作者: astronomer

在Airflow DAG中使用可延迟操作符实现人工审批关卡、表单输入和分支。四种操作符类型:用于批准/拒绝决策的ApprovalOperator、带表单的多选项选择HITLOperator、人工驱动的任务路由HITLBranchOperator,以及表单数据收集HITLEntryOperator。所有操作符均为可延迟设计,在通过Airflow UI的"必需操作"标签页或REST API等待人工响应时释放工作槽位。支持包括自定义在内的可选功能...

npx skills add https://github.com/astronomer/agents --skill airflow-hitl

Airflow Human-in-the-Loop Operators

Pause a DAG until a human responds via the Airflow UI or REST API. HITL operators are deferrable — they release their worker slot while waiting.

Requires Airflow 3.1+ (af config version).

UI location: Browse → Required Actions. Respond from the task instance page's Required Actions tab.

Cross-references: airflow-ai for AI/LLM task decorators; airflow for registry and API discovery commands used below.


Step 1 — Pick the capability you need

CapabilityClass (verify in Step 2)
Approve or reject; downstream skips on rejectApprovalOperator
Present N options and return which were chosenHITLOperator
Branch to one or more downstream tasks based on a choiceHITLBranchOperator
Collect a form (no approve/select step)HITLEntryOperator
Use the HITL trigger directly (advanced / custom operators)HITLTrigger

This is the only place class names are hardcoded. The provider adds, renames, and removes params across releases — do not copy parameter lists from memory. Fetch the current signature before writing code.


Step 2 — Discover the current signatures from the Airflow Registry

Before writing HITL code, run these to see the live roster and constructor params (see the airflow skill for the full af registry reference):

# Every HITL-related module in the standard provider
af registry modules standard \
  | jq '.modules[] | select(.import_path | test("\\.hitl\\.")) | {name, type, import_path, short_description, docs_url}'

# Constructor signatures: name, type, default, required, description
af registry parameters standard \
  | jq '.classes | to_entries[] | select(.key | test("\\.hitl\\.")) | {fqn: .key, parameters: .value.parameters}'

# Pin to the exact installed provider version
af config providers \
  | jq '.providers[] | select(.package_name == "apache-airflow-providers-standard") | .version'
# then: af registry parameters standard --version <VERSION>

If the registry shows a param that this skill does not mention, prefer the registry. If the registry shows a class that is not in Step 1, treat it as additive — the decision table above may be stale.


Step 3 — Canonical example (approval gate)

Starting point for any HITL task. Adapt by swapping the class name and params per Step 2.

from airflow.providers.standard.operators.hitl import ApprovalOperator
from airflow.sdk import dag, task, chain, Param
from pendulum import datetime

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def approval_example():
    @task
    def prepare():
        return "Review quarterly report"

    approval = ApprovalOperator(
        task_id="approve_report",
        subject="Report Approval",
        body="{{ ti.xcom_pull(task_ids='prepare') }}",
        defaults="Approve",              # Auto-selected on timeout
        params={"comments": Param("", type="string")},
    )

    @task
    def after_approval(result):
        print(f"Decision: {result['chosen_options']}")

    chain(prepare(), approval)
    after_approval(approval.output)

approval_example()

For the other classes in Step 1, the shape is the same (task_id, subject, plus class-specific params). Verify each constructor through Step 2 — for example, HITLBranchOperator requires every option either to match a downstream task id directly or to be resolved via a mapping param surfaced in the registry.


Step 4 — Behavior contracts (stable across versions)

Timeout

  • With defaults set: task succeeds on timeout, default option(s) selected.
  • Without defaults: task fails on timeout.

Markdown + Jinja in body

body supports Markdown and is Jinja-templatable. Render XCom context directly:

body = """**Total Budget:** {{ ti.xcom_pull(task_ids='get_budget') }}

| Category | Amount |
|----------|--------|
| Marketing | $1M |
"""

Callbacks

All HITL operators accept the standard Airflow callback kwargs (on_success_callback, on_failure_callback, etc.).

Notifiers

HITL operators accept a notifiers list. Inside a notifier's notify(context) method, build a link to the pending task with HITLOperator.generate_link_to_ui_from_context(context, base_url=...).

Restricting who can respond

The parameter name and accepted identifier format depend on the active auth manager. Do not hardcode — check which one is active and which kwarg the current provider exposes:

af config show | jq '.auth_manager // .core.auth_manager'

Then look up the current kwarg in Step 2 (at the time of writing it is assigned_users, accepting identifiers in whatever format the active auth manager uses — Astro uses the Astro user ID, FabAuthManager uses email, SimpleAuthManager uses username).


Step 5 — Responding from external integrations

For Slack bots, custom apps, or scripts. Discover the live endpoint rather than hardcoding a path:

af api ls --filter hitl           # live endpoint list
af api spec \
  | jq '.paths | to_entries[] | select(.key | test("hitl"))'   # request/response schemas

The PATCH-to-respond pattern is stable; the exact path is discovered. Typical shape:

import os, requests

HOST = os.environ["AIRFLOW_HOST"]
TOKEN = os.environ["AIRFLOW_API_TOKEN"]
HEADERS = {"Authorization": f"Bearer {TOKEN}"}

# List pending — use the path from `af api ls --filter hitl`
requests.get(f"{HOST}/<path>", headers=HEADERS, params={"state": "pending"})

# Respond — same discovered path family, PATCH
requests.patch(
    f"{HOST}/<path>/{dag_id}/{run_id}/{task_id}",
    headers=HEADERS,
    json={"chosen_options": ["Approve"], "params_input": {"comments": "ok"}},
)

Step 6 — Safety checks

  • Airflow version ≥ 3.1 (af config version).
  • Constructor kwargs match the current registry output from Step 2 — no respondents-vs-assigned_users style drift.
  • For branching: every option resolves to a downstream task id (directly or via the mapping kwarg from Step 2).
  • Every value in defaults is also in options.
  • execution_timeout set; defaults configured if timeout should succeed rather than fail.
  • API token configured if external responders are part of the flow.

References

The upstream docs URL is surfaced per-module by the registry — do not hardcode:

af registry modules standard \
  | jq '.modules[] | select(.import_path | test("\\.hitl\\.")) | {name, docs_url}'

Related skills

  • airflowaf registry, af api, af config command reference.
  • airflow-ai — AI/LLM task decorators and GenAI patterns.
  • authoring-dags — general DAG writing best practices.
  • testing-dags — iterative test → debug → fix cycles.

来自 astronomer 的更多技能

airflow
astronomer
查询、管理和排查Apache Airflow的DAG、运行记录、任务及系统配置。支持30多种命令,涵盖DAG检查、运行管理、任务日志、配置查询及直接REST API访问。通过持久化配置管理多个Airflow实例;自动发现本地和Astro部署。同步(等待完成)或异步触发DAG运行,诊断故障,清除运行记录以重试,并通过重试/映射索引过滤访问任务日志。输出...
official
airflow-plugins
astronomer
构建嵌入FastAPI应用、自定义UI页面、React组件、中间件、宏和操作符链接的Airflow 3.1+插件,直接集成到Airflow UI中。使用…
official
analyzing-data
astronomer
查询数据仓库,利用缓存的模式和概念映射来回答业务问题。支持对重复问题类型进行模式查找和缓存,并通过记录结果来改进后续查询。包含概念到表的映射缓存,以及通过INFORMATION_SCHEMA或代码库grep进行表结构发现。提供run_sql()和run_sql_pandas()内核函数,返回Polars或Pandas DataFrame用于分析。提供CLI命令用于管理概念、模式和表缓存,以及...
official
annotating-task-lineage
astronomer
使用入口和出口为Airflow任务标注数据血缘。支持使用OpenLineage Dataset对象、Airflow Assets和Airflow Datasets定义跨数据库、数据仓库及云存储的输入输出。当运算符缺少内置OpenLineage提取器时作为备用方案;遵循四级优先级系统,其中自定义提取器和OpenLineage方法优先。包含针对Snowflake、BigQuery、S3和PostgreSQL的数据集命名辅助工具,以确保一致性...
official
authoring-dags
astronomer
创建Apache Airflow DAG的引导式工作流,集成验证与测试。采用六阶段结构化方法:发现环境与现有模式、规划DAG结构、遵循最佳实践实现、通过af CLI命令验证、经用户同意测试、迭代修复。用于发现(af config connections、af config providers、af dags list)和验证(af dags errors、af dags get、af dags explore)的CLI命令可提供DAG的即时反馈...
official
blueprint
astronomer
使用Pydantic验证定义可复用的Airflow任务组模板,并从YAML组合DAG。适用于创建blueprint模板、从YAML组合DAG等场景。
official
checking-freshness
astronomer
通过检查表时间戳和更新模式与陈旧度标尺对比,验证数据新鲜度。使用常见ETL命名模式(如_loaded_at、_updated_at、created_at等)识别时间戳列,并查询其最大值以确定数据时效。将数据分为四种新鲜度状态:新鲜(<4小时)、陈旧(4–24小时)、非常陈旧(>24小时)或未知(未找到时间戳)。提供SQL模板,用于检查最近几天的最后更新时间及行数变化趋势。
official
cosmos-dbt-core
astronomer
使用Astronomer Cosmos将dbt Core项目转换为Airflow DAG或TaskGroup。支持三种组装模式:独立的DbtDag、现有DAG中的DbtTaskGroup,以及用于精细控制的独立Cosmos运算符。根据隔离和性能需求,从八种执行模式(WATCHER、LOCAL、VIRTUALENV、KUBERNETES、AIRFLOW_ASYNC等)中选择。提供三种解析策略(dbt_manifest、dbt_ls、dbt_ls_file、自动),以平衡速度和选择器复杂度...
official