🚀 WebSockets 模型上下文協議(MCP)實現
本項目聚焦於在 Cloudflare Workers 環境中,藉助 WebSocket 協議對 Model Context Protocol (MCP) 進行擴展。通過此實現,能夠很好地支持高頻交易、即時協作環境以及需要快速響應的交互式代理等即時通信場景。
🚀 快速開始
本參考實現介紹瞭如何在 Cloudflare Workers 環境中使用 WebSocket 協議來擴展 Model Context Protocol (MCP),以支持即時通信場景。
✨ 主要特性
- 請求/響應協議:每個 WebSocket 消息包含
type
(消息類型)、id
(唯一請求標識符)、method
(MCP 方法名稱)、params
(請求參數)和timestamp
(時間戳)等字段。
- 流式數據處理:支持大文件的分片傳輸,實現流式結果反饋機制,確保數據完整性和順序性。
- 心跳檢測:定期發送心跳包維持連接,監測連接狀態變化,處理斷線重連邏輯。
📦 安裝指南
文檔未提及安裝步驟,此處跳過。
💻 使用示例
基礎用法
import { MCPClient } from '@modelcontextprotocol/typescript-sdk';
class WebSocketTransport implements MCPTransport {
private ws: WebSocket;
private pendingRequests: Map<string, { resolve: (result: any) => void, reject: (error: any) => void}>;
constructor(serverUrl: string, agentId: string) {
this.ws = new WebSocket(`${serverUrl}/agent/${agentId}/websocket`);
this.pendingRequests = new Map();
this.ws.addEventListener('message', this.handleMessage.bind(this));
}
async send(method: string, params: any): Promise<any> {
return new Promise((resolve, reject) => {
const requestId = crypto.randomUUID();
this.pendingRequests.set(requestId, { resolve, reject });
this.ws.send(JSON.stringify({
type: 'mcp_request',
request: { method, params },
requestId
}));
});
}
private handleMessage(event: MessageEvent) {
const message = JSON.parse(event.data);
if (message.type === 'mcp_response' && message.requestId) {
const pending = this.pendingRequests.get(message.requestId);
if (pending) {
pending.resolve(message.result);
this.pendingRequests.delete(message.requestId);
}
}
}
}
const transport = new WebSocketTransport('wss://example.com', 'agent-123');
const client = new MCPClient({ transport });
const result = await client.invoke('add', { a: 5, b: 3 });
高級用法
class WebSocketHandler {
private ws: WebSocket;
constructor(private readonly agentId: string) {}
async handleRequest(request: Request): Promise<Response> {
const response = new Response(null, {
status: 101,
webSocket: await this.upgradeToWebSocket(request)
});
return response;
}
private upgradeToWebSocket(request: Request): Promise<WebSocket> {
}
}
📚 詳細文檔
項目結構
.
├── src/
│ ├── mcp-websocket.ts # WebSocket 運輸層實現
│ └── mcp-client.ts # MCP 客戶端擴展
└── README.md # 項目文檔
實現細節
核心組件
- WebSocket 連接管理:使用 Cloudflare Workers 的原生 WebSocket 支持,並集成到現有的 MCP 客戶端架構中。
- 雙向通信協議:具備請求/響應消息格式、流式數據處理機制以及心跳檢測與連接狀態監控功能。
- 狀態管理:使用 Durable Objects 維護 WebSocket 連接狀態,關聯會話歷史記錄與上下文。
優勢分析
對比傳統 HTTP 請求的優勢
- 即時性:保持長期連接,減少請求延遲。
- 帶寬效率:減少來回次數,提高數據傳輸效率。
- 可擴展性:支持大規模併發連接,適合高吞吐量場景。
挑戰與解決方案
主要挑戰
- 狀態管理複雜度:解決方案是使用 Durable Objects 維護會話狀態。
- 消息可靠性:實施確認機制和重傳策略。
- 錯誤處理:定義統一的錯誤編碼和反饋機制。
🔧 技術細節
核心組件
-
WebSocket 連接管理
- 使用 Cloudflare Workers 的原生 WebSocket 支持
- 集成到現有的 MCP 客戶端架構中
-
雙向通信協議
- 請求/響應消息格式
- 流式數據處理機制
- 心跳檢測與連接狀態監控
-
狀態管理
- 使用 Durable Objects 維護 WebSocket 連接狀態
- 會話歷史記錄與上下文關聯
關鍵代碼示例
客戶端側 WebSocket 使用
import { MCPClient } from '@modelcontextprotocol/typescript-sdk';
class WebSocketTransport implements MCPTransport {
private ws: WebSocket;
private pendingRequests: Map<string, { resolve: (result: any) => void, reject: (error: any) => void}>;
constructor(serverUrl: string, agentId: string) {
this.ws = new WebSocket(`${serverUrl}/agent/${agentId}/websocket`);
this.pendingRequests = new Map();
this.ws.addEventListener('message', this.handleMessage.bind(this));
}
async send(method: string, params: any): Promise<any> {
return new Promise((resolve, reject) => {
const requestId = crypto.randomUUID();
this.pendingRequests.set(requestId, { resolve, reject });
this.ws.send(JSON.stringify({
type: 'mcp_request',
request: { method, params },
requestId
}));
});
}
private handleMessage(event: MessageEvent) {
const message = JSON.parse(event.data);
if (message.type === 'mcp_response' && message.requestId) {
const pending = this.pendingRequests.get(message.requestId);
if (pending) {
pending.resolve(message.result);
this.pendingRequests.delete(message.requestId);
}
}
}
}
const transport = new WebSocketTransport('wss://example.com', 'agent-123');
const client = new MCPClient({ transport });
const result = await client.invoke('add', { a: 5, b: 3 });
服務器端 WebSocket 處理
class WebSocketHandler {
private ws: WebSocket;
constructor(private readonly agentId: string) {}
async handleRequest(request: Request): Promise<Response> {
const response = new Response(null, {
status: 101,
webSocket: await this.upgradeToWebSocket(request)
});
return response;
}
private upgradeToWebSocket(request: Request): Promise<WebSocket> {
}
}
📄 許可證
本項目遵循 MIT 協議。
Copyright (c) 2023 Your Name.
📞 聯繫方式