From eadc43eaaa073d8e24ca00cd6f69814a0a250228 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 14 Apr 2022 19:08:41 -0400 Subject: [PATCH 01/20] feature: grpc compression --- .../pubsub_v1/publisher/_batch/thread.py | 8 +++++ .../publisher/_sequencer/ordered_sequencer.py | 6 +++- .../_sequencer/unordered_sequencer.py | 8 +++-- google/cloud/pubsub_v1/publisher/client.py | 6 +++- google/cloud/pubsub_v1/types.py | 3 ++ google/pubsub_v1/services/publisher/client.py | 2 ++ samples/snippets/publisher.py | 33 +++++++++++++++++++ 7 files changed, 62 insertions(+), 4 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index 8b868eaee..f2843018b 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -13,12 +13,14 @@ # limitations under the License. from __future__ import absolute_import +import functools import logging import threading import time import typing from typing import Any, Callable, List, Optional, Sequence +from xmlrpc.client import Transport import google.api_core.exceptions from google.api_core import gapic_v1 @@ -118,6 +120,9 @@ def __init__( self._commit_retry = commit_retry self._commit_timeout = commit_timeout + self._enable_grpc_compression = self.client.publisher_options.enable_grpc_compression + self._compression_bytes_threshold = self.client.publisher_options.compression_bytes_threshold + @staticmethod def make_lock() -> threading.Lock: @@ -269,6 +274,9 @@ def _commit(self) -> None: start = time.time() batch_transport_succeeded = True + + # need to add compression here + try: # Performs retries for errors defined by the retry configuration. response = self._client._gapic_publish( diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py b/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py index 30c76a44f..03c14d92e 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py @@ -240,6 +240,8 @@ def _create_batch( self, commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT, commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, + enable_grpc_compression: bool = False, + compression_bytes_threshold: int = 240 ) -> "_batch.thread.Batch": """Create a new batch using the client's batch class and other stored settings. @@ -265,6 +267,8 @@ def publish( message: gapic_types.PubsubMessage, retry: "OptionalRetry" = gapic_v1.method.DEFAULT, timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, + enable_grpc_compression: bool = False, + compression_bytes_threshold: int = 240 ) -> futures.Future: """Publish message for this ordering key. @@ -319,7 +323,7 @@ def publish( batch = self._ordered_batches[-1] future = batch.publish(message) while future is None: - batch = self._create_batch(commit_retry=retry, commit_timeout=timeout) + batch = self._create_batch(commit_retry=retry, commit_timeout=timeout,enable_grpc_compression=enable_grpc_compression, compression_bytes_threshold=compression_bytes_threshold) self._ordered_batches.append(batch) future = batch.publish(message) diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py b/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py index 7d57aa821..fceb94742 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py @@ -93,6 +93,8 @@ def _create_batch( self, commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT, commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, + enable_grpc_compression: bool = False, + compression_bytes_threshold: int = 240 ) -> "_batch.thread.Batch": """Create a new batch using the client's batch class and other stored settings. @@ -118,6 +120,8 @@ def publish( message: gapic_types.PubsubMessage, retry: "OptionalRetry" = gapic_v1.method.DEFAULT, timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, + enable_grpc_compression: bool = False, + compression_bytes_threshold: int = 240 ) -> "futures.Future": """Batch message into existing or new batch. @@ -144,7 +148,7 @@ def publish( raise RuntimeError("Unordered sequencer already stopped.") if not self._current_batch: - newbatch = self._create_batch(commit_retry=retry, commit_timeout=timeout) + newbatch = self._create_batch(commit_retry=retry, commit_timeout=timeout, enable_grpc_compression=enable_grpc_compression, compression_bytes_threshold=compression_bytes_threshold) self._current_batch = newbatch batch = self._current_batch @@ -154,7 +158,7 @@ def publish( future = batch.publish(message) # batch is full, triggering commit_when_full if future is None: - batch = self._create_batch(commit_retry=retry, commit_timeout=timeout) + batch = self._create_batch(commit_retry=retry, commit_timeout=timeout, enable_grpc_compression=enable_grpc_compression, compression_bytes_threshold=compression_bytes_threshold) # At this point, we lose track of the old batch, but we don't # care since it's already committed (because it was full.) self._current_batch = batch diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index e3266e57f..8fb2e496f 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -158,6 +158,9 @@ def __init__( # The object controlling the message publishing flow self._flow_controller = FlowController(self.publisher_options.flow_control) + self._enable_grpc_compression = self.publisher_options.enable_grpc_compression + self._compression_bytes_threshold = self.publisher_options.compression_bytes_threshold + @classmethod def from_service_account_file( # type: ignore[override] cls, @@ -269,6 +272,7 @@ def resume_publish(self, topic: str, ordering_key: str) -> None: def _gapic_publish(self, *args, **kwargs) -> "pubsub_types.PublishResponse": """Call the GAPIC public API directly.""" + # can we add compression here? return super().publish(*args, **kwargs) def publish( # type: ignore[override] @@ -409,7 +413,7 @@ def on_publish_done(future): # Delegate the publishing to the sequencer. sequencer = self._get_or_create_sequencer(topic, ordering_key) - future = sequencer.publish(message, retry=retry, timeout=timeout) + future = sequencer.publish(message, retry=retry, timeout=timeout, enable_grpc_compression=self._enable_grpc_compression, compression_bytes_threshold=self._compression_bytes_threshold) future.add_done_callback(on_publish_done) # Create a timer thread if necessary to enforce the batching diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py index 109d4aadc..77f1d8cea 100644 --- a/google/cloud/pubsub_v1/types.py +++ b/google/cloud/pubsub_v1/types.py @@ -132,6 +132,9 @@ class PublisherOptions(NamedTuple): "Timeout settings for message publishing by the client. It should be " "compatible with :class:`~.pubsub_v1.types.TimeoutType`." ) + enable_grpc_compression: bool = False + + compression_bytes_threshold: int = 240 # Define the type class and default values for flow control settings. diff --git a/google/pubsub_v1/services/publisher/client.py b/google/pubsub_v1/services/publisher/client.py index 5dc77d2a2..319ac4797 100644 --- a/google/pubsub_v1/services/publisher/client.py +++ b/google/pubsub_v1/services/publisher/client.py @@ -764,6 +764,8 @@ def sample_publish(): gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) + self._transport. + # Send the request. response = rpc( request, diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index d6e520772..db31ec866 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -215,6 +215,39 @@ def callback(future: pubsub_v1.publisher.futures.Future) -> None: print(f"Published messages with batch settings to {topic_path}.") # [END pubsub_publisher_batch_settings] +def publish_messages_with_default_compression_threshold(project_id: str, topic_id: str) -> None: + """Publishes messages to a Pub/Sub topic with grpc compression enabled.""" + # [START pubsub_publisher_compression_settings] + from concurrent import futures + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + # Configure publisher with compression + publisher = pubsub_v1.PublisherClient(enable_grpc_compression=True, compression_bytes_threshold=240) + topic_path = publisher.topic_path(project_id, topic_id) + publish_futures = [] + + # Resolve the publish future in a separate thread. + def callback(future: pubsub_v1.publisher.futures.Future) -> None: + message_id = future.result() + print(message_id) + + for n in range(1, 10): + data_str = f"Message number {n}" + # Data must be a bytestring + data = data_str.encode("utf-8") + publish_future = publisher.publish(topic_path, data) + # Non-blocking. Allow the publisher client to batch multiple messages. + publish_future.add_done_callback(callback) + publish_futures.append(publish_future) + + futures.wait(publish_futures, return_when=futures.ALL_COMPLETED) + + print(f"Published messages with compression settings to {topic_path}.") + # [END pubsub_publisher_compression_settings] def publish_messages_with_flow_control_settings(project_id: str, topic_id: str) -> None: """Publishes messages to a Pub/Sub topic with flow control settings.""" From 7a7d62f34b44cd9977c57cacde14cc4e487374d2 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Mon, 25 Apr 2022 01:19:45 -0400 Subject: [PATCH 02/20] add compression to gapic publish, and pass compression on _commit --- .../pubsub_v1/publisher/_batch/thread.py | 30 +++++++++++++++++-- google/cloud/pubsub_v1/publisher/client.py | 1 - google/pubsub_v1/services/publisher/client.py | 2 +- .../pubsub_v1/publisher/batch/test_thread.py | 5 +++- .../publisher/test_publisher_client.py | 4 +-- 5 files changed, 35 insertions(+), 7 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index f2843018b..f71f4122e 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -28,6 +28,7 @@ from google.cloud.pubsub_v1.publisher import futures from google.cloud.pubsub_v1.publisher._batch import base from google.pubsub_v1 import types as gapic_types +import grpc if typing.TYPE_CHECKING: # pragma: NO COVER from google.cloud import pubsub_v1 @@ -275,8 +276,32 @@ def _commit(self) -> None: batch_transport_succeeded = True - # need to add compression here - + compression = None + + if self._enable_grpc_compression and gapic_types.PublishRequest( + messages=self._messages + )._pb.ByteSize() >= self._compression_bytes_threshold: + compression = grpc.Compression.Gzip + + """ + probably shouldn't set it with the channel + Client-side interceptor + class grpc.ClientCallDetails(compression=) + class grpc.UnaryUnaryClientInterceptor (client_call_details) + grpc.UnaryStreamClientInterceptor(clientcalldetails) + Multi-Callable Interfaces: + class grpc.UnaryUnaryMultiCallable[source] + Affords invoking a unary-unary RPC from client-side. + abstract __call__(request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)[source] + Synchronously invokes the underlying RPC. + abstract future(request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)[source]¶ + abstract with_call(request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)[source] + abstract __call__(request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)[source] + abstract __call__(request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)[source] + abstract future(request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)[source] + abstract with_call(request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)[source] + abstract __call__(request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)[source] + """ try: # Performs retries for errors defined by the retry configuration. response = self._client._gapic_publish( @@ -284,6 +309,7 @@ def _commit(self) -> None: messages=self._messages, retry=self._commit_retry, timeout=self._commit_timeout, + compression=compression ) except google.api_core.exceptions.GoogleAPIError as exc: # We failed to publish, even after retries, so set the exception on diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 8fb2e496f..dc7e7290d 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -272,7 +272,6 @@ def resume_publish(self, topic: str, ordering_key: str) -> None: def _gapic_publish(self, *args, **kwargs) -> "pubsub_types.PublishResponse": """Call the GAPIC public API directly.""" - # can we add compression here? return super().publish(*args, **kwargs) def publish( # type: ignore[override] diff --git a/google/pubsub_v1/services/publisher/client.py b/google/pubsub_v1/services/publisher/client.py index 319ac4797..7f23115c1 100644 --- a/google/pubsub_v1/services/publisher/client.py +++ b/google/pubsub_v1/services/publisher/client.py @@ -679,6 +679,7 @@ def publish( messages: Sequence[pubsub.PubsubMessage] = None, retry: OptionalRetry = gapic_v1.method.DEFAULT, timeout: TimeoutType = gapic_v1.method.DEFAULT, + compression: grpc.Compression = None, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.PublishResponse: r"""Adds one or more messages to the topic. Returns ``NOT_FOUND`` if @@ -764,7 +765,6 @@ def sample_publish(): gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) - self._transport. # Send the request. response = rpc( diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py index b15128489..6fc695dfb 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -143,6 +143,7 @@ def test_blocking__commit(): ], retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, + compression=None ) # Establish that all of the futures are done, and that they have the @@ -172,6 +173,7 @@ def test_blocking__commit_custom_retry(): messages=[gapic_types.PubsubMessage(data=b"This is my message.")], retry=mock.sentinel.custom_retry, timeout=gapic_v1.method.DEFAULT, + compression=None ) @@ -194,6 +196,7 @@ def test_blocking__commit_custom_timeout(): messages=[gapic_types.PubsubMessage(data=b"This is my message.")], retry=gapic_v1.method.DEFAULT, timeout=mock.sentinel.custom_timeout, + compression=None ) @@ -201,7 +204,7 @@ def test_client_api_publish_not_blocking_additional_publish_calls(): batch = create_batch(max_messages=1) api_publish_called = threading.Event() - def api_publish_delay(topic="", messages=(), retry=None, timeout=None): + def api_publish_delay(topic="", messages=(), retry=None, timeout=None, compression=None): api_publish_called.set() time.sleep(1.0) message_ids = [str(i) for i in range(len(messages))] diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 372f53015..90358f3e9 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -393,7 +393,7 @@ def test_publish_custom_retry_overrides_configured_retry(creds): client.publish(topic, b"hello!", retry=mock.sentinel.custom_retry) fake_sequencer.publish.assert_called_once_with( - mock.ANY, retry=mock.sentinel.custom_retry, timeout=mock.ANY + mock.ANY, retry=mock.sentinel.custom_retry, timeout=mock.ANY, enable_grpc_compression=False, compression_bytes_threshold=240 ) message = fake_sequencer.publish.call_args.args[0] assert message.data == b"hello!" @@ -412,7 +412,7 @@ def test_publish_custom_timeout_overrides_configured_timeout(creds): client.publish(topic, b"hello!", timeout=mock.sentinel.custom_timeout) fake_sequencer.publish.assert_called_once_with( - mock.ANY, retry=mock.ANY, timeout=mock.sentinel.custom_timeout + mock.ANY, retry=mock.ANY, timeout=mock.sentinel.custom_timeout, enable_grpc_compression=False, compression_bytes_threshold=240 ) message = fake_sequencer.publish.call_args.args[0] assert message.data == b"hello!" From 9d9b054b08fa145dbd22df69c48ab08bed1a88ac Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Mon, 25 Apr 2022 01:20:23 -0400 Subject: [PATCH 03/20] add compression parameter to publish() in generated client --- owlbot.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/owlbot.py b/owlbot.py index af5b972d2..6731bfb0a 100644 --- a/owlbot.py +++ b/owlbot.py @@ -170,6 +170,16 @@ if count != 2: raise Exception("Too many or too few replacements in pull() methods.") + # Add compression parameter to publish call in publisher client + + count = s.replace( + library / f"google/pubsub_{library.name}/services/publisher/*client.py", + "messages\: Sequence\[pubsub\.PubsubMessage\] \= None\,\nretry: OptionalRetry \= gapic_v1\.method\.DEFAULT,\ntimeout\: TimeoutType = gapic\_v1\.method\.DEFAULT", + f"g<0>\ncompression\: grpc\.Compression \= None") + + if count != 1: + raise Exception("too many or too few compression parameter replacements" ) + # Silence deprecation warnings in pull() method flattened parameter tests. s.replace( @@ -332,7 +342,7 @@ if count < 1: raise Exception(".coveragerc replacement failed.") - + # fix the package name in samples/generated_samples to reflect # the package on pypi. https://pypi.org/project/google-cloud-pubsub/ s.replace( From c0dc43663b00023f26777a80e4065a99bf109a3f Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Mon, 25 Apr 2022 02:32:07 -0400 Subject: [PATCH 04/20] Revert "add compression parameter to publish() in generated client" This reverts commit 9d9b054b08fa145dbd22df69c48ab08bed1a88ac. --- owlbot.py | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/owlbot.py b/owlbot.py index 6731bfb0a..af5b972d2 100644 --- a/owlbot.py +++ b/owlbot.py @@ -170,16 +170,6 @@ if count != 2: raise Exception("Too many or too few replacements in pull() methods.") - # Add compression parameter to publish call in publisher client - - count = s.replace( - library / f"google/pubsub_{library.name}/services/publisher/*client.py", - "messages\: Sequence\[pubsub\.PubsubMessage\] \= None\,\nretry: OptionalRetry \= gapic_v1\.method\.DEFAULT,\ntimeout\: TimeoutType = gapic\_v1\.method\.DEFAULT", - f"g<0>\ncompression\: grpc\.Compression \= None") - - if count != 1: - raise Exception("too many or too few compression parameter replacements" ) - # Silence deprecation warnings in pull() method flattened parameter tests. s.replace( @@ -342,7 +332,7 @@ if count < 1: raise Exception(".coveragerc replacement failed.") - + # fix the package name in samples/generated_samples to reflect # the package on pypi. https://pypi.org/project/google-cloud-pubsub/ s.replace( From 9f70c0cadeb71630766f498f4396b786c52c00d9 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Mon, 25 Apr 2022 09:14:37 -0400 Subject: [PATCH 05/20] removing handwritten change to client.py --- google/pubsub_v1/services/publisher/client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/google/pubsub_v1/services/publisher/client.py b/google/pubsub_v1/services/publisher/client.py index 7f23115c1..4d6dbf122 100644 --- a/google/pubsub_v1/services/publisher/client.py +++ b/google/pubsub_v1/services/publisher/client.py @@ -679,7 +679,6 @@ def publish( messages: Sequence[pubsub.PubsubMessage] = None, retry: OptionalRetry = gapic_v1.method.DEFAULT, timeout: TimeoutType = gapic_v1.method.DEFAULT, - compression: grpc.Compression = None, metadata: Sequence[Tuple[str, str]] = (), ) -> pubsub.PublishResponse: r"""Adds one or more messages to the topic. Returns ``NOT_FOUND`` if From 9489fb8979a5c869ff902bc720caba3001e7f963 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Mon, 25 Apr 2022 09:51:48 -0400 Subject: [PATCH 06/20] remove custom compression set outside of _client defaults --- .../pubsub_v1/publisher/_batch/thread.py | 24 ++----------------- .../publisher/_sequencer/ordered_sequencer.py | 6 +---- .../_sequencer/unordered_sequencer.py | 8 ++----- 3 files changed, 5 insertions(+), 33 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index f71f4122e..a0f0e9d67 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -13,14 +13,12 @@ # limitations under the License. from __future__ import absolute_import -import functools import logging import threading import time import typing from typing import Any, Callable, List, Optional, Sequence -from xmlrpc.client import Transport import google.api_core.exceptions from google.api_core import gapic_v1 @@ -276,32 +274,14 @@ def _commit(self) -> None: batch_transport_succeeded = True + # Set compression if enabled. compression = None if self._enable_grpc_compression and gapic_types.PublishRequest( messages=self._messages )._pb.ByteSize() >= self._compression_bytes_threshold: compression = grpc.Compression.Gzip - - """ - probably shouldn't set it with the channel - Client-side interceptor - class grpc.ClientCallDetails(compression=) - class grpc.UnaryUnaryClientInterceptor (client_call_details) - grpc.UnaryStreamClientInterceptor(clientcalldetails) - Multi-Callable Interfaces: - class grpc.UnaryUnaryMultiCallable[source] - Affords invoking a unary-unary RPC from client-side. - abstract __call__(request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)[source] - Synchronously invokes the underlying RPC. - abstract future(request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)[source]¶ - abstract with_call(request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)[source] - abstract __call__(request, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)[source] - abstract __call__(request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)[source] - abstract future(request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)[source] - abstract with_call(request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)[source] - abstract __call__(request_iterator, timeout=None, metadata=None, credentials=None, wait_for_ready=None, compression=None)[source] - """ + try: # Performs retries for errors defined by the retry configuration. response = self._client._gapic_publish( diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py b/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py index 03c14d92e..30c76a44f 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/ordered_sequencer.py @@ -240,8 +240,6 @@ def _create_batch( self, commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT, commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, - enable_grpc_compression: bool = False, - compression_bytes_threshold: int = 240 ) -> "_batch.thread.Batch": """Create a new batch using the client's batch class and other stored settings. @@ -267,8 +265,6 @@ def publish( message: gapic_types.PubsubMessage, retry: "OptionalRetry" = gapic_v1.method.DEFAULT, timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, - enable_grpc_compression: bool = False, - compression_bytes_threshold: int = 240 ) -> futures.Future: """Publish message for this ordering key. @@ -323,7 +319,7 @@ def publish( batch = self._ordered_batches[-1] future = batch.publish(message) while future is None: - batch = self._create_batch(commit_retry=retry, commit_timeout=timeout,enable_grpc_compression=enable_grpc_compression, compression_bytes_threshold=compression_bytes_threshold) + batch = self._create_batch(commit_retry=retry, commit_timeout=timeout) self._ordered_batches.append(batch) future = batch.publish(message) diff --git a/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py b/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py index fceb94742..7d57aa821 100644 --- a/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py +++ b/google/cloud/pubsub_v1/publisher/_sequencer/unordered_sequencer.py @@ -93,8 +93,6 @@ def _create_batch( self, commit_retry: "OptionalRetry" = gapic_v1.method.DEFAULT, commit_timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, - enable_grpc_compression: bool = False, - compression_bytes_threshold: int = 240 ) -> "_batch.thread.Batch": """Create a new batch using the client's batch class and other stored settings. @@ -120,8 +118,6 @@ def publish( message: gapic_types.PubsubMessage, retry: "OptionalRetry" = gapic_v1.method.DEFAULT, timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, - enable_grpc_compression: bool = False, - compression_bytes_threshold: int = 240 ) -> "futures.Future": """Batch message into existing or new batch. @@ -148,7 +144,7 @@ def publish( raise RuntimeError("Unordered sequencer already stopped.") if not self._current_batch: - newbatch = self._create_batch(commit_retry=retry, commit_timeout=timeout, enable_grpc_compression=enable_grpc_compression, compression_bytes_threshold=compression_bytes_threshold) + newbatch = self._create_batch(commit_retry=retry, commit_timeout=timeout) self._current_batch = newbatch batch = self._current_batch @@ -158,7 +154,7 @@ def publish( future = batch.publish(message) # batch is full, triggering commit_when_full if future is None: - batch = self._create_batch(commit_retry=retry, commit_timeout=timeout, enable_grpc_compression=enable_grpc_compression, compression_bytes_threshold=compression_bytes_threshold) + batch = self._create_batch(commit_retry=retry, commit_timeout=timeout) # At this point, we lose track of the old batch, but we don't # care since it's already committed (because it was full.) self._current_batch = batch From 650a742d68bb34ec4fdc0cfc88c2c075392750c6 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Mon, 25 Apr 2022 09:53:39 -0400 Subject: [PATCH 07/20] remove extra space --- google/pubsub_v1/services/publisher/client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/google/pubsub_v1/services/publisher/client.py b/google/pubsub_v1/services/publisher/client.py index 4d6dbf122..5dc77d2a2 100644 --- a/google/pubsub_v1/services/publisher/client.py +++ b/google/pubsub_v1/services/publisher/client.py @@ -764,7 +764,6 @@ def sample_publish(): gapic_v1.routing_header.to_grpc_metadata((("topic", request.topic),)), ) - # Send the request. response = rpc( request, From 8fc311d5768edb5370f07e84ff9689a2b360c924 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Tue, 3 May 2022 15:18:40 -0400 Subject: [PATCH 08/20] add handwritten changed to client.py --- google/pubsub_v1/services/publisher/client.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/google/pubsub_v1/services/publisher/client.py b/google/pubsub_v1/services/publisher/client.py index 5dc77d2a2..56bac74a2 100644 --- a/google/pubsub_v1/services/publisher/client.py +++ b/google/pubsub_v1/services/publisher/client.py @@ -680,6 +680,7 @@ def publish( retry: OptionalRetry = gapic_v1.method.DEFAULT, timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), + compression: grpc.Compression = None ) -> pubsub.PublishResponse: r"""Adds one or more messages to the topic. Returns ``NOT_FOUND`` if the topic does not exist. @@ -770,6 +771,7 @@ def sample_publish(): retry=retry, timeout=timeout, metadata=metadata, + compression=compression ) # Done; return the response. From 7c498cfa42ccb4d00afc8435e1dba44c9ed2c156 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Wed, 11 May 2022 10:47:34 -0400 Subject: [PATCH 09/20] remove args on sequencer --- google/cloud/pubsub_v1/publisher/client.py | 2 +- tests/unit/pubsub_v1/publisher/test_publisher_client.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index dc7e7290d..12d2f3640 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -412,7 +412,7 @@ def on_publish_done(future): # Delegate the publishing to the sequencer. sequencer = self._get_or_create_sequencer(topic, ordering_key) - future = sequencer.publish(message, retry=retry, timeout=timeout, enable_grpc_compression=self._enable_grpc_compression, compression_bytes_threshold=self._compression_bytes_threshold) + future = sequencer.publish(message, retry=retry, timeout=timeout) future.add_done_callback(on_publish_done) # Create a timer thread if necessary to enforce the batching diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 90358f3e9..ef82304a0 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -393,7 +393,7 @@ def test_publish_custom_retry_overrides_configured_retry(creds): client.publish(topic, b"hello!", retry=mock.sentinel.custom_retry) fake_sequencer.publish.assert_called_once_with( - mock.ANY, retry=mock.sentinel.custom_retry, timeout=mock.ANY, enable_grpc_compression=False, compression_bytes_threshold=240 + mock.ANY, retry=mock.sentinel.custom_retry, timeout=mock.ANY, ) message = fake_sequencer.publish.call_args.args[0] assert message.data == b"hello!" @@ -412,7 +412,7 @@ def test_publish_custom_timeout_overrides_configured_timeout(creds): client.publish(topic, b"hello!", timeout=mock.sentinel.custom_timeout) fake_sequencer.publish.assert_called_once_with( - mock.ANY, retry=mock.ANY, timeout=mock.sentinel.custom_timeout, enable_grpc_compression=False, compression_bytes_threshold=240 + mock.ANY, retry=mock.ANY, timeout=mock.sentinel.custom_timeout, ) message = fake_sequencer.publish.call_args.args[0] assert message.data == b"hello!" From e7c78cc23f0314b352dfd9dc01b103fa2e35a8ac Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Wed, 11 May 2022 10:52:52 -0400 Subject: [PATCH 10/20] fix lint publisher sample --- samples/snippets/publisher.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index db31ec866..4bd36283e 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -215,6 +215,7 @@ def callback(future: pubsub_v1.publisher.futures.Future) -> None: print(f"Published messages with batch settings to {topic_path}.") # [END pubsub_publisher_batch_settings] + def publish_messages_with_default_compression_threshold(project_id: str, topic_id: str) -> None: """Publishes messages to a Pub/Sub topic with grpc compression enabled.""" # [START pubsub_publisher_compression_settings] @@ -249,6 +250,7 @@ def callback(future: pubsub_v1.publisher.futures.Future) -> None: print(f"Published messages with compression settings to {topic_path}.") # [END pubsub_publisher_compression_settings] + def publish_messages_with_flow_control_settings(project_id: str, topic_id: str) -> None: """Publishes messages to a Pub/Sub topic with flow control settings.""" # [START pubsub_publisher_flow_control] From f057617c4eccf1b5a23549cfeae21712ecafdf12 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Wed, 11 May 2022 11:32:56 -0400 Subject: [PATCH 11/20] adding compression coverage to test_thread --- .../pubsub_v1/publisher/batch/test_thread.py | 95 ++++++++++++++++++- 1 file changed, 90 insertions(+), 5 deletions(-) diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py index 6fc695dfb..ee1543728 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -12,9 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. + import datetime import threading import time +from typing import Sequence, Union import mock import pytest @@ -30,11 +32,12 @@ from google.cloud.pubsub_v1.publisher._batch import thread from google.cloud.pubsub_v1.publisher._batch.thread import Batch from google.pubsub_v1 import types as gapic_types +import grpc -def create_client(): +def create_client(client_options: Union[types.PublisherOptions, Sequence] = ()): creds = mock.Mock(spec=credentials.Credentials) - return publisher.Client(credentials=creds) + return publisher.Client(credentials=creds, publisher_options=client_options) def create_batch( @@ -43,8 +46,8 @@ def create_batch( commit_when_full=True, commit_retry=gapic_v1.method.DEFAULT, commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, - **batch_settings -): + client_options: Union[types.PublisherOptions, Sequence] = (), + **batch_settings): """Return a batch object suitable for testing. Args: @@ -57,13 +60,15 @@ def create_batch( for the batch commit call. commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`): The timeout to apply to the batch commit call. + client_options (Union[types.PublisherOptions, Sequence]): Arguments passed on + to the :class ``~.pubsub_v1.types.publisher.Client`` constructor. batch_settings (Mapping[str, str]): Arguments passed on to the :class:``~.pubsub_v1.types.BatchSettings`` constructor. Returns: ~.pubsub_v1.publisher.batch.thread.Batch: A batch object. """ - client = create_client() + client = create_client(client_options) settings = types.BatchSettings(**batch_settings) return Batch( client, @@ -90,6 +95,14 @@ def test_client(): assert batch.client is client +def test_client_with_compression(): + client = create_client(types.PublisherOptions(enable_grpc_compression=True)) + settings = types.BatchSettings() + batch = Batch(client, "topic_name", settings) + assert batch.client is client + assert batch.client._enable_grpc_compression + + def test_commit(): batch = create_batch() @@ -154,6 +167,78 @@ def test_blocking__commit(): assert futures[1].result() == "b" +def test_blocking__commit_with_compression_at_zero_bytes(): + batch = create_batch(client_options = types.PublisherOptions(enable_grpc_compression=True, compression_bytes_threshold=0)) + futures = ( + batch.publish({"data": b"This is my message."}), + batch.publish({"data": b"This is another message."}), + ) + + # Set up the underlying API publish method to return a PublishResponse. + publish_response = gapic_types.PublishResponse(message_ids=["a", "b"]) + patch = mock.patch.object( + type(batch.client), "_gapic_publish", return_value=publish_response + ) + with patch as publish: + batch._commit() + + # Establish that the underlying API call was made with expected + # arguments. + publish.assert_called_once_with( + topic="topic_name", + messages=[ + gapic_types.PubsubMessage(data=b"This is my message."), + gapic_types.PubsubMessage(data=b"This is another message."), + ], + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, + compression=grpc.Compression.Gzip + ) + + # Establish that all of the futures are done, and that they have the + # expected values. + assert futures[0].done() + assert futures[0].result() == "a" + assert futures[1].done() + assert futures[1].result() == "b" + + +def test_blocking__commit_with_compression_at_default(): + batch = create_batch(client_options = types.PublisherOptions(enable_grpc_compression=True)) + futures = ( + batch.publish({"data": b"This is my message."}), + batch.publish({"data": b"This is another message."}), + ) + + # Set up the underlying API publish method to return a PublishResponse. + publish_response = gapic_types.PublishResponse(message_ids=["a", "b"]) + patch = mock.patch.object( + type(batch.client), "_gapic_publish", return_value=publish_response + ) + with patch as publish: + batch._commit() + + # Establish that the underlying API call was made with expected + # arguments. + publish.assert_called_once_with( + topic="topic_name", + messages=[ + gapic_types.PubsubMessage(data=b"This is my message."), + gapic_types.PubsubMessage(data=b"This is another message."), + ], + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, + compression=None + ) + + # Establish that all of the futures are done, and that they have the + # expected values. + assert futures[0].done() + assert futures[0].result() == "a" + assert futures[1].done() + assert futures[1].result() == "b" + + def test_blocking__commit_custom_retry(): batch = create_batch(commit_retry=mock.sentinel.custom_retry) batch.publish({"data": b"This is my message."}) From e3f483971ba8c0330a29b72493d65290611d5038 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Wed, 11 May 2022 11:38:12 -0400 Subject: [PATCH 12/20] linter fixes --- .../pubsub_v1/publisher/_batch/thread.py | 21 +++--- google/cloud/pubsub_v1/publisher/client.py | 4 +- google/pubsub_v1/services/publisher/client.py | 4 +- .../pubsub_v1/publisher/batch/test_thread.py | 67 ++++++++++++++++--- .../publisher/test_publisher_client.py | 8 ++- 5 files changed, 82 insertions(+), 22 deletions(-) diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py index a0f0e9d67..46bde06b5 100644 --- a/google/cloud/pubsub_v1/publisher/_batch/thread.py +++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py @@ -119,9 +119,12 @@ def __init__( self._commit_retry = commit_retry self._commit_timeout = commit_timeout - self._enable_grpc_compression = self.client.publisher_options.enable_grpc_compression - self._compression_bytes_threshold = self.client.publisher_options.compression_bytes_threshold - + self._enable_grpc_compression = ( + self.client.publisher_options.enable_grpc_compression + ) + self._compression_bytes_threshold = ( + self.client.publisher_options.compression_bytes_threshold + ) @staticmethod def make_lock() -> threading.Lock: @@ -277,11 +280,13 @@ def _commit(self) -> None: # Set compression if enabled. compression = None - if self._enable_grpc_compression and gapic_types.PublishRequest( - messages=self._messages - )._pb.ByteSize() >= self._compression_bytes_threshold: + if ( + self._enable_grpc_compression + and gapic_types.PublishRequest(messages=self._messages)._pb.ByteSize() + >= self._compression_bytes_threshold + ): compression = grpc.Compression.Gzip - + try: # Performs retries for errors defined by the retry configuration. response = self._client._gapic_publish( @@ -289,7 +294,7 @@ def _commit(self) -> None: messages=self._messages, retry=self._commit_retry, timeout=self._commit_timeout, - compression=compression + compression=compression, ) except google.api_core.exceptions.GoogleAPIError as exc: # We failed to publish, even after retries, so set the exception on diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 12d2f3640..03ee774ce 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -159,7 +159,9 @@ def __init__( self._flow_controller = FlowController(self.publisher_options.flow_control) self._enable_grpc_compression = self.publisher_options.enable_grpc_compression - self._compression_bytes_threshold = self.publisher_options.compression_bytes_threshold + self._compression_bytes_threshold = ( + self.publisher_options.compression_bytes_threshold + ) @classmethod def from_service_account_file( # type: ignore[override] diff --git a/google/pubsub_v1/services/publisher/client.py b/google/pubsub_v1/services/publisher/client.py index 56bac74a2..4c3fd95c5 100644 --- a/google/pubsub_v1/services/publisher/client.py +++ b/google/pubsub_v1/services/publisher/client.py @@ -680,7 +680,7 @@ def publish( retry: OptionalRetry = gapic_v1.method.DEFAULT, timeout: TimeoutType = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, str]] = (), - compression: grpc.Compression = None + compression: grpc.Compression = None, ) -> pubsub.PublishResponse: r"""Adds one or more messages to the topic. Returns ``NOT_FOUND`` if the topic does not exist. @@ -771,7 +771,7 @@ def sample_publish(): retry=retry, timeout=timeout, metadata=metadata, - compression=compression + compression=compression, ) # Done; return the response. diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py index ee1543728..f5e1325b4 100644 --- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py +++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py @@ -47,7 +47,8 @@ def create_batch( commit_retry=gapic_v1.method.DEFAULT, commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, client_options: Union[types.PublisherOptions, Sequence] = (), - **batch_settings): + **batch_settings +): """Return a batch object suitable for testing. Args: @@ -156,7 +157,7 @@ def test_blocking__commit(): ], retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, - compression=None + compression=None, ) # Establish that all of the futures are done, and that they have the @@ -168,7 +169,51 @@ def test_blocking__commit(): def test_blocking__commit_with_compression_at_zero_bytes(): - batch = create_batch(client_options = types.PublisherOptions(enable_grpc_compression=True, compression_bytes_threshold=0)) + batch = create_batch( + client_options=types.PublisherOptions( + enable_grpc_compression=True, compression_bytes_threshold=0 + ) + ) + futures = ( + batch.publish({"data": b"This is my message."}), + batch.publish({"data": b"This is another message."}), + ) + + # Set up the underlying API publish method to return a PublishResponse. + publish_response = gapic_types.PublishResponse(message_ids=["a", "b"]) + patch = mock.patch.object( + type(batch.client), "_gapic_publish", return_value=publish_response + ) + with patch as publish: + batch._commit() + + # Establish that the underlying API call was made with expected + # arguments. + publish.assert_called_once_with( + topic="topic_name", + messages=[ + gapic_types.PubsubMessage(data=b"This is my message."), + gapic_types.PubsubMessage(data=b"This is another message."), + ], + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, + compression=grpc.Compression.Gzip, + ) + + # Establish that all of the futures are done, and that they have the + # expected values. + assert futures[0].done() + assert futures[0].result() == "a" + assert futures[1].done() + assert futures[1].result() == "b" + + +def test_blocking__commit_with_disabled_compression_at_zero_bytes(): + batch = create_batch( + client_options=types.PublisherOptions( + enable_grpc_compression=False, compression_bytes_threshold=0 + ) + ) futures = ( batch.publish({"data": b"This is my message."}), batch.publish({"data": b"This is another message."}), @@ -192,7 +237,7 @@ def test_blocking__commit_with_compression_at_zero_bytes(): ], retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, - compression=grpc.Compression.Gzip + compression=None, ) # Establish that all of the futures are done, and that they have the @@ -204,7 +249,9 @@ def test_blocking__commit_with_compression_at_zero_bytes(): def test_blocking__commit_with_compression_at_default(): - batch = create_batch(client_options = types.PublisherOptions(enable_grpc_compression=True)) + batch = create_batch( + client_options=types.PublisherOptions(enable_grpc_compression=True) + ) futures = ( batch.publish({"data": b"This is my message."}), batch.publish({"data": b"This is another message."}), @@ -228,7 +275,7 @@ def test_blocking__commit_with_compression_at_default(): ], retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, - compression=None + compression=None, ) # Establish that all of the futures are done, and that they have the @@ -258,7 +305,7 @@ def test_blocking__commit_custom_retry(): messages=[gapic_types.PubsubMessage(data=b"This is my message.")], retry=mock.sentinel.custom_retry, timeout=gapic_v1.method.DEFAULT, - compression=None + compression=None, ) @@ -281,7 +328,7 @@ def test_blocking__commit_custom_timeout(): messages=[gapic_types.PubsubMessage(data=b"This is my message.")], retry=gapic_v1.method.DEFAULT, timeout=mock.sentinel.custom_timeout, - compression=None + compression=None, ) @@ -289,7 +336,9 @@ def test_client_api_publish_not_blocking_additional_publish_calls(): batch = create_batch(max_messages=1) api_publish_called = threading.Event() - def api_publish_delay(topic="", messages=(), retry=None, timeout=None, compression=None): + def api_publish_delay( + topic="", messages=(), retry=None, timeout=None, compression=None + ): api_publish_called.set() time.sleep(1.0) message_ids = [str(i) for i in range(len(messages))] diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index ef82304a0..0c990b7e6 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -393,7 +393,9 @@ def test_publish_custom_retry_overrides_configured_retry(creds): client.publish(topic, b"hello!", retry=mock.sentinel.custom_retry) fake_sequencer.publish.assert_called_once_with( - mock.ANY, retry=mock.sentinel.custom_retry, timeout=mock.ANY, + mock.ANY, + retry=mock.sentinel.custom_retry, + timeout=mock.ANY, ) message = fake_sequencer.publish.call_args.args[0] assert message.data == b"hello!" @@ -412,7 +414,9 @@ def test_publish_custom_timeout_overrides_configured_timeout(creds): client.publish(topic, b"hello!", timeout=mock.sentinel.custom_timeout) fake_sequencer.publish.assert_called_once_with( - mock.ANY, retry=mock.ANY, timeout=mock.sentinel.custom_timeout, + mock.ANY, + retry=mock.ANY, + timeout=mock.sentinel.custom_timeout, ) message = fake_sequencer.publish.call_args.args[0] assert message.data == b"hello!" From 03f35e8c669f2f4a7fac7f41f7144ab0a299668b Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Wed, 11 May 2022 13:09:13 -0400 Subject: [PATCH 13/20] add samples --- samples/snippets/publisher.py | 37 +++++++++++++++++++++++++++++- samples/snippets/publisher_test.py | 18 +++++++++++++++ 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index 4bd36283e..52a673070 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -227,7 +227,7 @@ def publish_messages_with_default_compression_threshold(project_id: str, topic_i # topic_id = "your-topic-id" # Configure publisher with compression - publisher = pubsub_v1.PublisherClient(enable_grpc_compression=True, compression_bytes_threshold=240) + publisher = pubsub_v1.PublisherClient(enable_grpc_compression=True) topic_path = publisher.topic_path(project_id, topic_id) publish_futures = [] @@ -250,6 +250,41 @@ def callback(future: pubsub_v1.publisher.futures.Future) -> None: print(f"Published messages with compression settings to {topic_path}.") # [END pubsub_publisher_compression_settings] +def publish_messages_with_low_compression_threshold(project_id: str, topic_id: str) -> None: + """Publishes messages to a Pub/Sub topic with grpc compression enabled.""" + # [START pubsub_publisher_compression_settings] + from concurrent import futures + from google.cloud import pubsub_v1 + + # TODO(developer) + # project_id = "your-project-id" + # topic_id = "your-topic-id" + + # Configure publisher with compression + publisher = pubsub_v1.PublisherClient(enable_grpc_compression=True, compression_bytes_threshold=0) + topic_path = publisher.topic_path(project_id, topic_id) + publish_futures = [] + + # Resolve the publish future in a separate thread. + def callback(future: pubsub_v1.publisher.futures.Future) -> None: + message_id = future.result() + print(message_id) + + for n in range(1, 10): + data_str = f"Message number {n}" + # Data must be a bytestring + data = data_str.encode("utf-8") + publish_future = publisher.publish(topic_path, data) + # Non-blocking. Allow the publisher client to batch multiple messages. + publish_future.add_done_callback(callback) + publish_futures.append(publish_future) + + futures.wait(publish_futures, return_when=futures.ALL_COMPLETED) + + print(f"Published messages with compression settings to {topic_path}.") + # [END pubsub_publisher_compression_settings] + + def publish_messages_with_flow_control_settings(project_id: str, topic_id: str) -> None: """Publishes messages to a Pub/Sub topic with flow control settings.""" diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py index cf00da98e..5618ed74b 100644 --- a/samples/snippets/publisher_test.py +++ b/samples/snippets/publisher_test.py @@ -187,6 +187,24 @@ def test_publish_with_ordering_keys( assert f"Published messages with ordering keys to {topic_path}." in out +def test_publish_with_default_compression( + topic_path: str, capsys: CaptureFixture[str] +) -> None: + publisher.publish_messages_with_default_compression_threshold(PROJECT_ID, TOPIC_ID) + + out, _ = capsys.readouterr() + assert f"Published messages with retry settings to {topic_path}." in out + + +def test_publish_with_low_compression( + topic_path: str, capsys: CaptureFixture[str] +) -> None: + publisher.publish_messages_with_default_compression_threshold(PROJECT_ID, TOPIC_ID) + + out, _ = capsys.readouterr() + assert f"Published messages with retry settings to {topic_path}." in out + + def test_resume_publish_with_error_handler( topic_path: str, capsys: CaptureFixture[str] ) -> None: From 49eb3550a563c08859a5959d21e04e87929b0789 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Wed, 11 May 2022 13:15:13 -0400 Subject: [PATCH 14/20] fix samples --- samples/snippets/publisher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py index 52a673070..87715ecfd 100644 --- a/samples/snippets/publisher.py +++ b/samples/snippets/publisher.py @@ -227,7 +227,7 @@ def publish_messages_with_default_compression_threshold(project_id: str, topic_i # topic_id = "your-topic-id" # Configure publisher with compression - publisher = pubsub_v1.PublisherClient(enable_grpc_compression=True) + publisher = pubsub_v1.PublisherClient(publisher_options=pubsub_v1.types.PublisherOptions(enable_grpc_compression=True)) topic_path = publisher.topic_path(project_id, topic_id) publish_futures = [] @@ -261,7 +261,7 @@ def publish_messages_with_low_compression_threshold(project_id: str, topic_id: s # topic_id = "your-topic-id" # Configure publisher with compression - publisher = pubsub_v1.PublisherClient(enable_grpc_compression=True, compression_bytes_threshold=0) + publisher = pubsub_v1.PublisherClient(publisher_options=pubsub_v1.types.PublisherOptions(enable_grpc_compression=True, compression_bytes_threshold=0)) topic_path = publisher.topic_path(project_id, topic_id) publish_futures = [] From 90efd389a6805e56fb15812a63874cb972c84d9d Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Wed, 11 May 2022 13:19:19 -0400 Subject: [PATCH 15/20] fix samples tests --- samples/snippets/publisher_test.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py index 5618ed74b..fa7d0a29e 100644 --- a/samples/snippets/publisher_test.py +++ b/samples/snippets/publisher_test.py @@ -193,7 +193,7 @@ def test_publish_with_default_compression( publisher.publish_messages_with_default_compression_threshold(PROJECT_ID, TOPIC_ID) out, _ = capsys.readouterr() - assert f"Published messages with retry settings to {topic_path}." in out + assert f"Published messages with compression settings to {topic_path}." in out def test_publish_with_low_compression( @@ -202,7 +202,7 @@ def test_publish_with_low_compression( publisher.publish_messages_with_default_compression_threshold(PROJECT_ID, TOPIC_ID) out, _ = capsys.readouterr() - assert f"Published messages with retry settings to {topic_path}." in out + assert f"Published messages with compression settings to {topic_path}." in out def test_resume_publish_with_error_handler( From e62ce788cd5713869f8461325a35fa2c0df3c68d Mon Sep 17 00:00:00 2001 From: Anna Cocuzzo Date: Wed, 11 May 2022 21:26:46 +0000 Subject: [PATCH 16/20] add compression to receive test --- samples/snippets/main.py | 5 ++++ samples/snippets/sponge_log.xml -v | 1 + samples/snippets/sponge_log.xml -v -s | 1 + samples/snippets/subscriber_test.py | 37 +++++++++++++++++++++++++++ 4 files changed, 44 insertions(+) create mode 100644 samples/snippets/main.py create mode 100644 samples/snippets/sponge_log.xml -v create mode 100644 samples/snippets/sponge_log.xml -v -s diff --git a/samples/snippets/main.py b/samples/snippets/main.py new file mode 100644 index 000000000..c26053c51 --- /dev/null +++ b/samples/snippets/main.py @@ -0,0 +1,5 @@ +from google.cloud import pubsub_v1 +pub_sub_client = pubsub_v1.PublisherClient() + +if __name__ == '__main__': + result = pub_sub_client.publish("projects/annaco-python-lib-test/topics/annaco-python-lib-test-topic", bytes("Some message here!", "utf8")).result() \ No newline at end of file diff --git a/samples/snippets/sponge_log.xml -v b/samples/snippets/sponge_log.xml -v new file mode 100644 index 000000000..ce5f4a560 --- /dev/null +++ b/samples/snippets/sponge_log.xml -v @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/samples/snippets/sponge_log.xml -v -s b/samples/snippets/sponge_log.xml -v -s new file mode 100644 index 000000000..f79f07b64 --- /dev/null +++ b/samples/snippets/sponge_log.xml -v -s @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index d656c6ce4..f281d56ec 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -62,6 +62,13 @@ def publisher_client() -> Generator[pubsub_v1.PublisherClient, None, None]: yield pubsub_v1.PublisherClient() +@pytest.fixture(scope="module") +def publisher_client_with_default_compression() -> Generator[pubsub_v1.PublisherClient, None, None]: + yield pubsub_v1.PublisherClient(publisher_options=pubsub_v1.types.PublisherOptions(enable_grpc_compression=True)) + +@pytest.fixture(scope="module") +def publisher_client_with_low_compression() -> Generator[pubsub_v1.PublisherClient, None, None]: + yield pubsub_v1.PublisherClient(publisher_options=pubsub_v1.types.PublisherOptions(enable_grpc_compression=True, compression_bytes_threshold=0)) @pytest.fixture(scope="module") def regional_publisher_client() -> Generator[pubsub_v1.PublisherClient, None, None]: @@ -791,6 +798,36 @@ def eventually_consistent_test() -> None: eventually_consistent_test() +def test_listen_for_errors_default_compression( + publisher_client_with_default_compression: pubsub_v1.PublisherClient, + topic: str, + subscription_async: str, + capsys: CaptureFixture[str], +) -> None: + _ = _publish_messages(publisher_client_with_default_compression, topic) + + subscriber.listen_for_errors(PROJECT_ID, SUBSCRIPTION_ASYNC, 5) + + out, _ = capsys.readouterr() + assert subscription_async in out + assert "threw an exception" in out + + +def test_listen_for_errors_low_compression( + publisher_client_with_low_compression: pubsub_v1.PublisherClient, + topic: str, + subscription_async: str, + capsys: CaptureFixture[str], +) -> None: + _ = _publish_messages(publisher_client_with_low_compression, topic) + + subscriber.listen_for_errors(PROJECT_ID, SUBSCRIPTION_ASYNC, 5) + + out, _ = capsys.readouterr() + assert subscription_async in out + assert "threw an exception" in out + + def test_receive_synchronously( publisher_client: pubsub_v1.PublisherClient, topic: str, From 1806f536b2060cf717caa0f310a5984073fd75db Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Thu, 12 May 2022 15:39:17 -0400 Subject: [PATCH 17/20] temp log --- google/pubsub_v1/services/publisher/client.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/google/pubsub_v1/services/publisher/client.py b/google/pubsub_v1/services/publisher/client.py index 4c3fd95c5..78b06edc1 100644 --- a/google/pubsub_v1/services/publisher/client.py +++ b/google/pubsub_v1/services/publisher/client.py @@ -757,7 +757,16 @@ def sample_publish(): # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. - rpc = self._transport._wrapped_methods[self._transport.publish] + + # PublisherTransport + publisher_transport = self._transport + # Dict of methods to _GapicCallables + wrapped_methods = publisher_transport._wrapped_methods + # publish method + transport_publish_method = publisher_transport.publish + # _GapicCallable( + rpc = wrapped_methods[transport_publish_method] + print("target:", rpc._target) # Certain fields should be provided within the metadata header; # add these here. From ba321b9e1d9264194dc4b01b795699fb36f35d06 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Fri, 20 May 2022 11:44:02 -0400 Subject: [PATCH 18/20] Revert "add compression to receive test" This reverts commit e62ce788cd5713869f8461325a35fa2c0df3c68d. --- samples/snippets/main.py | 5 ---- samples/snippets/sponge_log.xml -v | 1 - samples/snippets/sponge_log.xml -v -s | 1 - samples/snippets/subscriber_test.py | 37 --------------------------- 4 files changed, 44 deletions(-) delete mode 100644 samples/snippets/main.py delete mode 100644 samples/snippets/sponge_log.xml -v delete mode 100644 samples/snippets/sponge_log.xml -v -s diff --git a/samples/snippets/main.py b/samples/snippets/main.py deleted file mode 100644 index c26053c51..000000000 --- a/samples/snippets/main.py +++ /dev/null @@ -1,5 +0,0 @@ -from google.cloud import pubsub_v1 -pub_sub_client = pubsub_v1.PublisherClient() - -if __name__ == '__main__': - result = pub_sub_client.publish("projects/annaco-python-lib-test/topics/annaco-python-lib-test-topic", bytes("Some message here!", "utf8")).result() \ No newline at end of file diff --git a/samples/snippets/sponge_log.xml -v b/samples/snippets/sponge_log.xml -v deleted file mode 100644 index ce5f4a560..000000000 --- a/samples/snippets/sponge_log.xml -v +++ /dev/null @@ -1 +0,0 @@ - \ No newline at end of file diff --git a/samples/snippets/sponge_log.xml -v -s b/samples/snippets/sponge_log.xml -v -s deleted file mode 100644 index f79f07b64..000000000 --- a/samples/snippets/sponge_log.xml -v -s +++ /dev/null @@ -1 +0,0 @@ - \ No newline at end of file diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index f281d56ec..d656c6ce4 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -62,13 +62,6 @@ def publisher_client() -> Generator[pubsub_v1.PublisherClient, None, None]: yield pubsub_v1.PublisherClient() -@pytest.fixture(scope="module") -def publisher_client_with_default_compression() -> Generator[pubsub_v1.PublisherClient, None, None]: - yield pubsub_v1.PublisherClient(publisher_options=pubsub_v1.types.PublisherOptions(enable_grpc_compression=True)) - -@pytest.fixture(scope="module") -def publisher_client_with_low_compression() -> Generator[pubsub_v1.PublisherClient, None, None]: - yield pubsub_v1.PublisherClient(publisher_options=pubsub_v1.types.PublisherOptions(enable_grpc_compression=True, compression_bytes_threshold=0)) @pytest.fixture(scope="module") def regional_publisher_client() -> Generator[pubsub_v1.PublisherClient, None, None]: @@ -798,36 +791,6 @@ def eventually_consistent_test() -> None: eventually_consistent_test() -def test_listen_for_errors_default_compression( - publisher_client_with_default_compression: pubsub_v1.PublisherClient, - topic: str, - subscription_async: str, - capsys: CaptureFixture[str], -) -> None: - _ = _publish_messages(publisher_client_with_default_compression, topic) - - subscriber.listen_for_errors(PROJECT_ID, SUBSCRIPTION_ASYNC, 5) - - out, _ = capsys.readouterr() - assert subscription_async in out - assert "threw an exception" in out - - -def test_listen_for_errors_low_compression( - publisher_client_with_low_compression: pubsub_v1.PublisherClient, - topic: str, - subscription_async: str, - capsys: CaptureFixture[str], -) -> None: - _ = _publish_messages(publisher_client_with_low_compression, topic) - - subscriber.listen_for_errors(PROJECT_ID, SUBSCRIPTION_ASYNC, 5) - - out, _ = capsys.readouterr() - assert subscription_async in out - assert "threw an exception" in out - - def test_receive_synchronously( publisher_client: pubsub_v1.PublisherClient, topic: str, From 350e0abedbd248fc801bf8e6388579e392792763 Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Fri, 20 May 2022 11:44:19 -0400 Subject: [PATCH 19/20] Revert "Revert "add compression to receive test"" This reverts commit ba321b9e1d9264194dc4b01b795699fb36f35d06. --- samples/snippets/main.py | 5 ++++ samples/snippets/sponge_log.xml -v | 1 + samples/snippets/sponge_log.xml -v -s | 1 + samples/snippets/subscriber_test.py | 37 +++++++++++++++++++++++++++ 4 files changed, 44 insertions(+) create mode 100644 samples/snippets/main.py create mode 100644 samples/snippets/sponge_log.xml -v create mode 100644 samples/snippets/sponge_log.xml -v -s diff --git a/samples/snippets/main.py b/samples/snippets/main.py new file mode 100644 index 000000000..c26053c51 --- /dev/null +++ b/samples/snippets/main.py @@ -0,0 +1,5 @@ +from google.cloud import pubsub_v1 +pub_sub_client = pubsub_v1.PublisherClient() + +if __name__ == '__main__': + result = pub_sub_client.publish("projects/annaco-python-lib-test/topics/annaco-python-lib-test-topic", bytes("Some message here!", "utf8")).result() \ No newline at end of file diff --git a/samples/snippets/sponge_log.xml -v b/samples/snippets/sponge_log.xml -v new file mode 100644 index 000000000..ce5f4a560 --- /dev/null +++ b/samples/snippets/sponge_log.xml -v @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/samples/snippets/sponge_log.xml -v -s b/samples/snippets/sponge_log.xml -v -s new file mode 100644 index 000000000..f79f07b64 --- /dev/null +++ b/samples/snippets/sponge_log.xml -v -s @@ -0,0 +1 @@ + \ No newline at end of file diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py index d656c6ce4..f281d56ec 100644 --- a/samples/snippets/subscriber_test.py +++ b/samples/snippets/subscriber_test.py @@ -62,6 +62,13 @@ def publisher_client() -> Generator[pubsub_v1.PublisherClient, None, None]: yield pubsub_v1.PublisherClient() +@pytest.fixture(scope="module") +def publisher_client_with_default_compression() -> Generator[pubsub_v1.PublisherClient, None, None]: + yield pubsub_v1.PublisherClient(publisher_options=pubsub_v1.types.PublisherOptions(enable_grpc_compression=True)) + +@pytest.fixture(scope="module") +def publisher_client_with_low_compression() -> Generator[pubsub_v1.PublisherClient, None, None]: + yield pubsub_v1.PublisherClient(publisher_options=pubsub_v1.types.PublisherOptions(enable_grpc_compression=True, compression_bytes_threshold=0)) @pytest.fixture(scope="module") def regional_publisher_client() -> Generator[pubsub_v1.PublisherClient, None, None]: @@ -791,6 +798,36 @@ def eventually_consistent_test() -> None: eventually_consistent_test() +def test_listen_for_errors_default_compression( + publisher_client_with_default_compression: pubsub_v1.PublisherClient, + topic: str, + subscription_async: str, + capsys: CaptureFixture[str], +) -> None: + _ = _publish_messages(publisher_client_with_default_compression, topic) + + subscriber.listen_for_errors(PROJECT_ID, SUBSCRIPTION_ASYNC, 5) + + out, _ = capsys.readouterr() + assert subscription_async in out + assert "threw an exception" in out + + +def test_listen_for_errors_low_compression( + publisher_client_with_low_compression: pubsub_v1.PublisherClient, + topic: str, + subscription_async: str, + capsys: CaptureFixture[str], +) -> None: + _ = _publish_messages(publisher_client_with_low_compression, topic) + + subscriber.listen_for_errors(PROJECT_ID, SUBSCRIPTION_ASYNC, 5) + + out, _ = capsys.readouterr() + assert subscription_async in out + assert "threw an exception" in out + + def test_receive_synchronously( publisher_client: pubsub_v1.PublisherClient, topic: str, From 89837bf2a7ad2eb8350347629ab10e40bcba287d Mon Sep 17 00:00:00 2001 From: acocuzzo Date: Fri, 20 May 2022 11:44:43 -0400 Subject: [PATCH 20/20] Revert "temp log" This reverts commit 1806f536b2060cf717caa0f310a5984073fd75db. --- google/pubsub_v1/services/publisher/client.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/google/pubsub_v1/services/publisher/client.py b/google/pubsub_v1/services/publisher/client.py index 78b06edc1..4c3fd95c5 100644 --- a/google/pubsub_v1/services/publisher/client.py +++ b/google/pubsub_v1/services/publisher/client.py @@ -757,16 +757,7 @@ def sample_publish(): # Wrap the RPC method; this adds retry and timeout information, # and friendly error handling. - - # PublisherTransport - publisher_transport = self._transport - # Dict of methods to _GapicCallables - wrapped_methods = publisher_transport._wrapped_methods - # publish method - transport_publish_method = publisher_transport.publish - # _GapicCallable( - rpc = wrapped_methods[transport_publish_method] - print("target:", rpc._target) + rpc = self._transport._wrapped_methods[self._transport.publish] # Certain fields should be provided within the metadata header; # add these here.