🚀 mcp-server-airflow-token
このプロジェクトは、Bearerトークン認証をサポートするApache Airflow用のModel Context Protocol (MCP)サーバーです。Astronomer CloudやスタンドアロンのAirflowインスタンスとのシームレスな統合を実現します。
mcp-server-apache-airflow をベースに作成
このフォークは、元のMCPサーバーにBearerトークン認証サポートを追加し、Astronomer Cloudやその他のトークンベースのAirflowデプロイメントと互換性を持たせています。
✨ 主な機能
- ✅ Bearerトークン認証 - 最新のAirflowデプロイメントの主要な認証方法
- ✅ Astronomer Cloud互換 - Astronomerの管理型Airflowとシームレスに動作
- ✅ 下位互換性 - 依然としてユーザー名/パスワード認証をサポート
- ✅ URLハンドリングの強化 -
/deployment-id
のようなデプロイメントパスを正しく処理
📚 ドキュメント
このプロジェクトは、Apache AirflowのREST APIをラップするModel Context Protocolサーバーを実装しています。これにより、MCPクライアントは標準化された方法でAirflowとやり取りできます。公式のApache Airflowクライアントライブラリを使用して、互換性と保守性を確保しています。
機能実装状況
機能 |
APIパス |
状態 |
DAG管理 |
|
|
DAGの一覧表示 |
/api/v1/dags |
✅ |
DAGの詳細取得 |
/api/v1/dags/{dag_id} |
✅ |
DAGの一時停止 |
/api/v1/dags/{dag_id} |
✅ |
DAGの再開 |
/api/v1/dags/{dag_id} |
✅ |
DAGの更新 |
/api/v1/dags/{dag_id} |
✅ |
DAGの削除 |
/api/v1/dags/{dag_id} |
✅ |
DAGソースの取得 |
/api/v1/dagSources/{file_token} |
✅ |
複数DAGのパッチ適用 |
/api/v1/dags |
✅ |
DAGファイルの再解析 |
/api/v1/dagSources/{file_token}/reparse |
✅ |
DAG実行 |
|
|
DAG実行の一覧表示 |
/api/v1/dags/{dag_id}/dagRuns |
✅ |
DAG実行の作成 |
/api/v1/dags/{dag_id}/dagRuns |
✅ |
DAG実行の詳細取得 |
/api/v1/dags/{dag_id}/dagRuns/{dag_run_id} |
✅ |
DAG実行の更新 |
/api/v1/dags/{dag_id}/dagRuns/{dag_run_id} |
✅ |
DAG実行の削除 |
/api/v1/dags/{dag_id}/dagRuns/{dag_run_id} |
✅ |
DAG実行のバッチ取得 |
/api/v1/dags/~/dagRuns/list |
✅ |
DAG実行のクリア |
/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/clear |
✅ |
DAG実行のノート設定 |
/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/setNote |
✅ |
上位データセットイベントの取得 |
/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/upstreamDatasetEvents |
✅ |
タスク |
|
|
DAGタスクの一覧表示 |
/api/v1/dags/{dag_id}/tasks |
✅ |
タスクの詳細取得 |
/api/v1/dags/{dag_id}/tasks/{task_id} |
✅ |
タスクインスタンスの取得 |
/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id} |
✅ |
タスクインスタンスの一覧表示 |
/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances |
✅ |
タスクインスタンスの更新 |
/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id} |
✅ |
タスクインスタンスのクリア |
/api/v1/dags/{dag_id}/clearTaskInstances |
✅ |
タスクインスタンスの状態設定 |
/api/v1/dags/{dag_id}/updateTaskInstancesState |
✅ |
変数 |
|
|
変数の一覧表示 |
/api/v1/variables |
✅ |
変数の作成 |
/api/v1/variables |
✅ |
変数の取得 |
/api/v1/variables/{variable_key} |
✅ |
変数の更新 |
/api/v1/variables/{variable_key} |
✅ |
変数の削除 |
/api/v1/variables/{variable_key} |
✅ |
接続 |
|
|
接続の一覧表示 |
/api/v1/connections |
✅ |
接続の作成 |
/api/v1/connections |
✅ |
接続の取得 |
/api/v1/connections/{connection_id} |
✅ |
接続の更新 |
/api/v1/connections/{connection_id} |
✅ |
接続の削除 |
/api/v1/connections/{connection_id} |
✅ |
接続のテスト |
/api/v1/connections/test |
✅ |
プール |
|
|
プールの一覧表示 |
/api/v1/pools |
✅ |
プールの作成 |
/api/v1/pools |
✅ |
プールの取得 |
/api/v1/pools/{pool_name} |
✅ |
プールの更新 |
/api/v1/pools/{pool_name} |
✅ |
プールの削除 |
/api/v1/pools/{pool_name} |
✅ |
XCom |
|
|
XComの一覧表示 |
/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries |
✅ |
XComエントリの取得 |
/api/v1/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/xcomEntries/{xcom_key} |
✅ |
データセット |
|
|
データセットの一覧表示 |
/api/v1/datasets |
✅ |
データセットの取得 |
/api/v1/datasets/{uri} |
✅ |
データセットイベントの取得 |
/api/v1/datasetEvents |
✅ |
データセットイベントの作成 |
/api/v1/datasetEvents |
✅ |
DAGデータセットキューイベントの取得 |
/api/v1/dags/{dag_id}/dagRuns/queued/datasetEvents/{uri} |
✅ |
DAGデータセットキューイベントの一覧取得 |
/api/v1/dags/{dag_id}/dagRuns/queued/datasetEvents |
✅ |
DAGデータセットキューイベントの削除 |
/api/v1/dags/{dag_id}/dagRuns/queued/datasetEvents/{uri} |
✅ |
DAGデータセットキューイベントの一括削除 |
/api/v1/dags/{dag_id}/dagRuns/queued/datasetEvents |
✅ |
データセットキューイベントの取得 |
/api/v1/datasets/{uri}/dagRuns/queued/datasetEvents |
✅ |
データセットキューイベントの削除 |
/api/v1/datasets/{uri}/dagRuns/queued/datasetEvents |
✅ |
モニタリング |
|
|
ヘルスチェックの取得 |
/api/v1/health |
✅ |
DAG統計情報 |
|
|
DAG統計情報の取得 |
/api/v1/dags/statistics |
✅ |
設定 |
|
|
設定の取得 |
/api/v1/config |
✅ |
プラグイン |
|
|
プラグインの取得 |
/api/v1/plugins |
✅ |
プロバイダ |
|
|
プロバイダの一覧表示 |
/api/v1/providers |
✅ |
イベントログ |
|
|
イベントログの一覧表示 |
/api/v1/eventLogs |
✅ |
イベントログの取得 |
/api/v1/eventLogs/{event_log_id} |
✅ |
システム |
|
|
インポートエラーの取得 |
/api/v1/importErrors |
✅ |
インポートエラーの詳細取得 |
/api/v1/importErrors/{import_error_id} |
✅ |
ヘルスステータスの取得 |
/api/v1/health |
✅ |
バージョンの取得 |
/api/v1/version |
✅ |
📦 インストール
依存関係
このプロジェクトは、公式のApache Airflowクライアントライブラリ (apache-airflow-client
) に依存しています。このパッケージをインストールすると、自動的にインストールされます。
環境変数
以下の環境変数を設定してください。
トークン認証 (推奨)
AIRFLOW_HOST=<your-airflow-host> # オプション、デフォルトは http://localhost:8080
AIRFLOW_TOKEN=<your-airflow-api-token> # あなたのAirflow APIトークン
AIRFLOW_API_VERSION=v1 # オプション、デフォルトは v1
基本認証 (代替)
AIRFLOW_HOST=<your-airflow-host> # オプション、デフォルトは http://localhost:8080
AIRFLOW_USERNAME=<your-airflow-username>
AIRFLOW_PASSWORD=<your-airflow-password>
AIRFLOW_API_VERSION=v1 # オプション、デフォルトは v1
注: AIRFLOW_TOKEN
が提供された場合、それが認証に使用されます。そうでない場合、サーバーはユーザー名とパスワードを使用した基本認証にフォールバックします。
Claude Desktopでの使用方法
まず、リポジトリをクローンします。
git clone https://github.com/nikhil-ganage/mcp-server-airflow-token
claude_desktop_config.json
に追加します。
トークン認証 (推奨)
{
"mcpServers": {
"apache-airflow": {
"type": "stdio",
"command": "uv",
"args": [
"--directory",
"path-to-repo/mcp-server-airflow-token",
"run",
"mcp-server-airflow-token"
],
"env": {
"AIRFLOW_HOST": "https://astro_id.astronomer.run/id",
"AIRFLOW_TOKEN": "TOKEN"
}
}
}
}
基本認証
{
"mcpServers": {
"mcp-server-airflow-token": {
"command": "uvx",
"args": ["mcp-server-airflow-token"],
"env": {
"AIRFLOW_HOST": "https://your-airflow-host",
"AIRFLOW_USERNAME": "your-username",
"AIRFLOW_PASSWORD": "your-password"
}
}
}
}
読み取り専用モード (安全性のため推奨)
トークン認証の読み取り専用モード
{
"mcpServers": {
"mcp-server-airflow-token": {
"command": "uvx",
"args": ["mcp-server-airflow-token", "--read-only"],
"env": {
"AIRFLOW_HOST": "https://your-airflow-host",
"AIRFLOW_TOKEN": "your-api-token"
}
}
}
}
基本認証の読み取り専用モード
{
"mcpServers": {
"mcp-server-airflow-token": {
"command": "uvx",
"args": ["mcp-server-airflow-token", "--read-only"],
"env": {
"AIRFLOW_HOST": "https://your-airflow-host",
"AIRFLOW_USERNAME": "your-username",
"AIRFLOW_PASSWORD": "your-password"
}
}
}
}
path-to-repo
を、実際にリポジトリをクローンしたパスに置き換えてください。
Astronomer Cloudの設定例
Astronomer Cloudのデプロイメントの場合:
{
"mcpServers": {
"mcp-server-airflow-token": {
"command": "uvx",
"args": ["mcp-server-airflow-token"],
"env": {
"AIRFLOW_HOST": "https://your-astronomer-domain.astronomer.run/your-deployment-id",
"AIRFLOW_TOKEN": "your-astronomer-api-token"
}
}
}
}
注: デプロイメントIDは、Astronomer CloudのURLパスの一部です。
APIグループの選択
--apis
フラグを設定することで、使用するAPIグループを選択できます。
uv run mcp-server-airflow-token --apis "dag,dagrun"
デフォルトは、すべてのAPIを使用することです。
許可される値は以下の通りです。
- config
- connections
- dag
- dagrun
- dagstats
- dataset
- eventlog
- importerror
- monitoring
- plugin
- pool
- provider
- taskinstance
- variable
- xcom
読み取り専用モード
--read-only
フラグを使用することで、サーバーを読み取り専用モードで実行できます。これにより、読み取り操作 (GETリクエスト) を実行するツールのみが公開され、リソースを作成、更新、または削除するツールは除外されます。
uv run mcp-server-airflow-token --read-only
読み取り専用モードでは、サーバーは以下のようなツールのみを公開します。
- DAG、DAG実行、タスク、変数、接続などの一覧表示
- 特定のリソースの詳細取得
- 設定とモニタリング情報の読み取り
- 接続のテスト (破壊的でない)
読み取り専用モードでは、DAG、変数、接続の作成、更新、削除、DAG実行のトリガーなどの書き込み操作は利用できません。
読み取り専用モードとAPIグループの選択を組み合わせることができます。
uv run mcp-server-airflow-token --read-only --apis "dag,variable"
手動実行
サーバーを手動で実行することもできます。
make run
make run
は以下のオプションを受け付けます。
オプション:
--port
: SSEでリッスンするポート (デフォルト: 8000)
--transport
: トランスポートタイプ (stdio/sse、デフォルト: stdio)
または、同じパラメータを受け付けるsseサーバーを直接実行することもできます。
make run-sse
インストール方法
pipまたはuvxを使用してサーバーをインストールできます。
pip install mcp-server-airflow-token
uvx mcp-server-airflow-token
開発
開発環境のセットアップ
- リポジトリをクローンします。
git clone https://github.com/nikhil-ganage/mcp-server-airflow-token.git
cd mcp-server-airflow-token
- 開発用の依存関係をインストールします。
uv sync --dev
- 環境変数用の
.env
ファイルを作成します (開発用にオプション)。
touch .env
注: テストを実行するために環境変数は必要ありません。AIRFLOW_HOST
は開発とテストの目的でデフォルトで http://localhost:8080
に設定されます。
テストの実行
このプロジェクトはpytestを使用してテストを行っており、以下のコマンドが利用可能です。
make test
コード品質
make lint
make format
継続的インテグレーション
このプロジェクトにはGitHub Actionsのワークフロー (.github/workflows/test.yml
) が含まれており、自動的に以下のことを行います。
- Python 3.10、3.11、および3.12でテストを実行
- ruffを使用してリントチェックを実行
main
ブランチへのすべてのプッシュとプルリクエストで実行
CIパイプラインは、変更がマージされる前に、サポートされているPythonバージョン間でのコード品質と互換性を確保します。
コントリビューション
コントリビューションは大歓迎です!プルリクエストを送信してください。
pyproject.toml
の project.version
が更新されると、パッケージは自動的にPyPIにデプロイされます。
バージョニングにはセマンティックバージョニングを使用してください。
コアロジックに変更を適用するには、PRにバージョン更新を含めてください。
📄 ライセンス
MITライセンス