概要
インストール
コンテンツ詳細
代替品
SSB MCPサーバーとは?
SSB MCPサーバーは、自然言語対話を通じてSQL Stream Builder環境を管理できるインテリジェントアシスタントです。80種類以上の機能ツールを提供し、クエリの作成からパフォーマンスの監視、データサンプリング分析まで、ストリームデータ処理ジョブの全ライフサイクル管理をカバーしています。SSB MCPサーバーの使い方は?
Claude Desktopアプリケーションを通じてSSB MCPサーバーとやり取りできます。設定が完了したら、「すべての実行中ジョブを表示」や「NVDAテーブルのリアルタイムデータをクエリする」など、実行したい操作を自然言語で記述するだけで、システムが自動的に対応する操作を実行します。適用シナリオ
データエンジニアによるストリームジョブ管理、運用チームによるシステム健全性の監視、データアナリストによるリアルタイムデータの探索、およびプラットフォーム管理者によるユーザーとプロジェクトの設定管理に適しています。特に、ストリームデータ処理システムと頻繁にやり取りする必要のあるチームに最適です。主要機能
使い方
使用例
よくある質問
関連リソース
インストール
{
"mcpServers": {
"ssb-mcp-server": {
"command": "/FULL/PATH/TO/SSB-MCP-Server/run_mcp_server.sh",
"args": [],
"cwd": "/FULL/PATH/TO/SSB-MCP-Server"
}
}
}
{
"mcpServers": {
"ssb-mcp-server": {
"command": "/FULL/PATH/TO/SSB-MCP-Server/.venv/bin/python",
"args": [
"-m",
"ssb_mcp_server.server"
],
"env": {
"MCP_TRANSPORT": "stdio",
"SSB_API_BASE": "https://ssb-gateway.yourshere.cloudera.site/ssb/api/v1",
"KNOX_TOKEN": "<your_knox_bearer_token>",
"SSB_READONLY": "true"
}
}
}
}
{
"mcpServers": {
"ssb-mcp-server": {
"command": "uvx",
"args": [
"--from",
"git+https://github.com/your-org/ssb-mcp-server@main",
"run-server"
],
"env": {
"MCP_TRANSPORT": "stdio",
"SSB_API_BASE": "https://ssb-gateway.yourshere.cloudera.site/ssb/api/v1",
"KNOX_TOKEN": "<your_knox_bearer_token>",
"SSB_READONLY": "true"
}
}
}
}🚀 SSB MCP Server
SSB MCP Serverは、SQL Stream Builder (SSB) への包括的なアクセスを提供するModel Context Protocolサーバーです。直接的なSSBアクセスとApache Knox統合の両方をサポートしています。スタンドアロンのSSBデプロイメントとCloudera Data Platform (CDP) SSBデプロイメントの両方で動作し、Claude Desktopを通じてSSBの機能を完全に利用できます。
利用可能なストリームとジョブが表示されるSSBのホームダッシュボード。SSB MCP Serverの統合が目立つブランディングとともに表示されます。
🚀 クイックスタート
スタンドアロンSSBデプロイメント (Docker Compose)
-
SSBサービスを起動する:
git clone https://github.com/your-org/ssb-mcp-server.git cd ssb-mcp-server docker-compose up -d -
Claude Desktopを設定する -
~/Library/Application Support/Claude/claude_desktop_config.jsonを編集します:{ "mcpServers": { "ssb-mcp-server": { "command": "/FULL/PATH/TO/SSB-MCP-Server/run_mcp_server.sh", "args": [], "cwd": "/FULL/PATH/TO/SSB-MCP-Server" } } } -
Claude Desktopを再起動して、SSBストリームに関する質問を始めましょう!
CDP SSBデプロイメント
SSB APIのベースURLは通常以下のようになります:
https://<your-ssb-host>/ssb/api/v1
CDP UIからKnox JWTトークンを取得し、以下の設定で使用します。
✨ 主な機能
- 複数の認証方法:
- 直接的なSSB認証: スタンドアロンSSBデプロイメント用の基本認証
- Knox統合: CDPデプロイメント用のベアラートークン、クッキー、パスコードトークン
- デフォルトで読み取り専用 - SSBストリームと設定の安全な探索
- 包括的なSSB APIカバレッジ と 80以上のMCPツール を備えた完全なSSB管理:
- 高度なジョブ管理: イベント履歴、状態管理、ジョブコピー、データソースクローン
- 監視と診断: システムの健全性、パフォーマンスカウンター、SQL分析
- 拡張されたテーブル管理: 詳細なテーブル、階層構造、検証、作成
- コネクタと形式管理: データ形式、コネクタの詳細、JAR情報
- ユーザーとプロジェクト管理: 設定、プロジェクト、ユーザー情報、プロジェクト作成
- APIキー管理: キーのライフサイクル管理、作成、削除、詳細
- 環境管理: 環境の切り替え、設定、作成
- 同期と設定: プロジェクトのエクスポート/インポート、同期管理、検証
- UDF管理: UDFのライフサイクル、実行、アーティファクト、カスタム関数
- ストリーム管理: SQLストリームの一覧表示、作成、更新、削除、開始、停止
- クエリ実行: SQLクエリを実行し、サンプリングを伴うリアルタイムの結果を取得
- サンプルデータアクセス: 実行中のジョブからストリーミングデータのサンプルを取得
- ジョブ管理: ジョブの状態を監視、ジョブの詳細を取得、ジョブのライフサイクルを管理
- スキーマ探索: テーブルのスキーマと利用可能なテーブルを探索
- 関数管理: ユーザー定義関数の一覧表示と検査
- コネクタ管理: 利用可能なコネクタを探索
- Kafka統合: Kafkaトピックの一覧表示と検査
- クラスタ監視: クラスタの情報と健全性の状態を取得
- パフォーマンスメトリクス: ストリームのパフォーマンスとメトリクスを監視
📦 インストール
オプション1: Claude Desktop (ローカル)
-
クローンしてインストールする:
git clone https://github.com/your-org/ssb-mcp-server.git cd ssb-mcp-server python3 -m venv .venv source .venv/bin/activate pip install -e . -
Claude Desktopを設定する -
~/Library/Application Support/Claude/claude_desktop_config.jsonを編集します:{ "mcpServers": { "ssb-mcp-server": { "command": "/FULL/PATH/TO/SSB-MCP-Server/.venv/bin/python", "args": [ "-m", "ssb_mcp_server.server" ], "env": { "MCP_TRANSPORT": "stdio", "SSB_API_BASE": "https://ssb-gateway.yourshere.cloudera.site/ssb/api/v1", "KNOX_TOKEN": "<your_knox_bearer_token>", "SSB_READONLY": "true" } } } } -
Claude Desktopを再起動して、SSBストリームに関する質問を始めましょう!
オプション2: 直接インストール (Cloudera Agent Studio)
Cloudera Agent Studioで使用する場合は、uvx コマンドを使用します:
{
"mcpServers": {
"ssb-mcp-server": {
"command": "uvx",
"args": [
"--from",
"git+https://github.com/your-org/ssb-mcp-server@main",
"run-server"
],
"env": {
"MCP_TRANSPORT": "stdio",
"SSB_API_BASE": "https://ssb-gateway.yourshere.cloudera.site/ssb/api/v1",
"KNOX_TOKEN": "<your_knox_bearer_token>",
"SSB_READONLY": "true"
}
}
}
}
📚 ドキュメント
設定オプション
すべての設定は環境変数を通じて行われます:
直接的なSSB認証 (スタンドアロン)
| 変数 | 必須 | 説明 |
|---|---|---|
SSB_API_BASE |
はい | 完全なSSB API URL (例: http://localhost:18121) |
SSB_USER |
はい | SSBユーザー名 (例: admin) |
SSB_PASSWORD |
はい | SSBパスワード (例: admin) |
SSB_READONLY |
いいえ | 読み取り専用モード (デフォルト: false) |
TIMEOUT_SECONDS |
いいえ | HTTPタイムアウト (秒) (デフォルト: 30) |
Knox認証 (CDP)
| 変数 | 必須 | 説明 |
|---|---|---|
KNOX_GATEWAY_URL |
はい* | KnoxゲートウェイURL (例: https://host:8444/gateway/ssb) |
KNOX_TOKEN |
はい* | 認証用のKnox JWTトークン |
KNOX_COOKIE |
いいえ | 代替: トークンの代わりに完全なクッキー文字列を提供 |
KNOX_PASSCODE_TOKEN |
いいえ | 代替: Knoxパスコードトークン (JWTに自動交換) |
KNOX_USER |
いいえ | 基本認証用のKnoxユーザー名 |
KNOX_PASSWORD |
いいえ | 基本認証用のKnoxパスワード |
KNOX_VERIFY_SSL |
いいえ | SSL証明書を検証する (デフォルト: true) |
KNOX_CA_BUNDLE |
いいえ | CA証明書バンドルへのパス |
SSB_READONLY |
いいえ | 読み取り専用モード (デフォルト: true) |
TIMEOUT_SECONDS |
いいえ | HTTPタイムアウト (秒) (デフォルト: 30) |
* SSB_API_BASE (直接用) または KNOX_GATEWAY_URL (Knox用) のいずれかが必須です。
利用可能なツール
🔧 高度なジョブ管理
get_job_events(job_id)- 詳細なジョブイベント履歴とタイムラインを取得get_job_state(job_id)- 包括的なジョブ状態情報を取得get_job_mv_endpoints(job_id)- ジョブの具体化ビューエンドポイントを取得create_job_mv_endpoint(job_id, mv_config)- 具体化ビューエンドポイントを作成または更新copy_job(job_id)- 既存のジョブを複製copy_data_source(data_source_id)- データソースをクローン
📊 監視と診断
get_diagnostic_counters()- システムのパフォーマンスカウンターと診断情報を取得get_heartbeat()- システムの健全性と接続性を確認analyze_sql(sql_query)- SQLクエリを実行せずに分析 (構文、パフォーマンス)
🗂️ 拡張されたテーブル管理
list_tables_detailed()- 包括的なテーブル情報を取得get_table_tree()- カタログで整理された階層的なテーブル構造を取得validate_data_source(data_source_config)- データソースの設定を検証create_table_detailed(table_config)- 完全な設定でテーブルを作成get_table_details(table_id)- 特定のテーブルに関する詳細情報を取得
🔌 コネクタと形式管理
list_data_formats()- すべての利用可能なデータ形式を一覧表示get_data_format_details(format_id)- 特定のデータ形式に関する詳細情報を取得create_data_format(format_config)- 新しいデータ形式を作成get_connector_jar(connector_type)- コネクタのJAR情報を取得get_connector_type_details(connector_type)- 詳細なコネクタタイプ情報を取得get_connector_details(connector_id)- 詳細なコネクタ情報を取得
👤 ユーザーとプロジェクト管理
get_user_settings()- ユーザーの設定と嗜好を取得update_user_settings(settings)- ユーザーの設定を更新list_projects()- 利用可能なプロジェクトを一覧表示get_project_details(project_id)- プロジェクト情報を取得create_project(project_config)- 新しいプロジェクトを作成get_user_info()- 現在のユーザー情報を取得
🔑 APIキー管理
list_api_keys()- ユーザーのAPIキーを一覧表示create_api_key(key_config)- 新しいAPIキーを作成delete_api_key(key_id)- APIキーを削除get_api_key_details(key_id)- APIキー情報を取得
🌍 環境管理
list_environments()- 利用可能な環境を一覧表示activate_environment(env_id)- 環境をアクティブ化/切り替えget_environment_details(env_id)- 環境の設定を取得create_environment(env_config)- 新しい環境を作成deactivate_environment()- 現在の環境を非アクティブ化
🔄 同期と設定
get_sync_config()- 同期設定を取得update_sync_config(config)- 同期設定を更新delete_sync_config()- 同期設定を削除validate_sync_config(project)- プロジェクトの同期設定を検証export_project(project)- プロジェクトの設定をエクスポートimport_project(project, config)- プロジェクトの設定をインポート
📈 UDF管理
list_udfs_detailed()- 包括的なUDF情報を取得run_udf(udf_id, parameters)- UDF関数を実行get_udf_artifacts()- UDFのアーティファクトと依存関係を取得create_udf(udf_config)- カスタムUDFを作成update_udf(udf_id, udf_config)- UDFの設定を更新get_udf_details(udf_id)- 詳細なUDF情報を取得get_udf_artifact_details(artifact_id)- UDFアーティファクトの詳細を取得get_udf_artifact_by_type(artifact_type)- タイプ別にUDFアーティファクトを取得
ストリーム管理
list_streams()- すべてのSQLストリーム (ジョブ) を一覧表示get_stream(stream_name)- 特定のストリームの詳細を取得create_stream(stream_name, sql_query, description?)- 新しいストリームを作成 (書き込みモード)update_stream(stream_name, sql_query, description?)- 既存のストリームを更新delete_stream(stream_name)- ストリームを削除start_stream(stream_name)- ストリームを開始stop_stream(stream_name)- ストリームを停止
クエリ実行とサンプルデータ
execute_query(sql_query, limit?)- SQLクエリを実行してSSBジョブを作成execute_query_with_sampling(sql_query, sample_interval, sample_count, window_size, sample_all_messages)- カスタムサンプリングでクエリを実行get_job_status(job_id)- 特定のSSBジョブの状態を取得get_job_sample(sample_id)- ジョブ実行からサンプルデータを取得get_job_sample_by_id(job_id)- ジョブIDでジョブからサンプルデータを取得list_jobs_with_samples()- すべてのジョブとそのサンプル情報を一覧表示
ジョブ管理と制御
stop_job(job_id, savepoint)- 特定のSSBジョブを停止execute_job(job_id, sql_query)- 新しいSQLでジョブを実行/再起動restart_job_with_sampling(job_id, sql_query, sample_interval, sample_all_messages)- サンプリングオプションでジョブを再起動configure_sampling(sample_id, sample_interval, sample_count, window_size, sample_all_messages)- サンプリングパラメータを設定
データソースとスキーマ
list_tables()- すべての利用可能なテーブル (データソース) を一覧表示get_table_schema(table_name)- テーブルのスキーマ情報を取得create_kafka_table(table_name, topic, kafka_connector_type, bootstrap_servers, format_type, scan_startup_mode, additional_properties?)- ローカルKafkaを強制する新しいテーブルを作成register_kafka_table(table_name, topic, schema_fields?, use_ssb_prefix?, catalog?, database?)- FlinkカタログにKafkaテーブルを登録 (クエリ可能にする)validate_kafka_connector(kafka_connector_type)- コネクタタイプがローカルKafkaであることを検証
関数とコネクタ
list_udfs()- ユーザー定義関数を一覧表示get_udf(udf_name)- UDFの詳細を取得list_connectors()- 利用可能なコネクタを一覧表示get_connector(connector_name)- コネクタの詳細を取得
Kafka統合
list_topics()- Kafkaトピックを一覧表示get_topic(topic_name)- トピックの詳細を取得
クラスタ管理
get_cluster_info()- クラスタの情報を取得get_cluster_health()- クラスタの健全性の状態を取得get_ssb_info()- SSBのバージョンとシステム情報を取得
利用例
設定が完了したら、Claudeに以下のような質問をすることができます:
基本情報
- "私が実行しているSSBのバージョンは何ですか?"
- "すべてのSQLストリームを一覧表示してください"
- "クエリ可能なテーブルはどれですか?"
- "利用可能なコネクタは何ですか?"
- "すべてのKafkaトピックを一覧表示してください"
- "クラスタの健全性はどうですか?"
ClaudeはSSB環境で利用可能なすべてのテーブルを表示できます。組み込みテーブルとカスタムテーブルの両方が含まれます。
クエリ実行とデータアクセス
- "このクエリを実行してください: SELECT * FROM NVDA"
- "すべてのメッセージをサンプリングするジョブを作成してください: SELECT * FROM NVDA"
- "ジョブ1234の状態を表示してください"
Claudeは実行中のすべてのSSBジョブを一覧表示でき、その状態、作成時間、詳細を表示します。
- "ジョブ1234からサンプルデータを取得してください"
- "すべてのジョブとそのサンプル情報を一覧表示してください"
- "ReadNVDAジョブから最新のデータを表示してください"
ジョブ管理と制御
- "ジョブ1234を停止してください"
- "新しいSQLでジョブ1234を実行/再起動してください"
- "サンプリングオプションでジョブ1234を再起動してください"
- "ジョブ1234のサンプリングパラメータを設定してください"
ClaudeはSQLクエリを実行して新しいSSBジョブを作成でき、完全なジョブ管理機能を備えています。
ストリーム管理
- "このSQLで 'sales_analysis' という名前の新しいストリームを作成してください: SELECT * FROM sales WHERE amount > 1000"
- " 'user_events' ストリームの詳細を表示してください"
- " 'sales_stream' の状態はどうですか?"
Kafkaテーブル管理
- " 'user-events' トピックから 'user_events' という名前のローカルKafkaテーブルを作成してください"
- "FlinkカタログにKafkaテーブルを登録してクエリ可能にしてください"
- "JSON形式でローカルKafkaテーブルを作成してください"
ClaudeはKafkaトピックに接続された新しい仮想テーブルを作成でき、適切なスキーマとコネクタ設定を備えています。
- " 'local-kafka' が有効なコネクタタイプであることを検証してください"
- "リアルタイムデータストリーミング用の仮想テーブルを作成してください"
高度なジョブ管理
- "ジョブ1234のイベント履歴を表示してください"
- "ジョブ1234の詳細な状態を取得してください"
- "ジョブ1234を複製して新しいジョブを作成してください"
- " 'user_events' テーブルのデータソースをクローンしてください"
- "ジョブ1234の具体化ビューエンドポイントを取得してください"
- "ジョブ1234の具体化ビューエンドポイントを作成してください"
具体化ビュー
- "ジョブ1234の具体化ビューエンドポイントを取得してください"
- "ジョブ1234の具体化ビューエンドポイントを作成してください"
⚠️ 重要な制限事項: 具体化ビュー (MV) はSSB UIインターフェイスを通じて作成する必要があります。MCPサーバーは既存の具体化ビューからデータを取得できますが、プログラムで新しいものを作成することはできません。具体化ビューを作成するには:
- SSB UIでジョブに移動します
- 具体化ビューのセクションに移動します
- UIを通じてMVを設定して作成します
- MCPサーバーを使用して作成されたMVデータをクエリします
監視と診断
- "システムのハートビートと健全性を確認してください"
- "診断カウンターを表示してください"
- "このSQLクエリのパフォーマンスを分析してください: SELECT * FROM NVDA WHERE close > 100"
- "現在のシステムパフォーマンスはどうですか?"
拡張されたテーブル管理
- "すべてのテーブルに関する詳細情報を表示してください"
- "カタログ別の階層的なテーブル構造を取得してください"
- "このデータソースの設定を検証してください"
- "完全な設定で新しいテーブルを作成してください"
- " 'user_events' テーブルに関する詳細情報を取得してください"
Claudeは特定のテーブルに関する詳細情報を提供でき、そのスキーマと設定が含まれます。
ユーザーとプロジェクト管理
- "私のユーザー設定と嗜好を表示してください"
- "ダークモードを有効にするようにユーザー設定を更新してください"
- "すべての利用可能なプロジェクトを一覧表示してください"
- " 'analytics' という名前の新しいプロジェクトを作成してください"
- "プロジェクト 'ffffffff' の詳細を取得してください"
- "私のユーザー情報を表示してください"
APIキー管理
- "私のすべてのAPIキーを一覧表示してください"
- "外部アクセス用の新しいAPIキーを作成してください"
- "APIキー 'key123' を削除してください"
- "APIキー 'key123' の詳細を取得してください"
環境管理
- "すべての利用可能な環境を一覧表示してください"
- " 'production' 環境に切り替えてください"
- " 'staging' という名前の新しい環境を作成してください"
- " 'dev' 環境の詳細を取得してください"
- "現在の環境を非アクティブ化してください"
同期と設定
- "現在の同期設定を表示してください"
- "Git統合用に同期設定を更新してください"
- "プロジェクト 'analytics' の設定をエクスポートしてください"
- "Gitからプロジェクト設定をインポートしてください"
- "プロジェクト 'test' の同期設定を検証してください"
UDF管理
- "すべてのユーザー定義関数の詳細を一覧表示してください"
- "パラメータを指定してUDF 'custom_aggregate' を実行してください"
- "データ変換用の新しいUDFを作成してください"
- "UDF 'my_function' の設定を更新してください"
- "UDFのアーティファクトと依存関係を取得してください"
サンプルデータの例
MCPサーバーは、異なるサンプリングモードでリアルタイムのストリーミングデータサンプルを取得できます:
Claudeは実行中のジョブからリアルタイムのサンプルデータを取得でき、実際のストリーミングデータを表示します。
定期的なサンプリング (デフォルト):
{
"records": [
{
"___open": "185.0919",
"___high": "185.1200",
"___low": "184.9400",
"___close": "184.9700",
"___volume": "61884",
"eventTimestamp": "2025-10-08T18:34:10.915Z"
}
],
"job_status": "RUNNING",
"end_of_samples": false,
"message": "Retrieved 1 sample records"
}
すべてのメッセージをサンプリングするモード:
{
"sampling_mode": "sample_all_messages",
"sample_interval": 0,
"sample_count": 10000,
"window_size": 10000,
"message": "Job created with comprehensive sampling enabled"
}
SQLクエリの機能
- 自動セミコロン処理: すべてのSQLクエリは自動的にセミコロンで終了されます
- 柔軟なサンプリング: 定期的なサンプリングまたはすべてのメッセージをサンプリングするモードから選択できます
- ジョブ制御: 異なる設定でジョブを開始、停止、再起動できます
- リアルタイムデータ: 実行中のジョブからストリーミングデータのサンプルにアクセスできます
高度な機能
すべてのメッセージをサンプリングする
包括的なデータサンプリングには、sample_all_messages=True オプションを使用します:
# すべてのメッセージをサンプリングするジョブを作成
execute_query_with_sampling("SELECT * FROM NVDA", sample_all_messages=True)
# すべてのメッセージをサンプリングするようにジョブを再起動
restart_job_with_sampling(1234, "SELECT * FROM NVDA", sample_all_messages=True)
設定:
sample_interval: 0(すぐにサンプリング)sample_count: 10000(すべてのメッセージをキャプチャするための高いカウント)window_size: 10000(包括的なサンプリングのための大きなウィンドウ)
カスタムサンプリング設定
特定のニーズに合わせてサンプリング動作を微調整します:
# 500msの間隔でカスタムサンプリング
execute_query_with_sampling("SELECT * FROM NVDA",
sample_interval=500,
sample_count=500,
window_size=500)
# 既存のジョブのサンプリングを設定
configure_sampling("sample_id",
sample_interval=200,
sample_count=1000,
window_size=1000)
ジョブ管理
完全なジョブのライフサイクル管理:
# セーブポイントを使用してジョブを停止
stop_job(1234, savepoint=True)
# 新しいSQLでジョブを再起動
execute_job(1234, "SELECT * FROM NEW_TABLE")
# サンプリングオプションで再起動
restart_job_with_sampling(1234, "SELECT * FROM NVDA",
sample_interval=1000,
sample_all_messages=False)
Kafkaテーブルの作成
ローカルKafkaコネクタのみに制限されたテーブルを作成します:
# ステップ1: データソースを作成する (設定を作成する)
create_kafka_table("user_events", "user-events") # デフォルトでlocal-kafkaを使用
# ステップ2: Flinkカタログにテーブルを登録する (クエリ可能にする)
register_kafka_table("user_events", "user-events") # ssb.ssb_defaultにssb_user_eventsを作成 (デフォルトのカタログ.ssb_defaultにフォールバック)
# 高度な: カスタムスキーマの登録
custom_schema = [
{"name": "id", "type": "STRING"},
{"name": "name", "type": "STRING"},
{"name": "timestamp", "type": "TIMESTAMP"}
]
register_kafka_table("custom_table", "custom-topic", custom_schema) # ssb_custom_tableを作成
# ssb_ 接頭辞を使用しない
register_kafka_table("raw_data", "raw-topic", use_ssb_prefix=False) # 接頭辞なしでraw_dataを作成
# カスタムカタログとデータベース
register_kafka_table("custom_table", "custom-topic", catalog="default_catalog", database="default_database")
# カスタム設定でローカルKafka
create_kafka_table("local_data", "local-topic", "local-kafka", "localhost:9092", "json", "earliest-offset")
register_kafka_table("local_data", "local-topic") # ssb_local_dataを作成
# コネクタタイプを検証
validate_kafka_connector("local-kafka") # 検証詳細を返す
validate_kafka_connector("kafka") # エラーを返す - local-kafkaのみ許可
二段階のプロセス: Kafkaテーブルを作成するには、2つのステップが必要です:
- データソースを作成する:
create_kafka_table()を使用してデータソースの設定を作成します - カタログに登録する:
register_kafka_table()を使用してテーブルをクエリ可能にします
自動登録: register_kafka_table() 関数は:
- DDLを使用してFlinkカタログにテーブルを登録します (デフォルト:
ssb.ssb_default、ssbカタログが利用できない場合はdefault_catalog.ssb_defaultにフォールバック) - 柔軟な名前空間制御のためのカタログとデータベースのパラメータを設定可能です
- 要求されたカタログが利用できない場合に自動的にカタログをフォールバックします
- デフォルトではNVDAのような既存のテーブルと同じデータベースにテーブルを作成します
- テーブル名に自動的に
ssb_接頭辞を追加します (設定可能) - トピックデータに基づいて自動的にスキーマを作成します
- 正しいデータベースコンテキストをチェックして、テーブルがクエリ可能であることを検証します
- 完全なテーブル名、カタログ、データベース情報を含む成功した登録の確認を返します
命名規則:
- デフォルト: テーブルに自動的に
ssb_接頭辞が付けられます (例:user_events→ssb_user_events) - オーバーライド:
use_ssb_prefix=Falseを使用して接頭辞を無効にします - 既存: 既に
ssb_で始まるテーブルは変更されません
名前空間設定:
- デフォルト: テーブルは
ssb.ssb_default名前空間に作成されます (ssbカタログが利用できない場合はdefault_catalog.ssb_defaultにフォールバック) - カスタムカタログ:
catalogパラメータを使用して異なるカタログを指定します - カスタムデータベース:
databaseパラメータを使用して異なるデータベースを指定します - 自動フォールバック: 要求されたカタログが利用できない場合、システムは自動的に
default_catalogにフォールバックします - 完全な制御: 最大限の柔軟性のために、カタログとデータベースの両方を設定可能です
検証:
SHOW TABLES;を使用して、テーブルがクエリ可能であることを確認します- テーブルは
default_catalog.ssb_default名前空間に作成されます (NVDAと同じ) - 完全な名前空間 (
default_catalog.ssb_default.TABLE_NAME) を使用するか、USE default_catalog.ssb_default;でデータベースコンテキストを切り替えます - すべての仮想Kafkaテーブルは、既存のテーブルと同じ場所に配置され、簡単にクエリできます
サポートされるコネクタ:
local-kafka- ローカルKafkaコネクタ (仮想テーブルの唯一のオプション)
サポートされる形式:
json- JSON形式 (デフォルト)csv- CSV形式avro- Apache Avro形式- カスタム形式文字列
Docker Composeの設定
リポジトリには、ローカル開発とテスト用の完全なDocker Compose設定が含まれています:
含まれるサービス
- PostgreSQL: SSBメタデータ用のデータベース
- Kafka: メッセージストリーミングプラットフォーム
- Flink: ストリーム処理エンジン
- NiFi: データフロー管理
- Qdrant: ベクトルデータベース
- SSB SSE: SQL Stream Builder Streaming SQL Engine
- SSB MVE: SQL Stream Builder Materialized View Engine
- Apache Knox: 安全なアクセス用のゲートウェイ (オプション)
環境の起動
# すべてのサービスを起動する
docker-compose up -d
# サービスの状態を確認する
docker-compose ps
# ログを表示する
docker-compose logs -f ssb-sse
アクセスポイント
- SSB SSE: http://localhost:18121
- SSB MVE: http://localhost:18131
- Flink Job Manager: http://localhost:8081
- NiFi: http://localhost:8080
- Knox Gateway: https://localhost:8444 (有効にした場合)
書き込み操作
デフォルトでは、サーバーはCDPデプロイメントでは読み取り専用モードで実行され、スタンドアロンデプロイメントでは書き込みを有効にして実行されます。これを変更するには:
SSB_READONLY=false(書き込みを有効にする) またはSSB_READONLY=true(読み取り専用) を設定します- MCPサーバーを再起動します
書き込み操作には以下が含まれます:
- ストリームの作成、更新、削除
- ジョブを作成するSQLクエリの実行
- ジョブのライフサイクルの管理 (開始、停止、再起動)
- サンプリングパラメータの設定
- ジョブの制御と管理
- Kafkaのみのテーブルの作成 (強制的な検証)
包括的な機能
SSB MCP Serverは現在、80以上のMCPツール を備え、SSB APIの80%以上 をカバーしており、Claude Desktopを通じて利用可能な最も包括的なSSB管理プラットフォームになっています。
📊 カバレッジ統計
- 総MCPツール数: 80以上 (以前は33)
- APIカバレッジ: 80%以上 (以前は20%)
- 機能カテゴリ数: 15 (以前は6)
- 利用可能なエンドポイント数: 67以上 (以前は15)
🎯 主要機能
完全なSSB管理
- ジョブのライフサイクル: ジョブの作成、監視、制御、コピー、管理
- データ管理: テーブル、スキーマ、検証、階層的な組織
- システム監視: 健全性チェック、診断、パフォーマンス追跡
- ユーザー管理: 設定、プロジェクト、環境、APIキー
- DevOps統合: 同期、エクスポート/インポート、設定管理
高度な機能
- リアルタイムサンプリング: "すべてのメッセージをサンプリングする" オプションを備えた柔軟なデータサンプリング
- SQL分析: 実行せずにクエリを分析してパフォーマンスを最適化
- 具体化ビュー: 具体化ビューエンドポイントの作成と管理
- カスタムUDF: ユーザー定義関数の管理と実行
- 環境制御: 切り替え機能を備えたマルチ環境サポート
- プロジェクト管理: エクスポート/インポート機能を備えた完全なプロジェクトライフサイクル
エンタープライズ対応
- セキュリティ: APIキー管理とユーザー認証
- 監視: 包括的なシステム健全性とパフォーマンス追跡
- スケーラビリティ: 複数のプロジェクトと環境をサポート
- 統合: Git同期、設定管理、DevOpsワークフロー
- 柔軟性: 設定可能なカタログ、データベース、命名規則
🚀 使用例
データエンジニア
- ストリーム処理ジョブの管理と監視
- リアルタイムデータのサンプリングと分析
- テーブルスキーマの管理と検証
- パフォーマンスの最適化とトラブルシューティング
DevOpsエンジニア
- 環境の管理と設定
- プロジェクトのエクスポート/インポートとバージョン管理
- システムの監視と健全性チェック
- APIキーの管理とセキュリティ
データサイエンティスト
- カスタムUDFの開発と実行
- データ形式の管理と検証
- クエリの分析と最適化
- リアルタイムデータの探索
プラットフォーム管理者
- ユーザーとプロジェクトの管理
- システムの診断と監視
- コネクタと形式の管理
- 同期設定と検証
🔧 技術詳細
テスト
SSB MCP Serverには包括的なテストスイートが含まれています。詳細なテストドキュメントについては Testing/README.md を参照してください。以下が含まれます:
- クイック機能テスト
- すべての80以上のMCPツールをカバーする包括的なテストスイート
- クラウドSSBテストプロトコル
- テスト設定とベストプラクティス
- 詳細なテスト結果と分析
クイックスタート:
cd Testing && python run_tests.py --quick
トラブルシューティング
一般的な問題
-
"Unauthorized" エラー: 認証資格情報を確認してください
- 直接的なSSBの場合:
SSB_USERとSSB_PASSWORDを確認してください - Knoxの場合:
KNOX_TOKENまたはKNOX_USER/KNOX_PASSWORDを確認してください
- 直接的なSSBの場合:
-
"Connection refused" エラー: SSBサービスが実行中であることを確認してください
docker-compose psでサービスの状態を確認してください- docker-compose.yml のポートマッピングを確認してください
-
"No sample data available" エラー: ジョブがデータを生成するまで時間がかかる場合があります
get_job_status(job_id)でジョブの状態を確認してください- ジョブが実行中であり、サンプル設定が正しいことを確認してください
- 包括的なサンプリングには
sample_all_messages=Trueを試してみてください
-
ジョブの再起動失敗: ジョブの再起動が失敗した場合
execute_job()の代わりにrestart_job_with_sampling()を使用してください- ジョブが再起動可能な状態であることを確認してください
- 再起動が不可能な場合は、新しいジョブを作成してください
-
SSL証明書エラー: Knoxデプロイメントの場合
- 自己署名証明書の場合は
KNOX_VERIFY_SSL=falseを設定してください - または
KNOX_CA_BUNDLEで適切なCAバンドルを提供してください
- 自己署名証明書の場合は
-
Kafkaテーブルの作成エラー: テーブルの作成が失敗した場合
- ローカルKafkaコネクタのみが使用されていることを確認してください (強制)
- Kafkaトピックが存在し、アクセス可能であることを確認してください
- ブートストラップサーバーが正しく設定されていることを確認してください
validate_kafka_connector()を使用してコネクタの有効性を確認してください
-
仮想テーブルがクエリできない場合: Kafkaテーブルを作成した後
- 重要: データソースを作成しても自動的にクエリ可能になるわけではありません
- データソースはSSB UIを通じてFlinkカタログに手動で登録する必要があります
SHOW TABLES;を使用して、実際にクエリ可能なテーブルを確認してくださいSHOW TABLES;に表示されるテーブルのみがSQLでクエリ可能です
デバッグモード
環境変数を設定してデバッグログを有効にします:
export MCP_LOG_LEVEL=DEBUG
セキュリティ
- すべての機密データ (パスワード、トークン、シークレット) は応答で自動的に隠されます
- 大きなコレクションは、LLMを圧倒しないように切り捨てられます
- CDPデプロイメントではデフォルトで読み取り専用モードが有効になっており、誤った変更を防止します
- 直接的なSSB認証はHTTP上で基本認証を使用します (ローカル開発に適しています)
- SQLクエリは適切なセミコロン終了で自動的にサニタイズされます
- Kafkaテーブルの作成は、データセキュリティのためにローカルKafkaのみのコネクタを強制します
📄 ライセンス
Apache License 2.0
代替品















