cosmos-dbt-fusion

作成者: astronomer

Astronomer CosmosをSnowflake、Databricks、BigQuery、Redshift上のdbt Fusionプロジェクト向けにローカル実行で設定します。Cosmos 1.11.0以上、Airflowランタイムに別途インストールされたdbt Fusionバイナリ、およびサブプロセス呼び出しによるExecutionMode.LOCALが必要です。3つの解析戦略をサポート:dbt_manifest(大規模プロジェクトに最速)、dbt_ls(複雑なセレクター向け)、またはautomatic(シンプルな設定)。ウェアハウス接続用のProfileConfig、dbtプロジェクトパス用のProjectConfigの設定などをカバーします。

npx skills add https://github.com/astronomer/agents --skill cosmos-dbt-fusion

Cosmos + dbt Fusion: Implementation Checklist

Execute steps in order. This skill covers Fusion-specific constraints only.

Version note: dbt Fusion support was introduced in Cosmos 1.11.0. Requires Cosmos ≥1.11.

Reference: See reference/cosmos-config.md for ProfileConfig, operator_args, and Airflow 3 compatibility details.

Before starting, confirm: (1) dbt engine = Fusion (not Core → use cosmos-dbt-core), (2) warehouse = Snowflake, Databricks, Bigquery and Redshift only.

Fusion-Specific Constraints

ConstraintDetails
No asyncAIRFLOW_ASYNC not supported
No virtualenvFusion is a binary, not a Python package
Warehouse supportSnowflake, Databricks, Bigquery and Redshift support while in preview

1. Confirm Cosmos Version

CRITICAL: Cosmos 1.11.0 introduced dbt Fusion compatibility.

# Check installed version
pip show astronomer-cosmos

# Install/upgrade if needed
pip install "astronomer-cosmos>=1.11.0"

Validate: pip show astronomer-cosmos reports version ≥ 1.11.0


2. Install the dbt Fusion Binary (REQUIRED)

dbt Fusion is NOT bundled with Cosmos or dbt Core. Install it into the Airflow runtime/image.

Determine where to install the Fusion binary (Dockerfile / base image / runtime).

Example Dockerfile Install

USER root
RUN apt-get update && apt-get install -y curl
ENV SHELL=/bin/bash
RUN curl -fsSL https://public.cdn.getdbt.com/fs/install/install.sh | sh -s -- --update
USER astro

Common Install Paths

EnvironmentTypical path
Astro Runtime/home/astro/.local/bin/dbt
System-wide/usr/local/bin/dbt

Validate: The dbt binary exists at the chosen path and dbt --version succeeds.


3. Choose Parsing Strategy (RenderConfig)

Parsing strategy is the same as dbt Core. Pick ONE:

Load modeWhen to useRequired inputs
dbt_manifestLarge projects; fastest parsingProjectConfig.manifest_path
dbt_lsComplex selectors; need dbt-native selectionFusion binary accessible to scheduler
automaticSimple setups; let Cosmos pick(none)
from cosmos import RenderConfig, LoadMode

_render_config = RenderConfig(
    load_method=LoadMode.AUTOMATIC,  # or DBT_MANIFEST, DBT_LS
)

4. Configure Warehouse Connection (ProfileConfig)

Reference: See reference/cosmos-config.md for full ProfileConfig options and examples.

from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

5. Configure ExecutionConfig (LOCAL Only)

CRITICAL: dbt Fusion with Cosmos requires ExecutionMode.LOCAL with dbt_executable_path pointing to the Fusion binary.

from cosmos import ExecutionConfig
from cosmos.constants import InvocationMode

_execution_config = ExecutionConfig(
    invocation_mode=InvocationMode.SUBPROCESS,
    dbt_executable_path="/home/astro/.local/bin/dbt",  # REQUIRED: path to Fusion binary
    # execution_mode is LOCAL by default - do not change
)

6. Configure Project (ProjectConfig)

from cosmos import ProjectConfig

_project_config = ProjectConfig(
    dbt_project_path="/path/to/dbt/project",
    # manifest_path="/path/to/manifest.json",  # for dbt_manifest load mode
    # install_dbt_deps=False,  # if deps precomputed in CI
)

7. Assemble DAG / TaskGroup

Option A: DbtDag (Standalone)

from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime

_project_config = ProjectConfig(
    dbt_project_path="/usr/local/airflow/dbt/my_project",
)

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

_execution_config = ExecutionConfig(
    dbt_executable_path="/home/astro/.local/bin/dbt",  # Fusion binary
)

_render_config = RenderConfig()

my_fusion_dag = DbtDag(
    dag_id="my_fusion_cosmos_dag",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
    render_config=_render_config,
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
)

Option B: DbtTaskGroup (Inside Existing DAG)

from airflow.sdk import dag, task  # Airflow 3.x
# from airflow.decorators import dag, task  # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig
from pendulum import datetime

_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig(dbt_executable_path="/home/astro/.local/bin/dbt")

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
    @task
    def pre_dbt():
        return "some_value"

    dbt = DbtTaskGroup(
        group_id="dbt_fusion_project",
        project_config=_project_config,
        profile_config=_profile_config,
        execution_config=_execution_config,
    )

    @task
    def post_dbt():
        pass

    chain(pre_dbt(), dbt, post_dbt())

my_dag()

8. Final Validation

Before finalizing, verify:

  • Cosmos version: ≥1.11.0
  • Fusion binary installed: Path exists and is executable
  • Warehouse supported: Snowflake, Databricks, Bigquery or Redshift only
  • Secrets handling: Airflow connections or env vars, NOT plaintext

Troubleshooting

If user reports dbt Core regressions after enabling Fusion:

AIRFLOW__COSMOS__PRE_DBT_FUSION=1

User Must Test

  • The DAG parses in the Airflow UI (no import/parse-time errors)
  • A manual run succeeds against the target warehouse (at least one model)

Reference


Related Skills

  • cosmos-dbt-core: For dbt Core projects (not Fusion)
  • authoring-dags: General DAG authoring patterns
  • testing-dags: Testing DAGs after creation

astronomerのその他のスキル

airflow
astronomer
Apache AirflowのDAG、実行、タスク、システム設定をクエリ、管理、トラブルシューティングします。DAG検査、実行管理、タスクログ、設定クエリ、REST API直接アクセスを含む30以上のコマンドをサポート。複数のAirflowインスタンスを永続的な設定で管理し、ローカルおよびAstroデプロイメントを自動検出。DAG実行を同期的(完了待機)または非同期的にトリガーし、障害を診断、再試行のために実行をクリア、リトライ/マップインデックスフィルタリング付きでタスクログにアクセス。出力...
official
airflow-hitl
astronomer
人間による承認ゲート、フォーム入力、およびAirflow DAG内での分岐を、遅延可能オペレーターを使用して実現。4種類のオペレーター:承認/却下の判断を行うApprovalOperator、フォームによる複数選択肢の選択を行うHITLOperator、人間主導のタスクルーティングを行うHITLBranchOperator、フォームデータ収集を行うHITLEntryOperator。すべてのオペレーターは遅延可能であり、Airflow UIのRequired ActionsタブまたはREST APIを介して人間の応答を待つ間、ワーカースロットを解放します。カスタム...を含むオプション機能をサポート。
official
airflow-plugins
astronomer
Airflow 3.1+のプラグインを構築し、FastAPIアプリ、カスタムUIページ、Reactコンポーネント、ミドルウェア、マクロ、オペレーターリンクをAirflow UIに直接埋め込みます。使用…
official
analyzing-data
astronomer
データウェアハウスにクエリを実行し、キャッシュされたパターンと概念マッピングを使用してビジネス上の質問に回答します。繰り返し発生する質問タイプのパターン検索とキャッシュをサポートし、結果を記録して将来のクエリを改善します。概念からテーブルへのマッピングキャッシュと、INFORMATION_SCHEMAまたはコードベースのgrepによるテーブルスキーマ検出を含みます。分析用にPolarsまたはPandas DataFrameを返すrun_sql()およびrun_sql_pandas()カーネル関数を提供します。概念、パターン、テーブルキャッシュを管理するCLIコマンド、さらに...
official
annotating-task-lineage
astronomer
Airflowタスクにデータ系列を注釈付けし、インレットとアウトレットを使用します。OpenLineage Datasetオブジェクト、Airflow Assets、Airflow Datasetsをサポートし、データベース、データウェアハウス、クラウドストレージ間での入出力を定義します。オペレーターに組み込みのOpenLineage抽出機能がない場合のフォールバックとして使用し、カスタム抽出機能とOpenLineageメソッドが優先される4段階の優先順位システムに従います。Snowflake、BigQuery、S3、PostgreSQL向けのデータセット命名ヘルパーを含み、一貫性を確保します。
official
authoring-dags
astronomer
Apache Airflow DAGを作成するためのガイド付きワークフローで、検証とテストの統合を備えています。構造化された6フェーズのアプローチ:環境と既存のパターンを発見し、DAG構造を計画し、ベストプラクティスに従って実装し、af CLIコマンドで検証し、ユーザーの同意を得てテストし、修正を繰り返します。発見用のCLIコマンド(af config connections、af config providers、af dags list)と検証用のCLIコマンド(af dags errors、af dags get、af dags explore)は、DAGに関する即時フィードバックを提供します。
official
blueprint
astronomer
Pydanticバリデーションを使用して再利用可能なAirflowタスクグループテンプレートを定義し、YAMLからDAGを構成します。ブループリントテンプレートの作成時や、DAGの構成時に使用します。
official
checking-freshness
astronomer
テーブルのタイムスタンプと更新パターンを陳腐化スケールに照らして確認し、データの鮮度を検証します。一般的なETL命名パターン(_loaded_at、_updated_at、created_atなど)を使用してタイムスタンプカラムを特定し、その最大値をクエリして経過時間を判定します。データを4つの鮮度ステータスに分類します:Fresh(4時間未満)、Stale(4~24時間)、Very Stale(24時間超)、またはUnknown(タイムスタンプなし)。最近の日数における最終更新時刻と行数トレンドを確認するためのSQLテンプレートを提供します...
official