🚀 SSB MCP Server
The SSB MCP Server offers comprehensive access to SQL Stream Builder (SSB). It supports both direct SSB access and Apache Knox integration, working with standalone SSB deployments and Cloudera Data Platform (CDP) SSB deployments. It provides full access to SSB functionality through Claude Desktop.
The main SSB interface showing the home dashboard with available streams and jobs, featuring the SSB MCP Server integration with prominent branding.
The Model Context Protocol server provides comprehensive access to SQL Stream Builder (SSB) with support for both direct SSB access and Apache Knox integration.
Works with both standalone SSB deployments and Cloudera Data Platform (CDP) SSB deployments - provides full access to SSB functionality through Claude Desktop.
✨ Features
- Multiple Authentication Methods:
- Direct SSB Authentication: Basic auth for standalone SSB deployments
- Knox Integration: Bearer tokens, cookies, and passcode tokens for CDP deployments
- Read-only by default - Safe exploration of SSB streams and configuration
- Comprehensive SSB API coverage with 80+ MCP tools for complete SSB management:
- Advanced Job Management: Event history, state management, job copying, data source cloning
- Monitoring & Diagnostics: System health, performance counters, SQL analysis
- Enhanced Table Management: Detailed tables, hierarchical structure, validation, creation
- Connector & Format Management: Data formats, connector details, JAR information
- User & Project Management: Settings, projects, user information, project creation
- API Key Management: Key lifecycle management, creation, deletion, details
- Environment Management: Environment switching, configuration, creation
- Sync & Configuration: Project export/import, sync management, validation
- UDF Management: UDF lifecycle, execution, artifacts, custom functions
- Stream Management: List, create, update, delete, start, stop SQL streams
- Query Execution: Execute SQL queries and get real-time results with sampling
- Sample Data Access: Retrieve streaming data samples from running jobs
- Job Management: Monitor job status, get job details, and manage job lifecycle
- Schema Discovery: Explore table schemas and available tables
- Function Management: List and inspect user-defined functions
- Connector Management: Explore available connectors
- Kafka Integration: List and inspect Kafka topics
- Cluster Monitoring: Get cluster info and health status
- Performance Metrics: Monitor stream performance and metrics
🚀 Quick Start
For Standalone SSB Deployments (Docker Compose)
-
Start SSB services:
git clone https://github.com/your-org/ssb-mcp-server.git
cd ssb-mcp-server
docker-compose up -d
-
Configure Claude Desktop - Edit ~/Library/Application Support/Claude/claude_desktop_config.json:
{
"mcpServers": {
"ssb-mcp-server": {
"command": "/FULL/PATH/TO/SSB-MCP-Server/run_mcp_server.sh",
"args": [],
"cwd": "/FULL/PATH/TO/SSB-MCP-Server"
}
}
}
-
Restart Claude Desktop and start interacting with your SSB streams!
For CDP SSB deployments
Your SSB API base URL will typically be:
https://<your-ssb-host>/ssb/api/v1
Get your Knox JWT token from the CDP UI and use it with the configurations below.
📦 Installation
Option 1: Claude Desktop (Local)
-
Clone and install:
git clone https://github.com/your-org/ssb-mcp-server.git
cd ssb-mcp-server
python3 -m venv .venv
source .venv/bin/activate
pip install -e .
-
Configure Claude Desktop - Edit ~/Library/Application Support/Claude/claude_desktop_config.json:
{
"mcpServers": {
"ssb-mcp-server": {
"command": "/FULL/PATH/TO/SSB-MCP-Server/.venv/bin/python",
"args": [
"-m",
"ssb_mcp_server.server"
],
"env": {
"MCP_TRANSPORT": "stdio",
"SSB_API_BASE": "https://ssb-gateway.yourshere.cloudera.site/ssb/api/v1",
"KNOX_TOKEN": "<your_knox_bearer_token>",
"SSB_READONLY": "true"
}
}
}
}
-
Restart Claude Desktop and start asking questions about your SSB streams!
Option 2: Direct Installation (Cloudera Agent Studio)
For use with Cloudera Agent Studio, use the uvx command:
{
"mcpServers": {
"ssb-mcp-server": {
"command": "uvx",
"args": [
"--from",
"git+https://github.com/your-org/ssb-mcp-server@main",
"run-server"
],
"env": {
"MCP_TRANSPORT": "stdio",
"SSB_API_BASE": "https://ssb-gateway.yourshere.cloudera.site/ssb/api/v1",
"KNOX_TOKEN": "<your_knox_bearer_token>",
"SSB_READONLY": "true"
}
}
}
}
📚 Documentation
Configuration Options
All configuration is done via environment variables:
Direct SSB Authentication (Standalone)
| Property |
Required |
Details |
SSB_API_BASE |
Yes |
Full SSB API URL (e.g., http://localhost:18121) |
SSB_USER |
Yes |
SSB username (e.g., admin) |
SSB_PASSWORD |
Yes |
SSB password (e.g., admin) |
SSB_READONLY |
No |
Read-only mode (default: false) |
TIMEOUT_SECONDS |
No |
HTTP timeout in seconds (default: 30) |
Knox Authentication (CDP)
| Property |
Required |
Details |
KNOX_GATEWAY_URL |
Yes* |
Knox gateway URL (e.g., https://host:8444/gateway/ssb) |
KNOX_TOKEN |
Yes* |
Knox JWT token for authentication |
KNOX_COOKIE |
No |
Alternative: provide full cookie string instead of token |
KNOX_PASSCODE_TOKEN |
No |
Alternative: Knox passcode token (auto-exchanged for JWT) |
KNOX_USER |
No |
Knox username for basic auth |
KNOX_PASSWORD |
No |
Knox password for basic auth |
KNOX_VERIFY_SSL |
No |
Verify SSL certificates (default: true) |
KNOX_CA_BUNDLE |
No |
Path to CA certificate bundle |
SSB_READONLY |
No |
Read-only mode (default: true) |
TIMEOUT_SECONDS |
No |
HTTP timeout in seconds (default: 30) |
* Either SSB_API_BASE (for direct) or KNOX_GATEWAY_URL (for Knox) is required
Example Functionality
The SSB MCP Server provides comprehensive access to SQL Stream Builder through Claude Desktop. Here are some visual examples of the functionality:
Standard Brand Box Versions:
- - Top positioned with yellow text
Available Tools
🔧 Advanced Job Management
get_job_events(job_id) - Get detailed job event history and timeline
get_job_state(job_id) - Get comprehensive job state information
get_job_mv_endpoints(job_id) - Get materialized view endpoints for a job
create_job_mv_endpoint(job_id, mv_config) - Create or update materialized view endpoint
copy_job(job_id) - Duplicate an existing job
copy_data_source(data_source_id) - Clone a data source
📊 Monitoring & Diagnostics
get_diagnostic_counters() - Get system performance counters and diagnostics
get_heartbeat() - Check system health and connectivity
analyze_sql(sql_query) - Analyze SQL query without execution (syntax, performance)
🗂️ Enhanced Table Management
list_tables_detailed() - Get comprehensive table information
get_table_tree() - Get hierarchical table structure organized by catalog
validate_data_source(data_source_config) - Validate data source configuration
create_table_detailed(table_config) - Create table with full configuration
get_table_details(table_id) - Get detailed information about a specific table
🔌 Connector & Format Management
list_data_formats() - List all available data formats
get_data_format_details(format_id) - Get detailed information about a specific data format
create_data_format(format_config) - Create a new data format
get_connector_jar(connector_type) - Get connector JAR information
get_connector_type_details(connector_type) - Get detailed connector type information
get_connector_details(connector_id) - Get detailed connector information
👤 User & Project Management
get_user_settings() - Get user preferences and settings
update_user_settings(settings) - Update user configuration
list_projects() - List available projects
get_project_details(project_id) - Get project information
create_project(project_config) - Create a new project
get_user_info() - Get current user information
🔑 API Key Management
list_api_keys() - List user API keys
create_api_key(key_config) - Create new API key
delete_api_key(key_id) - Delete API key
get_api_key_details(key_id) - Get API key information
🌍 Environment Management
list_environments() - List available environments
activate_environment(env_id) - Activate/switch to an environment
get_environment_details(env_id) - Get environment configuration
create_environment(env_config) - Create new environment
deactivate_environment() - Deactivate current environment
🔄 Sync & Configuration
get_sync_config() - Get sync configuration
update_sync_config(config) - Update sync configuration
delete_sync_config() - Delete sync configuration
validate_sync_config(project) - Validate sync configuration for a project
export_project(project) - Export project configuration
import_project(project, config) - Import project configuration
📈 UDF Management
list_udfs_detailed() - Get comprehensive UDF information
run_udf(udf_id, parameters) - Execute UDF function
get_udf_artifacts() - Get UDF artifacts and dependencies
create_udf(udf_config) - Create custom UDF
update_udf(udf_id, udf_config) - Update UDF configuration
get_udf_details(udf_id) - Get detailed UDF information
get_udf_artifact_details(artifact_id) - Get UDF artifact details
get_udf_artifact_by_type(artifact_type) - Get UDF artifacts by type
Stream Management
list_streams() - List all SQL streams (jobs)
get_stream(stream_name) - Get details of a specific stream
create_stream(stream_name, sql_query, description?) - Create new stream (write mode)
update_stream(stream_name, sql_query, description?) - Update an existing stream
delete_stream(stream_name) - Delete a stream
start_stream(stream_name) - Start a stream
stop_stream(stream_name) - Stop a stream
Query Execution & Sample Data
execute_query(sql_query, limit?) - Execute SQL query and create SSB job
execute_query_with_sampling(sql_query, sample_interval, sample_count, window_size, sample_all_messages) - Execute query with custom sampling
get_job_status(job_id) - Get status of a specific SSB job
get_job_sample(sample_id) - Get sample data from a job execution
get_job_sample_by_id(job_id) - Get sample data from a job by job ID
list_jobs_with_samples() - List all jobs with their sample information
Job Management & Control
stop_job(job_id, savepoint) - Stop a specific SSB job
execute_job(job_id, sql_query) - Execute/restart a job with new SQL
restart_job_with_sampling(job_id, sql_query, sample_interval, sample_all_messages) - Restart job with sampling options
configure_sampling(sample_id, sample_interval, sample_count, window_size, sample_all_messages) - Configure sampling parameters
Data Sources & Schema
list_tables() - List all available tables (data sources)
get_table_schema(table_name) - Get table schema information
create_kafka_table(table_name, topic, kafka_connector_type, bootstrap_servers, format_type, scan_startup_mode, additional_properties?) - Create new table with local-kafka enforcement
register_kafka_table(table_name, topic, schema_fields?, use_ssb_prefix?, catalog?, database?) - Register Kafka table in Flink catalog (makes it queryable)
validate_kafka_connector(kafka_connector_type) - Validate that a connector type is local-kafka
Functions and Connectors
list_udfs() - List user-defined functions
get_udf(udf_name) - Get UDF details
list_connectors() - List available connectors
get_connector(connector_name) - Get connector details
Kafka Integration
list_topics() - List Kafka topics
get_topic(topic_name) - Get topic details
Cluster Management
get_cluster_info() - Get cluster information
get_cluster_health() - Get cluster health status
get_ssb_info() - Get SSB version and system info
Example Usage
Once configured, you can ask Claude questions like:
Basic Information
- "What version of SSB am I running?"
- "List all my SQL streams"
- "What tables are available for querying?"
- "What connectors are available?"
- "List all Kafka topics"
- "How is the cluster health?"
Claude can show all available tables in the SSB environment, including both built-in and custom tables.
Query Execution & Data Access
- "Execute this query: SELECT * FROM NVDA"
- "Create a job with sample all messages: SELECT * FROM NVDA"
- "Show me the status of job 1234"
Claude can list all running SSB jobs, showing their status, creation time, and details.
- "Get sample data from job 1234"
- "List all jobs with their sample information"
- "Show me the latest data from the ReadNVDA job"
Job Management & Control
- "Stop job 1234"
- "Restart job 1234 with sample all messages"
- "Restart job 1234 with custom sampling (500ms interval)"
- "Configure sampling for job 1234 to sample all messages"
Claude can create new SSB jobs by executing SQL queries, with full job management capabilities.
Stream Management
- "Create a new stream called 'sales_analysis' with this SQL: SELECT * FROM sales WHERE amount > 1000"
- "Show me the details of the 'user_events' stream"
- "What's the status of my 'sales_stream'?"
Kafka Table Management
- "Create a local Kafka table called 'user_events' from topic 'user-events'"
- "Register a Kafka table in the Flink catalog to make it queryable"
- "Create a local Kafka table with JSON format"
Claude can create new virtual tables connected to Kafka topics, with proper schema and connector configuration.
- "Validate if 'local-kafka' is a valid connector type"
- "Create a virtual table for real-time data streaming"
Advanced Job Management
- "Show me the event history for job 1234"
- "Get the detailed state of job 1234"
- "Copy job 1234 to create a new job"
- "Clone the data source for table 'user_events'"
- "Get materialized view endpoints for job 1234"
- "Create a materialized view endpoint for job 1234"
Materialized Views
- "Get materialized view endpoints for job 1234"
- "Create a materialized view endpoint for job 1234"
⚠️ Important Note: Materialized Views (MVs) must be created through the SSB UI interface. The MCP server can retrieve data from existing materialized views but cannot create new ones programmatically. To create a materialized view:
- Navigate to your job in the SSB UI
- Go to the Materialized Views section
- Configure and create the MV through the UI
- Use the MCP server to query the created MV data
Monitoring & Diagnostics
- "Check the system heartbeat and health"
- "Show me the diagnostic counters"
- "Analyze this SQL query for performance: SELECT * FROM NVDA WHERE close > 100"
- "What's the current system performance?"
Enhanced Table Management
- "Show me detailed information about all tables"
- "Get the hierarchical table structure by catalog"
- "Validate this data source configuration"
- "Create a new table with full configuration"
- "Get detailed information about table 'user_events'"
Claude can provide detailed information about specific tables, including their schema and configuration.
User & Project Management
- "Show me my user settings and preferences"
- "Update my user settings to enable dark mode"
- "List all available projects"
- "Create a new project called 'analytics'"
- "Get details about project 'ffffffff'"
- "Show me my user information"
API Key Management
- "List all my API keys"
- "Create a new API key for external access"
- "Delete API key 'key123'"
- "Get details about API key 'key123'"
Environment Management
- "List all available environments"
- "Switch to environment 'production'"
- "Create a new environment called 'staging'"
- "Get details about environment 'dev'"
- "Deactivate the current environment"
Sync & Configuration
- "Show me the current sync configuration"
- "Update the sync configuration for Git integration"
- "Export project 'analytics' configuration"
- "Import project configuration from Git"
- "Validate sync configuration for project 'test'"
UDF Management
- "List all user-defined functions with details"
- "Run UDF 'custom_aggregate' with parameters"
- "Create a new UDF for data transformation"
- "Update UDF 'my_function' configuration"
- "Get UDF artifacts and dependencies"
Sample Data Examples
The MCP server can retrieve real-time streaming data samples with different sampling modes:
Claude can retrieve real-time sample data from running jobs, showing actual streaming data.
Periodic Sampling (default):
{
"records": [
{
"___open": "185.0919",
"___high": "185.1200",
"___low": "184.9400",
"___close": "184.9700",
"___volume": "61884",
"eventTimestamp": "2025-10-08T18:34:10.915Z"
}
],
"job_status": "RUNNING",
"end_of_samples": false,
"message": "Retrieved 1 sample records"
}
Sample All Messages Mode:
{
"sampling_mode": "sample_all_messages",
"sample_interval": 0,
"sample_count": 10000,
"window_size": 10000,
"message": "Job created with comprehensive sampling enabled"
}
SQL Query Features
- Automatic Semicolon Handling: All SQL queries are automatically terminated with semicolons
- Flexible Sampling: Choose between periodic sampling or sample all messages
- Job Control: Start, stop, and restart jobs with different configurations
- Real-time Data: Access streaming data samples from running jobs
Advanced Features
Sample All Messages
For comprehensive data sampling, use the sample_all_messages=True option:
execute_query_with_sampling("SELECT * FROM NVDA", sample_all_messages=True)
restart_job_with_sampling(1234, "SELECT * FROM NVDA", sample_all_messages=True)
Configuration:
sample_interval: 0 (sample immediately)
sample_count: 10000 (high count to capture all messages)
window_size: 10000 (large window for comprehensive sampling)
Custom Sampling Configuration
Fine-tune sampling behavior for your specific needs:
execute_query_with_sampling("SELECT * FROM NVDA",
sample_interval=500,
sample_count=500,
window_size=500)
configure_sampling("sample_id",
sample_interval=200,
sample_count=1000,
window_size=1000)
Job Management
Complete job lifecycle management:
stop_job(1234, savepoint=True)
execute_job(1234, "SELECT * FROM NEW_TABLE")
restart_job_with_sampling(1234, "SELECT * FROM NVDA",
sample_interval=1000,
sample_all_messages=False)
Kafka Table Creation
Create tables that are restricted to local-kafka connector only:
create_kafka_table("user_events", "user-events")
register_kafka_table("user_events", "user-events")
custom_schema = [
{"name": "id", "type": "STRING"},
{"name": "name", "type": "STRING"},
{"name": "timestamp", "type": "TIMESTAMP"}
]
register_kafka_table("custom_table", "custom-topic", custom_schema)
register_kafka_table("raw_data", "raw-topic", use_ssb_prefix=False)
register_kafka_table("custom_table", "custom-topic", catalog="default_catalog", database="default_database")
create_kafka_table("local_data", "local-topic", "local-kafka", "localhost:9092", "json", "earliest-offset")
register_kafka_table("local_data", "local-topic")
validate_kafka_connector("local-kafka")
validate_kafka_connector("kafka")
Two-Step Process: Creating a Kafka table requires two steps:
- Create Data Source: Use
create_kafka_table() to create the data source configuration
- Register in Catalog: Use
register_kafka_table() to make the table queryable
Automatic Registration: The register_kafka_table() function:
- Uses DDL to register the table in the Flink catalog (default:
ssb.ssb_default, falls back to default_catalog.ssb_default)
- Configurable catalog and database parameters for flexible namespace control
- Automatic catalog fallback when requested catalog is not available
- Creates tables in the same database as existing tables like NVDA by default
- Automatically adds
ssb_ prefix to table names (configurable)
- Automatically creates a schema based on the topic data
- Verifies the table is available for querying by checking the correct database context
- Returns confirmation of successful registration with full table name, catalog, and database info
Naming Conventions:
- Default: Tables get
ssb_ prefix automatically (e.g., user_events → ssb_user_events)
- Override: Use
use_ssb_prefix=False to disable prefix
- Existing: Tables already starting with
ssb_ are not modified
Namespace Configuration:
- Default: Tables created in
ssb.ssb_default namespace (falls back to default_catalog.ssb_default if ssb catalog unavailable)
- Custom Catalog: Use
catalog parameter to specify different catalog
- Custom Database: Use
database parameter to specify different database
- Automatic Fallback: System automatically falls back to
default_catalog if requested catalog is not available
- Full Control: Both catalog and database are configurable for maximum flexibility
Verification:
- Use
SHOW TABLES; to confirm tables are available for querying
- Tables are created in
default_catalog.ssb_default namespace (same as NVDA)
- Use full namespace (
default_catalog.ssb_default.TABLE_NAME) or switch database context with USE default_catalog.ssb_default;
- All virtual Kafka tables are co-located with existing tables for easy querying
Supported Connector:
local-kafka - Local Kafka connector (only option for virtual tables)
Supported Formats:
json - JSON format (default)
csv - CSV format
avro - Apache Avro format
- Custom format strings
Docker Compose Setup
The repository includes a complete Docker Compose setup for local development and testing:
Services Included
- PostgreSQL: Database for SSB metadata
- Kafka: Message streaming platform
- Flink: Stream processing engine
- NiFi: Data flow management
- Qdrant: Vector database
- SSB SSE: SQL Stream Builder Streaming SQL Engine
- SSB MVE: SQL Stream Builder Materialized View Engine
- Apache Knox: Gateway for secure access (optional)
Starting the Environment
docker-compose up -d
docker-compose ps
docker-compose logs -f ssb-sse
Access Points
- SSB SSE: http://localhost:18121
- SSB MVE: http://localhost:18131
- Flink Job Manager: http://localhost:8081
- NiFi: http://localhost:8080
- Knox Gateway: https://localhost:8444 (if enabled)
Write Operations
By default, the server runs in read-only mode for CDP deployments and write-enabled for standalone deployments. To change this:
- Set
SSB_READONLY=false (enable writes) or SSB_READONLY=true (read-only)
- Restart the MCP server
Write operations include:
- Creating, updating, and deleting streams
- Executing SQL queries that create jobs
- Managing job lifecycle (start, stop, restart)
- Configuring sampling parameters
- Job control and management
- Creating Kafka-only tables (enforced validation)
🔧 Technical Details
Testing
The SSB MCP Server includes a comprehensive test suite. See Testing/README.md for detailed testing documentation, including:
- Quick functionality tests
- Comprehensive test suite covering all 80+ MCP tools
- Cloud SSB test protocols
- Test configuration and best practices
- Detailed test results and analysis
Quick Start:
cd Testing && python run_tests.py --quick
Troubleshooting
Common Issues
-
"Unauthorized" errors: Check your authentication credentials
- For direct SSB: Verify
SSB_USER and SSB_PASSWORD
- For Knox: Verify
KNOX_TOKEN or KNOX_USER/KNOX_PASSWORD
-
"Connection refused" errors: Ensure SSB services are running
- Check
docker-compose ps for service status
- Verify port mappings in docker-compose.yml
-
"No sample data available": Jobs may need time to produce data
- Check job status with
get_job_status(job_id)
- Verify the job is running and has sample configuration
- Try using
sample_all_messages=True for comprehensive sampling
-
Job restart failures: If job restart fails
- Use
restart_job_with_sampling() instead of execute_job()
- Check if the job is in a state that allows restarting
- Create a new job if restart is not possible
-
SSL certificate errors: For Knox deployments
- Set
KNOX_VERIFY_SSL=false for self-signed certificates
- Or provide proper CA bundle with
KNOX_CA_BUNDLE
-
Kafka table creation errors: If table creation fails
- Verify that only local-kafka connector is used (enforced)
- Check that the Kafka topic exists and is accessible
- Ensure bootstrap servers are correctly configured
- Use
validate_kafka_connector() to check connector validity
-
Virtual tables not available for querying: After creating Kafka tables
- Important: Creating a data source doesn't automatically make it available for querying
- Data sources need to be manually registered in the Flink catalog through the SSB UI
- Use
SHOW TABLES; to see which tables are actually available for querying
- Only tables that appear in
SHOW TABLES; can be queried via SQL
Debug Mode
Enable debug logging by setting environment variable:
export MCP_LOG_LEVEL=DEBUG
Security
- All sensitive data (passwords, tokens, secrets) is automatically redacted in responses
- Large collections are truncated to prevent overwhelming the LLM
- Read-only mode is enabled by default for CDP deployments to prevent accidental modifications
- Direct SSB authentication uses basic auth over HTTP (suitable for local development)
- SQL queries are automatically sanitized with proper semicolon termination
- Kafka table creation enforces local-kafka-only connector for data security
📄 License
Apache License 2.0