cosmos-dbt-fusion

द्वारा astronomer

Astronomer Cosmos को Snowflake, Databricks, BigQuery या Redshift पर dbt Fusion प्रोजेक्ट्स के लिए स्थानीय निष्पादन के साथ कॉन्फ़िगर करें। Cosmos 1.11.0+, Airflow रनटाइम में अलग से स्थापित dbt Fusion बाइनरी, और सबप्रोसेस आह्वान के साथ ExecutionMode.LOCAL की आवश्यकता है। तीन पार्सिंग रणनीतियों का समर्थन करता है: dbt_manifest (बड़े प्रोजेक्ट्स के लिए सबसे तेज़), dbt_ls (जटिल चयनकर्ताओं के लिए), या स्वचालित (सरल सेटअप)। वेयरहाउस कनेक्शन के ल

npx skills add https://github.com/astronomer/agents --skill cosmos-dbt-fusion

Cosmos + dbt Fusion: Implementation Checklist

Execute steps in order. This skill covers Fusion-specific constraints only.

Version note: dbt Fusion support was introduced in Cosmos 1.11.0. Requires Cosmos ≥1.11.

Reference: See reference/cosmos-config.md for ProfileConfig, operator_args, and Airflow 3 compatibility details.

Before starting, confirm: (1) dbt engine = Fusion (not Core → use cosmos-dbt-core), (2) warehouse = Snowflake, Databricks, Bigquery and Redshift only.

Fusion-Specific Constraints

ConstraintDetails
No asyncAIRFLOW_ASYNC not supported
No virtualenvFusion is a binary, not a Python package
Warehouse supportSnowflake, Databricks, Bigquery and Redshift support while in preview

1. Confirm Cosmos Version

CRITICAL: Cosmos 1.11.0 introduced dbt Fusion compatibility.

# Check installed version
pip show astronomer-cosmos

# Install/upgrade if needed
pip install "astronomer-cosmos>=1.11.0"

Validate: pip show astronomer-cosmos reports version ≥ 1.11.0


2. Install the dbt Fusion Binary (REQUIRED)

dbt Fusion is NOT bundled with Cosmos or dbt Core. Install it into the Airflow runtime/image.

Determine where to install the Fusion binary (Dockerfile / base image / runtime).

Example Dockerfile Install

USER root
RUN apt-get update && apt-get install -y curl
ENV SHELL=/bin/bash
RUN curl -fsSL https://public.cdn.getdbt.com/fs/install/install.sh | sh -s -- --update
USER astro

Common Install Paths

EnvironmentTypical path
Astro Runtime/home/astro/.local/bin/dbt
System-wide/usr/local/bin/dbt

Validate: The dbt binary exists at the chosen path and dbt --version succeeds.


3. Choose Parsing Strategy (RenderConfig)

Parsing strategy is the same as dbt Core. Pick ONE:

Load modeWhen to useRequired inputs
dbt_manifestLarge projects; fastest parsingProjectConfig.manifest_path
dbt_lsComplex selectors; need dbt-native selectionFusion binary accessible to scheduler
automaticSimple setups; let Cosmos pick(none)
from cosmos import RenderConfig, LoadMode

_render_config = RenderConfig(
    load_method=LoadMode.AUTOMATIC,  # or DBT_MANIFEST, DBT_LS
)

4. Configure Warehouse Connection (ProfileConfig)

Reference: See reference/cosmos-config.md for full ProfileConfig options and examples.

from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

5. Configure ExecutionConfig (LOCAL Only)

CRITICAL: dbt Fusion with Cosmos requires ExecutionMode.LOCAL with dbt_executable_path pointing to the Fusion binary.

from cosmos import ExecutionConfig
from cosmos.constants import InvocationMode

_execution_config = ExecutionConfig(
    invocation_mode=InvocationMode.SUBPROCESS,
    dbt_executable_path="/home/astro/.local/bin/dbt",  # REQUIRED: path to Fusion binary
    # execution_mode is LOCAL by default - do not change
)

6. Configure Project (ProjectConfig)

from cosmos import ProjectConfig

_project_config = ProjectConfig(
    dbt_project_path="/path/to/dbt/project",
    # manifest_path="/path/to/manifest.json",  # for dbt_manifest load mode
    # install_dbt_deps=False,  # if deps precomputed in CI
)

7. Assemble DAG / TaskGroup

Option A: DbtDag (Standalone)

from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime

_project_config = ProjectConfig(
    dbt_project_path="/usr/local/airflow/dbt/my_project",
)

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

_execution_config = ExecutionConfig(
    dbt_executable_path="/home/astro/.local/bin/dbt",  # Fusion binary
)

_render_config = RenderConfig()

my_fusion_dag = DbtDag(
    dag_id="my_fusion_cosmos_dag",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
    render_config=_render_config,
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
)

Option B: DbtTaskGroup (Inside Existing DAG)

from airflow.sdk import dag, task  # Airflow 3.x
# from airflow.decorators import dag, task  # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig
from pendulum import datetime

_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig(dbt_executable_path="/home/astro/.local/bin/dbt")

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
    @task
    def pre_dbt():
        return "some_value"

    dbt = DbtTaskGroup(
        group_id="dbt_fusion_project",
        project_config=_project_config,
        profile_config=_profile_config,
        execution_config=_execution_config,
    )

    @task
    def post_dbt():
        pass

    chain(pre_dbt(), dbt, post_dbt())

my_dag()

8. Final Validation

Before finalizing, verify:

  • Cosmos version: ≥1.11.0
  • Fusion binary installed: Path exists and is executable
  • Warehouse supported: Snowflake, Databricks, Bigquery or Redshift only
  • Secrets handling: Airflow connections or env vars, NOT plaintext

Troubleshooting

If user reports dbt Core regressions after enabling Fusion:

AIRFLOW__COSMOS__PRE_DBT_FUSION=1

User Must Test

  • The DAG parses in the Airflow UI (no import/parse-time errors)
  • A manual run succeeds against the target warehouse (at least one model)

Reference


Related Skills

  • cosmos-dbt-core: For dbt Core projects (not Fusion)
  • authoring-dags: General DAG authoring patterns
  • testing-dags: Testing DAGs after creation

astronomer की और Skills

airflow
astronomer
Apache Airflow DAGs, रन, टास्क और सिस्टम कॉन्फ़िगरेशन को क्वेरी, प्रबंधित और समस्या निवारण करें। DAG निरीक्षण, रन प्रबंधन, टास्क लॉगिंग, कॉन्फ़िगरेशन क्वेरी और सीधे REST API एक्सेस में 30+ कमांड का समर्थन करता है। स्थायी कॉन्फ़िगरेशन के साथ कई Airflow इंस्टेंस प्रबंधित करें; स्थानीय और Astro डिप्लॉयमेंट को स्वचालित रूप से खोजें। DAG रन को सिंक्रोनस (पूर्णता की प्रतीक्षा करें) या एसिंक्रोनस रूप से
official
airflow-hitl
astronomer
एयरफ्लो डीएजी में डिफरेबल ऑपरेटरों का उपयोग करके मानव अनुमोदन गेट, फॉर्म इनपुट और ब्रांचिंग। चार ऑपरेटर प्रकार: अनुमोदन/अस्वीकृति निर्णयों के लिए ApprovalOperator, फॉर्म के साथ बहु-विकल्प चयन के लिए HITLOperator, मानव-संचालित कार्य रूटिंग के लिए HITLBranchOperator, और फॉर्म डेटा संग्रह के लिए HITLEntryOperator। सभी ऑपरेटर डिफरेबल हैं, जो एयरफ्लो यूआई के आवश्यक कार्रवाई टैब या REST API के माध्यम
official
airflow-plugins
astronomer
Airflow 3.1+ प्लगइन्स बनाएँ जो FastAPI ऐप्स, कस्टम UI पेज, React कम्पोनेंट्स, मिडलवेयर, मैक्रोज़ और ऑपरेटर लिंक्स को सीधे Airflow UI में एम्बेड करते हैं। उपयोग करें…
official
analyzing-data
astronomer
अपने डेटा वेयरहाउस से कैश्ड पैटर्न और कॉन्सेप्ट मैपिंग के साथ व्यावसायिक प्रश्नों के उत्तर प्राप्त करें। बार-बार पूछे जाने वाले प्रश्नों के लिए पैटर्न लुकअप और कैशिंग का समर्थन करता है, जिसमें भविष्य के प्रश्नों को बेहतर बनाने के लिए परिणाम रिकॉर्डिंग शामिल है। इसमें कॉन्सेप्ट-टू-टेबल मैपिंग कैश और INFORMATION_SCHEMA या कोडबेस grep के माध्यम से टेबल स्कीमा डिस्कवरी शामिल है। विश्लेषण के लिए
official
annotating-task-lineage
astronomer
Airflow कार्यों को इनलेट और आउटलेट का उपयोग करके डेटा लाइनेज के साथ एनोटेट करें। डेटाबेस, डेटा वेयरहाउस और क्लाउड स्टोरेज में इनपुट और आउटपुट परिभाषित करने के लिए OpenLineage Dataset ऑब्जेक्ट, Airflow Assets और Airflow Datasets का समर्थन करता है। जब ऑपरेटरों में बिल्ट-इन OpenLineage एक्सट्रैक्टर न हों तो फ़ॉलबैक के रूप में उपयोग करें; चार-स्तरीय प्राथमिकता प्रणाली का पालन करता है जहाँ कस्टम एक्सट्रैक्टर और OpenLineage
official
authoring-dags
astronomer
Apache Airflow DAGs बनाने के लिए निर्देशित कार्यप्रवाह, जिसमें सत्यापन और परीक्षण एकीकरण शामिल है। संरचित छह-चरणीय दृष्टिकोण: वातावरण और मौजूदा पैटर्न की खोज करें, DAG संरचना की योजना बनाएं, सर्वोत्तम प्रथाओं का पालन करते हुए कार्यान्वित करें, af CLI कमांड से सत्यापित करें, उपयोगकर्ता की सहमति से परीक्षण करें, और सुधारों पर पुनरावृत्ति करें। खोज के लिए CLI कमांड (af config connections, af config providers, af dags list) और सत्यापन के लिए (af d
official
blueprint
astronomer
Pydantic सत्यापन के साथ पुन: प्रयोज्य Airflow कार्य समूह टेम्पलेट परिभाषित करें और YAML से DAGs संकलित करें। blueprint टेम्पलेट बनाते समय, DAGs संकलित करते समय उपयोग करें…
official
checking-freshness
astronomer
तालिका टाइमस्टैम्प और अद्यतन पैटर्न की जांच करके एक स्टेलनेस स्केल के विरुद्ध डेटा ताजगी सत्यापित करता है। सामान्य ETL नामकरण पैटर्न ( _loaded_at , _updated_at , created_at , आदि) का उपयोग करके टाइमस्टैम्प कॉलम की पहचान करता है और आयु निर्धारित करने के लिए उनके अधिकतम मानों को क्वेरी करता है। डेटा को चार ताजगी स्थितियों में वर्गीकृत करता है: ताजा (< 4 घंटे), बासी (4–24 घंटे), बहुत बासी (> 24 घंटे), य
official