diff --git a/trpc-proto/trpc-proto-standard/src/main/java/com/tencent/trpc/proto/standard/stream/TRpcStreamResponder.java b/trpc-proto/trpc-proto-standard/src/main/java/com/tencent/trpc/proto/standard/stream/TRpcStreamResponder.java index 2bf09f1dc..8db87bc59 100644 --- a/trpc-proto/trpc-proto-standard/src/main/java/com/tencent/trpc/proto/standard/stream/TRpcStreamResponder.java +++ b/trpc-proto/trpc-proto-standard/src/main/java/com/tencent/trpc/proto/standard/stream/TRpcStreamResponder.java @@ -16,6 +16,7 @@ import com.tencent.trpc.core.exception.ErrorCode; import com.tencent.trpc.core.logger.Logger; import com.tencent.trpc.core.logger.LoggerFactory; +import com.tencent.trpc.core.rpc.CallInfo; import com.tencent.trpc.core.rpc.ProviderInvoker; import com.tencent.trpc.core.rpc.RpcContext; import com.tencent.trpc.core.rpc.RpcServerContext; @@ -104,13 +105,15 @@ protected void handleStreamInit(int streamId, ByteBuf frame) { // release reference after decoding ReferenceCountUtil.safeRelease(data); } - }); + }) + .doFinally(signal -> receivers.remove(streamId)); // use the thread pool to call the corresponding interface to prevent blocking the IO thread workerPool.execute(() -> { // Currently TRPC streaming protocol does not carry timeout field, timeout control is currently managed // by the client side RpcContext ctx = new RpcServerContext(); + fillCallInfo(ctx.getCallInfo(), requestMeta); requestMeta.getTransInfoMap().forEach((key, val) -> ctx.getReqAttachMap().put(key, val.toByteArray())); Publisher resp; @@ -146,4 +149,10 @@ protected void handleStreamInit(int streamId, ByteBuf frame) { }); } + private void fillCallInfo(CallInfo callInfo, TrpcStreamInitRequestMeta requestMeta) { + callInfo.setCaller(requestMeta.getCaller().toStringUtf8()); + callInfo.setCallee(requestMeta.getCallee().toStringUtf8()); + callInfo.setCalleeMethod(requestMeta.getFunc().toStringUtf8()); + } + }