warehouse-init

作者: astronomer

初始化倉儲結構探索。生成包含所有表格元數據的 .astro/warehouse.md 以支援即時查詢。每個專案執行一次,並在結構變更時重新整理…

npx skills add https://github.com/astronomer/agents --skill warehouse-init

Initialize Warehouse Schema

Generate a comprehensive, user-editable schema reference file for the data warehouse.

Scripts: ../analyzing-data/scripts/ — All CLI commands below are relative to the analyzing-data skill's directory. Before running any scripts/cli.py command, cd to ../analyzing-data/ relative to this file.

What This Does

  1. Discovers all databases, schemas, tables, and columns from the warehouse
  2. Enriches with codebase context (dbt models, gusty SQL, schema docs)
  3. Records row counts and identifies large tables
  4. Generates .astro/warehouse.md - a version-controllable, team-shareable reference
  5. Enables instant concept→table lookups without warehouse queries

Process

Step 1: Read Warehouse Configuration

cat ~/.astro/agents/warehouse.yml

Get the list of databases to discover (e.g., databases: [HQ, ANALYTICS, RAW]).

Step 2: Search Codebase for Context (Parallel)

Launch a subagent to find business context in code:

Task(
    subagent_type="Explore",
    prompt="""
    Search for data model documentation in the codebase:

    1. dbt models: **/models/**/*.yml, **/schema.yml
       - Extract table descriptions, column descriptions
       - Note primary keys and tests

    2. Gusty/declarative SQL: **/dags/**/*.sql with YAML frontmatter
       - Parse frontmatter for: description, primary_key, tests
       - Note schema mappings

    3. AGENTS.md or CLAUDE.md files with data layer documentation

    Return a mapping of:
      table_name -> {description, primary_key, important_columns, layer}
    """
)

Step 3: Parallel Warehouse Discovery

Launch one subagent per database using the Task tool:

For each database in configured_databases:
    Task(
        subagent_type="general-purpose",
        prompt="""
        Discover all metadata for database {DATABASE}.

        Use the CLI to run SQL queries:
        # Scripts are relative to ../analyzing-data/
        uv run scripts/cli.py exec "df = run_sql('...')"
        uv run scripts/cli.py exec "print(df)"

        1. Query schemas:
           SELECT SCHEMA_NAME FROM {DATABASE}.INFORMATION_SCHEMA.SCHEMATA

        2. Query tables with row counts:
           SELECT TABLE_SCHEMA, TABLE_NAME, ROW_COUNT, COMMENT
           FROM {DATABASE}.INFORMATION_SCHEMA.TABLES
           ORDER BY TABLE_SCHEMA, TABLE_NAME

        3. For important schemas (MODEL_*, METRICS_*, MART_*), query columns:
           SELECT TABLE_NAME, COLUMN_NAME, DATA_TYPE, COMMENT
           FROM {DATABASE}.INFORMATION_SCHEMA.COLUMNS
           WHERE TABLE_SCHEMA = 'X'

        Return a structured summary:
        - Database name
        - List of schemas with table counts
        - For each table: name, row_count, key columns
        - Flag any tables with >100M rows as "large"
        """
    )

Run all subagents in parallel (single message with multiple Task calls).

Step 4: Discover Categorical Value Families

For key categorical columns (like OPERATOR, STATUS, TYPE, FEATURE), discover value families:

uv run cli.py exec "df = run_sql('''
SELECT DISTINCT column_name, COUNT(*) as occurrences
FROM table
WHERE column_name IS NOT NULL
GROUP BY column_name
ORDER BY occurrences DESC
LIMIT 50
''')"
uv run cli.py exec "print(df)"

Group related values into families by common prefix/suffix (e.g., Export* for ExportCSV, ExportJSON, ExportParquet).

Step 5: Merge Results

Combine warehouse metadata + codebase context:

  1. Quick Reference table - concept → table mappings (pre-populated from code if found)
  2. Categorical Columns - value families for key filter columns
  3. Database sections - one per database
  4. Schema subsections - tables grouped by schema
  5. Table details - columns, row counts, descriptions from code, warnings

Step 6: Generate warehouse.md

Write the file to:

  • .astro/warehouse.md (default - project-specific, version-controllable)
  • ~/.astro/agents/warehouse.md (if --global flag)

Output Format

# Warehouse Schema

> Generated by `/astronomer-data:warehouse-init` on {DATE}. Edit freely to add business context.

## Quick Reference

| Concept | Table | Key Column | Date Column |
|---------|-------|------------|-------------|
| customers | HQ.MODEL_ASTRO.ORGANIZATIONS | ORG_ID | CREATED_AT |
<!-- Add your concept mappings here -->

## Categorical Columns

When filtering on these columns, explore value families first (values often have variants):

| Table | Column | Value Families |
|-------|--------|----------------|
| {TABLE} | {COLUMN} | `{PREFIX}*` ({VALUE1}, {VALUE2}, ...) |
<!-- Populated by /astronomer-data:warehouse-init from actual warehouse data -->

## Data Layer Hierarchy

Query downstream first: `reporting` > `mart_*` > `metric_*` > `model_*` > `IN_*`

| Layer | Prefix | Purpose |
|-------|--------|---------|
| Reporting | `reporting.*` | Dashboard-optimized |
| Mart | `mart_*` | Combined analytics |
| Metric | `metric_*` | KPIs at various grains |
| Model | `model_*` | Cleansed sources of truth |
| Raw | `IN_*` | Source data - avoid |

## {DATABASE} Database

### {SCHEMA} Schema

#### {TABLE_NAME}
{DESCRIPTION from code if found}

| Column | Type | Description |
|--------|------|-------------|
| COL1 | VARCHAR | {from code or inferred} |

- **Rows:** {ROW_COUNT}
- **Key column:** {PRIMARY_KEY from code or inferred}
{IF ROW_COUNT > 100M: - **⚠️ WARNING:** Large table - always add date filters}

## Relationships

{Inferred relationships based on column names like *_ID}

Command Options

OptionEffect
/astronomer-data:warehouse-initGenerate .astro/warehouse.md
/astronomer-data:warehouse-init --refreshRegenerate, preserving user edits
/astronomer-data:warehouse-init --database HQOnly discover specific database
/astronomer-data:warehouse-init --globalWrite to ~/.astro/agents/ instead

Step 7: Pre-populate Cache

After generating warehouse.md, populate the concept cache:

# Scripts are relative to ../analyzing-data/
uv run cli.py concept import -p .astro/warehouse.md
uv run cli.py concept learn customers HQ.MART_CUST.CURRENT_ASTRO_CUSTS -k ACCT_ID

Step 8: Offer CLAUDE.md Integration (Ask User)

Ask the user:

Would you like to add the Quick Reference table to your CLAUDE.md file?

This ensures the schema mappings are always in context for data queries, improving accuracy from ~25% to ~100% for complex queries.

Options:

  1. Yes, add to CLAUDE.md (Recommended) - Append Quick Reference section
  2. No, skip - Use warehouse.md and cache only

If user chooses Yes:

  1. Check if .claude/CLAUDE.md or CLAUDE.md exists
  2. If exists, append the Quick Reference section (avoid duplicates)
  3. If not exists, create .claude/CLAUDE.md with just the Quick Reference

Quick Reference section to add:

## Data Warehouse Quick Reference

When querying the warehouse, use these table mappings:

| Concept | Table | Key Column | Date Column |
|---------|-------|------------|-------------|
{rows from warehouse.md Quick Reference}

**Large tables (always filter by date):** {list tables with >100M rows}

> Auto-generated by `/astronomer-data:warehouse-init`. Run `/astronomer-data:warehouse-init --refresh` to update.

If yes: Append the Quick Reference section to .claude/CLAUDE.md or CLAUDE.md.

After Generation

Tell the user:

Generated .astro/warehouse.md

Summary:
  - {N} databases, {N} schemas, {N} tables
  - {N} tables enriched with code descriptions
  - {N} concepts cached for instant lookup

Next steps:
  1. Edit .astro/warehouse.md to add business context
  2. Commit to version control
  3. Run /astronomer-data:warehouse-init --refresh when schema changes

Refresh Behavior

When --refresh is specified:

  1. Read existing warehouse.md
  2. Preserve all HTML comments (<!-- ... -->)
  3. Preserve Quick Reference table entries (user-added)
  4. Preserve user-added descriptions
  5. Update row counts and add new tables
  6. Mark removed tables with <!-- REMOVED --> comment

Cache Staleness & Schema Drift

The runtime cache has a 7-day TTL by default. After 7 days, cached entries expire and will be re-discovered on next use.

When to Refresh

Run /astronomer-data:warehouse-init --refresh when:

  • Schema changes: Tables added, renamed, or removed
  • Column changes: New columns added or types changed
  • After deployments: If your data pipeline deploys schema migrations
  • Weekly: As a good practice, even if no known changes

Signs of Stale Cache

Watch for these indicators:

  • Queries fail with "table not found" errors
  • Results seem wrong or outdated
  • New tables aren't being discovered

Manual Cache Reset

If you suspect cache issues:

# Scripts are relative to ../analyzing-data/
uv run scripts/cli.py cache status
uv run scripts/cli.py cache clear --stale-only
uv run scripts/cli.py cache clear

Codebase Patterns Recognized

PatternSourceWhat We Extract
**/models/**/*.ymldbttable/column descriptions, tests
**/dags/**/*.sqlgustyYAML frontmatter (description, primary_key)
AGENTS.md, CLAUDE.mddocsdata layer hierarchy, conventions
**/docs/**/*.mddocsbusiness context

Example Session

User: /astronomer-data:warehouse-init

Agent:
→ Reading warehouse configuration...
→ Found 1 warehouse with databases: HQ, PRODUCT

→ Searching codebase for data documentation...
  Found: AGENTS.md with data layer hierarchy
  Found: 45 SQL files with YAML frontmatter in dags/declarative/

→ Launching parallel warehouse discovery...
  [Database: HQ] Discovering schemas...
  [Database: PRODUCT] Discovering schemas...

→ HQ: Found 29 schemas, 401 tables
→ PRODUCT: Found 1 schema, 0 tables

→ Merging warehouse metadata with code context...
  Enriched 45 tables with descriptions from code

→ Generated .astro/warehouse.md

Summary:
  - 2 databases
  - 30 schemas
  - 401 tables
  - 45 tables enriched with code descriptions
  - 8 large tables flagged (>100M rows)

Next steps:
  1. Review .astro/warehouse.md
  2. Add concept mappings to Quick Reference
  3. Commit to version control
  4. Run /astronomer-data:warehouse-init --refresh when schema changes

來自 astronomer 的更多技能

airflow
astronomer
查詢、管理及疑難排解 Apache Airflow 的 DAG、執行、任務與系統設定。支援 30 多種指令,涵蓋 DAG 檢查、執行管理、任務日誌、設定查詢及直接 REST API 存取。可管理多個 Airflow 實例並保留設定;自動探索本機與 Astro 部署。同步(等待完成)或非同步觸發 DAG 執行、診斷失敗、清除執行以重試,並透過重試/映射索引篩選存取任務日誌。輸出...
official
airflow-hitl
astronomer
使用可延遲運算子,在 Airflow DAG 中實現人工審批關卡、表單輸入與分支流程。包含四種運算子類型:ApprovalOperator 用於核准/拒絕決策、HITLOperator 用於多選項表單選擇、HITLBranchOperator 用於人工驅動的任務路由,以及 HITLEntryOperator 用於表單資料收集。所有運算子皆為可延遲,在等待人工回應時釋放工作槽位,可透過 Airflow UI 的「必要操作」標籤或 REST API 進行回應。支援選用功能,包括自訂...
official
airflow-plugins
astronomer
構建 Airflow 3.1+ 插件,將 FastAPI 應用、自訂 UI 頁面、React 元件、中介軟體、巨集和運算子連結直接嵌入 Airflow UI。使用…
official
analyzing-data
astronomer
查詢您的資料倉儲,利用快取的模式與概念映射來回答商業問題。支援針對重複問題類型的模式查詢與快取,並記錄結果以改善未來查詢。包含概念到表格的映射快取,以及透過INFORMATION_SCHEMA或程式碼庫grep進行的表格結構探索。提供run_sql()與run_sql_pandas()核心函式,回傳Polars或Pandas DataFrame供分析使用。CLI指令可管理概念、模式與表格快取,以及...
official
annotating-task-lineage
astronomer
使用 inlets 和 outlets 為 Airflow 任務標註資料血緣。支援 OpenLineage Dataset 物件、Airflow Assets 與 Airflow Datasets,用於定義跨資料庫、資料倉儲及雲端儲存的輸入與輸出。當運算子缺乏內建 OpenLineage 提取器時,可作為備用方案;遵循四層優先級系統,其中自訂提取器與 OpenLineage 方法具有優先權。包含針對 Snowflake、BigQuery、S3 及 PostgreSQL 的資料集命名輔助工具,以確保一致性...
official
authoring-dags
astronomer
建立Apache Airflow DAG的引導式工作流程,包含驗證與測試整合。結構化六階段方法:探索環境與現有模式、規劃DAG結構、遵循最佳實踐進行實作、使用af CLI指令驗證、經使用者同意後測試,以及根據修正反覆迭代。用於探索的CLI指令(af config connections、af config providers、af dags list)與驗證指令(af dags errors、af dags get、af dags explore)可提供DAG的即時回饋。
official
blueprint
astronomer
使用 Pydantic 驗證定義可重複使用的 Airflow 任務組模板,並從 YAML 組合 DAG。適用於建立 blueprint 模板、從 YAML 組合 DAG 等場景。
official
checking-freshness
astronomer
透過檢查表格時間戳記及更新模式,並比對過時程度量表,驗證資料的新鮮度。利用常見的ETL命名模式(如 _loaded_at、_updated_at、created_at 等)識別時間戳記欄位,並查詢其最大值以判斷資料年齡。將資料分類為四種新鮮度狀態:新鮮(少於4小時)、過時(4–24小時)、非常過時(超過24小時)或未知(未找到時間戳記)。提供SQL範本,用於檢查最近幾天的上次更新時間與資料列數量趨勢。
official