analyzing-data

作成者: astronomer

データウェアハウスにクエリを実行し、キャッシュされたパターンと概念マッピングを使用してビジネス上の質問に回答します。繰り返し発生する質問タイプのパターン検索とキャッシュをサポートし、結果を記録して将来のクエリを改善します。概念からテーブルへのマッピングキャッシュと、INFORMATION_SCHEMAまたはコードベースのgrepによるテーブルスキーマ検出を含みます。分析用にPolarsまたはPandas DataFrameを返すrun_sql()およびrun_sql_pandas()カーネル関数を提供します。概念、パターン、テーブルキャッシュを管理するCLIコマンド、さらに...

npx skills add https://github.com/astronomer/agents --skill analyzing-data

Data Analysis

Answer business questions by querying the data warehouse. The kernel auto-starts on first exec call.

All CLI commands below are relative to this skill's directory. Before running any scripts/cli.py command, cd to the directory containing this file.

Workflow

  1. Pattern lookup — Check for a cached query strategy:

    uv run scripts/cli.py pattern lookup "<user's question>"
    

    If a pattern exists, follow its strategy. Record the outcome after executing:

    uv run scripts/cli.py pattern record <name> --success  # or --failure
    
  2. Concept lookup — Find known table mappings:

    uv run scripts/cli.py concept lookup <concept>
    
  3. Table discovery — If cache misses, search the codebase (Grep pattern="<concept>" glob="**/*.sql") or query INFORMATION_SCHEMA. See reference/discovery-warehouse.md.

  4. Execute query:

    uv run scripts/cli.py exec "df = run_sql('SELECT ...')"
    uv run scripts/cli.py exec "print(df)"
    
  5. Cache learnings — Always cache before presenting results:

    # Cache concept → table mapping
    uv run scripts/cli.py concept learn <concept> <TABLE> -k <KEY_COL>
    # Cache query strategy (if discovery was needed)
    uv run scripts/cli.py pattern learn <name> -q "question" -s "step" -t "TABLE" -g "gotcha"
    
  6. Present findings to user.

Kernel Functions

FunctionReturns
run_sql(query, limit=100)Polars DataFrame
run_sql_pandas(query, limit=100)Pandas DataFrame
run_sql_many(queries, limit=100)List of Polars DataFrames (one per query)

pl (Polars) and pd (Pandas) are pre-imported.

Run independent queries together with run_sql_many — they execute concurrently (Snowflake async / connection-pool fan-out) instead of one at a time:

uv run scripts/cli.py exec "dfs = run_sql_many(['SELECT ...', 'SELECT ...']); print(dfs[0])"

run_sql_many is fail-fast: if any query errors, the call raises and the results of the queries that succeeded are discarded. Use separate run_sql calls if you need partial results.

Timeouts: exec waits up to 120s by default, then interrupts the query and returns a "client stopped waiting" message (the query may still finish server-side). Raise it for known long-running queries: uv run scripts/cli.py exec "..." -t 600.

Idle kernel: the kernel self-terminates after 2h idle (preserving state until then). Override with ASTRO_KERNEL_IDLE_TIMEOUT (seconds; 0 disables).

CLI Reference

Kernel

uv run scripts/cli.py warehouse list      # List warehouses
uv run scripts/cli.py start [-w name]     # Start kernel (with optional warehouse)
uv run scripts/cli.py exec "..."          # Execute Python code
uv run scripts/cli.py status              # Kernel status
uv run scripts/cli.py restart             # Restart kernel
uv run scripts/cli.py stop                # Stop kernel
uv run scripts/cli.py install <pkg>       # Install package

Concept Cache

uv run scripts/cli.py concept lookup <name>                     # Look up
uv run scripts/cli.py concept learn <name> <TABLE> -k <KEY_COL> # Learn
uv run scripts/cli.py concept list                               # List all
uv run scripts/cli.py concept import -p /path/to/warehouse.md   # Bulk import

Pattern Cache

uv run scripts/cli.py pattern lookup "question"                                      # Look up
uv run scripts/cli.py pattern learn <name> -q "..." -s "..." -t "TABLE" -g "gotcha"  # Learn
uv run scripts/cli.py pattern record <name> --success                                # Record outcome
uv run scripts/cli.py pattern list                                                   # List all
uv run scripts/cli.py pattern delete <name>                                          # Delete

Table Schema Cache

uv run scripts/cli.py table lookup <TABLE>            # Look up schema
uv run scripts/cli.py table cache <TABLE> -c '[...]'  # Cache schema
uv run scripts/cli.py table list                       # List cached
uv run scripts/cli.py table delete <TABLE>             # Delete

Cache Management

uv run scripts/cli.py cache status                # Stats
uv run scripts/cli.py cache clear [--stale-only]  # Clear

References

astronomerのその他のスキル

airflow
astronomer
Apache AirflowのDAG、実行、タスク、システム設定をクエリ、管理、トラブルシューティングします。DAG検査、実行管理、タスクログ、設定クエリ、REST API直接アクセスを含む30以上のコマンドをサポート。複数のAirflowインスタンスを永続的な設定で管理し、ローカルおよびAstroデプロイメントを自動検出。DAG実行を同期的(完了待機)または非同期的にトリガーし、障害を診断、再試行のために実行をクリア、リトライ/マップインデックスフィルタリング付きでタスクログにアクセス。出力...
official
airflow-hitl
astronomer
人間による承認ゲート、フォーム入力、およびAirflow DAG内での分岐を、遅延可能オペレーターを使用して実現。4種類のオペレーター:承認/却下の判断を行うApprovalOperator、フォームによる複数選択肢の選択を行うHITLOperator、人間主導のタスクルーティングを行うHITLBranchOperator、フォームデータ収集を行うHITLEntryOperator。すべてのオペレーターは遅延可能であり、Airflow UIのRequired ActionsタブまたはREST APIを介して人間の応答を待つ間、ワーカースロットを解放します。カスタム...を含むオプション機能をサポート。
official
airflow-plugins
astronomer
Airflow 3.1+のプラグインを構築し、FastAPIアプリ、カスタムUIページ、Reactコンポーネント、ミドルウェア、マクロ、オペレーターリンクをAirflow UIに直接埋め込みます。使用…
official
annotating-task-lineage
astronomer
Airflowタスクにデータ系列を注釈付けし、インレットとアウトレットを使用します。OpenLineage Datasetオブジェクト、Airflow Assets、Airflow Datasetsをサポートし、データベース、データウェアハウス、クラウドストレージ間での入出力を定義します。オペレーターに組み込みのOpenLineage抽出機能がない場合のフォールバックとして使用し、カスタム抽出機能とOpenLineageメソッドが優先される4段階の優先順位システムに従います。Snowflake、BigQuery、S3、PostgreSQL向けのデータセット命名ヘルパーを含み、一貫性を確保します。
official
authoring-dags
astronomer
Apache Airflow DAGを作成するためのガイド付きワークフローで、検証とテストの統合を備えています。構造化された6フェーズのアプローチ:環境と既存のパターンを発見し、DAG構造を計画し、ベストプラクティスに従って実装し、af CLIコマンドで検証し、ユーザーの同意を得てテストし、修正を繰り返します。発見用のCLIコマンド(af config connections、af config providers、af dags list)と検証用のCLIコマンド(af dags errors、af dags get、af dags explore)は、DAGに関する即時フィードバックを提供します。
official
blueprint
astronomer
Pydanticバリデーションを使用して再利用可能なAirflowタスクグループテンプレートを定義し、YAMLからDAGを構成します。ブループリントテンプレートの作成時や、DAGの構成時に使用します。
official
checking-freshness
astronomer
テーブルのタイムスタンプと更新パターンを陳腐化スケールに照らして確認し、データの鮮度を検証します。一般的なETL命名パターン(_loaded_at、_updated_at、created_atなど)を使用してタイムスタンプカラムを特定し、その最大値をクエリして経過時間を判定します。データを4つの鮮度ステータスに分類します:Fresh(4時間未満)、Stale(4~24時間)、Very Stale(24時間超)、またはUnknown(タイムスタンプなし)。最近の日数における最終更新時刻と行数トレンドを確認するためのSQLテンプレートを提供します...
official
cosmos-dbt-core
astronomer
Astronomer Cosmosを使用して、dbt CoreプロジェクトをAirflow DAGまたはTaskGroupに変換します。3つのアセンブリパターン(スタンドアロンのDbtDag、既存のDAG内のDbtTaskGroup、細かい制御が可能な個別のCosmosオペレーター)をサポートします。分離とパフォーマンスのニーズに基づいて、8つの実行モード(WATCHER、LOCAL、VIRTUALENV、KUBERNETES、AIRFLOW_ASYNCなど)から選択できます。速度とセレクターの複雑さのバランスを取るために、3つの解析戦略(dbt_manifest、dbt_ls、dbt_ls_file、automatic)を提供します...
official