warehouse-init

作者: astronomer

初始化仓库模式发现。生成包含所有表元数据的.astro/warehouse.md,用于即时查询。每个项目运行一次,当模式变更时刷新…

npx skills add https://github.com/astronomer/agents --skill warehouse-init

Initialize Warehouse Schema

Generate a comprehensive, user-editable schema reference file for the data warehouse.

Scripts: ../analyzing-data/scripts/ — All CLI commands below are relative to the analyzing-data skill's directory. Before running any scripts/cli.py command, cd to ../analyzing-data/ relative to this file.

What This Does

  1. Discovers all databases, schemas, tables, and columns from the warehouse
  2. Enriches with codebase context (dbt models, gusty SQL, schema docs)
  3. Records row counts and identifies large tables
  4. Generates .astro/warehouse.md - a version-controllable, team-shareable reference
  5. Enables instant concept→table lookups without warehouse queries

Process

Step 1: Read Warehouse Configuration

cat ~/.astro/agents/warehouse.yml

Get the list of databases to discover (e.g., databases: [HQ, ANALYTICS, RAW]).

Step 2: Search Codebase for Context (Parallel)

Launch a subagent to find business context in code:

Task(
    subagent_type="Explore",
    prompt="""
    Search for data model documentation in the codebase:

    1. dbt models: **/models/**/*.yml, **/schema.yml
       - Extract table descriptions, column descriptions
       - Note primary keys and tests

    2. Gusty/declarative SQL: **/dags/**/*.sql with YAML frontmatter
       - Parse frontmatter for: description, primary_key, tests
       - Note schema mappings

    3. AGENTS.md or CLAUDE.md files with data layer documentation

    Return a mapping of:
      table_name -> {description, primary_key, important_columns, layer}
    """
)

Step 3: Parallel Warehouse Discovery

Launch one subagent per database using the Task tool:

For each database in configured_databases:
    Task(
        subagent_type="general-purpose",
        prompt="""
        Discover all metadata for database {DATABASE}.

        Use the CLI to run SQL queries:
        # Scripts are relative to ../analyzing-data/
        uv run scripts/cli.py exec "df = run_sql('...')"
        uv run scripts/cli.py exec "print(df)"

        1. Query schemas:
           SELECT SCHEMA_NAME FROM {DATABASE}.INFORMATION_SCHEMA.SCHEMATA

        2. Query tables with row counts:
           SELECT TABLE_SCHEMA, TABLE_NAME, ROW_COUNT, COMMENT
           FROM {DATABASE}.INFORMATION_SCHEMA.TABLES
           ORDER BY TABLE_SCHEMA, TABLE_NAME

        3. For important schemas (MODEL_*, METRICS_*, MART_*), query columns:
           SELECT TABLE_NAME, COLUMN_NAME, DATA_TYPE, COMMENT
           FROM {DATABASE}.INFORMATION_SCHEMA.COLUMNS
           WHERE TABLE_SCHEMA = 'X'

        Return a structured summary:
        - Database name
        - List of schemas with table counts
        - For each table: name, row_count, key columns
        - Flag any tables with >100M rows as "large"
        """
    )

Run all subagents in parallel (single message with multiple Task calls).

Step 4: Discover Categorical Value Families

For key categorical columns (like OPERATOR, STATUS, TYPE, FEATURE), discover value families:

uv run cli.py exec "df = run_sql('''
SELECT DISTINCT column_name, COUNT(*) as occurrences
FROM table
WHERE column_name IS NOT NULL
GROUP BY column_name
ORDER BY occurrences DESC
LIMIT 50
''')"
uv run cli.py exec "print(df)"

Group related values into families by common prefix/suffix (e.g., Export* for ExportCSV, ExportJSON, ExportParquet).

Step 5: Merge Results

Combine warehouse metadata + codebase context:

  1. Quick Reference table - concept → table mappings (pre-populated from code if found)
  2. Categorical Columns - value families for key filter columns
  3. Database sections - one per database
  4. Schema subsections - tables grouped by schema
  5. Table details - columns, row counts, descriptions from code, warnings

Step 6: Generate warehouse.md

Write the file to:

  • .astro/warehouse.md (default - project-specific, version-controllable)
  • ~/.astro/agents/warehouse.md (if --global flag)

Output Format

# Warehouse Schema

> Generated by `/astronomer-data:warehouse-init` on {DATE}. Edit freely to add business context.

## Quick Reference

| Concept | Table | Key Column | Date Column |
|---------|-------|------------|-------------|
| customers | HQ.MODEL_ASTRO.ORGANIZATIONS | ORG_ID | CREATED_AT |
<!-- Add your concept mappings here -->

## Categorical Columns

When filtering on these columns, explore value families first (values often have variants):

| Table | Column | Value Families |
|-------|--------|----------------|
| {TABLE} | {COLUMN} | `{PREFIX}*` ({VALUE1}, {VALUE2}, ...) |
<!-- Populated by /astronomer-data:warehouse-init from actual warehouse data -->

## Data Layer Hierarchy

Query downstream first: `reporting` > `mart_*` > `metric_*` > `model_*` > `IN_*`

| Layer | Prefix | Purpose |
|-------|--------|---------|
| Reporting | `reporting.*` | Dashboard-optimized |
| Mart | `mart_*` | Combined analytics |
| Metric | `metric_*` | KPIs at various grains |
| Model | `model_*` | Cleansed sources of truth |
| Raw | `IN_*` | Source data - avoid |

## {DATABASE} Database

### {SCHEMA} Schema

#### {TABLE_NAME}
{DESCRIPTION from code if found}

| Column | Type | Description |
|--------|------|-------------|
| COL1 | VARCHAR | {from code or inferred} |

- **Rows:** {ROW_COUNT}
- **Key column:** {PRIMARY_KEY from code or inferred}
{IF ROW_COUNT > 100M: - **⚠️ WARNING:** Large table - always add date filters}

## Relationships

{Inferred relationships based on column names like *_ID}

Command Options

OptionEffect
/astronomer-data:warehouse-initGenerate .astro/warehouse.md
/astronomer-data:warehouse-init --refreshRegenerate, preserving user edits
/astronomer-data:warehouse-init --database HQOnly discover specific database
/astronomer-data:warehouse-init --globalWrite to ~/.astro/agents/ instead

Step 7: Pre-populate Cache

After generating warehouse.md, populate the concept cache:

# Scripts are relative to ../analyzing-data/
uv run cli.py concept import -p .astro/warehouse.md
uv run cli.py concept learn customers HQ.MART_CUST.CURRENT_ASTRO_CUSTS -k ACCT_ID

Step 8: Offer CLAUDE.md Integration (Ask User)

Ask the user:

Would you like to add the Quick Reference table to your CLAUDE.md file?

This ensures the schema mappings are always in context for data queries, improving accuracy from ~25% to ~100% for complex queries.

Options:

  1. Yes, add to CLAUDE.md (Recommended) - Append Quick Reference section
  2. No, skip - Use warehouse.md and cache only

If user chooses Yes:

  1. Check if .claude/CLAUDE.md or CLAUDE.md exists
  2. If exists, append the Quick Reference section (avoid duplicates)
  3. If not exists, create .claude/CLAUDE.md with just the Quick Reference

Quick Reference section to add:

## Data Warehouse Quick Reference

When querying the warehouse, use these table mappings:

| Concept | Table | Key Column | Date Column |
|---------|-------|------------|-------------|
{rows from warehouse.md Quick Reference}

**Large tables (always filter by date):** {list tables with >100M rows}

> Auto-generated by `/astronomer-data:warehouse-init`. Run `/astronomer-data:warehouse-init --refresh` to update.

If yes: Append the Quick Reference section to .claude/CLAUDE.md or CLAUDE.md.

After Generation

Tell the user:

Generated .astro/warehouse.md

Summary:
  - {N} databases, {N} schemas, {N} tables
  - {N} tables enriched with code descriptions
  - {N} concepts cached for instant lookup

Next steps:
  1. Edit .astro/warehouse.md to add business context
  2. Commit to version control
  3. Run /astronomer-data:warehouse-init --refresh when schema changes

Refresh Behavior

When --refresh is specified:

  1. Read existing warehouse.md
  2. Preserve all HTML comments (<!-- ... -->)
  3. Preserve Quick Reference table entries (user-added)
  4. Preserve user-added descriptions
  5. Update row counts and add new tables
  6. Mark removed tables with <!-- REMOVED --> comment

Cache Staleness & Schema Drift

The runtime cache has a 7-day TTL by default. After 7 days, cached entries expire and will be re-discovered on next use.

When to Refresh

Run /astronomer-data:warehouse-init --refresh when:

  • Schema changes: Tables added, renamed, or removed
  • Column changes: New columns added or types changed
  • After deployments: If your data pipeline deploys schema migrations
  • Weekly: As a good practice, even if no known changes

Signs of Stale Cache

Watch for these indicators:

  • Queries fail with "table not found" errors
  • Results seem wrong or outdated
  • New tables aren't being discovered

Manual Cache Reset

If you suspect cache issues:

# Scripts are relative to ../analyzing-data/
uv run scripts/cli.py cache status
uv run scripts/cli.py cache clear --stale-only
uv run scripts/cli.py cache clear

Codebase Patterns Recognized

PatternSourceWhat We Extract
**/models/**/*.ymldbttable/column descriptions, tests
**/dags/**/*.sqlgustyYAML frontmatter (description, primary_key)
AGENTS.md, CLAUDE.mddocsdata layer hierarchy, conventions
**/docs/**/*.mddocsbusiness context

Example Session

User: /astronomer-data:warehouse-init

Agent:
→ Reading warehouse configuration...
→ Found 1 warehouse with databases: HQ, PRODUCT

→ Searching codebase for data documentation...
  Found: AGENTS.md with data layer hierarchy
  Found: 45 SQL files with YAML frontmatter in dags/declarative/

→ Launching parallel warehouse discovery...
  [Database: HQ] Discovering schemas...
  [Database: PRODUCT] Discovering schemas...

→ HQ: Found 29 schemas, 401 tables
→ PRODUCT: Found 1 schema, 0 tables

→ Merging warehouse metadata with code context...
  Enriched 45 tables with descriptions from code

→ Generated .astro/warehouse.md

Summary:
  - 2 databases
  - 30 schemas
  - 401 tables
  - 45 tables enriched with code descriptions
  - 8 large tables flagged (>100M rows)

Next steps:
  1. Review .astro/warehouse.md
  2. Add concept mappings to Quick Reference
  3. Commit to version control
  4. Run /astronomer-data:warehouse-init --refresh when schema changes

来自 astronomer 的更多技能

airflow
astronomer
查询、管理和排查Apache Airflow的DAG、运行记录、任务及系统配置。支持30多种命令,涵盖DAG检查、运行管理、任务日志、配置查询及直接REST API访问。通过持久化配置管理多个Airflow实例;自动发现本地和Astro部署。同步(等待完成)或异步触发DAG运行,诊断故障,清除运行记录以重试,并通过重试/映射索引过滤访问任务日志。输出...
official
airflow-hitl
astronomer
在Airflow DAG中使用可延迟操作符实现人工审批关卡、表单输入和分支。四种操作符类型:用于批准/拒绝决策的ApprovalOperator、带表单的多选项选择HITLOperator、人工驱动的任务路由HITLBranchOperator,以及表单数据收集HITLEntryOperator。所有操作符均为可延迟设计,在通过Airflow UI的"必需操作"标签页或REST API等待人工响应时释放工作槽位。支持包括自定义在内的可选功能...
official
airflow-plugins
astronomer
构建嵌入FastAPI应用、自定义UI页面、React组件、中间件、宏和操作符链接的Airflow 3.1+插件,直接集成到Airflow UI中。使用…
official
analyzing-data
astronomer
查询数据仓库,利用缓存的模式和概念映射来回答业务问题。支持对重复问题类型进行模式查找和缓存,并通过记录结果来改进后续查询。包含概念到表的映射缓存,以及通过INFORMATION_SCHEMA或代码库grep进行表结构发现。提供run_sql()和run_sql_pandas()内核函数,返回Polars或Pandas DataFrame用于分析。提供CLI命令用于管理概念、模式和表缓存,以及...
official
annotating-task-lineage
astronomer
使用入口和出口为Airflow任务标注数据血缘。支持使用OpenLineage Dataset对象、Airflow Assets和Airflow Datasets定义跨数据库、数据仓库及云存储的输入输出。当运算符缺少内置OpenLineage提取器时作为备用方案;遵循四级优先级系统,其中自定义提取器和OpenLineage方法优先。包含针对Snowflake、BigQuery、S3和PostgreSQL的数据集命名辅助工具,以确保一致性...
official
authoring-dags
astronomer
创建Apache Airflow DAG的引导式工作流,集成验证与测试。采用六阶段结构化方法:发现环境与现有模式、规划DAG结构、遵循最佳实践实现、通过af CLI命令验证、经用户同意测试、迭代修复。用于发现(af config connections、af config providers、af dags list)和验证(af dags errors、af dags get、af dags explore)的CLI命令可提供DAG的即时反馈...
official
blueprint
astronomer
使用Pydantic验证定义可复用的Airflow任务组模板,并从YAML组合DAG。适用于创建blueprint模板、从YAML组合DAG等场景。
official
checking-freshness
astronomer
通过检查表时间戳和更新模式与陈旧度标尺对比,验证数据新鲜度。使用常见ETL命名模式(如_loaded_at、_updated_at、created_at等)识别时间戳列,并查询其最大值以确定数据时效。将数据分为四种新鲜度状态:新鲜(<4小时)、陈旧(4–24小时)、非常陈旧(>24小时)或未知(未找到时间戳)。提供SQL模板,用于检查最近几天的最后更新时间及行数变化趋势。
official