cosmos-dbt-fusion

Konfigurasikan Astronomer Cosmos untuk proyek dbt Fusion di Snowflake, Databricks, BigQuery, atau Redshift dengan eksekusi lokal. Membutuhkan Cosmos 1.11.0+, biner dbt Fusion yang diinstal secara terpisah di runtime Airflow, dan ExecutionMode.LOCAL dengan pemanggilan subproses. Mendukung tiga strategi penguraian: dbt_manifest (tercepat untuk proyek besar), dbt_ls (untuk pemilih kompleks), atau otomatis (pengaturan sederhana). Mencakup pengaturan ProfileConfig untuk koneksi warehouse, ProjectConfig untuk jalur proyek dbt, dan...

npx skills add https://github.com/astronomer/agents --skill cosmos-dbt-fusion

Cosmos + dbt Fusion: Implementation Checklist

Execute steps in order. This skill covers Fusion-specific constraints only.

Version note: dbt Fusion support was introduced in Cosmos 1.11.0. Requires Cosmos ≥1.11.

Reference: See reference/cosmos-config.md for ProfileConfig, operator_args, and Airflow 3 compatibility details.

Before starting, confirm: (1) dbt engine = Fusion (not Core → use cosmos-dbt-core), (2) warehouse = Snowflake, Databricks, Bigquery and Redshift only.

Fusion-Specific Constraints

ConstraintDetails
No asyncAIRFLOW_ASYNC not supported
No virtualenvFusion is a binary, not a Python package
Warehouse supportSnowflake, Databricks, Bigquery and Redshift support while in preview

1. Confirm Cosmos Version

CRITICAL: Cosmos 1.11.0 introduced dbt Fusion compatibility.

# Check installed version
pip show astronomer-cosmos

# Install/upgrade if needed
pip install "astronomer-cosmos>=1.11.0"

Validate: pip show astronomer-cosmos reports version ≥ 1.11.0


2. Install the dbt Fusion Binary (REQUIRED)

dbt Fusion is NOT bundled with Cosmos or dbt Core. Install it into the Airflow runtime/image.

Determine where to install the Fusion binary (Dockerfile / base image / runtime).

Example Dockerfile Install

USER root
RUN apt-get update && apt-get install -y curl
ENV SHELL=/bin/bash
RUN curl -fsSL https://public.cdn.getdbt.com/fs/install/install.sh | sh -s -- --update
USER astro

Common Install Paths

EnvironmentTypical path
Astro Runtime/home/astro/.local/bin/dbt
System-wide/usr/local/bin/dbt

Validate: The dbt binary exists at the chosen path and dbt --version succeeds.


3. Choose Parsing Strategy (RenderConfig)

Parsing strategy is the same as dbt Core. Pick ONE:

Load modeWhen to useRequired inputs
dbt_manifestLarge projects; fastest parsingProjectConfig.manifest_path
dbt_lsComplex selectors; need dbt-native selectionFusion binary accessible to scheduler
automaticSimple setups; let Cosmos pick(none)
from cosmos import RenderConfig, LoadMode

_render_config = RenderConfig(
    load_method=LoadMode.AUTOMATIC,  # or DBT_MANIFEST, DBT_LS
)

4. Configure Warehouse Connection (ProfileConfig)

Reference: See reference/cosmos-config.md for full ProfileConfig options and examples.

from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

5. Configure ExecutionConfig (LOCAL Only)

CRITICAL: dbt Fusion with Cosmos requires ExecutionMode.LOCAL with dbt_executable_path pointing to the Fusion binary.

from cosmos import ExecutionConfig
from cosmos.constants import InvocationMode

_execution_config = ExecutionConfig(
    invocation_mode=InvocationMode.SUBPROCESS,
    dbt_executable_path="/home/astro/.local/bin/dbt",  # REQUIRED: path to Fusion binary
    # execution_mode is LOCAL by default - do not change
)

6. Configure Project (ProjectConfig)

from cosmos import ProjectConfig

_project_config = ProjectConfig(
    dbt_project_path="/path/to/dbt/project",
    # manifest_path="/path/to/manifest.json",  # for dbt_manifest load mode
    # install_dbt_deps=False,  # if deps precomputed in CI
)

7. Assemble DAG / TaskGroup

Option A: DbtDag (Standalone)

from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime

_project_config = ProjectConfig(
    dbt_project_path="/usr/local/airflow/dbt/my_project",
)

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

_execution_config = ExecutionConfig(
    dbt_executable_path="/home/astro/.local/bin/dbt",  # Fusion binary
)

_render_config = RenderConfig()

my_fusion_dag = DbtDag(
    dag_id="my_fusion_cosmos_dag",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
    render_config=_render_config,
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
)

Option B: DbtTaskGroup (Inside Existing DAG)

from airflow.sdk import dag, task  # Airflow 3.x
# from airflow.decorators import dag, task  # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig
from pendulum import datetime

_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig(dbt_executable_path="/home/astro/.local/bin/dbt")

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
    @task
    def pre_dbt():
        return "some_value"

    dbt = DbtTaskGroup(
        group_id="dbt_fusion_project",
        project_config=_project_config,
        profile_config=_profile_config,
        execution_config=_execution_config,
    )

    @task
    def post_dbt():
        pass

    chain(pre_dbt(), dbt, post_dbt())

my_dag()

8. Final Validation

Before finalizing, verify:

  • Cosmos version: ≥1.11.0
  • Fusion binary installed: Path exists and is executable
  • Warehouse supported: Snowflake, Databricks, Bigquery or Redshift only
  • Secrets handling: Airflow connections or env vars, NOT plaintext

Troubleshooting

If user reports dbt Core regressions after enabling Fusion:

AIRFLOW__COSMOS__PRE_DBT_FUSION=1

User Must Test

  • The DAG parses in the Airflow UI (no import/parse-time errors)
  • A manual run succeeds against the target warehouse (at least one model)

Reference


Related Skills

  • cosmos-dbt-core: For dbt Core projects (not Fusion)
  • authoring-dags: General DAG authoring patterns
  • testing-dags: Testing DAGs after creation

Lebih banyak skill dari astronomer

airflow
astronomer
Kueri, kelola, dan pecahkan masalah DAG, proses, tugas, serta konfigurasi sistem Apache Airflow. Mendukung 30+ perintah untuk inspeksi DAG, manajemen proses, pencatatan tugas, kueri konfigurasi, dan akses langsung REST API. Kelola beberapa instance Airflow dengan konfigurasi persisten; temukan secara otomatis deployment lokal dan Astro. Jalankan proses DAG secara sinkron (tunggu hingga selesai) atau asinkron, diagnosis kegagalan, hapus proses untuk percobaan ulang, dan akses log tugas dengan filter percobaan ulang/indeks peta. Keluaran...
official
airflow-hitl
astronomer
Gerbang persetujuan manusia, input formulir, dan percabangan dalam DAG Airflow menggunakan operator yang dapat ditunda. Empat jenis operator: ApprovalOperator untuk keputusan setuju/tolak, HITLOperator untuk pemilihan multi-opsi dengan formulir, HITLBranchOperator untuk perutean tugas yang digerakkan manusia, dan HITLEntryOperator untuk pengumpulan data formulir. Semua operator dapat ditunda, membebaskan slot pekerja sambil menunggu respons manusia melalui tab Required Actions di UI Airflow atau REST API. Mendukung fitur opsional termasuk kustom...
official
airflow-plugins
astronomer
Bangun plugin Airflow 3.1+ yang menyematkan aplikasi FastAPI, halaman UI kustom, komponen React, middleware, makro, dan tautan operator langsung ke dalam UI Airflow. Gunakan…
official
analyzing-data
astronomer
Kueri gudang data Anda untuk menjawab pertanyaan bisnis dengan pola yang di-cache dan pemetaan konsep. Mendukung pencarian pola dan caching untuk jenis pertanyaan berulang, dengan pencatatan hasil untuk meningkatkan kueri di masa mendatang. Menyertakan cache pemetaan konsep-ke-tabel dan penemuan skema tabel melalui INFORMATION_SCHEMA atau grep basis kode. Menyediakan fungsi kernel run_sql() dan run_sql_pandas() yang mengembalikan DataFrame Polars atau Pandas untuk analisis. Perintah CLI untuk mengelola cache konsep, pola, dan tabel, plus...
official
annotating-task-lineage
astronomer
Anotasi tugas Airflow dengan lineage data menggunakan inlet dan outlet. Mendukung objek Dataset OpenLineage, Aset Airflow, dan Dataset Airflow untuk mendefinisikan input dan output di seluruh basis data, gudang data, dan penyimpanan cloud. Digunakan sebagai cadangan ketika operator tidak memiliki ekstraktor OpenLineage bawaan; mengikuti sistem prioritas empat tingkat di mana ekstraktor kustom dan metode OpenLineage diutamakan. Menyertakan pembantu penamaan dataset untuk Snowflake, BigQuery, S3, dan PostgreSQL guna memastikan konsistensi...
official
authoring-dags
astronomer
Panduan kerja untuk membuat DAG Apache Airflow dengan integrasi validasi dan pengujian. Pendekatan enam fase terstruktur: temukan lingkungan dan pola yang ada, rencanakan struktur DAG, implementasikan sesuai praktik terbaik, validasi dengan perintah CLI af, uji dengan persetujuan pengguna, dan lakukan iterasi perbaikan. Perintah CLI untuk penemuan (af config connections, af config providers, af dags list) dan validasi (af dags errors, af dags get, af dags explore) memberikan umpan balik langsung pada DAG...
official
blueprint
astronomer
Definisikan templat grup tugas Airflow yang dapat digunakan kembali dengan validasi Pydantic dan susun DAG dari YAML. Gunakan saat membuat templat blueprint, menyusun DAG dari…
official
checking-freshness
astronomer
Verifikasi kesegaran data dengan memeriksa timestamp tabel dan pola pembaruan terhadap skala ketidaksegaran. Mengidentifikasi kolom timestamp menggunakan pola penamaan ETL umum (_loaded_at, _updated_at, created_at, dll.) dan menanyakan nilai maksimumnya untuk menentukan usia. Mengklasifikasikan data ke dalam empat status kesegaran: Segar (< 4 jam), Agak Basi (4–24 jam), Sangat Basi (> 24 jam), atau Tidak Diketahui (tidak ada timestamp ditemukan). Menyediakan template SQL untuk memeriksa waktu pembaruan terakhir dan tren jumlah baris selama beberapa hari terakhir hingga...
official