From 5fd3178b0b4b249f7eb529e904a1792e440e7035 Mon Sep 17 00:00:00 2001 From: Wang Xiao Date: Thu, 11 Dec 2025 19:57:35 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B5=81=E5=BC=8F=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E7=AB=AF=E6=B6=88=E8=B4=B9=E5=AE=8C=E5=90=8E=E7=A7=BB=E9=99=A4?= =?UTF-8?q?=E6=B5=81&=E6=B7=BB=E5=8A=A0=E8=B0=83=E7=94=A8=E4=BF=A1?= =?UTF-8?q?=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../proto/standard/stream/TRpcStreamResponder.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/trpc-proto/trpc-proto-standard/src/main/java/com/tencent/trpc/proto/standard/stream/TRpcStreamResponder.java b/trpc-proto/trpc-proto-standard/src/main/java/com/tencent/trpc/proto/standard/stream/TRpcStreamResponder.java index 2bf09f1dc..8db87bc59 100644 --- a/trpc-proto/trpc-proto-standard/src/main/java/com/tencent/trpc/proto/standard/stream/TRpcStreamResponder.java +++ b/trpc-proto/trpc-proto-standard/src/main/java/com/tencent/trpc/proto/standard/stream/TRpcStreamResponder.java @@ -16,6 +16,7 @@ import com.tencent.trpc.core.exception.ErrorCode; import com.tencent.trpc.core.logger.Logger; import com.tencent.trpc.core.logger.LoggerFactory; +import com.tencent.trpc.core.rpc.CallInfo; import com.tencent.trpc.core.rpc.ProviderInvoker; import com.tencent.trpc.core.rpc.RpcContext; import com.tencent.trpc.core.rpc.RpcServerContext; @@ -104,13 +105,15 @@ protected void handleStreamInit(int streamId, ByteBuf frame) { // release reference after decoding ReferenceCountUtil.safeRelease(data); } - }); + }) + .doFinally(signal -> receivers.remove(streamId)); // use the thread pool to call the corresponding interface to prevent blocking the IO thread workerPool.execute(() -> { // Currently TRPC streaming protocol does not carry timeout field, timeout control is currently managed // by the client side RpcContext ctx = new RpcServerContext(); + fillCallInfo(ctx.getCallInfo(), requestMeta); requestMeta.getTransInfoMap().forEach((key, val) -> ctx.getReqAttachMap().put(key, val.toByteArray())); Publisher resp; @@ -146,4 +149,10 @@ protected void handleStreamInit(int streamId, ByteBuf frame) { }); } + private void fillCallInfo(CallInfo callInfo, TrpcStreamInitRequestMeta requestMeta) { + callInfo.setCaller(requestMeta.getCaller().toStringUtf8()); + callInfo.setCallee(requestMeta.getCallee().toStringUtf8()); + callInfo.setCalleeMethod(requestMeta.getFunc().toStringUtf8()); + } + }