cosmos-dbt-core

โดย astronomer

แปลงโปรเจกต์ dbt Core เป็น Airflow DAGs หรือ TaskGroups โดยใช้ Astronomer Cosmos รองรับรูปแบบการประกอบสามแบบ: DbtDag แบบสแตนด์อโลน, DbtTaskGroup ภายใน DAG ที่มีอยู่ และตัวดำเนินการ Cosmos แต่ละตัวเพื่อการควบคุมแบบละเอียด เลือกจากโหมดการทำงานแปดโหมด (WATCHER, LOCAL, VIRTUALENV, KUBERNETES, AIRFLOW_ASYNC และอื่นๆ) ตามความต้องการด้านการแยกส่วนและประสิทธิภาพ มีกลยุทธ์การแยกวิเคราะห์สามแบบ (dbt_manifest, dbt_ls, dbt_ls_file, automatic) เพื่อปรับสมดุลความเร็วและความซับซ้อนของตัวเลือก...

npx skills add https://github.com/astronomer/agents --skill cosmos-dbt-core

Cosmos + dbt Core: Implementation Checklist

Execute steps in order. Prefer the simplest configuration that meets the user's constraints.

Version note: This skill targets Cosmos 1.11+ and Airflow 3.x. If the user is on Airflow 2.x, adjust imports accordingly (see Appendix A).

Reference: Latest stable: https://pypi.org/project/astronomer-cosmos/

Before starting, confirm: (1) dbt engine = Core (not Fusion → use cosmos-dbt-fusion), (2) warehouse type, (3) Airflow version, (4) execution environment (Airflow env / venv / container), (5) DbtDag vs DbtTaskGroup vs individual operators, (6) manifest availability.


1. Configure Project (ProjectConfig)

ApproachWhen to useRequired param
Project pathFiles available locallydbt_project_path
Manifest onlydbt_manifest loadmanifest_path + project_name
from cosmos import ProjectConfig

_project_config = ProjectConfig(
    dbt_project_path="/path/to/dbt/project",
    # manifest_path="/path/to/manifest.json",  # for dbt_manifest load mode
    # project_name="my_project",  # if using manifest_path without dbt_project_path
    # install_dbt_deps=False,  # if deps precomputed in CI
)

2. Choose Parsing Strategy (RenderConfig)

Pick ONE load mode based on constraints:

Load modeWhen to useRequired inputsConstraints
dbt_manifestLarge projects; containerized execution; fastestProjectConfig.manifest_pathRemote manifest needs manifest_conn_id
dbt_lsComplex selectors; need dbt-native selectiondbt installed OR dbt_executable_pathCan also be used with containerized execution
dbt_ls_filedbt_ls selection without running dbt_ls every parseRenderConfig.dbt_ls_pathselect/exclude won't work
automatic (default)Simple setups; let Cosmos pick(none)Falls back: manifest → dbt_ls → custom

CRITICAL: Containerized execution (DOCKER/KUBERNETES/etc.)

from cosmos import RenderConfig, LoadMode

_render_config = RenderConfig(
    load_method=LoadMode.DBT_MANIFEST,  # or DBT_LS, DBT_LS_FILE, AUTOMATIC
)

3. Choose Execution Mode (ExecutionConfig)

Reference: See reference/cosmos-config.md for detailed configuration examples per mode.

Pick ONE execution mode:

Execution modeWhen to useSpeedRequired setup
WATCHERFastest; single dbt build visibilityFastestdbt adapter in env OR dbt_executable_path or dbt Fusion
WATCHER_KUBERNETESFastest isolated method; single dbt build visibilityFastdbt installed in container
LOCAL + DBT_RUNNERdbt + adapter in the same Python installation as AirflowFastdbt 1.5+ in requirements.txt
LOCAL + SUBPROCESSdbt + adapter available in the Airflow deployment, in an isolated Python installationMediumdbt_executable_path
AIRFLOW_ASYNCBigQuery + long-running transformsFastAirflow ≥2.8; provider deps
KUBERNETESIsolation between Airflow and dbtMediumAirflow ≥2.8; provider deps
VIRTUALENVCan't modify image; runtime venvSlowerpy_requirements in operator_args
Other containerized approachesSupport Airflow and dbt isolationMediumcontainer config
from cosmos import ExecutionConfig, ExecutionMode

_execution_config = ExecutionConfig(
    execution_mode=ExecutionMode.WATCHER,  # or LOCAL, VIRTUALENV, AIRFLOW_ASYNC, KUBERNETES, etc.
)

4. Configure Warehouse Connection (ProfileConfig)

Reference: See reference/cosmos-config.md for detailed ProfileConfig options and all ProfileMapping classes.

Option A: Airflow Connection + ProfileMapping (Recommended)

from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
        profile_args={"schema": "my_schema"},
    ),
)

Option B: Existing profiles.yml

CRITICAL: Do not hardcode secrets; use environment variables.

from cosmos import ProfileConfig

_profile_config = ProfileConfig(
    profile_name="my_profile",
    target_name="dev",
    profiles_yml_filepath="/path/to/profiles.yml",
)

5. Configure Testing Behavior (RenderConfig)

Reference: See reference/cosmos-config.md for detailed testing options.

TestBehaviorBehavior
AFTER_EACH (default)Tests run immediately after each model (default)
BUILDCombine run + test into single dbt build
AFTER_ALLAll tests after all models complete
NONESkip tests
from cosmos import RenderConfig, TestBehavior

_render_config = RenderConfig(
    test_behavior=TestBehavior.AFTER_EACH,
)

6. Configure operator_args

Reference: See reference/cosmos-config.md for detailed operator_args options.

_operator_args = {
    # BaseOperator params
    "retries": 3,

    # Cosmos-specific params
    "install_deps": False,
    "full_refresh": False,
    "quiet": True,

    # Runtime dbt vars (XCom / params)
    "vars": '{"my_var": "{{ ti.xcom_pull(task_ids=\'pre_dbt\') }}"}',
}

7. Assemble DAG / TaskGroup

Option A: DbtDag (Standalone)

from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime

_project_config = ProjectConfig(
    dbt_project_path="/usr/local/airflow/dbt/my_project",
)

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

_execution_config = ExecutionConfig()
_render_config = RenderConfig()

my_cosmos_dag = DbtDag(
    dag_id="my_cosmos_dag",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
    render_config=_render_config,
    operator_args={},
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
)

Option B: DbtTaskGroup (Inside Existing DAG)

from airflow.sdk import dag, task  # Airflow 3.x
# from airflow.decorators import dag, task  # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from pendulum import datetime

_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig()
_render_config = RenderConfig()

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
    @task
    def pre_dbt():
        return "some_value"

    dbt = DbtTaskGroup(
        group_id="dbt_project",
        project_config=_project_config,
        profile_config=_profile_config,
        execution_config=_execution_config,
        render_config=_render_config,
    )

    @task
    def post_dbt():
        pass

    chain(pre_dbt(), dbt, post_dbt())

my_dag()

Option C: Use Cosmos operators directly

import os
from datetime import datetime
from pathlib import Path
from typing import Any

from airflow import DAG

try:
    from airflow.providers.standard.operators.python import PythonOperator
except ImportError:
    from airflow.operators.python import PythonOperator

from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig
from cosmos.io import upload_to_aws_s3

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
DBT_PROJ_DIR = DBT_ROOT_PATH / "jaffle_shop"
DBT_PROFILE_PATH = DBT_PROJ_DIR / "profiles.yml"
DBT_ARTIFACT = DBT_PROJ_DIR / "target"

profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profiles_yml_filepath=DBT_PROFILE_PATH,
)


def check_s3_file(bucket_name: str, file_key: str, aws_conn_id: str = "aws_default", **context: Any) -> bool:
    """Check if a file exists in the given S3 bucket."""
    from airflow.providers.amazon.aws.hooks.s3 import S3Hook

    s3_key = f"{context['dag'].dag_id}/{context['run_id']}/seed/0/{file_key}"
    print(f"Checking if file {s3_key} exists in S3 bucket...")
    hook = S3Hook(aws_conn_id=aws_conn_id)
    return hook.check_for_key(key=s3_key, bucket_name=bucket_name)


with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag:
    seed_operator = DbtSeedLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="seed",
        dbt_cmd_flags=["--select", "raw_customers"],
        install_deps=True,
        append_env=True,
    )

    check_file_uploaded_task = PythonOperator(
        task_id="check_file_uploaded_task",
        python_callable=check_s3_file,
        op_kwargs={
            "aws_conn_id": "aws_s3_conn",
            "bucket_name": "cosmos-artifacts-upload",
            "file_key": "target/run_results.json",
        },
    )

    run_operator = DbtRunLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="run",
        dbt_cmd_flags=["--models", "stg_customers"],
        install_deps=True,
        append_env=True,
    )

    clone_operator = DbtCloneLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="clone",
        dbt_cmd_flags=["--models", "stg_customers", "--state", DBT_ARTIFACT],
        install_deps=True,
        append_env=True,
    )

    seed_operator >> run_operator >> clone_operator
    seed_operator >> check_file_uploaded_task

Setting Dependencies on Individual Cosmos Tasks

from cosmos import DbtDag, DbtResourceType
from airflow.sdk import task, chain

with DbtDag(...) as dag:
    @task
    def upstream_task():
        pass

    _upstream = upstream_task()

    for unique_id, dbt_node in dag.dbt_graph.filtered_nodes.items():
        if dbt_node.resource_type == DbtResourceType.SEED:
            my_dbt_task = dag.tasks_map[unique_id]
            chain(_upstream, my_dbt_task)

8. Safety Checks

Before finalizing, verify:

  • Execution mode matches constraints (AIRFLOW_ASYNC → BigQuery only)
  • Warehouse adapter installed for chosen execution mode
  • Secrets via Airflow connections or env vars, NOT plaintext
  • Load mode matches execution (complex selectors → dbt_ls)
  • Airflow 3 asset URIs if downstream DAGs scheduled on Cosmos assets (see Appendix A)

Appendix A: Airflow 3 Compatibility

Import Differences

Airflow 3.xAirflow 2.x
from airflow.sdk import dag, taskfrom airflow.decorators import dag, task
from airflow.sdk import chainfrom airflow.models.baseoperator import chain

Asset/Dataset URI Format Change

Cosmos ≤1.9 (Airflow 2 Datasets):

postgres://0.0.0.0:5434/postgres.public.orders

Cosmos ≥1.10 (Airflow 3 Assets):

postgres://0.0.0.0:5434/postgres/public/orders

CRITICAL: Update asset URIs when upgrading to Airflow 3.


Appendix B: Operational Extras

Caching

Cosmos caches artifacts to speed up parsing. Enabled by default.

Reference: https://astronomer.github.io/astronomer-cosmos/configuration/caching.html

Memory-Optimized Imports

AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS=True

When enabled:

from cosmos.airflow.dag import DbtDag  # instead of: from cosmos import DbtDag

Artifact Upload to Object Storage

AIRFLOW__COSMOS__REMOTE_TARGET_PATH=s3://bucket/target_dir/
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=aws_default
from cosmos.io import upload_to_cloud_storage

my_dag = DbtDag(
    # ...
    operator_args={"callback": upload_to_cloud_storage},
)

dbt Docs Hosting

Cosmos serves dbt docs in the Airflow UI. The config depends on your Airflow major version (each uses a different UI plugin system) — it is not a free single-vs-multi choice:

AirflowConfigScopeSince
2 (FAB plugin)DBT_DOCS_DIR (+ DBT_DOCS_CONN_ID, DBT_DOCS_INDEX_FILE_NAME)Single projectCosmos 1.4.0+
3.1+ (FastAPI)DBT_DOCS_PROJECTS (JSON)One or more projectsCosmos 1.11.0+

Airflow 2:

AIRFLOW__COSMOS__DBT_DOCS_DIR="path/to/docs"                   # local path or S3/GCS/Azure/HTTP URI; defaults to the dbt target/ folder
AIRFLOW__COSMOS__DBT_DOCS_CONN_ID="my_conn_id"                 # optional; for cloud storage
AIRFLOW__COSMOS__DBT_DOCS_INDEX_FILE_NAME="static_index.html"  # optional; only if docs built with --static

Airflow 3.1+:

AIRFLOW__COSMOS__DBT_DOCS_PROJECTS='{
    "my_project": {
        "dir": "s3://bucket/docs/",
        "index": "index.html",
        "conn_id": "aws_default",
        "name": "My Project"
    }
}'

Pick by Airflow version, not project count. The single-project settings are the Airflow 2 path; Cosmos publishes no deprecation notice for them — do not describe them as "legacy" or "deprecated."

Reference: https://astronomer.github.io/astronomer-cosmos/configuration/hosting-docs.html


Related Skills

  • cosmos-dbt-fusion: For dbt Fusion projects (not dbt Core)
  • authoring-dags: General DAG authoring patterns
  • testing-dags: Testing DAGs after creation

Skills เพิ่มเติมจาก astronomer

airflow
astronomer
สอบถาม จัดการ และแก้ไขปัญหา DAGs, การรัน, งาน และการกำหนดค่าระบบของ Apache Airflow รองรับคำสั่งมากกว่า 30 คำสั่งสำหรับการตรวจสอบ DAG, การจัดการการรัน, การบันทึกงาน, การสอบถามการกำหนดค่า และการเข้าถึง REST API โดยตรง จัดการอินสแตนซ์ Airflow หลายตัวพร้อมการกำหนดค่าถาวร ค้นหาการปรับใช้ในเครื่องและ Astro โดยอัตโนมัติ เรียกใช้ DAG แบบซิงโครนัส (รอให้เสร็จ) หรือแบบอะซิงโครนัส วินิจฉัยข้อผิดพลาด ล้างการรันเพื่อลองใหม่ และเข้าถึงบันทึกงานพร้อมการกรอง retry/map-index ผลลัพธ์...
official
airflow-hitl
astronomer
ประตูการอนุมัติของมนุษย์, การป้อนข้อมูลฟอร์ม, และการแตกกิ่งใน Airflow DAGs โดยใช้ตัวดำเนินการที่สามารถเลื่อนได้ ตัวดำเนินการสี่ประเภท: ApprovalOperator สำหรับการตัดสินใจอนุมัติ/ปฏิเสธ, HITLOperator สำหรับการเลือกหลายตัวเลือกพร้อมฟอร์ม, HITLBranchOperator สำหรับการกำหนดเส้นทางงานที่ขับเคลื่อนโดยมนุษย์, และ HITLEntryOperator สำหรับการรวบรวมข้อมูลฟอร์ม ตัวดำเนินการทั้งหมดสามารถเลื่อนได้ โดยปล่อยช่อง worker ขณะรอการตอบสนองจากมนุษย์ผ่านแท็บ Required Actions ของ Airflow UI หรือ REST API รองรับคุณสมบัติเสริมรวมถึงแบบกำหนดเอง...
official
airflow-plugins
astronomer
สร้างปลั๊กอิน Airflow 3.1+ ที่ฝังแอป FastAPI, หน้า UI แบบกำหนดเอง, คอมโพเนนต์ React, มิดเดิลแวร์, มาโคร และลิงก์โอเปอเรเตอร์ลงใน UI ของ Airflow โดยตรง ใช้…
official
analyzing-data
astronomer
สอบถามคลังข้อมูลของคุณเพื่อตอบคำถามทางธุรกิจด้วยรูปแบบที่แคชไว้และการแมปแนวคิด รองรับการค้นหารูปแบบและการแคชสำหรับประเภทคำถามที่เกิดซ้ำ พร้อมบันทึกผลลัพธ์เพื่อปรับปรุงการสอบถามในอนาคต รวมถึงแคชการแมปแนวคิดไปยังตารางและการค้นพบสคีมาตารางผ่าน INFORMATION_SCHEMA หรือการ grep โค้ดเบส มีฟังก์ชันเคอร์เนล run_sql() และ run_sql_pandas() ที่ส่งคืน Polars หรือ Pandas DataFrames สำหรับการวิเคราะห์ คำสั่ง CLI สำหรับจัดการแคชแนวคิด รูปแบบ และตาราง รวมถึง...
official
annotating-task-lineage
astronomer
ใส่คำอธิบาย Airflow tasks ด้วย data lineage โดยใช้ inlets และ outlets รองรับ OpenLineage Dataset objects, Airflow Assets และ Airflow Datasets สำหรับกำหนด inputs และ outputs ครอบคลุมฐานข้อมูล, data warehouses และ cloud storage ใช้เป็นทางเลือกสำรองเมื่อ operators ไม่มี OpenLineage extractors ในตัว; ทำงานตามระบบลำดับความสำคัญสี่ระดับที่ custom extractors และ OpenLineage methods มีสิทธิ์優先 รวมถึงตัวช่วยตั้งชื่อ dataset สำหรับ Snowflake, BigQuery, S3 และ PostgreSQL เพื่อให้มั่นใจถึงความสอดคล้อง...
official
authoring-dags
astronomer
เวิร์กโฟลว์แบบมีคำแนะนำสำหรับสร้าง Apache Airflow DAGs พร้อมการตรวจสอบความถูกต้องและการผสานการทดสอบ แนวทางแบบหกขั้นตอน: ค้นพบสภาพแวดล้อมและรูปแบบที่มีอยู่ วางแผนโครงสร้าง DAG ดำเนินการตามแนวทางปฏิบัติที่ดีที่สุด ตรวจสอบความถูกต้องด้วยคำสั่ง CLI ของ af ทดสอบโดยได้รับความยินยอมจากผู้ใช้ และปรับปรุงแก้ไขซ้ำ คำสั่ง CLI สำหรับการค้นพบ (af config connections, af config providers, af dags list) และการตรวจสอบความถูกต้อง (af dags errors, af dags get, af dags explore) ให้ข้อเสนอแนะทันทีเกี่ยวกับ DAG...
official
blueprint
astronomer
กำหนดเทมเพลตกลุ่มงาน Airflow ที่ใช้ซ้ำได้พร้อมการตรวจสอบความถูกต้องด้วย Pydantic และประกอบ DAG จาก YAML ใช้เมื่อสร้างเทมเพลต blueprint หรือประกอบ DAG จาก…
official
checking-freshness
astronomer
ตรวจสอบความสดใหม่ของข้อมูลโดยการตรวจสอบเวลาปรับปรุงของตารางและรูปแบบการอัปเดตเทียบกับระดับความเก่า ระบุคอลัมน์เวลาปรับปรุงโดยใช้รูปแบบการตั้งชื่อ ETL ทั่วไป (เช่น _loaded_at, _updated_at, created_at) และสอบถามค่าสูงสุดเพื่อกำหนดอายุ จัดประเภทข้อมูลเป็นสถานะความสดใหม่สี่สถานะ: สดใหม่ (น้อยกว่า 4 ชั่วโมง), เก่า (4–24 ชั่วโมง), เก่ามาก (มากกว่า 24 ชั่วโมง) หรือไม่ทราบ (ไม่พบเวลาปรับปรุง) มีเทมเพลต SQL สำหรับตรวจสอบเวลาปรับปรุงล่าสุดและแนวโน้มจำนวนแถวในช่วงไม่กี่วันที่ผ่านมาเพื่อ...
official