cosmos-dbt-core

Chuyển đổi các dự án dbt Core thành DAGs hoặc TaskGroups của Airflow bằng Astronomer Cosmos. Hỗ trợ ba mẫu lắp ráp: DbtDag độc lập, DbtTaskGroup trong các DAG hiện có và các toán tử Cosmos riêng lẻ để kiểm soát chi tiết. Chọn từ tám chế độ thực thi (WATCHER, LOCAL, VIRTUALENV, KUBERNETES, AIRFLOW_ASYNC và các chế độ khác) dựa trên nhu cầu cách ly và hiệu suất. Cung cấp ba chiến lược phân tích cú pháp (dbt_manifest, dbt_ls, dbt_ls_file, tự động) để cân bằng giữa tốc độ và độ phức tạp của bộ chọn...

npx skills add https://github.com/astronomer/agents --skill cosmos-dbt-core

Cosmos + dbt Core: Implementation Checklist

Execute steps in order. Prefer the simplest configuration that meets the user's constraints.

Version note: This skill targets Cosmos 1.11+ and Airflow 3.x. If the user is on Airflow 2.x, adjust imports accordingly (see Appendix A).

Reference: Latest stable: https://pypi.org/project/astronomer-cosmos/

Before starting, confirm: (1) dbt engine = Core (not Fusion → use cosmos-dbt-fusion), (2) warehouse type, (3) Airflow version, (4) execution environment (Airflow env / venv / container), (5) DbtDag vs DbtTaskGroup vs individual operators, (6) manifest availability.


1. Configure Project (ProjectConfig)

ApproachWhen to useRequired param
Project pathFiles available locallydbt_project_path
Manifest onlydbt_manifest loadmanifest_path + project_name
from cosmos import ProjectConfig

_project_config = ProjectConfig(
    dbt_project_path="/path/to/dbt/project",
    # manifest_path="/path/to/manifest.json",  # for dbt_manifest load mode
    # project_name="my_project",  # if using manifest_path without dbt_project_path
    # install_dbt_deps=False,  # if deps precomputed in CI
)

2. Choose Parsing Strategy (RenderConfig)

Pick ONE load mode based on constraints:

Load modeWhen to useRequired inputsConstraints
dbt_manifestLarge projects; containerized execution; fastestProjectConfig.manifest_pathRemote manifest needs manifest_conn_id
dbt_lsComplex selectors; need dbt-native selectiondbt installed OR dbt_executable_pathCan also be used with containerized execution
dbt_ls_filedbt_ls selection without running dbt_ls every parseRenderConfig.dbt_ls_pathselect/exclude won't work
automatic (default)Simple setups; let Cosmos pick(none)Falls back: manifest → dbt_ls → custom

CRITICAL: Containerized execution (DOCKER/KUBERNETES/etc.)

from cosmos import RenderConfig, LoadMode

_render_config = RenderConfig(
    load_method=LoadMode.DBT_MANIFEST,  # or DBT_LS, DBT_LS_FILE, AUTOMATIC
)

3. Choose Execution Mode (ExecutionConfig)

Reference: See reference/cosmos-config.md for detailed configuration examples per mode.

Pick ONE execution mode:

Execution modeWhen to useSpeedRequired setup
WATCHERFastest; single dbt build visibilityFastestdbt adapter in env OR dbt_executable_path or dbt Fusion
WATCHER_KUBERNETESFastest isolated method; single dbt build visibilityFastdbt installed in container
LOCAL + DBT_RUNNERdbt + adapter in the same Python installation as AirflowFastdbt 1.5+ in requirements.txt
LOCAL + SUBPROCESSdbt + adapter available in the Airflow deployment, in an isolated Python installationMediumdbt_executable_path
AIRFLOW_ASYNCBigQuery + long-running transformsFastAirflow ≥2.8; provider deps
KUBERNETESIsolation between Airflow and dbtMediumAirflow ≥2.8; provider deps
VIRTUALENVCan't modify image; runtime venvSlowerpy_requirements in operator_args
Other containerized approachesSupport Airflow and dbt isolationMediumcontainer config
from cosmos import ExecutionConfig, ExecutionMode

_execution_config = ExecutionConfig(
    execution_mode=ExecutionMode.WATCHER,  # or LOCAL, VIRTUALENV, AIRFLOW_ASYNC, KUBERNETES, etc.
)

4. Configure Warehouse Connection (ProfileConfig)

Reference: See reference/cosmos-config.md for detailed ProfileConfig options and all ProfileMapping classes.

Option A: Airflow Connection + ProfileMapping (Recommended)

from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
        profile_args={"schema": "my_schema"},
    ),
)

Option B: Existing profiles.yml

CRITICAL: Do not hardcode secrets; use environment variables.

from cosmos import ProfileConfig

_profile_config = ProfileConfig(
    profile_name="my_profile",
    target_name="dev",
    profiles_yml_filepath="/path/to/profiles.yml",
)

5. Configure Testing Behavior (RenderConfig)

Reference: See reference/cosmos-config.md for detailed testing options.

TestBehaviorBehavior
AFTER_EACH (default)Tests run immediately after each model (default)
BUILDCombine run + test into single dbt build
AFTER_ALLAll tests after all models complete
NONESkip tests
from cosmos import RenderConfig, TestBehavior

_render_config = RenderConfig(
    test_behavior=TestBehavior.AFTER_EACH,
)

6. Configure operator_args

Reference: See reference/cosmos-config.md for detailed operator_args options.

_operator_args = {
    # BaseOperator params
    "retries": 3,

    # Cosmos-specific params
    "install_deps": False,
    "full_refresh": False,
    "quiet": True,

    # Runtime dbt vars (XCom / params)
    "vars": '{"my_var": "{{ ti.xcom_pull(task_ids=\'pre_dbt\') }}"}',
}

7. Assemble DAG / TaskGroup

Option A: DbtDag (Standalone)

from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime

_project_config = ProjectConfig(
    dbt_project_path="/usr/local/airflow/dbt/my_project",
)

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

_execution_config = ExecutionConfig()
_render_config = RenderConfig()

my_cosmos_dag = DbtDag(
    dag_id="my_cosmos_dag",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
    render_config=_render_config,
    operator_args={},
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
)

Option B: DbtTaskGroup (Inside Existing DAG)

from airflow.sdk import dag, task  # Airflow 3.x
# from airflow.decorators import dag, task  # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from pendulum import datetime

_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig()
_render_config = RenderConfig()

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
    @task
    def pre_dbt():
        return "some_value"

    dbt = DbtTaskGroup(
        group_id="dbt_project",
        project_config=_project_config,
        profile_config=_profile_config,
        execution_config=_execution_config,
        render_config=_render_config,
    )

    @task
    def post_dbt():
        pass

    chain(pre_dbt(), dbt, post_dbt())

my_dag()

Option C: Use Cosmos operators directly

import os
from datetime import datetime
from pathlib import Path
from typing import Any

from airflow import DAG

try:
    from airflow.providers.standard.operators.python import PythonOperator
except ImportError:
    from airflow.operators.python import PythonOperator

from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig
from cosmos.io import upload_to_aws_s3

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
DBT_PROJ_DIR = DBT_ROOT_PATH / "jaffle_shop"
DBT_PROFILE_PATH = DBT_PROJ_DIR / "profiles.yml"
DBT_ARTIFACT = DBT_PROJ_DIR / "target"

profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profiles_yml_filepath=DBT_PROFILE_PATH,
)


def check_s3_file(bucket_name: str, file_key: str, aws_conn_id: str = "aws_default", **context: Any) -> bool:
    """Check if a file exists in the given S3 bucket."""
    from airflow.providers.amazon.aws.hooks.s3 import S3Hook

    s3_key = f"{context['dag'].dag_id}/{context['run_id']}/seed/0/{file_key}"
    print(f"Checking if file {s3_key} exists in S3 bucket...")
    hook = S3Hook(aws_conn_id=aws_conn_id)
    return hook.check_for_key(key=s3_key, bucket_name=bucket_name)


with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag:
    seed_operator = DbtSeedLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="seed",
        dbt_cmd_flags=["--select", "raw_customers"],
        install_deps=True,
        append_env=True,
    )

    check_file_uploaded_task = PythonOperator(
        task_id="check_file_uploaded_task",
        python_callable=check_s3_file,
        op_kwargs={
            "aws_conn_id": "aws_s3_conn",
            "bucket_name": "cosmos-artifacts-upload",
            "file_key": "target/run_results.json",
        },
    )

    run_operator = DbtRunLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="run",
        dbt_cmd_flags=["--models", "stg_customers"],
        install_deps=True,
        append_env=True,
    )

    clone_operator = DbtCloneLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="clone",
        dbt_cmd_flags=["--models", "stg_customers", "--state", DBT_ARTIFACT],
        install_deps=True,
        append_env=True,
    )

    seed_operator >> run_operator >> clone_operator
    seed_operator >> check_file_uploaded_task

Setting Dependencies on Individual Cosmos Tasks

from cosmos import DbtDag, DbtResourceType
from airflow.sdk import task, chain

with DbtDag(...) as dag:
    @task
    def upstream_task():
        pass

    _upstream = upstream_task()

    for unique_id, dbt_node in dag.dbt_graph.filtered_nodes.items():
        if dbt_node.resource_type == DbtResourceType.SEED:
            my_dbt_task = dag.tasks_map[unique_id]
            chain(_upstream, my_dbt_task)

8. Safety Checks

Before finalizing, verify:

  • Execution mode matches constraints (AIRFLOW_ASYNC → BigQuery only)
  • Warehouse adapter installed for chosen execution mode
  • Secrets via Airflow connections or env vars, NOT plaintext
  • Load mode matches execution (complex selectors → dbt_ls)
  • Airflow 3 asset URIs if downstream DAGs scheduled on Cosmos assets (see Appendix A)

Appendix A: Airflow 3 Compatibility

Import Differences

Airflow 3.xAirflow 2.x
from airflow.sdk import dag, taskfrom airflow.decorators import dag, task
from airflow.sdk import chainfrom airflow.models.baseoperator import chain

Asset/Dataset URI Format Change

Cosmos ≤1.9 (Airflow 2 Datasets):

postgres://0.0.0.0:5434/postgres.public.orders

Cosmos ≥1.10 (Airflow 3 Assets):

postgres://0.0.0.0:5434/postgres/public/orders

CRITICAL: Update asset URIs when upgrading to Airflow 3.


Appendix B: Operational Extras

Caching

Cosmos caches artifacts to speed up parsing. Enabled by default.

Reference: https://astronomer.github.io/astronomer-cosmos/configuration/caching.html

Memory-Optimized Imports

AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS=True

When enabled:

from cosmos.airflow.dag import DbtDag  # instead of: from cosmos import DbtDag

Artifact Upload to Object Storage

AIRFLOW__COSMOS__REMOTE_TARGET_PATH=s3://bucket/target_dir/
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=aws_default
from cosmos.io import upload_to_cloud_storage

my_dag = DbtDag(
    # ...
    operator_args={"callback": upload_to_cloud_storage},
)

dbt Docs Hosting

Cosmos serves dbt docs in the Airflow UI. The config depends on your Airflow major version (each uses a different UI plugin system) — it is not a free single-vs-multi choice:

AirflowConfigScopeSince
2 (FAB plugin)DBT_DOCS_DIR (+ DBT_DOCS_CONN_ID, DBT_DOCS_INDEX_FILE_NAME)Single projectCosmos 1.4.0+
3.1+ (FastAPI)DBT_DOCS_PROJECTS (JSON)One or more projectsCosmos 1.11.0+

Airflow 2:

AIRFLOW__COSMOS__DBT_DOCS_DIR="path/to/docs"                   # local path or S3/GCS/Azure/HTTP URI; defaults to the dbt target/ folder
AIRFLOW__COSMOS__DBT_DOCS_CONN_ID="my_conn_id"                 # optional; for cloud storage
AIRFLOW__COSMOS__DBT_DOCS_INDEX_FILE_NAME="static_index.html"  # optional; only if docs built with --static

Airflow 3.1+:

AIRFLOW__COSMOS__DBT_DOCS_PROJECTS='{
    "my_project": {
        "dir": "s3://bucket/docs/",
        "index": "index.html",
        "conn_id": "aws_default",
        "name": "My Project"
    }
}'

Pick by Airflow version, not project count. The single-project settings are the Airflow 2 path; Cosmos publishes no deprecation notice for them — do not describe them as "legacy" or "deprecated."

Reference: https://astronomer.github.io/astronomer-cosmos/configuration/hosting-docs.html


Related Skills

  • cosmos-dbt-fusion: For dbt Fusion projects (not dbt Core)
  • authoring-dags: General DAG authoring patterns
  • testing-dags: Testing DAGs after creation

Thêm skills từ astronomer

airflow
astronomer
Truy vấn, quản lý và khắc phục sự cố DAG, lần chạy, tác vụ và cấu hình hệ thống Apache Airflow. Hỗ trợ hơn 30 lệnh bao gồm kiểm tra DAG, quản lý lần chạy, ghi nhật ký tác vụ, truy vấn cấu hình và truy cập trực tiếp REST API. Quản lý nhiều phiên bản Airflow với cấu hình liên tục; tự động phát hiện triển khai cục bộ và Astro. Kích hoạt chạy DAG đồng bộ (chờ hoàn thành) hoặc không đồng bộ, chẩn đoán lỗi, xóa lần chạy để thử lại, và truy cập nhật ký tác vụ với bộ lọc thử lại/ch
official
airflow-hitl
astronomer
Cổng phê duyệt của con người, đầu vào biểu mẫu và phân nhánh trong DAG Airflow sử dụng các toán tử có thể trì hoãn. Bốn loại toán tử: ApprovalOperator cho quyết định phê duyệt/từ chối, HITLOperator cho lựa chọn nhiều tùy chọn với biểu mẫu, HITLBranchOperator cho định tuyến tác vụ do con người điều khiển và HITLEntryOperator cho thu thập dữ liệu biểu mẫu. Tất cả các toán tử đều có thể trì hoãn, giải phóng slot worker trong khi chờ phản hồi của con người qua tab Required Actions của giao diện Airflow hoặc REST API. Hỗ trợ các tính năng tùy chọn bao gồm tùy chỉnh...
official
airflow-plugins
astronomer
Xây dựng plugin Airflow 3.1+ nhúng ứng dụng FastAPI, trang UI tùy chỉnh, thành phần React, middleware, macro và liên kết toán tử trực tiếp vào giao diện Airflow. Sử dụng…
official
analyzing-data
astronomer
Truy vấn kho dữ liệu của bạn để trả lời các câu hỏi kinh doanh với các mẫu đã lưu trong bộ nhớ đệm và ánh xạ khái niệm. Hỗ trợ tra cứu mẫu và lưu vào bộ nhớ đệm cho các loại câu hỏi lặp lại, với ghi nhận kết quả để cải thiện các truy vấn trong tương lai. Bao gồm bộ nhớ đệm ánh xạ khái niệm sang bảng và khám phá lược đồ bảng qua INFORMATION_SCHEMA hoặc tìm kiếm trong mã nguồn. Cung cấp các hàm kernel run_sql() và run_sql_pandas() trả về DataFrame Polars hoặc Pandas để phân tích. Các lệnh CLI để quản lý bộ nhớ đệm khái
official
annotating-task-lineage
astronomer
Chú thích các tác vụ Airflow với dòng dữ liệu (data lineage) bằng cách sử dụng inlets và outlets. Hỗ trợ các đối tượng Dataset của OpenLineage, Assets của Airflow và Datasets của Airflow để xác định đầu vào và đầu ra trên các cơ sở dữ liệu, kho dữ liệu và lưu trữ đám mây. Sử dụng như phương án dự phòng khi các toán tử thiếu bộ trích xuất OpenLineage tích hợp sẵn; tuân theo hệ thống ưu tiên bốn cấp, trong đó các bộ trích xuất tùy chỉnh và phương thức OpenLineage được ưu tiên. Bao gồm các trình trợ giúp đặt tên dataset cho Snowflake, BigQuery, S3 và PostgreSQL để đảm bảo tính nhất quán...
official
authoring-dags
astronomer
Quy trình làm việc có hướng dẫn để tạo DAG Apache Airflow với tích hợp xác thực và kiểm thử. Phương pháp sáu giai đoạn có cấu trúc: khám phá môi trường và các mẫu hiện có, lập kế hoạch cấu trúc DAG, triển khai theo các phương pháp tốt nhất, xác thực bằng lệnh CLI af, kiểm thử với sự đồng ý của người dùng, và lặp lại các bước sửa lỗi. Các lệnh CLI để khám phá (af config connections, af config providers, af dags list) và xác thực (af dags errors, af dags get, af dags explore) cung cấp phản hồi tức thì về DAG...
official
blueprint
astronomer
Xác định các mẫu nhóm tác vụ Airflow có thể tái sử dụng với xác thực Pydantic và soạn DAG từ YAML. Sử dụng khi tạo mẫu blueprint, soạn DAG từ…
official
checking-freshness
astronomer
Kiểm tra độ tươi mới của dữ liệu bằng cách đối chiếu dấu thời gian bảng và mẫu cập nhật với thang đo độ cũ. Xác định các cột dấu thời gian sử dụng các mẫu đặt tên ETL phổ biến (_loaded_at, _updated_at, created_at, v.v.) và truy vấn giá trị tối đa của chúng để xác định tuổi. Phân loại dữ liệu thành bốn trạng thái độ tươi mới: Tươi (< 4 giờ), Cũ (4–24 giờ), Rất cũ (> 24 giờ) hoặc Không xác định (không tìm thấy dấu thời gian). Cung cấp các mẫu SQL để kiểm tra thời gian cập nhật cuối cùng và xu hướng số lượng hàng trong những ngày gần đây để...
official