debugging-dags

Систематический анализ первопричин и устранение неисправностей для упавших DAG-графов Airflow с использованием структурированных рабочих процессов расследования. Проводит через четырехэтапный процесс диагностики: выявление сбоя, извлечение деталей ошибки, сбор контекстной информации и предоставление практических шагов по исправлению. Классифицирует сбои на четыре типа (данные, код, инфраструктура, зависимости) для фокусировки расследования и предложения соответствующих исправлений. Предоставляет готовые к использованию команды CLI для извлечения журналов, сравнения запусков, очистки задач и DAG...

npx skills add https://github.com/astronomer/agents --skill debugging-dags

DAG Diagnosis

You are a data engineer debugging a failed Airflow DAG. Follow this systematic approach to identify the root cause and provide actionable remediation.

Running the CLI

These commands assume af is on PATH. Run via astro otto to get it automatically, or install standalone with uv tool install astro-airflow-mcp.


Step 1: Identify the Failure

If a specific DAG was mentioned:

  • Run af runs diagnose <dag_id> <dag_run_id> (if run_id is provided)
  • If no run_id specified, run af dags stats to find recent failures

If no DAG was specified:

  • Run af health to find recent failures across all DAGs
  • Check for import errors with af dags errors
  • Show DAGs with recent failures
  • Ask which DAG to investigate further

Step 2: Get the Error Details

Once you have identified a failed task:

  1. Get task logs using af tasks logs <dag_id> <dag_run_id> <task_id>
  2. Look for the actual exception - scroll past the Airflow boilerplate to find the real error
  3. Categorize the failure type:
    • Data issue: Missing data, schema change, null values, constraint violation
    • Code issue: Bug, syntax error, import failure, type error
    • Infrastructure issue: Connection timeout, resource exhaustion, permission denied
    • Dependency issue: Upstream failure, external API down, rate limiting

Step 3: Check Context

Gather additional context to understand WHY this happened:

  1. Recent changes: Was there a code deploy? Check git history if available
  2. Package version changes: Was a package upgraded — in the image, in a venv-style operator, or at the index? See Package version changes below.
  3. Data volume: Did data volume spike? Run a quick count on source tables
  4. Upstream health: Did upstream tasks succeed but produce unexpected data?
  5. Historical pattern: Is this a recurring failure? Check if same task failed before
  6. Timing: Did this fail at an unusual time? (resource contention, maintenance windows)

Use af runs get <dag_id> <dag_run_id> to compare the failed run against recent successful runs.

Package version changes

A common cause of failures with no git activity is dependency drift — the user's code didn't change, but a package they depend on did. Check in this order:

  1. Worker image diff (preferred when available). Every Astro deploy = new image tag, so the registry has a "before" and "after". Diff pip freeze between current and previous image — that's ground truth for what changed:

    docker run --rm <current_image> pip freeze > /tmp/now.txt
    docker run --rm <previous_image> pip freeze > /tmp/prev.txt
    diff /tmp/prev.txt /tmp/now.txt
    

    Also compare docker run --rm <image> python --version between the two — a Python minor-version bump (3.11 → 3.12, or even a patch) can break wheel compatibility even when pip freeze looks identical. af config providers lists currently installed provider versions, useful for cross-checking against modules named in the traceback.

  2. Venv-style operators bypass the worker image. @task.virtualenv, PythonVirtualenvOperator, ExternalPythonOperator, and KubernetesPodOperator build their environment per task run, so an image diff won't catch failures inside them. If the failed task is one of these, read its requirements / image / python_version / python args directly:

    • Unbounded specifier (e.g. pandas>=2.0.0 with no upper bound, or no specifier at all) → a new upstream release is the prime suspect.
    • image="foo:latest" or no tag → the image moved underneath you.
    • python_version="3.11" (on @task.virtualenv / PythonVirtualenvOperator) or a python path (on ExternalPythonOperator) resolving to a different interpreter than it used to — a Python minor-version change can break wheel compatibility for unchanged requirements. Same vector applies to the worker image itself if the base Python changed there.

    Fix is to pin: pandas>=2.0.0,<3.0.0, a lockfile, a specific image SHA, or a fully-qualified Python version (python_version="3.11.7" instead of "3.11").

  3. Index lookup when image diff isn't conclusive (no image history, or a venv-style operator). Identify the configured index first — it may not be PyPI:

    • Env vars: UV_INDEX_URL, PIP_INDEX_URL, PIP_EXTRA_INDEX_URL
    • pyproject.toml[[tool.uv.index]]
    • ~/.pip/pip.conf, /etc/pip.conf
    • Dockerfile --index-url flags

    Then query for releases of the suspect package since the first failure started. PyPI:

    curl -s https://pypi.org/pypi/<pkg>/json | jq '.releases | to_entries | map({version: .key, uploaded: .value[0].upload_time}) | sort_by(.uploaded) | reverse | .[:5]'
    

    Private indexes usually expose the same /pypi/<pkg>/json shape; fall back to the Simple API (/simple/<pkg>/) or ask the user if neither works.

A release timestamp landing between the last green run and the first red run, for a package named in the traceback, is the answer.

On Astro

If you're running on Astro, these additional tools can help with diagnosis:

  • Deployment activity log: Check the Astro UI for recent deploys — a failed deploy or recent code change is often the cause of sudden failures
  • Astro alerts: Configure alerts in the Astro UI for proactive failure monitoring (DAG failure, task duration, SLA miss)
  • Observability: Use the Astro observability dashboard to track DAG health trends and spot recurring issues

On OSS Airflow

  • Airflow UI: Use the DAGs page, Graph view, and task logs to inspect recent runs and failures

Step 4: Provide Actionable Output

Structure your diagnosis as:

Root Cause

What actually broke? Be specific - not "the task failed" but "the task failed because column X was null in 15% of rows when the code expected 0%".

Impact Assessment

  • What data is affected? Which tables didn't get updated?
  • What downstream processes are blocked?
  • Is this blocking production dashboards or reports?

Immediate Fix

Specific steps to resolve RIGHT NOW:

  1. If it's a data issue: SQL to fix or skip bad records
  2. If it's a code issue: The exact code change needed
  3. If it's infra: Who to contact or what to restart

Prevention

How to prevent this from happening again:

  • Add data quality checks?
  • Add better error handling?
  • Add alerting for edge cases?
  • Update documentation?
  • Pin dependencies (constraints file, lockfile, or upper-bound specifiers on venv/external/pod operators) to avoid silent upstream drift?

Quick Commands

Provide ready-to-use commands:

  • To clear and rerun the entire DAG run: af runs clear <dag_id> <run_id>
  • To clear and rerun specific failed tasks: af tasks clear <dag_id> <run_id> <task_ids> -D
  • To delete a stuck or unwanted run: af runs delete <dag_id> <run_id>

Больше skills от astronomer

airflow
astronomer
Запрос, управление и устранение неполадок DAG, запусков, задач и системной конфигурации Apache Airflow. Поддерживает более 30 команд для проверки DAG, управления запусками, ведения журналов задач, запросов конфигурации и прямого доступа к REST API. Управление несколькими экземплярами Airflow с постоянной конфигурацией; автоматическое обнаружение локальных и Astro развертываний. Синхронный (с ожиданием завершения) или асинхронный запуск DAG, диагностика сбоев, очистка запусков для повторного выполнения, доступ к журналам задач с фильтрацией по повторным попыткам и индексу карты. Вывод...
official
airflow-hitl
astronomer
Шлюзы утверждения человеком, ввод форм и ветвление в DAG Airflow с использованием отложенных операторов. Четыре типа операторов: ApprovalOperator для решений утвердить/отклонить, HITLOperator для выбора нескольких вариантов с формами, HITLBranchOperator для маршрутизации задач на основе решений человека и HITLEntryOperator для сбора данных из форм. Все операторы являются отложенными, освобождая слоты рабочих узлов в ожидании ответа человека через вкладку Required Actions в интерфейсе Airflow или REST API. Поддерживает дополнительные функции, включая пользовательские...
official
airflow-plugins
astronomer
Создавайте плагины для Airflow 3.1+, которые встраивают приложения FastAPI, пользовательские страницы интерфейса, компоненты React, промежуточное ПО, макросы и ссылки на операторы непосредственно в интерфейс Airflow. Используйте…
official
analyzing-data
astronomer
Запрашивайте данные из вашего хранилища данных для ответа на бизнес-вопросы с использованием кэшированных шаблонов и сопоставлений понятий. Поддерживает поиск по шаблонам и кэширование для повторяющихся типов вопросов с записью результатов для улучшения будущих запросов. Включает кэш сопоставлений понятий и таблиц, а также обнаружение схем таблиц через INFORMATION_SCHEMA или grep кодовой базы. Предоставляет функции ядра run_sql() и run_sql_pandas(), возвращающие DataFrames Polars или Pandas для анализа. Команды CLI для управления кэшами понятий, шаблонов и таблиц, а также...
official
annotating-task-lineage
astronomer
Аннотирование задач Airflow с помощью data lineage с использованием inlets и outlets. Поддерживает объекты Dataset OpenLineage, Airflow Assets и Airflow Datasets для определения входных и выходных данных в базах данных, хранилищах данных и облачных хранилищах. Используется как запасной вариант, когда операторам не хватает встроенных экстракторов OpenLineage; следует четырехуровневой системе приоритетов, где пользовательские экстракторы и методы OpenLineage имеют приоритет. Включает вспомогательные функции для именования наборов данных для Snowflake, BigQuery, S3 и PostgreSQL для обеспечения согласованности...
official
authoring-dags
astronomer
Пошаговый процесс создания DAG Apache Airflow с интеграцией валидации и тестирования. Структурированный шестифазный подход: обнаружение среды и существующих шаблонов, планирование структуры DAG, реализация с соблюдением лучших практик, валидация с помощью команд af CLI, тестирование с согласия пользователя и итеративное исправление. Команды CLI для обнаружения (af config connections, af config providers, af dags list) и валидации (af dags errors, af dags get, af dags explore) обеспечивают немедленную обратную связь по DAG...
official
blueprint
astronomer
Определяйте переиспользуемые шаблоны групп задач Airflow с валидацией Pydantic и составляйте DAG из YAML. Используйте при создании шаблонов blueprint, составлении DAG из…
official
checking-freshness
astronomer
Проверяет свежесть данных, сравнивая временные метки таблиц и шаблоны обновлений со шкалой устаревания. Определяет столбцы с временными метками, используя распространённые шаблоны именования ETL (_loaded_at, _updated_at, created_at и т.д.), и запрашивает их максимальные значения для определения возраста. Классифицирует данные по четырём статусам свежести: Свежие (< 4 часов), Устаревшие (4–24 часа), Очень устаревшие (> 24 часов) или Неизвестно (временная метка не найдена). Предоставляет SQL-шаблоны для проверки времени последнего обновления и тенденций количества строк за последние дни, чтобы...
official