cosmos-dbt-core

作成者: astronomer

Astronomer Cosmosを使用して、dbt CoreプロジェクトをAirflow DAGまたはTaskGroupに変換します。3つのアセンブリパターン(スタンドアロンのDbtDag、既存のDAG内のDbtTaskGroup、細かい制御が可能な個別のCosmosオペレーター)をサポートします。分離とパフォーマンスのニーズに基づいて、8つの実行モード(WATCHER、LOCAL、VIRTUALENV、KUBERNETES、AIRFLOW_ASYNCなど)から選択できます。速度とセレクターの複雑さのバランスを取るために、3つの解析戦略(dbt_manifest、dbt_ls、dbt_ls_file、automatic)を提供します...

npx skills add https://github.com/astronomer/agents --skill cosmos-dbt-core

Cosmos + dbt Core: Implementation Checklist

Execute steps in order. Prefer the simplest configuration that meets the user's constraints.

Version note: This skill targets Cosmos 1.11+ and Airflow 3.x. If the user is on Airflow 2.x, adjust imports accordingly (see Appendix A).

Reference: Latest stable: https://pypi.org/project/astronomer-cosmos/

Before starting, confirm: (1) dbt engine = Core (not Fusion → use cosmos-dbt-fusion), (2) warehouse type, (3) Airflow version, (4) execution environment (Airflow env / venv / container), (5) DbtDag vs DbtTaskGroup vs individual operators, (6) manifest availability.


1. Configure Project (ProjectConfig)

ApproachWhen to useRequired param
Project pathFiles available locallydbt_project_path
Manifest onlydbt_manifest loadmanifest_path + project_name
from cosmos import ProjectConfig

_project_config = ProjectConfig(
    dbt_project_path="/path/to/dbt/project",
    # manifest_path="/path/to/manifest.json",  # for dbt_manifest load mode
    # project_name="my_project",  # if using manifest_path without dbt_project_path
    # install_dbt_deps=False,  # if deps precomputed in CI
)

2. Choose Parsing Strategy (RenderConfig)

Pick ONE load mode based on constraints:

Load modeWhen to useRequired inputsConstraints
dbt_manifestLarge projects; containerized execution; fastestProjectConfig.manifest_pathRemote manifest needs manifest_conn_id
dbt_lsComplex selectors; need dbt-native selectiondbt installed OR dbt_executable_pathCan also be used with containerized execution
dbt_ls_filedbt_ls selection without running dbt_ls every parseRenderConfig.dbt_ls_pathselect/exclude won't work
automatic (default)Simple setups; let Cosmos pick(none)Falls back: manifest → dbt_ls → custom

CRITICAL: Containerized execution (DOCKER/KUBERNETES/etc.)

from cosmos import RenderConfig, LoadMode

_render_config = RenderConfig(
    load_method=LoadMode.DBT_MANIFEST,  # or DBT_LS, DBT_LS_FILE, AUTOMATIC
)

3. Choose Execution Mode (ExecutionConfig)

Reference: See reference/cosmos-config.md for detailed configuration examples per mode.

Pick ONE execution mode:

Execution modeWhen to useSpeedRequired setup
WATCHERFastest; single dbt build visibilityFastestdbt adapter in env OR dbt_executable_path or dbt Fusion
WATCHER_KUBERNETESFastest isolated method; single dbt build visibilityFastdbt installed in container
LOCAL + DBT_RUNNERdbt + adapter in the same Python installation as AirflowFastdbt 1.5+ in requirements.txt
LOCAL + SUBPROCESSdbt + adapter available in the Airflow deployment, in an isolated Python installationMediumdbt_executable_path
AIRFLOW_ASYNCBigQuery + long-running transformsFastAirflow ≥2.8; provider deps
KUBERNETESIsolation between Airflow and dbtMediumAirflow ≥2.8; provider deps
VIRTUALENVCan't modify image; runtime venvSlowerpy_requirements in operator_args
Other containerized approachesSupport Airflow and dbt isolationMediumcontainer config
from cosmos import ExecutionConfig, ExecutionMode

_execution_config = ExecutionConfig(
    execution_mode=ExecutionMode.WATCHER,  # or LOCAL, VIRTUALENV, AIRFLOW_ASYNC, KUBERNETES, etc.
)

4. Configure Warehouse Connection (ProfileConfig)

Reference: See reference/cosmos-config.md for detailed ProfileConfig options and all ProfileMapping classes.

Option A: Airflow Connection + ProfileMapping (Recommended)

from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
        profile_args={"schema": "my_schema"},
    ),
)

Option B: Existing profiles.yml

CRITICAL: Do not hardcode secrets; use environment variables.

from cosmos import ProfileConfig

_profile_config = ProfileConfig(
    profile_name="my_profile",
    target_name="dev",
    profiles_yml_filepath="/path/to/profiles.yml",
)

5. Configure Testing Behavior (RenderConfig)

Reference: See reference/cosmos-config.md for detailed testing options.

TestBehaviorBehavior
AFTER_EACH (default)Tests run immediately after each model (default)
BUILDCombine run + test into single dbt build
AFTER_ALLAll tests after all models complete
NONESkip tests
from cosmos import RenderConfig, TestBehavior

_render_config = RenderConfig(
    test_behavior=TestBehavior.AFTER_EACH,
)

6. Configure operator_args

Reference: See reference/cosmos-config.md for detailed operator_args options.

_operator_args = {
    # BaseOperator params
    "retries": 3,

    # Cosmos-specific params
    "install_deps": False,
    "full_refresh": False,
    "quiet": True,

    # Runtime dbt vars (XCom / params)
    "vars": '{"my_var": "{{ ti.xcom_pull(task_ids=\'pre_dbt\') }}"}',
}

7. Assemble DAG / TaskGroup

Option A: DbtDag (Standalone)

from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime

_project_config = ProjectConfig(
    dbt_project_path="/usr/local/airflow/dbt/my_project",
)

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

_execution_config = ExecutionConfig()
_render_config = RenderConfig()

my_cosmos_dag = DbtDag(
    dag_id="my_cosmos_dag",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
    render_config=_render_config,
    operator_args={},
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
)

Option B: DbtTaskGroup (Inside Existing DAG)

from airflow.sdk import dag, task  # Airflow 3.x
# from airflow.decorators import dag, task  # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from pendulum import datetime

_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig()
_render_config = RenderConfig()

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
    @task
    def pre_dbt():
        return "some_value"

    dbt = DbtTaskGroup(
        group_id="dbt_project",
        project_config=_project_config,
        profile_config=_profile_config,
        execution_config=_execution_config,
        render_config=_render_config,
    )

    @task
    def post_dbt():
        pass

    chain(pre_dbt(), dbt, post_dbt())

my_dag()

Option C: Use Cosmos operators directly

import os
from datetime import datetime
from pathlib import Path
from typing import Any

from airflow import DAG

try:
    from airflow.providers.standard.operators.python import PythonOperator
except ImportError:
    from airflow.operators.python import PythonOperator

from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig
from cosmos.io import upload_to_aws_s3

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
DBT_PROJ_DIR = DBT_ROOT_PATH / "jaffle_shop"
DBT_PROFILE_PATH = DBT_PROJ_DIR / "profiles.yml"
DBT_ARTIFACT = DBT_PROJ_DIR / "target"

profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profiles_yml_filepath=DBT_PROFILE_PATH,
)


def check_s3_file(bucket_name: str, file_key: str, aws_conn_id: str = "aws_default", **context: Any) -> bool:
    """Check if a file exists in the given S3 bucket."""
    from airflow.providers.amazon.aws.hooks.s3 import S3Hook

    s3_key = f"{context['dag'].dag_id}/{context['run_id']}/seed/0/{file_key}"
    print(f"Checking if file {s3_key} exists in S3 bucket...")
    hook = S3Hook(aws_conn_id=aws_conn_id)
    return hook.check_for_key(key=s3_key, bucket_name=bucket_name)


with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag:
    seed_operator = DbtSeedLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="seed",
        dbt_cmd_flags=["--select", "raw_customers"],
        install_deps=True,
        append_env=True,
    )

    check_file_uploaded_task = PythonOperator(
        task_id="check_file_uploaded_task",
        python_callable=check_s3_file,
        op_kwargs={
            "aws_conn_id": "aws_s3_conn",
            "bucket_name": "cosmos-artifacts-upload",
            "file_key": "target/run_results.json",
        },
    )

    run_operator = DbtRunLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="run",
        dbt_cmd_flags=["--models", "stg_customers"],
        install_deps=True,
        append_env=True,
    )

    clone_operator = DbtCloneLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="clone",
        dbt_cmd_flags=["--models", "stg_customers", "--state", DBT_ARTIFACT],
        install_deps=True,
        append_env=True,
    )

    seed_operator >> run_operator >> clone_operator
    seed_operator >> check_file_uploaded_task

Setting Dependencies on Individual Cosmos Tasks

from cosmos import DbtDag, DbtResourceType
from airflow.sdk import task, chain

with DbtDag(...) as dag:
    @task
    def upstream_task():
        pass

    _upstream = upstream_task()

    for unique_id, dbt_node in dag.dbt_graph.filtered_nodes.items():
        if dbt_node.resource_type == DbtResourceType.SEED:
            my_dbt_task = dag.tasks_map[unique_id]
            chain(_upstream, my_dbt_task)

8. Safety Checks

Before finalizing, verify:

  • Execution mode matches constraints (AIRFLOW_ASYNC → BigQuery only)
  • Warehouse adapter installed for chosen execution mode
  • Secrets via Airflow connections or env vars, NOT plaintext
  • Load mode matches execution (complex selectors → dbt_ls)
  • Airflow 3 asset URIs if downstream DAGs scheduled on Cosmos assets (see Appendix A)

Appendix A: Airflow 3 Compatibility

Import Differences

Airflow 3.xAirflow 2.x
from airflow.sdk import dag, taskfrom airflow.decorators import dag, task
from airflow.sdk import chainfrom airflow.models.baseoperator import chain

Asset/Dataset URI Format Change

Cosmos ≤1.9 (Airflow 2 Datasets):

postgres://0.0.0.0:5434/postgres.public.orders

Cosmos ≥1.10 (Airflow 3 Assets):

postgres://0.0.0.0:5434/postgres/public/orders

CRITICAL: Update asset URIs when upgrading to Airflow 3.


Appendix B: Operational Extras

Caching

Cosmos caches artifacts to speed up parsing. Enabled by default.

Reference: https://astronomer.github.io/astronomer-cosmos/configuration/caching.html

Memory-Optimized Imports

AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS=True

When enabled:

from cosmos.airflow.dag import DbtDag  # instead of: from cosmos import DbtDag

Artifact Upload to Object Storage

AIRFLOW__COSMOS__REMOTE_TARGET_PATH=s3://bucket/target_dir/
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=aws_default
from cosmos.io import upload_to_cloud_storage

my_dag = DbtDag(
    # ...
    operator_args={"callback": upload_to_cloud_storage},
)

dbt Docs Hosting

Cosmos serves dbt docs in the Airflow UI. The config depends on your Airflow major version (each uses a different UI plugin system) — it is not a free single-vs-multi choice:

AirflowConfigScopeSince
2 (FAB plugin)DBT_DOCS_DIR (+ DBT_DOCS_CONN_ID, DBT_DOCS_INDEX_FILE_NAME)Single projectCosmos 1.4.0+
3.1+ (FastAPI)DBT_DOCS_PROJECTS (JSON)One or more projectsCosmos 1.11.0+

Airflow 2:

AIRFLOW__COSMOS__DBT_DOCS_DIR="path/to/docs"                   # local path or S3/GCS/Azure/HTTP URI; defaults to the dbt target/ folder
AIRFLOW__COSMOS__DBT_DOCS_CONN_ID="my_conn_id"                 # optional; for cloud storage
AIRFLOW__COSMOS__DBT_DOCS_INDEX_FILE_NAME="static_index.html"  # optional; only if docs built with --static

Airflow 3.1+:

AIRFLOW__COSMOS__DBT_DOCS_PROJECTS='{
    "my_project": {
        "dir": "s3://bucket/docs/",
        "index": "index.html",
        "conn_id": "aws_default",
        "name": "My Project"
    }
}'

Pick by Airflow version, not project count. The single-project settings are the Airflow 2 path; Cosmos publishes no deprecation notice for them — do not describe them as "legacy" or "deprecated."

Reference: https://astronomer.github.io/astronomer-cosmos/configuration/hosting-docs.html


Related Skills

  • cosmos-dbt-fusion: For dbt Fusion projects (not dbt Core)
  • authoring-dags: General DAG authoring patterns
  • testing-dags: Testing DAGs after creation

astronomerのその他のスキル

airflow
astronomer
Apache AirflowのDAG、実行、タスク、システム設定をクエリ、管理、トラブルシューティングします。DAG検査、実行管理、タスクログ、設定クエリ、REST API直接アクセスを含む30以上のコマンドをサポート。複数のAirflowインスタンスを永続的な設定で管理し、ローカルおよびAstroデプロイメントを自動検出。DAG実行を同期的(完了待機)または非同期的にトリガーし、障害を診断、再試行のために実行をクリア、リトライ/マップインデックスフィルタリング付きでタスクログにアクセス。出力...
official
airflow-hitl
astronomer
人間による承認ゲート、フォーム入力、およびAirflow DAG内での分岐を、遅延可能オペレーターを使用して実現。4種類のオペレーター:承認/却下の判断を行うApprovalOperator、フォームによる複数選択肢の選択を行うHITLOperator、人間主導のタスクルーティングを行うHITLBranchOperator、フォームデータ収集を行うHITLEntryOperator。すべてのオペレーターは遅延可能であり、Airflow UIのRequired ActionsタブまたはREST APIを介して人間の応答を待つ間、ワーカースロットを解放します。カスタム...を含むオプション機能をサポート。
official
airflow-plugins
astronomer
Airflow 3.1+のプラグインを構築し、FastAPIアプリ、カスタムUIページ、Reactコンポーネント、ミドルウェア、マクロ、オペレーターリンクをAirflow UIに直接埋め込みます。使用…
official
analyzing-data
astronomer
データウェアハウスにクエリを実行し、キャッシュされたパターンと概念マッピングを使用してビジネス上の質問に回答します。繰り返し発生する質問タイプのパターン検索とキャッシュをサポートし、結果を記録して将来のクエリを改善します。概念からテーブルへのマッピングキャッシュと、INFORMATION_SCHEMAまたはコードベースのgrepによるテーブルスキーマ検出を含みます。分析用にPolarsまたはPandas DataFrameを返すrun_sql()およびrun_sql_pandas()カーネル関数を提供します。概念、パターン、テーブルキャッシュを管理するCLIコマンド、さらに...
official
annotating-task-lineage
astronomer
Airflowタスクにデータ系列を注釈付けし、インレットとアウトレットを使用します。OpenLineage Datasetオブジェクト、Airflow Assets、Airflow Datasetsをサポートし、データベース、データウェアハウス、クラウドストレージ間での入出力を定義します。オペレーターに組み込みのOpenLineage抽出機能がない場合のフォールバックとして使用し、カスタム抽出機能とOpenLineageメソッドが優先される4段階の優先順位システムに従います。Snowflake、BigQuery、S3、PostgreSQL向けのデータセット命名ヘルパーを含み、一貫性を確保します。
official
authoring-dags
astronomer
Apache Airflow DAGを作成するためのガイド付きワークフローで、検証とテストの統合を備えています。構造化された6フェーズのアプローチ:環境と既存のパターンを発見し、DAG構造を計画し、ベストプラクティスに従って実装し、af CLIコマンドで検証し、ユーザーの同意を得てテストし、修正を繰り返します。発見用のCLIコマンド(af config connections、af config providers、af dags list)と検証用のCLIコマンド(af dags errors、af dags get、af dags explore)は、DAGに関する即時フィードバックを提供します。
official
blueprint
astronomer
Pydanticバリデーションを使用して再利用可能なAirflowタスクグループテンプレートを定義し、YAMLからDAGを構成します。ブループリントテンプレートの作成時や、DAGの構成時に使用します。
official
checking-freshness
astronomer
テーブルのタイムスタンプと更新パターンを陳腐化スケールに照らして確認し、データの鮮度を検証します。一般的なETL命名パターン(_loaded_at、_updated_at、created_atなど)を使用してタイムスタンプカラムを特定し、その最大値をクエリして経過時間を判定します。データを4つの鮮度ステータスに分類します:Fresh(4時間未満)、Stale(4~24時間)、Very Stale(24時間超)、またはUnknown(タイムスタンプなし)。最近の日数における最終更新時刻と行数トレンドを確認するためのSQLテンプレートを提供します...
official