Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/three-kings-lay.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"agents": patch
---

fix saving initialize params for stateless MCP server (effects eliciations and other optional features)
33 changes: 31 additions & 2 deletions packages/agents/src/mcp/worker-transport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import type {
JSONRPCMessage,
RequestId,
RequestInfo,
MessageExtraInfo
MessageExtraInfo,
InitializeRequestParams
} from "@modelcontextprotocol/sdk/types.js";
import {
isInitializeRequest,
Expand All @@ -27,6 +28,7 @@ import type {
} from "@modelcontextprotocol/sdk/server/streamableHttp.js";

const MCP_PROTOCOL_VERSION_HEADER = "MCP-Protocol-Version";
const RESTORE_REQUEST_ID = "__restore__";

interface StreamMapping {
writer?: WritableStreamDefaultWriter<Uint8Array>;
Expand All @@ -43,6 +45,7 @@ export interface MCPStorageApi {
export interface TransportState {
sessionId?: string;
initialized: boolean;
initializeParams?: InitializeRequestParams;
}

export interface WorkerTransportOptions {
Expand Down Expand Up @@ -99,6 +102,7 @@ export class WorkerTransport implements Transport {
private stateRestored = false;
private eventStore?: EventStore;
private retryInterval?: number;
private initializeParams?: TransportState["initializeParams"];

sessionId?: string;
onclose?: () => void;
Expand Down Expand Up @@ -130,6 +134,16 @@ export class WorkerTransport implements Transport {
if (state) {
this.sessionId = state.sessionId;
this.initialized = state.initialized;

// Restore _clientCapabilities on the Server instance by replaying the original initialize request
if (state.initializeParams && this.onmessage) {
this.onmessage({
jsonrpc: "2.0",
id: RESTORE_REQUEST_ID,
method: "initialize",
params: state.initializeParams
});
}
}

this.stateRestored = true;
Expand All @@ -145,7 +159,8 @@ export class WorkerTransport implements Transport {

const state: TransportState = {
sessionId: this.sessionId,
initialized: this.initialized
initialized: this.initialized,
initializeParams: this.initializeParams
};

await Promise.resolve(this.storage.set(state));
Expand Down Expand Up @@ -538,6 +553,16 @@ export class WorkerTransport implements Transport {

this.sessionId = this.sessionIdGenerator?.();
this.initialized = true;

const initMessage = messages.find(isInitializeRequest);
if (initMessage && isInitializeRequest(initMessage)) {
this.initializeParams = {
capabilities: initMessage.params.capabilities,
clientInfo: initMessage.params.clientInfo,
protocolVersion: initMessage.params.protocolVersion
};
}

await this.saveState();

if (this.sessionId && this.onsessioninitialized) {
Expand Down Expand Up @@ -802,6 +827,10 @@ export class WorkerTransport implements Transport {
requestId = message.id;
}

if (requestId === RESTORE_REQUEST_ID) {
return;
}

if (requestId === undefined) {
if (isJSONRPCResultResponse(message) || isJSONRPCErrorResponse(message)) {
throw new Error(
Expand Down
287 changes: 287 additions & 0 deletions packages/agents/src/tests/mcp/transports/worker-transport.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -858,6 +858,293 @@ describe("WorkerTransport", () => {
});
});

describe("Client Capabilities Persistence (Serverless Restart)", () => {
it("should persist initializeParams when client sends capabilities", async () => {
const server = createTestServer();
let storedState: TransportState | undefined;

const mockStorage = {
get: async () => storedState,
set: async (state: TransportState) => {
storedState = state;
}
};

const transport = await setupTransport(server, {
sessionIdGenerator: () => "test-session",
storage: mockStorage,
enableJsonResponse: true
});

const request = new Request("http://example.com/", {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "application/json, text/event-stream"
},
body: JSON.stringify({
jsonrpc: "2.0",
id: "1",
method: "initialize",
params: {
capabilities: {
elicitation: { form: {} }
},
clientInfo: { name: "test-client", version: "1.0" },
protocolVersion: "2025-06-18"
}
})
});

const response = await transport.handleRequest(request);
await response.json();

expect(response.status).toBe(200);
expect(storedState).toBeDefined();
expect(storedState?.initializeParams).toBeDefined();
expect(
storedState?.initializeParams?.capabilities?.elicitation?.form
).toBeDefined();
expect(storedState?.initializeParams?.clientInfo).toEqual({
name: "test-client",
version: "1.0"
});
expect(storedState?.initializeParams?.protocolVersion).toBe("2025-06-18");
});

it("should restore client capabilities on Server instance after restart", async () => {
// Phase 1: Initialize with capabilities
let storedState: TransportState | undefined;
const mockStorage = {
get: async () => storedState,
set: async (state: TransportState) => {
storedState = state;
}
};

const server1 = createTestServer();
const transport1 = await setupTransport(server1, {
sessionIdGenerator: () => "test-session",
storage: mockStorage,
enableJsonResponse: true
});

const initRequest = new Request("http://example.com/", {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "application/json, text/event-stream"
},
body: JSON.stringify({
jsonrpc: "2.0",
id: "1",
method: "initialize",
params: {
capabilities: {
elicitation: { form: {} }
},
clientInfo: { name: "test-client", version: "1.0" },
protocolVersion: "2025-06-18"
}
})
});

await transport1.handleRequest(initRequest);

// Verify server1 has capabilities
expect(
server1.server.getClientCapabilities()?.elicitation?.form
).toBeDefined();

// Phase 2: Simulate serverless restart with NEW instances
const server2 = createTestServer();
const transport2 = await setupTransport(server2, {
sessionIdGenerator: () => "test-session",
storage: mockStorage,
enableJsonResponse: true
});

// Trigger state restoration by making a request
const listRequest = new Request("http://example.com/", {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "application/json, text/event-stream",
"mcp-session-id": "test-session"
},
body: JSON.stringify({
jsonrpc: "2.0",
id: "2",
method: "tools/list",
params: {}
})
});

await transport2.handleRequest(listRequest);

// Verify capabilities were restored on server2
expect(transport2.sessionId).toBe("test-session");
expect(server2.server.getClientCapabilities()).toBeDefined();
expect(
server2.server.getClientCapabilities()?.elicitation?.form
).toBeDefined();
});

it("should restore clientInfo on Server instance after restart", async () => {
let storedState: TransportState | undefined;
const mockStorage = {
get: async () => storedState,
set: async (state: TransportState) => {
storedState = state;
}
};

const server1 = createTestServer();
const transport1 = await setupTransport(server1, {
sessionIdGenerator: () => "test-session",
storage: mockStorage,
enableJsonResponse: true
});

const initRequest = new Request("http://example.com/", {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "application/json, text/event-stream"
},
body: JSON.stringify({
jsonrpc: "2.0",
id: "1",
method: "initialize",
params: {
capabilities: {},
clientInfo: { name: "my-client", version: "2.0" },
protocolVersion: "2025-06-18"
}
})
});

await transport1.handleRequest(initRequest);

// Simulate restart
const server2 = createTestServer();
const transport2 = await setupTransport(server2, {
sessionIdGenerator: () => "test-session",
storage: mockStorage,
enableJsonResponse: true
});

const listRequest = new Request("http://example.com/", {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "application/json, text/event-stream",
"mcp-session-id": "test-session"
},
body: JSON.stringify({
jsonrpc: "2.0",
id: "2",
method: "tools/list",
params: {}
})
});

await transport2.handleRequest(listRequest);

// Verify clientInfo was restored
expect(server2.server.getClientVersion()).toEqual({
name: "my-client",
version: "2.0"
});
});

it("should handle old storage format without initializeParams (backward compatibility)", async () => {
// Simulate old stored state without initializeParams field
const oldState: TransportState = {
sessionId: "old-session",
initialized: true
// No initializeParams - simulating old storage format
};

const mockStorage = {
get: async () => oldState,
set: async () => {}
};

const server = createTestServer();
const transport = await setupTransport(server, {
storage: mockStorage,
enableJsonResponse: true
});

const request = new Request("http://example.com/", {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "application/json, text/event-stream",
"mcp-session-id": "old-session"
},
body: JSON.stringify({
jsonrpc: "2.0",
id: "1",
method: "tools/list",
params: {}
})
});

// Should not throw
const response = await transport.handleRequest(request);
expect(response.status).toBe(200);

// Session restored but capabilities not available (no initializeParams)
expect(transport.sessionId).toBe("old-session");
expect(server.server.getClientCapabilities()).toBeUndefined();
});

it("should persist initializeParams with empty capabilities", async () => {
const server = createTestServer();
let storedState: TransportState | undefined;

const mockStorage = {
get: async () => storedState,
set: async (state: TransportState) => {
storedState = state;
}
};

const transport = await setupTransport(server, {
sessionIdGenerator: () => "test-session",
storage: mockStorage,
enableJsonResponse: true
});

const request = new Request("http://example.com/", {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "application/json, text/event-stream"
},
body: JSON.stringify({
jsonrpc: "2.0",
id: "1",
method: "initialize",
params: {
capabilities: {}, // Empty but present
clientInfo: { name: "test-client", version: "1.0" },
protocolVersion: "2025-06-18"
}
})
});

const response = await transport.handleRequest(request);
await response.json();

expect(response.status).toBe(200);
expect(storedState?.initializeParams).toBeDefined();
expect(storedState?.initializeParams?.capabilities).toEqual({});
});
});

describe("Session Management", () => {
it("should use custom sessionIdGenerator", async () => {
const server = createTestServer();
Expand Down
Loading