warehouse-init

Инициализация обнаружения схемы хранилища. Создаёт .astro/warehouse.md со всеми метаданными таблиц для мгновенного поиска. Запускается один раз на проект, обновляется при изменении схемы…

npx skills add https://github.com/astronomer/agents --skill warehouse-init

Initialize Warehouse Schema

Generate a comprehensive, user-editable schema reference file for the data warehouse.

Scripts: ../analyzing-data/scripts/ — All CLI commands below are relative to the analyzing-data skill's directory. Before running any scripts/cli.py command, cd to ../analyzing-data/ relative to this file.

What This Does

  1. Discovers all databases, schemas, tables, and columns from the warehouse
  2. Enriches with codebase context (dbt models, gusty SQL, schema docs)
  3. Records row counts and identifies large tables
  4. Generates .astro/warehouse.md - a version-controllable, team-shareable reference
  5. Enables instant concept→table lookups without warehouse queries

Process

Step 1: Read Warehouse Configuration

cat ~/.astro/agents/warehouse.yml

Get the list of databases to discover (e.g., databases: [HQ, ANALYTICS, RAW]).

Step 2: Search Codebase for Context (Parallel)

Launch a subagent to find business context in code:

Task(
    subagent_type="Explore",
    prompt="""
    Search for data model documentation in the codebase:

    1. dbt models: **/models/**/*.yml, **/schema.yml
       - Extract table descriptions, column descriptions
       - Note primary keys and tests

    2. Gusty/declarative SQL: **/dags/**/*.sql with YAML frontmatter
       - Parse frontmatter for: description, primary_key, tests
       - Note schema mappings

    3. AGENTS.md or CLAUDE.md files with data layer documentation

    Return a mapping of:
      table_name -> {description, primary_key, important_columns, layer}
    """
)

Step 3: Parallel Warehouse Discovery

Launch one subagent per database using the Task tool:

For each database in configured_databases:
    Task(
        subagent_type="general-purpose",
        prompt="""
        Discover all metadata for database {DATABASE}.

        Use the CLI to run SQL queries:
        # Scripts are relative to ../analyzing-data/
        uv run scripts/cli.py exec "df = run_sql('...')"
        uv run scripts/cli.py exec "print(df)"

        1. Query schemas:
           SELECT SCHEMA_NAME FROM {DATABASE}.INFORMATION_SCHEMA.SCHEMATA

        2. Query tables with row counts:
           SELECT TABLE_SCHEMA, TABLE_NAME, ROW_COUNT, COMMENT
           FROM {DATABASE}.INFORMATION_SCHEMA.TABLES
           ORDER BY TABLE_SCHEMA, TABLE_NAME

        3. For important schemas (MODEL_*, METRICS_*, MART_*), query columns:
           SELECT TABLE_NAME, COLUMN_NAME, DATA_TYPE, COMMENT
           FROM {DATABASE}.INFORMATION_SCHEMA.COLUMNS
           WHERE TABLE_SCHEMA = 'X'

        Return a structured summary:
        - Database name
        - List of schemas with table counts
        - For each table: name, row_count, key columns
        - Flag any tables with >100M rows as "large"
        """
    )

Run all subagents in parallel (single message with multiple Task calls).

Step 4: Discover Categorical Value Families

For key categorical columns (like OPERATOR, STATUS, TYPE, FEATURE), discover value families:

uv run cli.py exec "df = run_sql('''
SELECT DISTINCT column_name, COUNT(*) as occurrences
FROM table
WHERE column_name IS NOT NULL
GROUP BY column_name
ORDER BY occurrences DESC
LIMIT 50
''')"
uv run cli.py exec "print(df)"

Group related values into families by common prefix/suffix (e.g., Export* for ExportCSV, ExportJSON, ExportParquet).

Step 5: Merge Results

Combine warehouse metadata + codebase context:

  1. Quick Reference table - concept → table mappings (pre-populated from code if found)
  2. Categorical Columns - value families for key filter columns
  3. Database sections - one per database
  4. Schema subsections - tables grouped by schema
  5. Table details - columns, row counts, descriptions from code, warnings

Step 6: Generate warehouse.md

Write the file to:

  • .astro/warehouse.md (default - project-specific, version-controllable)
  • ~/.astro/agents/warehouse.md (if --global flag)

Output Format

# Warehouse Schema

> Generated by `/astronomer-data:warehouse-init` on {DATE}. Edit freely to add business context.

## Quick Reference

| Concept | Table | Key Column | Date Column |
|---------|-------|------------|-------------|
| customers | HQ.MODEL_ASTRO.ORGANIZATIONS | ORG_ID | CREATED_AT |
<!-- Add your concept mappings here -->

## Categorical Columns

When filtering on these columns, explore value families first (values often have variants):

| Table | Column | Value Families |
|-------|--------|----------------|
| {TABLE} | {COLUMN} | `{PREFIX}*` ({VALUE1}, {VALUE2}, ...) |
<!-- Populated by /astronomer-data:warehouse-init from actual warehouse data -->

## Data Layer Hierarchy

Query downstream first: `reporting` > `mart_*` > `metric_*` > `model_*` > `IN_*`

| Layer | Prefix | Purpose |
|-------|--------|---------|
| Reporting | `reporting.*` | Dashboard-optimized |
| Mart | `mart_*` | Combined analytics |
| Metric | `metric_*` | KPIs at various grains |
| Model | `model_*` | Cleansed sources of truth |
| Raw | `IN_*` | Source data - avoid |

## {DATABASE} Database

### {SCHEMA} Schema

#### {TABLE_NAME}
{DESCRIPTION from code if found}

| Column | Type | Description |
|--------|------|-------------|
| COL1 | VARCHAR | {from code or inferred} |

- **Rows:** {ROW_COUNT}
- **Key column:** {PRIMARY_KEY from code or inferred}
{IF ROW_COUNT > 100M: - **⚠️ WARNING:** Large table - always add date filters}

## Relationships

{Inferred relationships based on column names like *_ID}

Command Options

OptionEffect
/astronomer-data:warehouse-initGenerate .astro/warehouse.md
/astronomer-data:warehouse-init --refreshRegenerate, preserving user edits
/astronomer-data:warehouse-init --database HQOnly discover specific database
/astronomer-data:warehouse-init --globalWrite to ~/.astro/agents/ instead

Step 7: Pre-populate Cache

After generating warehouse.md, populate the concept cache:

# Scripts are relative to ../analyzing-data/
uv run cli.py concept import -p .astro/warehouse.md
uv run cli.py concept learn customers HQ.MART_CUST.CURRENT_ASTRO_CUSTS -k ACCT_ID

Step 8: Offer CLAUDE.md Integration (Ask User)

Ask the user:

Would you like to add the Quick Reference table to your CLAUDE.md file?

This ensures the schema mappings are always in context for data queries, improving accuracy from ~25% to ~100% for complex queries.

Options:

  1. Yes, add to CLAUDE.md (Recommended) - Append Quick Reference section
  2. No, skip - Use warehouse.md and cache only

If user chooses Yes:

  1. Check if .claude/CLAUDE.md or CLAUDE.md exists
  2. If exists, append the Quick Reference section (avoid duplicates)
  3. If not exists, create .claude/CLAUDE.md with just the Quick Reference

Quick Reference section to add:

## Data Warehouse Quick Reference

When querying the warehouse, use these table mappings:

| Concept | Table | Key Column | Date Column |
|---------|-------|------------|-------------|
{rows from warehouse.md Quick Reference}

**Large tables (always filter by date):** {list tables with >100M rows}

> Auto-generated by `/astronomer-data:warehouse-init`. Run `/astronomer-data:warehouse-init --refresh` to update.

If yes: Append the Quick Reference section to .claude/CLAUDE.md or CLAUDE.md.

After Generation

Tell the user:

Generated .astro/warehouse.md

Summary:
  - {N} databases, {N} schemas, {N} tables
  - {N} tables enriched with code descriptions
  - {N} concepts cached for instant lookup

Next steps:
  1. Edit .astro/warehouse.md to add business context
  2. Commit to version control
  3. Run /astronomer-data:warehouse-init --refresh when schema changes

Refresh Behavior

When --refresh is specified:

  1. Read existing warehouse.md
  2. Preserve all HTML comments (<!-- ... -->)
  3. Preserve Quick Reference table entries (user-added)
  4. Preserve user-added descriptions
  5. Update row counts and add new tables
  6. Mark removed tables with <!-- REMOVED --> comment

Cache Staleness & Schema Drift

The runtime cache has a 7-day TTL by default. After 7 days, cached entries expire and will be re-discovered on next use.

When to Refresh

Run /astronomer-data:warehouse-init --refresh when:

  • Schema changes: Tables added, renamed, or removed
  • Column changes: New columns added or types changed
  • After deployments: If your data pipeline deploys schema migrations
  • Weekly: As a good practice, even if no known changes

Signs of Stale Cache

Watch for these indicators:

  • Queries fail with "table not found" errors
  • Results seem wrong or outdated
  • New tables aren't being discovered

Manual Cache Reset

If you suspect cache issues:

# Scripts are relative to ../analyzing-data/
uv run scripts/cli.py cache status
uv run scripts/cli.py cache clear --stale-only
uv run scripts/cli.py cache clear

Codebase Patterns Recognized

PatternSourceWhat We Extract
**/models/**/*.ymldbttable/column descriptions, tests
**/dags/**/*.sqlgustyYAML frontmatter (description, primary_key)
AGENTS.md, CLAUDE.mddocsdata layer hierarchy, conventions
**/docs/**/*.mddocsbusiness context

Example Session

User: /astronomer-data:warehouse-init

Agent:
→ Reading warehouse configuration...
→ Found 1 warehouse with databases: HQ, PRODUCT

→ Searching codebase for data documentation...
  Found: AGENTS.md with data layer hierarchy
  Found: 45 SQL files with YAML frontmatter in dags/declarative/

→ Launching parallel warehouse discovery...
  [Database: HQ] Discovering schemas...
  [Database: PRODUCT] Discovering schemas...

→ HQ: Found 29 schemas, 401 tables
→ PRODUCT: Found 1 schema, 0 tables

→ Merging warehouse metadata with code context...
  Enriched 45 tables with descriptions from code

→ Generated .astro/warehouse.md

Summary:
  - 2 databases
  - 30 schemas
  - 401 tables
  - 45 tables enriched with code descriptions
  - 8 large tables flagged (>100M rows)

Next steps:
  1. Review .astro/warehouse.md
  2. Add concept mappings to Quick Reference
  3. Commit to version control
  4. Run /astronomer-data:warehouse-init --refresh when schema changes

Больше skills от astronomer

airflow
astronomer
Запрос, управление и устранение неполадок DAG, запусков, задач и системной конфигурации Apache Airflow. Поддерживает более 30 команд для проверки DAG, управления запусками, ведения журналов задач, запросов конфигурации и прямого доступа к REST API. Управление несколькими экземплярами Airflow с постоянной конфигурацией; автоматическое обнаружение локальных и Astro развертываний. Синхронный (с ожиданием завершения) или асинхронный запуск DAG, диагностика сбоев, очистка запусков для повторного выполнения, доступ к журналам задач с фильтрацией по повторным попыткам и индексу карты. Вывод...
official
airflow-hitl
astronomer
Шлюзы утверждения человеком, ввод форм и ветвление в DAG Airflow с использованием отложенных операторов. Четыре типа операторов: ApprovalOperator для решений утвердить/отклонить, HITLOperator для выбора нескольких вариантов с формами, HITLBranchOperator для маршрутизации задач на основе решений человека и HITLEntryOperator для сбора данных из форм. Все операторы являются отложенными, освобождая слоты рабочих узлов в ожидании ответа человека через вкладку Required Actions в интерфейсе Airflow или REST API. Поддерживает дополнительные функции, включая пользовательские...
official
airflow-plugins
astronomer
Создавайте плагины для Airflow 3.1+, которые встраивают приложения FastAPI, пользовательские страницы интерфейса, компоненты React, промежуточное ПО, макросы и ссылки на операторы непосредственно в интерфейс Airflow. Используйте…
official
analyzing-data
astronomer
Запрашивайте данные из вашего хранилища данных для ответа на бизнес-вопросы с использованием кэшированных шаблонов и сопоставлений понятий. Поддерживает поиск по шаблонам и кэширование для повторяющихся типов вопросов с записью результатов для улучшения будущих запросов. Включает кэш сопоставлений понятий и таблиц, а также обнаружение схем таблиц через INFORMATION_SCHEMA или grep кодовой базы. Предоставляет функции ядра run_sql() и run_sql_pandas(), возвращающие DataFrames Polars или Pandas для анализа. Команды CLI для управления кэшами понятий, шаблонов и таблиц, а также...
official
annotating-task-lineage
astronomer
Аннотирование задач Airflow с помощью data lineage с использованием inlets и outlets. Поддерживает объекты Dataset OpenLineage, Airflow Assets и Airflow Datasets для определения входных и выходных данных в базах данных, хранилищах данных и облачных хранилищах. Используется как запасной вариант, когда операторам не хватает встроенных экстракторов OpenLineage; следует четырехуровневой системе приоритетов, где пользовательские экстракторы и методы OpenLineage имеют приоритет. Включает вспомогательные функции для именования наборов данных для Snowflake, BigQuery, S3 и PostgreSQL для обеспечения согласованности...
official
authoring-dags
astronomer
Пошаговый процесс создания DAG Apache Airflow с интеграцией валидации и тестирования. Структурированный шестифазный подход: обнаружение среды и существующих шаблонов, планирование структуры DAG, реализация с соблюдением лучших практик, валидация с помощью команд af CLI, тестирование с согласия пользователя и итеративное исправление. Команды CLI для обнаружения (af config connections, af config providers, af dags list) и валидации (af dags errors, af dags get, af dags explore) обеспечивают немедленную обратную связь по DAG...
official
blueprint
astronomer
Определяйте переиспользуемые шаблоны групп задач Airflow с валидацией Pydantic и составляйте DAG из YAML. Используйте при создании шаблонов blueprint, составлении DAG из…
official
checking-freshness
astronomer
Проверяет свежесть данных, сравнивая временные метки таблиц и шаблоны обновлений со шкалой устаревания. Определяет столбцы с временными метками, используя распространённые шаблоны именования ETL (_loaded_at, _updated_at, created_at и т.д.), и запрашивает их максимальные значения для определения возраста. Классифицирует данные по четырём статусам свежести: Свежие (< 4 часов), Устаревшие (4–24 часа), Очень устаревшие (> 24 часов) или Неизвестно (временная метка не найдена). Предоставляет SQL-шаблоны для проверки времени последнего обновления и тенденций количества строк за последние дни, чтобы...
official