cosmos-dbt-fusion

作者: astronomer

在Snowflake、Databricks、BigQuery或Redshift上为dbt Fusion项目配置Astronomer Cosmos,并支持本地执行。需要Cosmos 1.11.0及以上版本、在Airflow运行时中单独安装dbt Fusion二进制文件,以及使用子进程调用的ExecutionMode.LOCAL模式。支持三种解析策略:dbt_manifest(适用于大型项目,速度最快)、dbt_ls(适用于复杂选择器)或automatic(适用于简单设置)。涵盖用于仓库连接的ProfileConfig配置、用于dbt项目路径的ProjectConfig配置,以及...

npx skills add https://github.com/astronomer/agents --skill cosmos-dbt-fusion

Cosmos + dbt Fusion: Implementation Checklist

Execute steps in order. This skill covers Fusion-specific constraints only.

Version note: dbt Fusion support was introduced in Cosmos 1.11.0. Requires Cosmos ≥1.11.

Reference: See reference/cosmos-config.md for ProfileConfig, operator_args, and Airflow 3 compatibility details.

Before starting, confirm: (1) dbt engine = Fusion (not Core → use cosmos-dbt-core), (2) warehouse = Snowflake, Databricks, Bigquery and Redshift only.

Fusion-Specific Constraints

ConstraintDetails
No asyncAIRFLOW_ASYNC not supported
No virtualenvFusion is a binary, not a Python package
Warehouse supportSnowflake, Databricks, Bigquery and Redshift support while in preview

1. Confirm Cosmos Version

CRITICAL: Cosmos 1.11.0 introduced dbt Fusion compatibility.

# Check installed version
pip show astronomer-cosmos

# Install/upgrade if needed
pip install "astronomer-cosmos>=1.11.0"

Validate: pip show astronomer-cosmos reports version ≥ 1.11.0


2. Install the dbt Fusion Binary (REQUIRED)

dbt Fusion is NOT bundled with Cosmos or dbt Core. Install it into the Airflow runtime/image.

Determine where to install the Fusion binary (Dockerfile / base image / runtime).

Example Dockerfile Install

USER root
RUN apt-get update && apt-get install -y curl
ENV SHELL=/bin/bash
RUN curl -fsSL https://public.cdn.getdbt.com/fs/install/install.sh | sh -s -- --update
USER astro

Common Install Paths

EnvironmentTypical path
Astro Runtime/home/astro/.local/bin/dbt
System-wide/usr/local/bin/dbt

Validate: The dbt binary exists at the chosen path and dbt --version succeeds.


3. Choose Parsing Strategy (RenderConfig)

Parsing strategy is the same as dbt Core. Pick ONE:

Load modeWhen to useRequired inputs
dbt_manifestLarge projects; fastest parsingProjectConfig.manifest_path
dbt_lsComplex selectors; need dbt-native selectionFusion binary accessible to scheduler
automaticSimple setups; let Cosmos pick(none)
from cosmos import RenderConfig, LoadMode

_render_config = RenderConfig(
    load_method=LoadMode.AUTOMATIC,  # or DBT_MANIFEST, DBT_LS
)

4. Configure Warehouse Connection (ProfileConfig)

Reference: See reference/cosmos-config.md for full ProfileConfig options and examples.

from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

5. Configure ExecutionConfig (LOCAL Only)

CRITICAL: dbt Fusion with Cosmos requires ExecutionMode.LOCAL with dbt_executable_path pointing to the Fusion binary.

from cosmos import ExecutionConfig
from cosmos.constants import InvocationMode

_execution_config = ExecutionConfig(
    invocation_mode=InvocationMode.SUBPROCESS,
    dbt_executable_path="/home/astro/.local/bin/dbt",  # REQUIRED: path to Fusion binary
    # execution_mode is LOCAL by default - do not change
)

6. Configure Project (ProjectConfig)

from cosmos import ProjectConfig

_project_config = ProjectConfig(
    dbt_project_path="/path/to/dbt/project",
    # manifest_path="/path/to/manifest.json",  # for dbt_manifest load mode
    # install_dbt_deps=False,  # if deps precomputed in CI
)

7. Assemble DAG / TaskGroup

Option A: DbtDag (Standalone)

from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime

_project_config = ProjectConfig(
    dbt_project_path="/usr/local/airflow/dbt/my_project",
)

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

_execution_config = ExecutionConfig(
    dbt_executable_path="/home/astro/.local/bin/dbt",  # Fusion binary
)

_render_config = RenderConfig()

my_fusion_dag = DbtDag(
    dag_id="my_fusion_cosmos_dag",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
    render_config=_render_config,
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
)

Option B: DbtTaskGroup (Inside Existing DAG)

from airflow.sdk import dag, task  # Airflow 3.x
# from airflow.decorators import dag, task  # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig
from pendulum import datetime

_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig(dbt_executable_path="/home/astro/.local/bin/dbt")

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
    @task
    def pre_dbt():
        return "some_value"

    dbt = DbtTaskGroup(
        group_id="dbt_fusion_project",
        project_config=_project_config,
        profile_config=_profile_config,
        execution_config=_execution_config,
    )

    @task
    def post_dbt():
        pass

    chain(pre_dbt(), dbt, post_dbt())

my_dag()

8. Final Validation

Before finalizing, verify:

  • Cosmos version: ≥1.11.0
  • Fusion binary installed: Path exists and is executable
  • Warehouse supported: Snowflake, Databricks, Bigquery or Redshift only
  • Secrets handling: Airflow connections or env vars, NOT plaintext

Troubleshooting

If user reports dbt Core regressions after enabling Fusion:

AIRFLOW__COSMOS__PRE_DBT_FUSION=1

User Must Test

  • The DAG parses in the Airflow UI (no import/parse-time errors)
  • A manual run succeeds against the target warehouse (at least one model)

Reference


Related Skills

  • cosmos-dbt-core: For dbt Core projects (not Fusion)
  • authoring-dags: General DAG authoring patterns
  • testing-dags: Testing DAGs after creation

来自 astronomer 的更多技能

airflow
astronomer
查询、管理和排查Apache Airflow的DAG、运行记录、任务及系统配置。支持30多种命令,涵盖DAG检查、运行管理、任务日志、配置查询及直接REST API访问。通过持久化配置管理多个Airflow实例;自动发现本地和Astro部署。同步(等待完成)或异步触发DAG运行,诊断故障,清除运行记录以重试,并通过重试/映射索引过滤访问任务日志。输出...
official
airflow-hitl
astronomer
在Airflow DAG中使用可延迟操作符实现人工审批关卡、表单输入和分支。四种操作符类型:用于批准/拒绝决策的ApprovalOperator、带表单的多选项选择HITLOperator、人工驱动的任务路由HITLBranchOperator,以及表单数据收集HITLEntryOperator。所有操作符均为可延迟设计,在通过Airflow UI的"必需操作"标签页或REST API等待人工响应时释放工作槽位。支持包括自定义在内的可选功能...
official
airflow-plugins
astronomer
构建嵌入FastAPI应用、自定义UI页面、React组件、中间件、宏和操作符链接的Airflow 3.1+插件,直接集成到Airflow UI中。使用…
official
analyzing-data
astronomer
查询数据仓库,利用缓存的模式和概念映射来回答业务问题。支持对重复问题类型进行模式查找和缓存,并通过记录结果来改进后续查询。包含概念到表的映射缓存,以及通过INFORMATION_SCHEMA或代码库grep进行表结构发现。提供run_sql()和run_sql_pandas()内核函数,返回Polars或Pandas DataFrame用于分析。提供CLI命令用于管理概念、模式和表缓存,以及...
official
annotating-task-lineage
astronomer
使用入口和出口为Airflow任务标注数据血缘。支持使用OpenLineage Dataset对象、Airflow Assets和Airflow Datasets定义跨数据库、数据仓库及云存储的输入输出。当运算符缺少内置OpenLineage提取器时作为备用方案;遵循四级优先级系统,其中自定义提取器和OpenLineage方法优先。包含针对Snowflake、BigQuery、S3和PostgreSQL的数据集命名辅助工具,以确保一致性...
official
authoring-dags
astronomer
创建Apache Airflow DAG的引导式工作流,集成验证与测试。采用六阶段结构化方法:发现环境与现有模式、规划DAG结构、遵循最佳实践实现、通过af CLI命令验证、经用户同意测试、迭代修复。用于发现(af config connections、af config providers、af dags list)和验证(af dags errors、af dags get、af dags explore)的CLI命令可提供DAG的即时反馈...
official
blueprint
astronomer
使用Pydantic验证定义可复用的Airflow任务组模板,并从YAML组合DAG。适用于创建blueprint模板、从YAML组合DAG等场景。
official
checking-freshness
astronomer
通过检查表时间戳和更新模式与陈旧度标尺对比,验证数据新鲜度。使用常见ETL命名模式(如_loaded_at、_updated_at、created_at等)识别时间戳列,并查询其最大值以确定数据时效。将数据分为四种新鲜度状态:新鲜(<4小时)、陈旧(4–24小时)、非常陈旧(>24小时)或未知(未找到时间戳)。提供SQL模板,用于检查最近几天的最后更新时间及行数变化趋势。
official