airflow-plugins

작성자: astronomer

Airflow 3.1+ 플러그인을 빌드하여 FastAPI 앱, 커스텀 UI 페이지, React 컴포넌트, 미들웨어, 매크로 및 연산자 링크를 Airflow UI에 직접 임베드합니다. 사용…

npx skills add https://github.com/astronomer/agents --skill airflow-plugins

Airflow 3 Plugins

Airflow 3 plugins let you embed FastAPI apps, React UIs, middleware, macros, operator buttons, and custom timetables directly into the Airflow process. No sidecar, no extra server.

CRITICAL: Plugin components (fastapi_apps, react_apps, external_views) require Airflow 3.1+. NEVER import flask, flask_appbuilder, or use appbuilder_views / flask_blueprints — these are Airflow 2 patterns and will not work in Airflow 3. If existing code uses them, rewrite the entire registration block using FastAPI.

Security: FastAPI plugin endpoints are not automatically protected by Airflow auth. If your endpoints need to be private, implement authentication explicitly using FastAPI's security utilities.

Restart required: Changes to Python plugin files require restarting the API server. Static file changes (HTML, JS, CSS) are picked up immediately. Set AIRFLOW__CORE__LAZY_LOAD_PLUGINS=False during development to load plugins at startup rather than lazily.

Relative paths always: In external_views, href must have no leading slash. In HTML and JavaScript, use relative paths for all assets and fetch() calls. Absolute paths break behind reverse proxies.

Before writing any code, verify

  1. Am I using fastapi_apps / FastAPI — not appbuilder_views / Flask?
  2. Are all HTML/JS asset paths and fetch() calls relative (no leading slash)?
  3. Are all synchronous SDK or SQLAlchemy calls wrapped in asyncio.to_thread()?
  4. Do the static/ and assets/ directories exist before the FastAPI app mounts them?
  5. If the endpoint must be private, did I add explicit FastAPI authentication?

Step 1: Choose plugin components

A single plugin class can register multiple component types at once.

ComponentWhat it doesField
Custom API endpointsFastAPI app mounted in Airflow processfastapi_apps
Nav / page linkEmbeds a URL as an iframe or links outexternal_views
React componentCustom React app embedded in Airflow UIreact_apps
API middlewareIntercepts all Airflow API requests/responsesfastapi_root_middlewares
Jinja macrosReusable Python functions in DAG templatesmacros
Task instance buttonExtra link button in task Detail viewoperator_extra_links / global_operator_extra_links
Custom timetableCustom scheduling logictimetables
Event hooksListener callbacks for Airflow eventslisteners

Step 2: Plugin registration skeleton

Project file structure

Give each plugin its own subdirectory under plugins/ — this keeps the Python file, static assets, and templates together and makes multi-plugin projects manageable:

plugins/
  my-plugin/
    plugin.py       # AirflowPlugin subclass — auto-discovered by Airflow
    static/
      index.html
      app.js
    assets/
      icon.svg

BASE_DIR = Path(__file__).parent in plugin.py resolves to plugins/my-plugin/ — static and asset paths will be correct relative to that. Create the subdirectory and any static/assets folders before starting Airflow, or StaticFiles will raise on import.

from pathlib import Path
from airflow.plugins_manager import AirflowPlugin
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse

BASE_DIR = Path(__file__).parent

app = FastAPI(title="My Plugin")

# Both directories must exist before Airflow starts or FastAPI raises on import
app.mount("/static", StaticFiles(directory=BASE_DIR / "static"), name="static")
app.mount("/assets", StaticFiles(directory=BASE_DIR / "assets"), name="assets")


class MyPlugin(AirflowPlugin):
    name = "my_plugin"

    fastapi_apps = [
        {
            "app": app,
            "url_prefix": "/my-plugin",   # plugin available at {AIRFLOW_HOST}/my-plugin/
            "name": "My Plugin",
        }
    ]

    external_views = [
        {
            "name": "My Plugin",
            "href": "my-plugin/ui",              # NO leading slash — breaks on Astro and reverse proxies
            "destination": "nav",                # see locations table below
            "category": "browse",                # nav bar category (nav destination only)
            "url_route": "my-plugin",            # unique route name (required for React apps)
            "icon": "/my-plugin/static/icon.svg" # DOES use a leading slash — served by FastAPI
        }
    ]

External view locations

destinationWhere it appears
"nav"Left navigation bar (also set category)
"dag"Extra tab on every Dag page
"dag_run"Extra tab on every Dag run page
"task"Extra tab on every task page
"task_instance"Extra tab on every task instance page

Nav bar categories (destination: "nav")

Set "category" to place the link under a specific nav group: "browse", "admin", or omit for top-level.

External URLs and minimal plugins

href can be a relative path to an internal endpoint ("my-plugin/ui") or a full external URL. A plugin with only external_views and no fastapi_apps is valid — no backend needed for a simple link or tab:

from airflow.plugins_manager import AirflowPlugin

class LearnViewPlugin(AirflowPlugin):
    name = "learn_view_plugin"

    external_views = [
        {
            "name": "Learn Airflow 3",
            "href": "https://www.astronomer.io/docs/learn",
            "destination": "dag",   # adds a tab to every Dag page
            "url_route": "learn"
        }
    ]

The no-leading-slash rule applies to internal paths only — full https:// URLs are fine.


Step 3: Serve the UI entry point

@app.get("/ui", response_class=FileResponse)
async def serve_ui():
    return FileResponse(BASE_DIR / "static" / "index.html")

In HTML, always use relative paths. Absolute paths break when Airflow is mounted at a sub-path:

<!-- correct -->
<link rel="stylesheet" href="static/app.css" />
<script src="static/app.js?v=20240315"></script>

<!-- breaks behind a reverse proxy -->
<script src="/my-plugin/static/app.js"></script>

Same rule in JavaScript:

fetch('api/dags')           // correct — relative to current page
fetch('/my-plugin/api/dags') // breaks on Astro and sub-path deploys

Step 4: Call the Airflow API from your plugin

Only needed if your plugin calls the Airflow REST API. Plugins that only serve static files, register external_views, or use direct DB access do not need this step — skip to Step 5 or Step 6.

Add the dependency

Only if REST API communication is being implemented: add apache-airflow-client to the project's dependencies. Check which file exists and act accordingly:

File foundAction
requirements.txtAppend apache-airflow-client
pyproject.toml (uv / poetry)uv add apache-airflow-client or poetry add apache-airflow-client
None of the aboveTell the user: "Add apache-airflow-client to your dependencies before running the plugin."

Use apache-airflow-client to talk to Airflow's own REST API. The SDK is synchronous but FastAPI routes are async — never call blocking SDK methods directly inside async def or you will stall the event loop and freeze all concurrent requests.

JWT token management

Cache one token per process. Refresh 5 minutes before the 1-hour expiry. Use double-checked locking so multiple concurrent requests don't all race to refresh simultaneously:

Replace MYPLUGIN_ with a short uppercase prefix derived from the plugin name (e.g. if the plugin is called "Trip Analyzer", use TRIP_ANALYZER_). If no plugin name has been given yet, ask the user before writing env var names.

import asyncio
import os
import threading
import time
import airflow_client.client as airflow_sdk
import requests

AIRFLOW_HOST  = os.environ.get("MYPLUGIN_HOST",     "http://localhost:8080")
AIRFLOW_USER  = os.environ.get("MYPLUGIN_USERNAME", "admin")
AIRFLOW_PASS  = os.environ.get("MYPLUGIN_PASSWORD", "admin")
AIRFLOW_TOKEN = os.environ.get("MYPLUGIN_TOKEN")    # Astronomer Astro: Deployment API token

_cached_token: str | None = None
_token_expires_at: float  = 0.0
_token_lock = threading.Lock()


def _fetch_fresh_token() -> str:
    """Exchange username/password for a JWT via Airflow's auth endpoint."""
    response = requests.post(
        f"{AIRFLOW_HOST}/auth/token",
        json={"username": AIRFLOW_USER, "password": AIRFLOW_PASS},
        timeout=10,
    )
    response.raise_for_status()
    return response.json()["access_token"]


def _get_token() -> str:
    # Astronomer Astro production: use static Deployment API token directly
    if AIRFLOW_TOKEN:
        return AIRFLOW_TOKEN
    global _cached_token, _token_expires_at
    now = time.monotonic()
    # Fast path — no lock if still valid
    if _cached_token and now < _token_expires_at:
        return _cached_token
    # Slow path — one thread refreshes, others wait
    with _token_lock:
        if _cached_token and now < _token_expires_at:
            return _cached_token
        _cached_token = _fetch_fresh_token()
        _token_expires_at = now + 55 * 60  # refresh 5 min before 1-hour expiry
    return _cached_token


def _make_config() -> airflow_sdk.Configuration:
    config = airflow_sdk.Configuration(host=AIRFLOW_HOST)
    config.access_token = _get_token()
    return config

After implementing auth, tell the user:

  • Local development: set MYPLUGIN_USERNAME and MYPLUGIN_PASSWORD in .env — JWT exchange happens automatically.

  • Astronomer Astro (production): create a Deployment API token and set it as MYPLUGIN_TOKEN — the JWT exchange is skipped entirely:

    1. Astro UI → open the Deployment → AccessAPI Tokens+ Deployment API Token
    2. Copy the token value (shown only once)
    3. astro deployment variable create MYPLUGIN_TOKEN=<token>

    MYPLUGIN_USERNAME and MYPLUGIN_PASSWORD are not needed on Astro.

Wrapping SDK calls with asyncio.to_thread

from fastapi import HTTPException
from airflow_client.client.api import DAGApi

@app.get("/api/dags")
async def list_dags():
    try:
        def _fetch():
            with airflow_sdk.ApiClient(_make_config()) as client:
                return DAGApi(client).get_dags(limit=100).dags
        dags = await asyncio.to_thread(_fetch)
        return [{"dag_id": d.dag_id, "is_paused": d.is_paused, "timetable_summary": d.timetable_summary} for d in dags]
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

API field names: Never guess response field names — verify against the REST API reference. Key DAGResponse fields: dag_id, dag_display_name, description, is_paused, timetable_summary, timetable_description, fileloc, owners, tags.

The pattern is always: define a plain inner def _fetch() with all SDK logic, then await asyncio.to_thread(_fetch).

Alternative: Direct database access

Warning — use with caution and tell the user. The Airflow metadb is not a public interface. Direct writes or poorly-formed queries can corrupt scheduler state. Whenever you use this pattern, explicitly tell the user: "This accesses Airflow's internal database directly. The internal models are not part of the public API, can change between Airflow versions, and incorrect queries can cause issues in the metadb. Prefer apache-airflow-client unless the operation is not exposed via the REST API."

Since FastAPI plugin endpoints run inside the API server process (not in a task worker), they have direct access to Airflow's internal SQLAlchemy models — no HTTP round-trip or JWT needed. Use only for read operations not exposed via the REST API, or when the extra HTTP overhead genuinely matters. Always wrap DB calls in asyncio.to_thread() — SQLAlchemy queries are blocking.

from airflow.models import DagBag, DagModel
from airflow.utils.db import provide_session

@app.get("/api/dags/status")
async def dag_status():
    def _fetch():
        @provide_session
        def _query(session=None):
            dagbag = DagBag()
            paused = sum(
                1 for dag_id in dagbag.dags
                if (m := session.query(DagModel).filter(DagModel.dag_id == dag_id).first())
                and m.is_paused
            )
            return {"total": len(dagbag.dags), "paused": paused}
        return _query()
    return await asyncio.to_thread(_fetch)

Step 5: Common API endpoint patterns

If you need an SDK method or field not shown in the examples below, verify it before generating code — do not guess. Either run python3 -c "from airflow_client.client.api import <Class>; print([m for m in dir(<Class>) if not m.startswith('_')])" in any environment where the SDK is installed, or search the apache/airflow-client-python repo for the class definition.

from airflow_client.client.api import DAGApi, DagRunApi
from airflow_client.client.models import TriggerDAGRunPostBody, DAGPatchBody


@app.post("/api/dags/{dag_id}/trigger")
async def trigger_dag(dag_id: str):
    def _run():
        with airflow_sdk.ApiClient(_make_config()) as client:
            return DagRunApi(client).trigger_dag_run(dag_id, TriggerDAGRunPostBody())
    result = await asyncio.to_thread(_run)
    return {"run_id": result.dag_run_id, "state": normalize_state(result.state)}


@app.patch("/api/dags/{dag_id}/pause")
async def toggle_pause(dag_id: str, is_paused: bool):
    def _run():
        with airflow_sdk.ApiClient(_make_config()) as client:
            DAGApi(client).patch_dag(dag_id, DAGPatchBody(is_paused=is_paused))
    await asyncio.to_thread(_run)
    return {"dag_id": dag_id, "is_paused": is_paused}


@app.delete("/api/dags/{dag_id}")
async def delete_dag(dag_id: str):
    def _run():
        with airflow_sdk.ApiClient(_make_config()) as client:
            DAGApi(client).delete_dag(dag_id)
    await asyncio.to_thread(_run)
    return {"deleted": dag_id}


def normalize_state(raw) -> str:
    """Convert SDK enum objects to plain strings before sending to the frontend."""
    if raw is None:
        return "never_run"
    return str(raw).lower()

DAG runs, task instances, and logs

These are the most common calls beyond basic DAG CRUD. For anything not shown here, consult the REST API reference for available endpoints and the matching Python SDK class/method names.

from airflow_client.client.api import DagRunApi, TaskInstanceApi

# Latest run for a DAG
@app.get("/api/dags/{dag_id}/runs/latest")
async def latest_run(dag_id: str):
    def _fetch():
        with airflow_sdk.ApiClient(_make_config()) as client:
            runs = DagRunApi(client).get_dag_runs(dag_id, limit=1, order_by="-start_date").dag_runs
            return runs[0] if runs else None
    run = await asyncio.to_thread(_fetch)
    if not run:
        return {"state": "never_run"}
    return {"run_id": run.dag_run_id, "state": normalize_state(run.state)}


# Task instances for a specific run
@app.get("/api/dags/{dag_id}/runs/{run_id}/tasks")
async def task_instances(dag_id: str, run_id: str):
    def _fetch():
        with airflow_sdk.ApiClient(_make_config()) as client:
            return TaskInstanceApi(client).get_task_instances(dag_id, run_id).task_instances
    tasks = await asyncio.to_thread(_fetch)
    return [{"task_id": t.task_id, "state": normalize_state(t.state)} for t in tasks]


# Task log (try_number starts at 1)
@app.get("/api/dags/{dag_id}/runs/{run_id}/tasks/{task_id}/logs/{try_number}")
async def task_log(dag_id: str, run_id: str, task_id: str, try_number: int):
    def _fetch():
        with airflow_sdk.ApiClient(_make_config()) as client:
            return TaskInstanceApi(client).get_log(
                dag_id, run_id, task_id, try_number, map_index=-1
            )
    result = await asyncio.to_thread(_fetch)
    return {"log": result.content if hasattr(result, "content") else str(result)}

Streaming proxy

Use StreamingResponse to proxy binary content from an external URL through the plugin — useful when the browser can't fetch the resource directly (CORS, auth, etc.):

import requests
from starlette.responses import StreamingResponse

@app.get("/api/files/{filename}")
async def proxy_file(filename: str):
    def _stream():
        r = requests.get(f"https://files.example.com/{filename}", stream=True)
        r.raise_for_status()
        return r
    response = await asyncio.to_thread(_stream)
    return StreamingResponse(
        response.iter_content(chunk_size=8192),
        media_type="application/octet-stream",
        headers={"Content-Disposition": f'attachment; filename="{filename}"'},
    )

Note that requests.get() is blocking — fetch in asyncio.to_thread so the event loop isn't stalled while waiting for the remote server.


Step 6: Other plugin component types

Macros

Macros are loaded by the scheduler (and DAG processor), not the API server. Restart the scheduler after changes.

from airflow.plugins_manager import AirflowPlugin

def format_confidence(confidence: float) -> str:
    return f"{confidence * 100:.2f}%"

class MyPlugin(AirflowPlugin):
    name = "my_plugin"
    macros = [format_confidence]

Use in any templated field — including with XCom:

{{ macros.my_plugin.format_confidence(0.95) }}

{{ macros.my_plugin.format_confidence(ti.xcom_pull(task_ids='score_task')['confidence']) }}

The naming pattern is always macros.{plugin_name}.{function_name}.

Middleware

Middleware applies to all Airflow API requests, including the built-in REST API and any FastAPI plugins. Use sparingly and filter requests explicitly if needed:

from starlette.middleware.base import BaseHTTPMiddleware
from fastapi import Request, Response

class AuditMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next) -> Response:
        # runs before every request to the Airflow API server
        response = await call_next(request)
        return response

class MyPlugin(AirflowPlugin):
    name = "my_plugin"
    fastapi_root_middlewares = [
        {"middleware": AuditMiddleware, "args": [], "kwargs": {}, "name": "Audit"}
    ]

Operator extra links

from airflow.sdk.bases.operatorlink import BaseOperatorLink

class MyDashboardLink(BaseOperatorLink):
    name = "Open in Dashboard"

    def get_link(self, operator, *, ti_key, **context) -> str:
        return f"https://my-dashboard.example.com/tasks/{ti_key.task_id}"

class MyPlugin(AirflowPlugin):
    name = "my_plugin"
    global_operator_extra_links = [MyDashboardLink()]  # appears on every task
    # operator_extra_links = [MyDashboardLink()]       # attach to specific operator instead

React apps

React apps are embedded as JavaScript bundles served via FastAPI. The bundle must expose itself as a global variable matching the plugin name:

// In your bundle (e.g. my-app.js)
globalThis['My Plugin'] = MyComponent;   // matches plugin name
globalThis.AirflowPlugin = MyComponent;  // fallback Airflow looks for
class MyPlugin(AirflowPlugin):
    name = "my_plugin"
    fastapi_apps = [{"app": app, "url_prefix": "/my-plugin", "name": "My Plugin"}]
    react_apps = [
        {
            "name": "My Plugin",
            "bundle_url": "/my-plugin/my-app.js",
            "destination": "nav",
            "category": "browse",
            "url_route": "my-plugin",
        }
    ]

The same bundle can be registered to multiple destinations by adding multiple entries — each needs a unique url_route:

react_apps = [
    {"name": "My Widget", "bundle_url": "/my-plugin/widget.js", "destination": "nav",  "url_route": "my-widget-nav"},
    {"name": "My Widget", "bundle_url": "/my-plugin/widget.js", "destination": "dag",  "url_route": "my-widget-dag"},
]

React app integration is experimental in Airflow 3.1. Interfaces may change in future releases.


Step 7: Environment variables and deployment

Never hardcode credentials:

AIRFLOW_HOST = os.environ.get("MYPLUGIN_HOST",     "http://localhost:8080")
AIRFLOW_USER = os.environ.get("MYPLUGIN_USERNAME", "admin")
AIRFLOW_PASS = os.environ.get("MYPLUGIN_PASSWORD", "admin")

Local Astro CLI:

# .env
MYPLUGIN_HOST=http://localhost:8080
MYPLUGIN_USERNAME=admin
MYPLUGIN_PASSWORD=admin
astro dev restart              # required after any Python plugin change

# Check logs by component (Astro CLI):
astro dev logs --api-server    # FastAPI apps, external_views — plugin import errors show here
astro dev logs --scheduler     # macros, timetables, listeners, operator links
astro dev logs --dag-processor # DAG parsing errors

# Non-Astro:
airflow plugins                # CLI — lists all loaded plugins

Production Astronomer:

astro deployment variable create --deployment-id <id> MYPLUGIN_HOST=https://airflow.example.com

Auto-reload during development (skips lazy loading):

AIRFLOW__CORE__LAZY_LOAD_PLUGINS=False

Cache busting for static files after deploy:

<script src="static/app.js?v=20240315-1"></script>

Verify the plugin loaded: open Admin > Plugins in the Airflow UI.

OpenAPI docs are auto-generated for FastAPI plugins:

  • Swagger UI: {AIRFLOW_HOST}/{url_prefix}/docs
  • OpenAPI JSON: {AIRFLOW_HOST}/{url_prefix}/openapi.json

Common pitfalls

ProblemCauseFix
Nav link goes to 404Leading / in href"my-plugin/ui" not "/my-plugin/ui"
Nav icon not showingMissing / in iconicon takes an absolute path: "/my-plugin/static/icon.svg"
Event loop freezes under loadSync SDK called directly in async defWrap with asyncio.to_thread()
401 errors after 1 hourJWT expires with no refreshUse the 5-minute pre-expiry refresh pattern
StaticFiles raises on startupDirectory missingCreate assets/ and static/ before starting
Plugin not showing upPython file changed without restartastro dev restart
Endpoints accessible without loginFastAPI apps are not auto-authenticatedAdd FastAPI security (e.g. OAuth2, API key) if endpoints must be private
Middleware affecting wrong routesMiddleware applies to all API trafficFilter by request.url.path inside dispatch()
JS fetch() breaks on AstroAbsolute path in fetch()Always use relative paths: fetch('api/dags')

References

astronomer의 다른 스킬

airflow
astronomer
Apache Airflow DAG, 실행, 작업 및 시스템 구성을 쿼리, 관리 및 문제 해결합니다. DAG 검사, 실행 관리, 작업 로깅, 구성 쿼리 및 직접 REST API 액세스에 걸쳐 30개 이상의 명령을 지원합니다. 지속적인 구성으로 여러 Airflow 인스턴스를 관리하고 로컬 및 Astro 배포를 자동으로 검색합니다. DAG 실행을 동기식(완료 대기) 또는 비동기식으로 트리거하고, 실패를 진단하고, 재시도를 위해 실행을 지우고, 재시도/맵 인덱스 필터링을 통해 작업 로그에 액세스합니다. 출력...
official
airflow-hitl
astronomer
인간 승인 게이트, 폼 입력, 그리고 지연 가능 연산자를 사용한 Airflow DAG 내 분기 처리. 네 가지 연산자 유형: 승인/거부 결정을 위한 ApprovalOperator, 폼을 통한 다중 옵션 선택을 위한 HITLOperator, 인간 주도 작업 라우팅을 위한 HITLBranchOperator, 폼 데이터 수집을 위한 HITLEntryOperator. 모든 연산자는 지연 가능하며, Airflow UI의 Required Actions 탭 또는 REST API를 통해 인간 응답을 기다리는 동안 작업자 슬롯을 해제합니다. 선택적 기능 지원 포함: 사용자 정의...
official
analyzing-data
astronomer
데이터 웨어하우스에 질의하여 캐시된 패턴과 개념 매핑을 통해 비즈니스 질문에 답변합니다. 반복되는 질문 유형에 대한 패턴 조회 및 캐싱을 지원하며, 결과 기록을 통해 향후 질의를 개선합니다. 개념-테이블 매핑 캐시와 INFORMATION_SCHEMA 또는 코드베이스 grep을 통한 테이블 스키마 탐색을 포함합니다. 분석을 위해 Polars 또는 Pandas DataFrame을 반환하는 run_sql() 및 run_sql_pandas() 커널 함수를 제공합니다. 개념, 패턴 및 테이블 캐시를 관리하기 위한 CLI 명령어와 추가 기능을 포함합니다.
official
annotating-task-lineage
astronomer
Airflow 태스크에 인렛과 아웃렛을 사용하여 데이터 계보를 주석 처리합니다. 입력 및 출력을 데이터베이스, 데이터 웨어하우스, 클라우드 스토리지 전반에 걸쳐 정의하기 위해 OpenLineage Dataset 객체, Airflow Assets 및 Airflow Datasets를 지원합니다. 운영자에 내장된 OpenLineage 추출기가 없는 경우 대체 수단으로 사용되며, 사용자 정의 추출기와 OpenLineage 메서드가 우선 적용되는 4단계 우선순위 시스템을 따릅니다. Snowflake, BigQuery, S3 및 PostgreSQL에 대한 일관된 명명을 보장하는 데이터셋 명명 헬퍼를 포함합니다.
official
authoring-dags
astronomer
Apache Airflow DAG 생성을 위한 안내 워크플로우로, 검증 및 테스트 통합을 포함합니다. 구조화된 6단계 접근 방식: 환경 및 기존 패턴 발견, DAG 구조 계획, 모범 사례에 따른 구현, af CLI 명령어로 검증, 사용자 동의 하에 테스트, 수정 반복. 발견을 위한 CLI 명령어(af config connections, af config providers, af dags list)와 검증을 위한 명령어(af dags errors, af dags get, af dags explore)는 DAG에 대한 즉각적인 피드백을 제공합니다...
official
blueprint
astronomer
Pydantic 검증을 통해 재사용 가능한 Airflow 태스크 그룹 템플릿을 정의하고 YAML로 DAG를 구성합니다. blueprint 템플릿을 생성하거나 DAG를 구성할 때 사용합니다.
official
checking-freshness
astronomer
테이블 타임스탬프와 업데이트 패턴을 확인하여 데이터 신선도를 검증하고, 부패 정도를 평가합니다. 일반적인 ETL 명명 패턴(_loaded_at, _updated_at, created_at 등)을 사용하여 타임스탬프 열을 식별하고, 최대값을 조회하여 데이터의 기간을 파악합니다. 데이터 신선도를 네 가지 상태로 분류합니다: 신선(4시간 미만), 부패(4~24시간), 매우 부패(24시간 초과), 또는 알 수 없음(타임스탬프 없음). 최근 며칠간의 마지막 업데이트 시간과 행 수 추세를 확인하기 위한 SQL 템플릿을 제공합니다.
official
cosmos-dbt-core
astronomer
dbt Core 프로젝트를 Astronomer Cosmos를 사용하여 Airflow DAG 또는 TaskGroup으로 변환합니다. 세 가지 어셈블리 패턴을 지원합니다: 독립형 DbtDag, 기존 DAG 내 DbtTaskGroup, 세밀한 제어를 위한 개별 Cosmos 연산자. 격리 및 성능 요구 사항에 따라 8가지 실행 모드(WATCHER, LOCAL, VIRTUALENV, KUBERNETES, AIRFLOW_ASYNC 등) 중에서 선택할 수 있습니다. 속도와 선택기 복잡성의 균형을 맞추기 위해 세 가지 파싱 전략(dbt_manifest, dbt_ls, dbt_ls_file, automatic)을 제공합니다...
official