diff --git a/Package.resolved b/Package.resolved index 5e9023c5..fb776dd5 100644 --- a/Package.resolved +++ b/Package.resolved @@ -1,10 +1,10 @@ { - "originHash" : "08de61941b7919a65e36c0e34f8c1c41995469b86a39122158b75b4a68c4527d", + "originHash" : "371f3dfcfa1201fc8d50e924ad31f9ebc4f90242924df1275958ac79df15dc12", "pins" : [ { "identity" : "eventsource", "kind" : "remoteSourceControl", - "location" : "https://github.com/loopwork-ai/eventsource.git", + "location" : "https://github.com/mattt/eventsource.git", "state" : { "revision" : "e83f076811f32757305b8bf69ac92d05626ffdd7", "version" : "1.1.0" diff --git a/Sources/MCP/Client/Client.swift b/Sources/MCP/Client/Client.swift index 696ffd14..ae3b12d7 100644 --- a/Sources/MCP/Client/Client.swift +++ b/Sources/MCP/Client/Client.swift @@ -179,43 +179,40 @@ public actor Client { // Start message handling loop task = Task { guard let connection = self.connection else { return } - repeat { - // Check for cancellation before starting the iteration - if Task.isCancelled { break } - do { - let stream = await connection.receive() - for try await data in stream { - if Task.isCancelled { break } // Check inside loop too - - // Attempt to decode data - // Try decoding as a batch response first - if let batchResponse = try? decoder.decode([AnyResponse].self, from: data) { - await handleBatchResponse(batchResponse) - } else if let response = try? decoder.decode(AnyResponse.self, from: data) { - await handleResponse(response) - } else if let message = try? decoder.decode(AnyMessage.self, from: data) { - await handleMessage(message) - } else { - var metadata: Logger.Metadata = [:] - if let string = String(data: data, encoding: .utf8) { - metadata["message"] = .string(string) - } - await logger?.warning( - "Unexpected message received by client (not single/batch response or notification)", - metadata: metadata - ) + // Get the stream once - async streams can only be iterated once. + // Previously, receive() was called inside a repeat loop, which meant + // each iteration tried to create a new stream from an already-consumed one. + let stream = await connection.receive() + + do { + for try await data in stream { + if Task.isCancelled { break } + + // Attempt to decode data + // Try decoding as a batch response first + if let batchResponse = try? decoder.decode([AnyResponse].self, from: data) { + await handleBatchResponse(batchResponse) + } else if let response = try? decoder.decode(AnyResponse.self, from: data) { + await handleResponse(response) + } else if let message = try? decoder.decode(AnyMessage.self, from: data) { + await handleMessage(message) + } else { + var metadata: Logger.Metadata = [:] + if let string = String(data: data, encoding: .utf8) { + metadata["message"] = .string(string) } + await logger?.warning( + "Unexpected message received by client (not single/batch response or notification)", + metadata: metadata + ) } - } catch let error where MCPError.isResourceTemporarilyUnavailable(error) { - try? await Task.sleep(for: .milliseconds(10)) - continue - } catch { - await logger?.error( - "Error in message handling loop", metadata: ["error": "\(error)"]) - break } - } while true + } catch { + await logger?.error( + "Error in message handling loop", metadata: ["error": "\(error)"]) + } + await self.logger?.debug("Client message handling loop task is terminating.") }