-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Open
Description
Search before asking
- I searched in the issues and found nothing similar.
Version
3.0.0
Minimal reproduce step
-
Publish 600k messages
-
Start 2 consumers with different subscription name and subscribe from Earliest
one with async ack
Mono.fromCompletionStage(() -> consumer.acknowledgeAsync(message))
another one with sync ack
Mono.fromRunnable(() -> consumer.acknowledge(message))
- Wait until it finished###
What did you expect to see?
msg backlog & unack message should be 0 for both acknowledge & acknowledgeAsync subscription
What did you see instead?
There are few messages in the backlog & unack message left even we received the ack callback when using acknowledgeAsync, acknowledge is working fine
Topic stats for the acknowledgeAsync subscription for reference
{
"msgRateOut" : 382.4166635563445,
"msgThroughputOut" : 1143831.4573635042,
"bytesOutCounter" : 1590183426,
"msgOutCounter" : 531437,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 397.2166640450367,
"chunkedMessageRate" : 0,
"msgBacklog" : 4,
"backlogSize" : 572603343,
"earliestMsgPublishTimeInBacklog" : 0,
"msgBacklogNoDelayed" : 4,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 0,
"unackedMessages" : 7,
"type" : "Key_Shared",
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 1706002395690,
"lastConsumedFlowTimestamp" : 1706002431797,
"lastConsumedTimestamp" : 1706002428241,
"lastAckedTimestamp" : 1706002436543,
"lastMarkDeleteAdvancedTimestamp" : 1706002000095,
"consumers" : [ {
"msgRateOut" : 382.4166635563445,
"msgThroughputOut" : 1143831.4573635042,
"bytesOutCounter" : 598198012,
"msgOutCounter" : 200000,
"msgRateRedeliver" : 0.0,
"messageAckRate" : 397.2166640450367,
"chunkedMessageRate" : 0.0,
"consumerName" : "consumer",
"availablePermits" : 1000,
"unackedMessages" : 7,
"avgMessagesPerEntry" : 15,
"blockedConsumerOnUnackedMsgs" : false,
"readPositionWhenJoining" : "1018:10411",
"address" : "/100.96.65.127:20857",
"connectedSince" : "2024-01-23T09:26:34.035732392Z",
"clientVersion" : "Pulsar-Java-v3.0.0",
"lastAckedTimestamp" : 1706002436543,
"lastConsumedTimestamp" : 1706002428241,
"lastConsumedFlowTimestamp" : 1706002431797,
"keyHashRanges" : [ "[0, 6276111]", "[6276112, 13422723]", "[13422724, 52719097]", "[52719098, 60122502]", "[60122503, 74675312]", "[74675313, 161996019]", "[161996020, 207509307]", "[207509308, 220775154]", "[220775155, 229655103]", "[229655104, 234228609]", "[234228610, 276636200]", "[276636201, 324880668]", "[324880669, 369408646]", "[369408647, 374232013]", "[374232014, 379665156]", "[379665157, 380576699]", "[380576700, 405888400]", "[405888401, 428673014]", "[428673015, 453720349]", "[453720350, 488351370]", "[488351371, 496052795]", "[496052796, 504603928]", "[504603929, 508760821]", "[508760822, 526107528]", "[526107529, 576532446]", "[576532447, 580447007]", "[580447008, 587033352]", "[587033353, 604050605]", "[604050606, 607110270]", "[607110271, 611987246]", "[611987247, 627803480]", "[627803481, 628603516]", "[628603517, 643340895]", "[643340896, 649016535]", "[649016536, 682844752]", "[682844753, 723271437]", "[723271438, 725352428]", "[725352429, 753192194]", "[753192195, 798356347]", "[798356348, 824987130]", "[824987131, 838415369]", "[838415370, 853347508]", "[853347509, 869121139]", "[869121140, 937189723]", "[937189724, 1004046645]", "[1004046646, 1013552657]", "[1013552658, 1063116829]", "[1063116830, 1072226625]", "[1072226626, 1102842607]", "[1102842608, 1113396043]", "[1113396044, 1133270607]", "[1133270608, 1149712306]", "[1149712307, 1196163934]", "[1196163935, 1218114318]", "[1218114319, 1239267311]", "[1239267312, 1283886353]", "[1283886354, 1298017483]", "[1298017484, 1300597583]", "[1300597584, 1311995628]", "[1311995629, 1407745525]", "[1407745526, 1487107354]", "[1487107355, 1500070137]", "[1500070138, 1527269282]", "[1527269283, 1579052216]", "[1579052217, 1584997034]", "[1584997035, 1595017626]", "[1595017627, 1601176083]", "[1601176084, 1618519791]", "[1618519792, 1641494763]", "[1641494764, 1656777545]", "[1656777546, 1681398228]", "[1681398229, 1697816514]", "[1697816515, 1706859249]", "[1706859250, 1720068125]", "[1720068126, 1779743735]", "[1779743736, 1784442894]", "[1784442895, 1823221256]", "[1823221257, 1824702978]", "[1824702979, 1838089487]", "[1838089488, 1857634960]", "[1857634961, 1861247796]", "[1861247797, 1863792279]", "[1863792280, 1937071475]", "[1937071476, 1941970878]", "[1941970879, 1965632398]", "[1965632399, 1970489707]", "[1970489708, 1979412755]", "[1979412756, 1983921632]", "[1983921633, 2008961115]", "[2008961116, 2016328150]", "[2016328151, 2020236760]", "[2020236761, 2023857462]", "[2023857463, 2032948319]", "[2032948320, 2045854070]", "[2045854071, 2060460824]", "[2060460825, 2067248154]", "[2067248155, 2103376046]", "[2103376047, 2127999799]", "[2127999800, 2131945474]", "[2131945475, 2143021740]" ],
"metadata" : { },
"lastAckedTime" : "2024-01-23T09:33:56.543Z",
"lastConsumedTime" : "2024-01-23T09:33:48.241Z"
} ],
"isDurable" : true,
"isReplicated" : false,
"allowOutOfOrderDelivery" : false,
"keySharedMode" : "AUTO_SPLIT",
"consumersAfterMarkDeletePosition" : { },
"nonContiguousDeletedMessagesRanges" : 4,
"nonContiguousDeletedMessagesRangesSerializedSize" : 71,
"delayedMessageIndexSizeInBytes" : 0,
"subscriptionProperties" : { },
"filterProcessedMsgCount" : 0,
"filterAcceptedMsgCount" : 0,
"filterRejectedMsgCount" : 0,
"filterRescheduledMsgCount" : 0,
"durable" : true,
"replicated" : false
}
Anything else?
we run for multiple times, and every time there are few backlog & unack message left for the acknowledgeAsync subscription
Are you willing to submit a PR?
- I'm willing to submit a PR!
mcanalesmayo and GmaD-X
Metadata
Metadata
Assignees
Labels
No labels