From dcc82fe8d942e85f37da2eca50d650f567ad2217 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 24 Dec 2025 16:44:44 +0800 Subject: [PATCH 1/2] Improve asynchronous producer with more options for creation and send --- pulsar/asyncio.py | 197 +++++++++++++++++++++++++++++++++++++++++- src/producer.cc | 3 +- tests/asyncio_test.py | 44 +++++++++- 3 files changed, 240 insertions(+), 4 deletions(-) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 01246c6..0e24424 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -25,11 +25,15 @@ import asyncio import functools -from typing import Any, List, Union +from typing import Any, Callable, List, Union import _pulsar from _pulsar import ( InitialPosition, + CompressionType, + PartitionsRoutingMode, + BatchingType, + ProducerAccessMode, RegexSubscriptionMode, ConsumerCryptoFailureAction, ) @@ -84,7 +88,17 @@ def __init__(self, producer: _pulsar.Producer, schema: pulsar.schema.Schema) -> self._producer = producer self._schema = schema - async def send(self, content: Any) -> pulsar.MessageId: + # pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments + async def send(self, content: Any, + properties: dict | None = None, + partition_key: str | None = None, + ordering_key: str | None = None, + sequence_id: int | None = None, + replication_clusters: List[str] | None = None, + disable_replication: bool | None = None, + event_timestamp: int | None = None, + deliver_at: int | None = None, + deliver_after: int | None = None) -> pulsar.MessageId: """ Send a message asynchronously. @@ -93,6 +107,28 @@ async def send(self, content: Any) -> pulsar.MessageId: content: Any The message payload, whose type should respect the schema defined in `Client.create_producer`. + properties: dict | None + A dict of application0-defined string properties. + partition_key: str | None + Sets the partition key for the message routing. A hash of this key is + used to determine the message's topic partition. + ordering_key: str | None + Sets the ordering key for the message routing. + sequence_id: int | None + Specify a custom sequence id for the message being published. + replication_clusters: List[str] | None + Override namespace replication clusters. Note that it is the caller's responsibility + to provide valid cluster names and that all clusters have been previously configured + as topics. Given an empty list, the message will replicate per the namespace + configuration. + disable_replication: bool | None + Do not replicate this message. + event_timestamp: int | None + Timestamp in millis of the timestamp of event creation + deliver_at: int | None + Specify the message should not be delivered earlier than the specified timestamp. + deliver_after: int | None + Specify a delay in timedelta for the delivery of the messages. Returns ------- @@ -105,6 +141,27 @@ async def send(self, content: Any) -> pulsar.MessageId: """ builder = _pulsar.MessageBuilder() builder.content(self._schema.encode(content)) + + if properties is not None: + for k, v in properties.items(): + builder.property(k, v) + if partition_key is not None: + builder.partition_key(partition_key) + if ordering_key is not None: + builder.ordering_key(ordering_key) + if sequence_id is not None: + builder.sequence_id(sequence_id) + if replication_clusters is not None: + builder.replication_clusters(replication_clusters) + if disable_replication is not None: + builder.disable_replication(disable_replication) + if event_timestamp is not None: + builder.event_timestamp(event_timestamp) + if deliver_at is not None: + builder.deliver_at(deliver_at) + if deliver_after is not None: + builder.deliver_after(deliver_after) + future = asyncio.get_running_loop().create_future() self._producer.send_async(builder.build(), functools.partial(_set_future, future)) msg_id = await future @@ -115,6 +172,18 @@ async def send(self, content: Any) -> pulsar.MessageId: msg_id.batch_index(), ) + async def flush(self) -> None: + """ + Flush all the messages buffered in the producer asynchronously. + + Raises + ------ + PulsarException + """ + future = asyncio.get_running_loop().create_future() + self._producer.flush_async(functools.partial(_set_future, future, value=None)) + await future + async def close(self) -> None: """ Close the producer. @@ -127,6 +196,25 @@ async def close(self) -> None: self._producer.close_async(functools.partial(_set_future, future, value=None)) await future + def topic(self): + """ + Return the topic which producer is publishing to + """ + return self._producer.topic() + + def producer_name(self): + """ + Return the producer name which could have been assigned by the + system or specified by the client + """ + return self._producer.producer_name() + + def last_sequence_id(self): + """ + Return the last sequence id that was published by this producer. + """ + return self._producer.last_sequence_id() + class Consumer: """ The Pulsar message consumer, used to subscribe to messages from a topic. @@ -311,7 +399,28 @@ def __init__(self, service_url, **kwargs) -> None: # pylint: disable=too-many-arguments,too-many-locals,too-many-positional-arguments async def create_producer(self, topic: str, + producer_name: str | None = None, schema: pulsar.schema.Schema | None = None, + initial_sequence_id: int | None = None, + send_timeout_millis: int = 30000, + compression_type: CompressionType = CompressionType.NONE, + max_pending_messages: int = 1000, + max_pending_messages_across_partitions: int = 50000, + block_if_queue_full: bool = False, + batching_enabled: bool = True, + batching_max_messages: int = 1000, + batching_max_allowed_size_in_bytes: int = 128*1024, + batching_max_publish_delay_ms: int = 10, + chunking_enabled: bool = False, + message_routing_mode: PartitionsRoutingMode = + PartitionsRoutingMode.RoundRobinDistribution, + lazy_start_partitioned_producers: bool = False, + properties: dict | None = None, + batching_type: BatchingType = BatchingType.Default, + encryption_key: str | None = None, + crypto_key_reader: pulsar.CryptoKeyReader | None = None, + access_mode: ProducerAccessMode = ProducerAccessMode.Shared, + message_router: Callable[[pulsar.Message, int], int] | None = None, ) -> Producer: """ Create a new producer on a given topic @@ -320,8 +429,60 @@ async def create_producer(self, topic: str, ---------- topic: str The topic name + producer_name: str | None + Specify a name for the producer. If not assigned, the system will generate a globally + unique name which can be accessed with `Producer.producer_name()`. When specifying a + name, it is up to the user to ensure that, for a given topic, the producer name is + unique across all Pulsar's clusters. schema: pulsar.schema.Schema | None, default=None Define the schema of the data that will be published by this producer. + initial_sequence_id: int | None, default=None + Set the baseline for the sequence ids for messages published by + the producer. + send_timeout_millis: int, default=30000 + If a message is not acknowledged by the server before the + send_timeout expires, an error will be reported. + compression_type: CompressionType, default=CompressionType.NONE + Set the compression type for the producer. + max_pending_messages: int, default=1000 + Set the max size of the queue holding the messages pending to + receive an acknowledgment from the broker. + max_pending_messages_across_partitions: int, default=50000 + Set the max size of the queue holding the messages pending to + receive an acknowledgment across partitions. + block_if_queue_full: bool, default=False + Set whether send operations should block when the outgoing + message queue is full. + batching_enabled: bool, default=True + Enable automatic message batching. Note that, unlike the synchronous producer API in + ``pulsar.Client.create_producer``, batching is enabled by default for the asyncio + producer. + batching_max_messages: int, default=1000 + Maximum number of messages in a batch. + batching_max_allowed_size_in_bytes: int, default=128*1024 + Maximum size in bytes of a batch. + batching_max_publish_delay_ms: int, default=10 + The batch interval in milliseconds. + chunking_enabled: bool, default=False + Enable chunking of large messages. + message_routing_mode: PartitionsRoutingMode, + default=PartitionsRoutingMode.RoundRobinDistribution + Set the message routing mode for the partitioned producer. + lazy_start_partitioned_producers: bool, default=False + Start partitioned producers lazily on demand. + properties: dict | None, default=None + Sets the properties for the producer. + batching_type: BatchingType, default=BatchingType.Default + Sets the batching type for the producer. + encryption_key: str | None, default=None + The key used for symmetric encryption. + crypto_key_reader: pulsar.CryptoKeyReader | None, default=None + Symmetric encryption class implementation. + access_mode: ProducerAccessMode, default=ProducerAccessMode.Shared + Set the type of access mode that the producer requires on the topic. + message_router: Callable[[pulsar.Message, int], int] | None, default=None + A custom message router function that takes a Message and the + number of partitions and returns the partition index. Returns ------- @@ -332,13 +493,45 @@ async def create_producer(self, topic: str, ------ PulsarException """ + if batching_enabled and chunking_enabled: + raise ValueError("Batching and chunking of messages can't be enabled together.") + if schema is None: schema = pulsar.schema.BytesSchema() schema.attach_client(self._client) future = asyncio.get_running_loop().create_future() conf = _pulsar.ProducerConfiguration() + if producer_name is not None: + conf.producer_name(producer_name) conf.schema(schema.schema_info()) + if initial_sequence_id is not None: + conf.initial_sequence_id(initial_sequence_id) + conf.send_timeout_millis(send_timeout_millis) + conf.compression_type(compression_type) + conf.max_pending_messages(max_pending_messages) + conf.max_pending_messages_across_partitions(max_pending_messages_across_partitions) + conf.block_if_queue_full(block_if_queue_full) + conf.batching_enabled(batching_enabled) + conf.batching_max_messages(batching_max_messages) + conf.batching_max_allowed_size_in_bytes(batching_max_allowed_size_in_bytes) + conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms) + conf.chunking_enabled(chunking_enabled) + conf.partitions_routing_mode(message_routing_mode) + conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers) + if properties is not None: + for k, v in properties.items(): + conf.property(k, v) + conf.batching_type(batching_type) + if encryption_key is not None: + conf.encryption_key(encryption_key) + if crypto_key_reader is not None: + conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) + conf.access_mode(access_mode) + if message_router is not None: + def default_router(msg: _pulsar.Message, num_partitions: int) -> int: + return int(msg.partition_key()) % num_partitions + conf.message_router(default_router) self._client.create_producer_async( topic, conf, functools.partial(_set_future, future) diff --git a/src/producer.cc b/src/producer.cc index 9b38016..6a9b1a5 100644 --- a/src/producer.cc +++ b/src/producer.cc @@ -82,5 +82,6 @@ void export_producer(py::module_& m) { "successfully persisted\n") .def("close", &Producer_close) .def("close_async", &Producer_closeAsync) - .def("is_connected", &Producer::isConnected); + .def("is_connected", &Producer::isConnected) + .def("flush_async", &Producer::flushAsync); } diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index 656ffba..048dc43 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -60,12 +60,18 @@ async def asyncTearDown(self) -> None: async def test_batch_end_to_end(self): topic = f'asyncio-test-batch-e2e-{time.time()}' - producer = await self._client.create_producer(topic) + producer = await self._client.create_producer(topic, + producer_name="my-producer") + self.assertEqual(producer.topic(), f'persistent://public/default/{topic}') + self.assertEqual(producer.producer_name(), "my-producer") tasks = [] for i in range(5): tasks.append(asyncio.create_task(producer.send(f'msg-{i}'.encode()))) msg_ids = await asyncio.gather(*tasks) self.assertEqual(len(msg_ids), 5) + # pylint: disable=fixme + # TODO: the result is wrong due to https://github.com/apache/pulsar-client-cpp/issues/531 + self.assertEqual(producer.last_sequence_id(), 8) ledger_id = msg_ids[0].ledger_id() entry_id = msg_ids[0].entry_id() # These messages should be in the same entry @@ -90,6 +96,42 @@ async def test_batch_end_to_end(self): msg = await consumer.receive() self.assertEqual(msg.data(), b'final-message') + async def test_send_keyed_message(self): + topic = f'asyncio-test-send-keyed-message-{time.time()}' + producer = await self._client.create_producer(topic) + consumer = await self._client.subscribe(topic, 'sub') + await producer.send(b'msg', partition_key='key0', + ordering_key="key1", properties={'my-prop': 'my-value'}) + + msg = await consumer.receive() + self.assertEqual(msg.data(), b'msg') + self.assertEqual(msg.partition_key(), 'key0') + self.assertEqual(msg.ordering_key(), 'key1') + self.assertEqual(msg.properties(), {'my-prop': 'my-value'}) + + async def test_flush(self): + topic = f'asyncio-test-flush-{time.time()}' + producer = await self._client.create_producer(topic, batching_max_messages=3, + batching_max_publish_delay_ms=60000) + tasks = [] + tasks.append(asyncio.create_task(producer.send(b'msg-0'))) + tasks.append(asyncio.create_task(producer.send(b'msg-1'))) + + done, pending = await asyncio.wait(tasks, timeout=1, return_when=asyncio.FIRST_COMPLETED) + self.assertEqual(len(done), 0) + self.assertEqual(len(pending), 2) + + # flush will trigger sending the batched messages + await producer.flush() + for task in pending: + self.assertTrue(task.done()) + msg_id0 = tasks[0].result() + msg_id1 = tasks[1].result() + self.assertEqual(msg_id0.ledger_id(), msg_id1.ledger_id()) + self.assertEqual(msg_id0.entry_id(), msg_id1.entry_id()) + self.assertEqual(msg_id0.batch_index(), 0) + self.assertEqual(msg_id1.batch_index(), 1) + async def test_create_producer_failure(self): try: await self._client.create_producer('tenant/ns/asyncio-test-send-failure') From ba6492088b84ffc8d87a6502fdd14b036ec8ecf2 Mon Sep 17 00:00:00 2001 From: Yunze Xu Date: Wed, 24 Dec 2025 19:41:42 +0800 Subject: [PATCH 2/2] address comments from copilot --- pulsar/asyncio.py | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index 0e24424..5c3178a 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -25,6 +25,7 @@ import asyncio import functools +from datetime import timedelta from typing import Any, Callable, List, Union import _pulsar @@ -98,7 +99,7 @@ async def send(self, content: Any, disable_replication: bool | None = None, event_timestamp: int | None = None, deliver_at: int | None = None, - deliver_after: int | None = None) -> pulsar.MessageId: + deliver_after: timedelta | None = None) -> pulsar.MessageId: """ Send a message asynchronously. @@ -108,7 +109,7 @@ async def send(self, content: Any, The message payload, whose type should respect the schema defined in `Client.create_producer`. properties: dict | None - A dict of application0-defined string properties. + A dict of application-defined string properties. partition_key: str | None Sets the partition key for the message routing. A hash of this key is used to determine the message's topic partition. @@ -127,7 +128,7 @@ async def send(self, content: Any, Timestamp in millis of the timestamp of event creation deliver_at: int | None Specify the message should not be delivered earlier than the specified timestamp. - deliver_after: int | None + deliver_after: timedelta | None Specify a delay in timedelta for the delivery of the messages. Returns @@ -211,7 +212,12 @@ def producer_name(self): def last_sequence_id(self): """ - Return the last sequence id that was published by this producer. + Return the last sequence id that was published and acknowledged by this producer. + + The sequence id can be either automatically assigned or custom set on the message. + After recreating a producer with the same name, this will return the sequence id + of the last message that was published in the previous session, or -1 if no + message was ever published. """ return self._producer.last_sequence_id() @@ -529,9 +535,9 @@ async def create_producer(self, topic: str, conf.crypto_key_reader(crypto_key_reader.cryptoKeyReader) conf.access_mode(access_mode) if message_router is not None: - def default_router(msg: _pulsar.Message, num_partitions: int) -> int: - return int(msg.partition_key()) % num_partitions - conf.message_router(default_router) + def underlying_router(msg: _pulsar.Message, num_partitions: int) -> int: + return message_router(pulsar.Message._wrap(msg), num_partitions) + conf.message_router(underlying_router) self._client.create_producer_async( topic, conf, functools.partial(_set_future, future)