cosmos-dbt-core

tarafından astronomer

dbt Core projelerini Astronomer Cosmos kullanarak Airflow DAG'lerine veya TaskGroup'larına dönüştürür. Üç montaj desenini destekler: bağımsız DbtDag, mevcut DAG'ler içinde DbtTaskGroup ve ince ayar kontrol için bireysel Cosmos operatörleri. İzolasyon ve performans ihtiyaçlarına göre sekiz yürütme modu (WATCHER, LOCAL, VIRTUALENV, KUBERNETES, AIRFLOW_ASYNC ve diğerleri) arasından seçim yapın. Hız ve seçici karmaşıklığını dengelemek için üç ayrıştırma stratejisi (dbt_manifest, dbt_ls, dbt_ls_file, otomatik) sunar...

npx skills add https://github.com/astronomer/agents --skill cosmos-dbt-core

Cosmos + dbt Core: Implementation Checklist

Execute steps in order. Prefer the simplest configuration that meets the user's constraints.

Version note: This skill targets Cosmos 1.11+ and Airflow 3.x. If the user is on Airflow 2.x, adjust imports accordingly (see Appendix A).

Reference: Latest stable: https://pypi.org/project/astronomer-cosmos/

Before starting, confirm: (1) dbt engine = Core (not Fusion → use cosmos-dbt-fusion), (2) warehouse type, (3) Airflow version, (4) execution environment (Airflow env / venv / container), (5) DbtDag vs DbtTaskGroup vs individual operators, (6) manifest availability.


1. Configure Project (ProjectConfig)

ApproachWhen to useRequired param
Project pathFiles available locallydbt_project_path
Manifest onlydbt_manifest loadmanifest_path + project_name
from cosmos import ProjectConfig

_project_config = ProjectConfig(
    dbt_project_path="/path/to/dbt/project",
    # manifest_path="/path/to/manifest.json",  # for dbt_manifest load mode
    # project_name="my_project",  # if using manifest_path without dbt_project_path
    # install_dbt_deps=False,  # if deps precomputed in CI
)

2. Choose Parsing Strategy (RenderConfig)

Pick ONE load mode based on constraints:

Load modeWhen to useRequired inputsConstraints
dbt_manifestLarge projects; containerized execution; fastestProjectConfig.manifest_pathRemote manifest needs manifest_conn_id
dbt_lsComplex selectors; need dbt-native selectiondbt installed OR dbt_executable_pathCan also be used with containerized execution
dbt_ls_filedbt_ls selection without running dbt_ls every parseRenderConfig.dbt_ls_pathselect/exclude won't work
automatic (default)Simple setups; let Cosmos pick(none)Falls back: manifest → dbt_ls → custom

CRITICAL: Containerized execution (DOCKER/KUBERNETES/etc.)

from cosmos import RenderConfig, LoadMode

_render_config = RenderConfig(
    load_method=LoadMode.DBT_MANIFEST,  # or DBT_LS, DBT_LS_FILE, AUTOMATIC
)

3. Choose Execution Mode (ExecutionConfig)

Reference: See reference/cosmos-config.md for detailed configuration examples per mode.

Pick ONE execution mode:

Execution modeWhen to useSpeedRequired setup
WATCHERFastest; single dbt build visibilityFastestdbt adapter in env OR dbt_executable_path or dbt Fusion
WATCHER_KUBERNETESFastest isolated method; single dbt build visibilityFastdbt installed in container
LOCAL + DBT_RUNNERdbt + adapter in the same Python installation as AirflowFastdbt 1.5+ in requirements.txt
LOCAL + SUBPROCESSdbt + adapter available in the Airflow deployment, in an isolated Python installationMediumdbt_executable_path
AIRFLOW_ASYNCBigQuery + long-running transformsFastAirflow ≥2.8; provider deps
KUBERNETESIsolation between Airflow and dbtMediumAirflow ≥2.8; provider deps
VIRTUALENVCan't modify image; runtime venvSlowerpy_requirements in operator_args
Other containerized approachesSupport Airflow and dbt isolationMediumcontainer config
from cosmos import ExecutionConfig, ExecutionMode

_execution_config = ExecutionConfig(
    execution_mode=ExecutionMode.WATCHER,  # or LOCAL, VIRTUALENV, AIRFLOW_ASYNC, KUBERNETES, etc.
)

4. Configure Warehouse Connection (ProfileConfig)

Reference: See reference/cosmos-config.md for detailed ProfileConfig options and all ProfileMapping classes.

Option A: Airflow Connection + ProfileMapping (Recommended)

from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
        profile_args={"schema": "my_schema"},
    ),
)

Option B: Existing profiles.yml

CRITICAL: Do not hardcode secrets; use environment variables.

from cosmos import ProfileConfig

_profile_config = ProfileConfig(
    profile_name="my_profile",
    target_name="dev",
    profiles_yml_filepath="/path/to/profiles.yml",
)

5. Configure Testing Behavior (RenderConfig)

Reference: See reference/cosmos-config.md for detailed testing options.

TestBehaviorBehavior
AFTER_EACH (default)Tests run immediately after each model (default)
BUILDCombine run + test into single dbt build
AFTER_ALLAll tests after all models complete
NONESkip tests
from cosmos import RenderConfig, TestBehavior

_render_config = RenderConfig(
    test_behavior=TestBehavior.AFTER_EACH,
)

6. Configure operator_args

Reference: See reference/cosmos-config.md for detailed operator_args options.

_operator_args = {
    # BaseOperator params
    "retries": 3,

    # Cosmos-specific params
    "install_deps": False,
    "full_refresh": False,
    "quiet": True,

    # Runtime dbt vars (XCom / params)
    "vars": '{"my_var": "{{ ti.xcom_pull(task_ids=\'pre_dbt\') }}"}',
}

7. Assemble DAG / TaskGroup

Option A: DbtDag (Standalone)

from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime

_project_config = ProjectConfig(
    dbt_project_path="/usr/local/airflow/dbt/my_project",
)

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

_execution_config = ExecutionConfig()
_render_config = RenderConfig()

my_cosmos_dag = DbtDag(
    dag_id="my_cosmos_dag",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
    render_config=_render_config,
    operator_args={},
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
)

Option B: DbtTaskGroup (Inside Existing DAG)

from airflow.sdk import dag, task  # Airflow 3.x
# from airflow.decorators import dag, task  # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from pendulum import datetime

_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig()
_render_config = RenderConfig()

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
    @task
    def pre_dbt():
        return "some_value"

    dbt = DbtTaskGroup(
        group_id="dbt_project",
        project_config=_project_config,
        profile_config=_profile_config,
        execution_config=_execution_config,
        render_config=_render_config,
    )

    @task
    def post_dbt():
        pass

    chain(pre_dbt(), dbt, post_dbt())

my_dag()

Option C: Use Cosmos operators directly

import os
from datetime import datetime
from pathlib import Path
from typing import Any

from airflow import DAG

try:
    from airflow.providers.standard.operators.python import PythonOperator
except ImportError:
    from airflow.operators.python import PythonOperator

from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig
from cosmos.io import upload_to_aws_s3

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
DBT_PROJ_DIR = DBT_ROOT_PATH / "jaffle_shop"
DBT_PROFILE_PATH = DBT_PROJ_DIR / "profiles.yml"
DBT_ARTIFACT = DBT_PROJ_DIR / "target"

profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profiles_yml_filepath=DBT_PROFILE_PATH,
)


def check_s3_file(bucket_name: str, file_key: str, aws_conn_id: str = "aws_default", **context: Any) -> bool:
    """Check if a file exists in the given S3 bucket."""
    from airflow.providers.amazon.aws.hooks.s3 import S3Hook

    s3_key = f"{context['dag'].dag_id}/{context['run_id']}/seed/0/{file_key}"
    print(f"Checking if file {s3_key} exists in S3 bucket...")
    hook = S3Hook(aws_conn_id=aws_conn_id)
    return hook.check_for_key(key=s3_key, bucket_name=bucket_name)


with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag:
    seed_operator = DbtSeedLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="seed",
        dbt_cmd_flags=["--select", "raw_customers"],
        install_deps=True,
        append_env=True,
    )

    check_file_uploaded_task = PythonOperator(
        task_id="check_file_uploaded_task",
        python_callable=check_s3_file,
        op_kwargs={
            "aws_conn_id": "aws_s3_conn",
            "bucket_name": "cosmos-artifacts-upload",
            "file_key": "target/run_results.json",
        },
    )

    run_operator = DbtRunLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="run",
        dbt_cmd_flags=["--models", "stg_customers"],
        install_deps=True,
        append_env=True,
    )

    clone_operator = DbtCloneLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="clone",
        dbt_cmd_flags=["--models", "stg_customers", "--state", DBT_ARTIFACT],
        install_deps=True,
        append_env=True,
    )

    seed_operator >> run_operator >> clone_operator
    seed_operator >> check_file_uploaded_task

Setting Dependencies on Individual Cosmos Tasks

from cosmos import DbtDag, DbtResourceType
from airflow.sdk import task, chain

with DbtDag(...) as dag:
    @task
    def upstream_task():
        pass

    _upstream = upstream_task()

    for unique_id, dbt_node in dag.dbt_graph.filtered_nodes.items():
        if dbt_node.resource_type == DbtResourceType.SEED:
            my_dbt_task = dag.tasks_map[unique_id]
            chain(_upstream, my_dbt_task)

8. Safety Checks

Before finalizing, verify:

  • Execution mode matches constraints (AIRFLOW_ASYNC → BigQuery only)
  • Warehouse adapter installed for chosen execution mode
  • Secrets via Airflow connections or env vars, NOT plaintext
  • Load mode matches execution (complex selectors → dbt_ls)
  • Airflow 3 asset URIs if downstream DAGs scheduled on Cosmos assets (see Appendix A)

Appendix A: Airflow 3 Compatibility

Import Differences

Airflow 3.xAirflow 2.x
from airflow.sdk import dag, taskfrom airflow.decorators import dag, task
from airflow.sdk import chainfrom airflow.models.baseoperator import chain

Asset/Dataset URI Format Change

Cosmos ≤1.9 (Airflow 2 Datasets):

postgres://0.0.0.0:5434/postgres.public.orders

Cosmos ≥1.10 (Airflow 3 Assets):

postgres://0.0.0.0:5434/postgres/public/orders

CRITICAL: Update asset URIs when upgrading to Airflow 3.


Appendix B: Operational Extras

Caching

Cosmos caches artifacts to speed up parsing. Enabled by default.

Reference: https://astronomer.github.io/astronomer-cosmos/configuration/caching.html

Memory-Optimized Imports

AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS=True

When enabled:

from cosmos.airflow.dag import DbtDag  # instead of: from cosmos import DbtDag

Artifact Upload to Object Storage

AIRFLOW__COSMOS__REMOTE_TARGET_PATH=s3://bucket/target_dir/
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=aws_default
from cosmos.io import upload_to_cloud_storage

my_dag = DbtDag(
    # ...
    operator_args={"callback": upload_to_cloud_storage},
)

dbt Docs Hosting

Cosmos serves dbt docs in the Airflow UI. The config depends on your Airflow major version (each uses a different UI plugin system) — it is not a free single-vs-multi choice:

AirflowConfigScopeSince
2 (FAB plugin)DBT_DOCS_DIR (+ DBT_DOCS_CONN_ID, DBT_DOCS_INDEX_FILE_NAME)Single projectCosmos 1.4.0+
3.1+ (FastAPI)DBT_DOCS_PROJECTS (JSON)One or more projectsCosmos 1.11.0+

Airflow 2:

AIRFLOW__COSMOS__DBT_DOCS_DIR="path/to/docs"                   # local path or S3/GCS/Azure/HTTP URI; defaults to the dbt target/ folder
AIRFLOW__COSMOS__DBT_DOCS_CONN_ID="my_conn_id"                 # optional; for cloud storage
AIRFLOW__COSMOS__DBT_DOCS_INDEX_FILE_NAME="static_index.html"  # optional; only if docs built with --static

Airflow 3.1+:

AIRFLOW__COSMOS__DBT_DOCS_PROJECTS='{
    "my_project": {
        "dir": "s3://bucket/docs/",
        "index": "index.html",
        "conn_id": "aws_default",
        "name": "My Project"
    }
}'

Pick by Airflow version, not project count. The single-project settings are the Airflow 2 path; Cosmos publishes no deprecation notice for them — do not describe them as "legacy" or "deprecated."

Reference: https://astronomer.github.io/astronomer-cosmos/configuration/hosting-docs.html


Related Skills

  • cosmos-dbt-fusion: For dbt Fusion projects (not dbt Core)
  • authoring-dags: General DAG authoring patterns
  • testing-dags: Testing DAGs after creation

astronomer tarafından daha fazla skill

airflow
astronomer
Apache Airflow DAG'larını, çalıştırmalarını, görevlerini ve sistem yapılandırmasını sorgulayın, yönetin ve sorun giderin. DAG inceleme, çalıştırma yönetimi, görev günlüğü, yapılandırma sorguları ve doğrudan REST API erişimi dahil olmak üzere 30'dan fazla komutu destekler. Kalıcı yapılandırma ile birden çok Airflow örneğini yönetin; yerel ve Astro dağıtımlarını otomatik olarak keşfedin. DAG çalıştırmalarını eşzamanlı (tamamlanmayı bekleme) veya eşzamansız olarak tetikleyin, hataları teşhis edin, yeniden deneme için çalıştırm
official
airflow-hitl
astronomer
İnsan onay kapıları, form girdileri ve ertelenebilir operatörler kullanarak Airflow DAG'lerinde dallanma. Dört operatör türü: onay/red kararları için ApprovalOperator, formlarla çok seçenekli seçim için HITLOperator, insan odaklı görev yönlendirmesi için HITLBranchOperator ve form verisi toplama için HITLEntryOperator. Tüm operatörler ertelenebilir olup, Airflow UI'nin Gerekli İşlemler sekmesi veya REST API aracılığıyla insan yanıtı beklenirken işçi slotlarını serbest bırakır. Özel... dahil isteğe bağlı özellikleri destekler.
official
airflow-plugins
astronomer
Airflow 3.1+ eklentileri oluşturun; FastAPI uygulamaları, özel UI sayfaları, React bileşenleri, middleware, makrolar ve operatör bağlantılarını doğrudan Airflow arayüzüne yerleştirin. Kullanın…
official
analyzing-data
astronomer
Veri ambarınıza sorgu yaparak, önbelleğe alınmış desenler ve kavram eşlemeleriyle iş sorularını yanıtlayın. Tekrarlanan soru türleri için desen arama ve önbelleğe alma desteği sunar, gelecekteki sorguları iyileştirmek için sonuç kaydı yapar. Kavram-tablo eşleme önbelleği ve INFORMATION_SCHEMA veya kod tabanı grep aracılığıyla tablo şeması keşfi içerir. Analiz için Polars veya Pandas DataFrame'leri döndüren run_sql() ve run_sql_pandas() çekirdek fonksiyonlarını sağlar. Kavram, desen ve tablo önbelleklerini yönetmek için CLI komutları ve ayrıca...
official
annotating-task-lineage
astronomer
Airflow görevlerini, giriş ve çıkış noktalarını kullanarak veri soy ağacı ile açıklayın. Veritabanları, veri ambarları ve bulut depolama arasında girdi ve çıktıları tanımlamak için OpenLineage Dataset nesnelerini, Airflow Varlıklarını ve Airflow Veri Kümelerini destekler. Operatörlerde yerleşik OpenLineage çıkarıcılar bulunmadığında yedek olarak kullanın; özel çıkarıcıların ve OpenLineage yöntemlerinin öncelikli olduğu dört katmanlı bir öncelik sistemini izler. Snowflake, BigQuery, S3 ve PostgreSQL için tutarlı veri kümesi adlandırma yardımcıları içerir...
official
authoring-dags
astronomer
Apache Airflow DAG'ları oluşturmak için doğrulama ve test entegrasyonu içeren rehberli iş akışı. Yapılandırılmış altı aşamalı yaklaşım: ortamı ve mevcut kalıpları keşfetme, DAG yapısını planlama, en iyi uygulamaları takip ederek uygulama, af CLI komutlarıyla doğrulama, kullanıcı onayıyla test etme ve düzeltmeler üzerinde yineleme. Keşif (af config connections, af config providers, af dags list) ve doğrulama (af dags errors, af dags get, af dags explore) için CLI komutları, DAG hakkında anında geri bildirim sağlar...
official
blueprint
astronomer
Pydantic doğrulaması ile yeniden kullanılabilir Airflow görev grubu şablonları tanımlayın ve YAML’dan DAG’ler oluşturun. Blueprint şablonları oluştururken, YAML’dan DAG’ler oluştururken kullanın…
official
checking-freshness
astronomer
Tablo zaman damgalarını ve güncelleme desenlerini bir bayatlık ölçeğine göre kontrol ederek veri tazeliğini doğrular. Yaygın ETL adlandırma desenlerini (_loaded_at, _updated_at, created_at vb.) kullanarak zaman damgası sütunlarını tanımlar ve yaşı belirlemek için maksimum değerlerini sorgular. Verileri dört tazelik durumuna ayırır: Taze (< 4 saat), Bayat (4–24 saat), Çok Bayat (> 24 saat) veya Bilinmiyor (zaman damgası bulunamadı). Son güncelleme zamanını ve son günlerdeki satır sayısı eğilimlerini kontrol etmek için SQL şablonları sağlar...
official