cosmos-dbt-core

Convertissez les projets dbt Core en DAGs ou TaskGroups Airflow à l'aide d'Astronomer Cosmos. Prend en charge trois modèles d'assemblage : DbtDag autonome, DbtTaskGroup au sein de DAGs existants et opérateurs Cosmos individuels pour un contrôle précis. Choisissez parmi huit modes d'exécution (WATCHER, LOCAL, VIRTUALENV, KUBERNETES, AIRFLOW_ASYNC, et autres) en fonction des besoins d'isolation et de performance. Propose trois stratégies d'analyse (dbt_manifest, dbt_ls, dbt_ls_file, automatique) pour équilibrer vitesse et complexité des sélecteurs...

npx skills add https://github.com/astronomer/agents --skill cosmos-dbt-core

Cosmos + dbt Core: Implementation Checklist

Execute steps in order. Prefer the simplest configuration that meets the user's constraints.

Version note: This skill targets Cosmos 1.11+ and Airflow 3.x. If the user is on Airflow 2.x, adjust imports accordingly (see Appendix A).

Reference: Latest stable: https://pypi.org/project/astronomer-cosmos/

Before starting, confirm: (1) dbt engine = Core (not Fusion → use cosmos-dbt-fusion), (2) warehouse type, (3) Airflow version, (4) execution environment (Airflow env / venv / container), (5) DbtDag vs DbtTaskGroup vs individual operators, (6) manifest availability.


1. Configure Project (ProjectConfig)

ApproachWhen to useRequired param
Project pathFiles available locallydbt_project_path
Manifest onlydbt_manifest loadmanifest_path + project_name
from cosmos import ProjectConfig

_project_config = ProjectConfig(
    dbt_project_path="/path/to/dbt/project",
    # manifest_path="/path/to/manifest.json",  # for dbt_manifest load mode
    # project_name="my_project",  # if using manifest_path without dbt_project_path
    # install_dbt_deps=False,  # if deps precomputed in CI
)

2. Choose Parsing Strategy (RenderConfig)

Pick ONE load mode based on constraints:

Load modeWhen to useRequired inputsConstraints
dbt_manifestLarge projects; containerized execution; fastestProjectConfig.manifest_pathRemote manifest needs manifest_conn_id
dbt_lsComplex selectors; need dbt-native selectiondbt installed OR dbt_executable_pathCan also be used with containerized execution
dbt_ls_filedbt_ls selection without running dbt_ls every parseRenderConfig.dbt_ls_pathselect/exclude won't work
automatic (default)Simple setups; let Cosmos pick(none)Falls back: manifest → dbt_ls → custom

CRITICAL: Containerized execution (DOCKER/KUBERNETES/etc.)

from cosmos import RenderConfig, LoadMode

_render_config = RenderConfig(
    load_method=LoadMode.DBT_MANIFEST,  # or DBT_LS, DBT_LS_FILE, AUTOMATIC
)

3. Choose Execution Mode (ExecutionConfig)

Reference: See reference/cosmos-config.md for detailed configuration examples per mode.

Pick ONE execution mode:

Execution modeWhen to useSpeedRequired setup
WATCHERFastest; single dbt build visibilityFastestdbt adapter in env OR dbt_executable_path or dbt Fusion
WATCHER_KUBERNETESFastest isolated method; single dbt build visibilityFastdbt installed in container
LOCAL + DBT_RUNNERdbt + adapter in the same Python installation as AirflowFastdbt 1.5+ in requirements.txt
LOCAL + SUBPROCESSdbt + adapter available in the Airflow deployment, in an isolated Python installationMediumdbt_executable_path
AIRFLOW_ASYNCBigQuery + long-running transformsFastAirflow ≥2.8; provider deps
KUBERNETESIsolation between Airflow and dbtMediumAirflow ≥2.8; provider deps
VIRTUALENVCan't modify image; runtime venvSlowerpy_requirements in operator_args
Other containerized approachesSupport Airflow and dbt isolationMediumcontainer config
from cosmos import ExecutionConfig, ExecutionMode

_execution_config = ExecutionConfig(
    execution_mode=ExecutionMode.WATCHER,  # or LOCAL, VIRTUALENV, AIRFLOW_ASYNC, KUBERNETES, etc.
)

4. Configure Warehouse Connection (ProfileConfig)

Reference: See reference/cosmos-config.md for detailed ProfileConfig options and all ProfileMapping classes.

Option A: Airflow Connection + ProfileMapping (Recommended)

from cosmos import ProfileConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
        profile_args={"schema": "my_schema"},
    ),
)

Option B: Existing profiles.yml

CRITICAL: Do not hardcode secrets; use environment variables.

from cosmos import ProfileConfig

_profile_config = ProfileConfig(
    profile_name="my_profile",
    target_name="dev",
    profiles_yml_filepath="/path/to/profiles.yml",
)

5. Configure Testing Behavior (RenderConfig)

Reference: See reference/cosmos-config.md for detailed testing options.

TestBehaviorBehavior
AFTER_EACH (default)Tests run immediately after each model (default)
BUILDCombine run + test into single dbt build
AFTER_ALLAll tests after all models complete
NONESkip tests
from cosmos import RenderConfig, TestBehavior

_render_config = RenderConfig(
    test_behavior=TestBehavior.AFTER_EACH,
)

6. Configure operator_args

Reference: See reference/cosmos-config.md for detailed operator_args options.

_operator_args = {
    # BaseOperator params
    "retries": 3,

    # Cosmos-specific params
    "install_deps": False,
    "full_refresh": False,
    "quiet": True,

    # Runtime dbt vars (XCom / params)
    "vars": '{"my_var": "{{ ti.xcom_pull(task_ids=\'pre_dbt\') }}"}',
}

7. Assemble DAG / TaskGroup

Option A: DbtDag (Standalone)

from cosmos import DbtDag, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from cosmos.profiles import SnowflakeUserPasswordProfileMapping
from pendulum import datetime

_project_config = ProjectConfig(
    dbt_project_path="/usr/local/airflow/dbt/my_project",
)

_profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profile_mapping=SnowflakeUserPasswordProfileMapping(
        conn_id="snowflake_default",
    ),
)

_execution_config = ExecutionConfig()
_render_config = RenderConfig()

my_cosmos_dag = DbtDag(
    dag_id="my_cosmos_dag",
    project_config=_project_config,
    profile_config=_profile_config,
    execution_config=_execution_config,
    render_config=_render_config,
    operator_args={},
    start_date=datetime(2025, 1, 1),
    schedule="@daily",
)

Option B: DbtTaskGroup (Inside Existing DAG)

from airflow.sdk import dag, task  # Airflow 3.x
# from airflow.decorators import dag, task  # Airflow 2.x
from airflow.models.baseoperator import chain
from cosmos import DbtTaskGroup, ProjectConfig, ProfileConfig, ExecutionConfig, RenderConfig
from pendulum import datetime

_project_config = ProjectConfig(dbt_project_path="/usr/local/airflow/dbt/my_project")
_profile_config = ProfileConfig(profile_name="default", target_name="dev")
_execution_config = ExecutionConfig()
_render_config = RenderConfig()

@dag(start_date=datetime(2025, 1, 1), schedule="@daily")
def my_dag():
    @task
    def pre_dbt():
        return "some_value"

    dbt = DbtTaskGroup(
        group_id="dbt_project",
        project_config=_project_config,
        profile_config=_profile_config,
        execution_config=_execution_config,
        render_config=_render_config,
    )

    @task
    def post_dbt():
        pass

    chain(pre_dbt(), dbt, post_dbt())

my_dag()

Option C: Use Cosmos operators directly

import os
from datetime import datetime
from pathlib import Path
from typing import Any

from airflow import DAG

try:
    from airflow.providers.standard.operators.python import PythonOperator
except ImportError:
    from airflow.operators.python import PythonOperator

from cosmos import DbtCloneLocalOperator, DbtRunLocalOperator, DbtSeedLocalOperator, ProfileConfig
from cosmos.io import upload_to_aws_s3

DEFAULT_DBT_ROOT_PATH = Path(__file__).parent / "dbt"
DBT_ROOT_PATH = Path(os.getenv("DBT_ROOT_PATH", DEFAULT_DBT_ROOT_PATH))
DBT_PROJ_DIR = DBT_ROOT_PATH / "jaffle_shop"
DBT_PROFILE_PATH = DBT_PROJ_DIR / "profiles.yml"
DBT_ARTIFACT = DBT_PROJ_DIR / "target"

profile_config = ProfileConfig(
    profile_name="default",
    target_name="dev",
    profiles_yml_filepath=DBT_PROFILE_PATH,
)


def check_s3_file(bucket_name: str, file_key: str, aws_conn_id: str = "aws_default", **context: Any) -> bool:
    """Check if a file exists in the given S3 bucket."""
    from airflow.providers.amazon.aws.hooks.s3 import S3Hook

    s3_key = f"{context['dag'].dag_id}/{context['run_id']}/seed/0/{file_key}"
    print(f"Checking if file {s3_key} exists in S3 bucket...")
    hook = S3Hook(aws_conn_id=aws_conn_id)
    return hook.check_for_key(key=s3_key, bucket_name=bucket_name)


with DAG("example_operators", start_date=datetime(2024, 1, 1), catchup=False) as dag:
    seed_operator = DbtSeedLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="seed",
        dbt_cmd_flags=["--select", "raw_customers"],
        install_deps=True,
        append_env=True,
    )

    check_file_uploaded_task = PythonOperator(
        task_id="check_file_uploaded_task",
        python_callable=check_s3_file,
        op_kwargs={
            "aws_conn_id": "aws_s3_conn",
            "bucket_name": "cosmos-artifacts-upload",
            "file_key": "target/run_results.json",
        },
    )

    run_operator = DbtRunLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="run",
        dbt_cmd_flags=["--models", "stg_customers"],
        install_deps=True,
        append_env=True,
    )

    clone_operator = DbtCloneLocalOperator(
        profile_config=profile_config,
        project_dir=DBT_PROJ_DIR,
        task_id="clone",
        dbt_cmd_flags=["--models", "stg_customers", "--state", DBT_ARTIFACT],
        install_deps=True,
        append_env=True,
    )

    seed_operator >> run_operator >> clone_operator
    seed_operator >> check_file_uploaded_task

Setting Dependencies on Individual Cosmos Tasks

from cosmos import DbtDag, DbtResourceType
from airflow.sdk import task, chain

with DbtDag(...) as dag:
    @task
    def upstream_task():
        pass

    _upstream = upstream_task()

    for unique_id, dbt_node in dag.dbt_graph.filtered_nodes.items():
        if dbt_node.resource_type == DbtResourceType.SEED:
            my_dbt_task = dag.tasks_map[unique_id]
            chain(_upstream, my_dbt_task)

8. Safety Checks

Before finalizing, verify:

  • Execution mode matches constraints (AIRFLOW_ASYNC → BigQuery only)
  • Warehouse adapter installed for chosen execution mode
  • Secrets via Airflow connections or env vars, NOT plaintext
  • Load mode matches execution (complex selectors → dbt_ls)
  • Airflow 3 asset URIs if downstream DAGs scheduled on Cosmos assets (see Appendix A)

Appendix A: Airflow 3 Compatibility

Import Differences

Airflow 3.xAirflow 2.x
from airflow.sdk import dag, taskfrom airflow.decorators import dag, task
from airflow.sdk import chainfrom airflow.models.baseoperator import chain

Asset/Dataset URI Format Change

Cosmos ≤1.9 (Airflow 2 Datasets):

postgres://0.0.0.0:5434/postgres.public.orders

Cosmos ≥1.10 (Airflow 3 Assets):

postgres://0.0.0.0:5434/postgres/public/orders

CRITICAL: Update asset URIs when upgrading to Airflow 3.


Appendix B: Operational Extras

Caching

Cosmos caches artifacts to speed up parsing. Enabled by default.

Reference: https://astronomer.github.io/astronomer-cosmos/configuration/caching.html

Memory-Optimized Imports

AIRFLOW__COSMOS__ENABLE_MEMORY_OPTIMISED_IMPORTS=True

When enabled:

from cosmos.airflow.dag import DbtDag  # instead of: from cosmos import DbtDag

Artifact Upload to Object Storage

AIRFLOW__COSMOS__REMOTE_TARGET_PATH=s3://bucket/target_dir/
AIRFLOW__COSMOS__REMOTE_TARGET_PATH_CONN_ID=aws_default
from cosmos.io import upload_to_cloud_storage

my_dag = DbtDag(
    # ...
    operator_args={"callback": upload_to_cloud_storage},
)

dbt Docs Hosting

Cosmos serves dbt docs in the Airflow UI. The config depends on your Airflow major version (each uses a different UI plugin system) — it is not a free single-vs-multi choice:

AirflowConfigScopeSince
2 (FAB plugin)DBT_DOCS_DIR (+ DBT_DOCS_CONN_ID, DBT_DOCS_INDEX_FILE_NAME)Single projectCosmos 1.4.0+
3.1+ (FastAPI)DBT_DOCS_PROJECTS (JSON)One or more projectsCosmos 1.11.0+

Airflow 2:

AIRFLOW__COSMOS__DBT_DOCS_DIR="path/to/docs"                   # local path or S3/GCS/Azure/HTTP URI; defaults to the dbt target/ folder
AIRFLOW__COSMOS__DBT_DOCS_CONN_ID="my_conn_id"                 # optional; for cloud storage
AIRFLOW__COSMOS__DBT_DOCS_INDEX_FILE_NAME="static_index.html"  # optional; only if docs built with --static

Airflow 3.1+:

AIRFLOW__COSMOS__DBT_DOCS_PROJECTS='{
    "my_project": {
        "dir": "s3://bucket/docs/",
        "index": "index.html",
        "conn_id": "aws_default",
        "name": "My Project"
    }
}'

Pick by Airflow version, not project count. The single-project settings are the Airflow 2 path; Cosmos publishes no deprecation notice for them — do not describe them as "legacy" or "deprecated."

Reference: https://astronomer.github.io/astronomer-cosmos/configuration/hosting-docs.html


Related Skills

  • cosmos-dbt-fusion: For dbt Fusion projects (not dbt Core)
  • authoring-dags: General DAG authoring patterns
  • testing-dags: Testing DAGs after creation

Plus de skills de astronomer

airflow
astronomer
Interroger, gérer et dépanner les DAGs, exécutions, tâches et configurations système d'Apache Airflow. Prend en charge plus de 30 commandes pour l'inspection des DAGs, la gestion des exécutions, la journalisation des tâches, les requêtes de configuration et l'accès direct à l'API REST. Gérez plusieurs instances Airflow avec une configuration persistante ; découvrez automatiquement les déploiements locaux et Astro. Déclenchez des exécutions de DAG de manière synchrone (attente de fin) ou asynchrone, diagnostiquez les échecs, effacez les exécutions pour réessayer, et accédez aux journaux de tâches avec filtrage par tentative et index de carte. Sortie...
official
airflow-hitl
astronomer
Portes d'approbation humaine, entrées de formulaire et branchement dans les DAG Airflow à l'aide d'opérateurs différés. Quatre types d'opérateurs : ApprovalOperator pour les décisions d'approbation/rejet, HITLOperator pour la sélection multi-options avec formulaires, HITLBranchOperator pour le routage des tâches piloté par l'humain, et HITLEntryOperator pour la collecte de données de formulaire. Tous les opérateurs sont différés, libérant les emplacements de travail en attendant une réponse humaine via l'onglet Actions requises de l'interface utilisateur Airflow ou l'API REST. Prend en charge des fonctionnalités optionnelles, y compris personnalisées...
official
airflow-plugins
astronomer
Créez des plugins Airflow 3.1+ qui intègrent des applications FastAPI, des pages d'interface utilisateur personnalisées, des composants React, des intergiciels, des macros et des liens d'opérateur directement dans l'interface utilisateur d'Airflow. Utilisez…
official
analyzing-data
astronomer
Interrogez votre entrepôt de données pour répondre à des questions métier à l'aide de motifs mis en cache et de correspondances de concepts. Prend en charge la recherche de motifs et la mise en cache pour les types de questions récurrentes, avec enregistrement des résultats pour améliorer les requêtes futures. Inclut un cache de correspondance concept-table et la découverte de schémas de tables via INFORMATION_SCHEMA ou grep du code source. Fournit les fonctions noyau run_sql() et run_sql_pandas() renvoyant des DataFrames Polars ou Pandas pour l'analyse. Commandes CLI pour gérer les caches de concepts, motifs et tables, plus...
official
annotating-task-lineage
astronomer
Annotez les tâches Airflow avec la traçabilité des données à l'aide d'inlets et d'outlets. Prend en charge les objets Dataset OpenLineage, les Assets Airflow et les Datasets Airflow pour définir les entrées et sorties entre bases de données, entrepôts de données et stockage cloud. Utilisez-le comme solution de repli lorsque les opérateurs ne disposent pas d'extracteurs OpenLineage intégrés ; suit un système de priorité à quatre niveaux où les extracteurs personnalisés et les méthodes OpenLineage ont la priorité. Inclut des assistants de nommage de datasets pour Snowflake, BigQuery, S3 et PostgreSQL afin de garantir une cohérence...
official
authoring-dags
astronomer
Workflow guidé pour créer des DAGs Apache Airflow avec validation et intégration de tests. Approche structurée en six phases : découvrir l'environnement et les modèles existants, planifier la structure du DAG, implémenter en suivant les bonnes pratiques, valider avec les commandes CLI af, tester avec le consentement de l'utilisateur, et itérer sur les correctifs. Les commandes CLI pour la découverte (af config connections, af config providers, af dags list) et la validation (af dags errors, af dags get, af dags explore) fournissent un retour immédiat sur le DAG...
official
blueprint
astronomer
Définir des modèles réutilisables de groupes de tâches Airflow avec validation Pydantic et composer des DAGs à partir de YAML. Utiliser lors de la création de modèles blueprint, de la composition de DAGs à partir de…
official
checking-freshness
astronomer
Vérifier la fraîcheur des données en consultant les horodatages des tables et les modèles de mise à jour par rapport à une échelle d'obsolescence. Identifie les colonnes d'horodatage à l'aide de modèles de nommage ETL courants (_loaded_at, _updated_at, created_at, etc.) et interroge leurs valeurs maximales pour déterminer l'âge. Classe les données en quatre statuts de fraîcheur : Fraîches (< 4 heures), Obsolètes (4–24 heures), Très obsolètes (> 24 heures) ou Inconnues (aucun horodatage trouvé). Fournit des modèles SQL pour vérifier l'heure de la dernière mise à jour et les tendances du nombre de lignes sur les derniers jours afin de...
official