diff --git a/google/cloud/pubsub_v1/publisher/_batch/thread.py b/google/cloud/pubsub_v1/publisher/_batch/thread.py
index e872fcf2b..dbbb92197 100644
--- a/google/cloud/pubsub_v1/publisher/_batch/thread.py
+++ b/google/cloud/pubsub_v1/publisher/_batch/thread.py
@@ -26,6 +26,7 @@
from google.cloud.pubsub_v1.publisher import futures
from google.cloud.pubsub_v1.publisher._batch import base
from google.pubsub_v1 import types as gapic_types
+import grpc
if typing.TYPE_CHECKING: # pragma: NO COVER
from google.cloud import pubsub_v1
@@ -118,6 +119,12 @@ def __init__(
self._commit_retry = commit_retry
self._commit_timeout = commit_timeout
+ self._enable_grpc_compression = (
+ self.client.publisher_options.enable_grpc_compression
+ )
+ self._compression_bytes_threshold = (
+ self.client.publisher_options.compression_bytes_threshold
+ )
@staticmethod
def make_lock() -> threading.Lock:
@@ -269,6 +276,17 @@ def _commit(self) -> None:
start = time.time()
batch_transport_succeeded = True
+
+ # Set compression if enabled.
+ compression = None
+
+ if (
+ self._enable_grpc_compression
+ and gapic_types.PublishRequest(messages=self._messages)._pb.ByteSize()
+ >= self._compression_bytes_threshold
+ ):
+ compression = grpc.Compression.Gzip
+
try:
# Performs retries for errors defined by the retry configuration.
response = self._client._gapic_publish(
@@ -276,6 +294,7 @@ def _commit(self) -> None:
messages=self._messages,
retry=self._commit_retry,
timeout=self._commit_timeout,
+ compression=compression,
)
except google.api_core.exceptions.GoogleAPIError as exc:
# We failed to publish, even after retries, so set the exception on
diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py
index 3e668533d..6487b5a83 100644
--- a/google/cloud/pubsub_v1/publisher/client.py
+++ b/google/cloud/pubsub_v1/publisher/client.py
@@ -153,6 +153,11 @@ def __init__(
# The object controlling the message publishing flow
self._flow_controller = FlowController(self.publisher_options.flow_control)
+ self._enable_grpc_compression = self.publisher_options.enable_grpc_compression
+ self._compression_bytes_threshold = (
+ self.publisher_options.compression_bytes_threshold
+ )
+
@classmethod
def from_service_account_file( # type: ignore[override]
cls,
diff --git a/google/cloud/pubsub_v1/types.py b/google/cloud/pubsub_v1/types.py
index 3d071a189..a90615f02 100644
--- a/google/cloud/pubsub_v1/types.py
+++ b/google/cloud/pubsub_v1/types.py
@@ -173,6 +173,9 @@ class PublisherOptions(NamedTuple):
"Timeout settings for message publishing by the client. It should be "
"compatible with :class:`~.pubsub_v1.types.TimeoutType`."
)
+ enable_grpc_compression: bool = False
+
+ compression_bytes_threshold: int = 240
# Define the type class and default values for flow control settings.
diff --git a/google/pubsub_v1/services/publisher/client.py b/google/pubsub_v1/services/publisher/client.py
index 1a92362c5..489aa3475 100644
--- a/google/pubsub_v1/services/publisher/client.py
+++ b/google/pubsub_v1/services/publisher/client.py
@@ -737,6 +737,7 @@ def publish(
retry: OptionalRetry = gapic_v1.method.DEFAULT,
timeout: TimeoutType = gapic_v1.method.DEFAULT,
metadata: Sequence[Tuple[str, str]] = (),
+ compression: grpc.Compression = None,
) -> pubsub.PublishResponse:
r"""Adds one or more messages to the topic. Returns ``NOT_FOUND`` if
the topic does not exist.
@@ -833,6 +834,7 @@ def sample_publish():
retry=retry,
timeout=timeout,
metadata=metadata,
+ compression=compression,
)
# Done; return the response.
diff --git a/samples/snippets/main.py b/samples/snippets/main.py
new file mode 100644
index 000000000..c26053c51
--- /dev/null
+++ b/samples/snippets/main.py
@@ -0,0 +1,5 @@
+from google.cloud import pubsub_v1
+pub_sub_client = pubsub_v1.PublisherClient()
+
+if __name__ == '__main__':
+ result = pub_sub_client.publish("projects/annaco-python-lib-test/topics/annaco-python-lib-test-topic", bytes("Some message here!", "utf8")).result()
\ No newline at end of file
diff --git a/samples/snippets/publisher.py b/samples/snippets/publisher.py
index e2c63556c..45f71d71b 100644
--- a/samples/snippets/publisher.py
+++ b/samples/snippets/publisher.py
@@ -216,6 +216,76 @@ def callback(future: pubsub_v1.publisher.futures.Future) -> None:
# [END pubsub_publisher_batch_settings]
+def publish_messages_with_default_compression_threshold(project_id: str, topic_id: str) -> None:
+ """Publishes messages to a Pub/Sub topic with grpc compression enabled."""
+ # [START pubsub_publisher_compression_settings]
+ from concurrent import futures
+ from google.cloud import pubsub_v1
+
+ # TODO(developer)
+ # project_id = "your-project-id"
+ # topic_id = "your-topic-id"
+
+ # Configure publisher with compression
+ publisher = pubsub_v1.PublisherClient(publisher_options=pubsub_v1.types.PublisherOptions(enable_grpc_compression=True))
+ topic_path = publisher.topic_path(project_id, topic_id)
+ publish_futures = []
+
+ # Resolve the publish future in a separate thread.
+ def callback(future: pubsub_v1.publisher.futures.Future) -> None:
+ message_id = future.result()
+ print(message_id)
+
+ for n in range(1, 10):
+ data_str = f"Message number {n}"
+ # Data must be a bytestring
+ data = data_str.encode("utf-8")
+ publish_future = publisher.publish(topic_path, data)
+ # Non-blocking. Allow the publisher client to batch multiple messages.
+ publish_future.add_done_callback(callback)
+ publish_futures.append(publish_future)
+
+ futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
+
+ print(f"Published messages with compression settings to {topic_path}.")
+ # [END pubsub_publisher_compression_settings]
+
+def publish_messages_with_low_compression_threshold(project_id: str, topic_id: str) -> None:
+ """Publishes messages to a Pub/Sub topic with grpc compression enabled."""
+ # [START pubsub_publisher_compression_settings]
+ from concurrent import futures
+ from google.cloud import pubsub_v1
+
+ # TODO(developer)
+ # project_id = "your-project-id"
+ # topic_id = "your-topic-id"
+
+ # Configure publisher with compression
+ publisher = pubsub_v1.PublisherClient(publisher_options=pubsub_v1.types.PublisherOptions(enable_grpc_compression=True, compression_bytes_threshold=0))
+ topic_path = publisher.topic_path(project_id, topic_id)
+ publish_futures = []
+
+ # Resolve the publish future in a separate thread.
+ def callback(future: pubsub_v1.publisher.futures.Future) -> None:
+ message_id = future.result()
+ print(message_id)
+
+ for n in range(1, 10):
+ data_str = f"Message number {n}"
+ # Data must be a bytestring
+ data = data_str.encode("utf-8")
+ publish_future = publisher.publish(topic_path, data)
+ # Non-blocking. Allow the publisher client to batch multiple messages.
+ publish_future.add_done_callback(callback)
+ publish_futures.append(publish_future)
+
+ futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
+
+ print(f"Published messages with compression settings to {topic_path}.")
+ # [END pubsub_publisher_compression_settings]
+
+
+
def publish_messages_with_flow_control_settings(project_id: str, topic_id: str) -> None:
"""Publishes messages to a Pub/Sub topic with flow control settings."""
# [START pubsub_publisher_flow_control]
diff --git a/samples/snippets/publisher_test.py b/samples/snippets/publisher_test.py
index 0a6311308..f97605677 100644
--- a/samples/snippets/publisher_test.py
+++ b/samples/snippets/publisher_test.py
@@ -192,6 +192,24 @@ def test_publish_with_ordering_keys(
assert f"Published messages with ordering keys to {topic_path}." in out
+def test_publish_with_default_compression(
+ topic_path: str, capsys: CaptureFixture[str]
+) -> None:
+ publisher.publish_messages_with_default_compression_threshold(PROJECT_ID, TOPIC_ID)
+
+ out, _ = capsys.readouterr()
+ assert f"Published messages with compression settings to {topic_path}." in out
+
+
+def test_publish_with_low_compression(
+ topic_path: str, capsys: CaptureFixture[str]
+) -> None:
+ publisher.publish_messages_with_default_compression_threshold(PROJECT_ID, TOPIC_ID)
+
+ out, _ = capsys.readouterr()
+ assert f"Published messages with compression settings to {topic_path}." in out
+
+
def test_resume_publish_with_error_handler(
topic_path: str, capsys: CaptureFixture[str]
) -> None:
diff --git a/samples/snippets/sponge_log.xml -v b/samples/snippets/sponge_log.xml -v
new file mode 100644
index 000000000..ce5f4a560
--- /dev/null
+++ b/samples/snippets/sponge_log.xml -v
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/samples/snippets/sponge_log.xml -v -s b/samples/snippets/sponge_log.xml -v -s
new file mode 100644
index 000000000..f79f07b64
--- /dev/null
+++ b/samples/snippets/sponge_log.xml -v -s
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/samples/snippets/subscriber_test.py b/samples/snippets/subscriber_test.py
index 3fa94761c..3b1297365 100644
--- a/samples/snippets/subscriber_test.py
+++ b/samples/snippets/subscriber_test.py
@@ -61,6 +61,13 @@
def publisher_client() -> Generator[pubsub_v1.PublisherClient, None, None]:
yield pubsub_v1.PublisherClient()
+@pytest.fixture(scope="module")
+def publisher_client_with_default_compression() -> Generator[pubsub_v1.PublisherClient, None, None]:
+ yield pubsub_v1.PublisherClient(publisher_options=pubsub_v1.types.PublisherOptions(enable_grpc_compression=True))
+
+@pytest.fixture(scope="module")
+def publisher_client_with_low_compression() -> Generator[pubsub_v1.PublisherClient, None, None]:
+ yield pubsub_v1.PublisherClient(publisher_options=pubsub_v1.types.PublisherOptions(enable_grpc_compression=True, compression_bytes_threshold=0))
@pytest.fixture(scope="module")
def regional_publisher_client() -> Generator[pubsub_v1.PublisherClient, None, None]:
@@ -974,6 +981,36 @@ def test_listen_for_errors(
subscriber_client.delete_subscription(request={"subscription": subscription_path})
+def test_listen_for_errors_default_compression(
+ publisher_client_with_default_compression: pubsub_v1.PublisherClient,
+ topic: str,
+ subscription_async: str,
+ capsys: CaptureFixture[str],
+) -> None:
+ _ = _publish_messages(publisher_client_with_default_compression, topic)
+
+ subscriber.listen_for_errors(PROJECT_ID, SUBSCRIPTION_ASYNC, 5)
+
+ out, _ = capsys.readouterr()
+ assert subscription_async in out
+ assert "threw an exception" in out
+
+
+def test_listen_for_errors_low_compression(
+ publisher_client_with_low_compression: pubsub_v1.PublisherClient,
+ topic: str,
+ subscription_async: str,
+ capsys: CaptureFixture[str],
+) -> None:
+ _ = _publish_messages(publisher_client_with_low_compression, topic)
+
+ subscriber.listen_for_errors(PROJECT_ID, SUBSCRIPTION_ASYNC, 5)
+
+ out, _ = capsys.readouterr()
+ assert subscription_async in out
+ assert "threw an exception" in out
+
+
def test_receive_synchronously(
subscriber_client: pubsub_v1.SubscriberClient,
publisher_client: pubsub_v1.PublisherClient,
diff --git a/tests/unit/pubsub_v1/publisher/batch/test_thread.py b/tests/unit/pubsub_v1/publisher/batch/test_thread.py
index 60658b4ce..2c8a6d65b 100644
--- a/tests/unit/pubsub_v1/publisher/batch/test_thread.py
+++ b/tests/unit/pubsub_v1/publisher/batch/test_thread.py
@@ -12,10 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+
import datetime
import sys
import threading
import time
+from typing import Sequence, Union
# special case python < 3.8
if sys.version_info.major == 3 and sys.version_info.minor < 8:
@@ -36,11 +38,12 @@
from google.cloud.pubsub_v1.publisher._batch import thread
from google.cloud.pubsub_v1.publisher._batch.thread import Batch
from google.pubsub_v1 import types as gapic_types
+import grpc
-def create_client():
+def create_client(client_options: Union[types.PublisherOptions, Sequence] = ()):
creds = mock.Mock(spec=credentials.Credentials)
- return publisher.Client(credentials=creds)
+ return publisher.Client(credentials=creds, publisher_options=client_options)
def create_batch(
@@ -49,6 +52,7 @@ def create_batch(
commit_when_full=True,
commit_retry=gapic_v1.method.DEFAULT,
commit_timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT,
+ client_options: Union[types.PublisherOptions, Sequence] = (),
**batch_settings
):
"""Return a batch object suitable for testing.
@@ -63,13 +67,15 @@ def create_batch(
for the batch commit call.
commit_timeout (:class:`~.pubsub_v1.types.TimeoutType`):
The timeout to apply to the batch commit call.
+ client_options (Union[types.PublisherOptions, Sequence]): Arguments passed on
+ to the :class ``~.pubsub_v1.types.publisher.Client`` constructor.
batch_settings (Mapping[str, str]): Arguments passed on to the
:class:``~.pubsub_v1.types.BatchSettings`` constructor.
Returns:
~.pubsub_v1.publisher.batch.thread.Batch: A batch object.
"""
- client = create_client()
+ client = create_client(client_options)
settings = types.BatchSettings(**batch_settings)
return Batch(
client,
@@ -96,6 +102,14 @@ def test_client():
assert batch.client is client
+def test_client_with_compression():
+ client = create_client(types.PublisherOptions(enable_grpc_compression=True))
+ settings = types.BatchSettings()
+ batch = Batch(client, "topic_name", settings)
+ assert batch.client is client
+ assert batch.client._enable_grpc_compression
+
+
def test_commit():
batch = create_batch()
@@ -149,6 +163,125 @@ def test_blocking__commit():
],
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
+ compression=None,
+ )
+
+ # Establish that all of the futures are done, and that they have the
+ # expected values.
+ assert futures[0].done()
+ assert futures[0].result() == "a"
+ assert futures[1].done()
+ assert futures[1].result() == "b"
+
+
+def test_blocking__commit_with_compression_at_zero_bytes():
+ batch = create_batch(
+ client_options=types.PublisherOptions(
+ enable_grpc_compression=True, compression_bytes_threshold=0
+ )
+ )
+ futures = (
+ batch.publish({"data": b"This is my message."}),
+ batch.publish({"data": b"This is another message."}),
+ )
+
+ # Set up the underlying API publish method to return a PublishResponse.
+ publish_response = gapic_types.PublishResponse(message_ids=["a", "b"])
+ patch = mock.patch.object(
+ type(batch.client), "_gapic_publish", return_value=publish_response
+ )
+ with patch as publish:
+ batch._commit()
+
+ # Establish that the underlying API call was made with expected
+ # arguments.
+ publish.assert_called_once_with(
+ topic="topic_name",
+ messages=[
+ gapic_types.PubsubMessage(data=b"This is my message."),
+ gapic_types.PubsubMessage(data=b"This is another message."),
+ ],
+ retry=gapic_v1.method.DEFAULT,
+ timeout=gapic_v1.method.DEFAULT,
+ compression=grpc.Compression.Gzip,
+ )
+
+ # Establish that all of the futures are done, and that they have the
+ # expected values.
+ assert futures[0].done()
+ assert futures[0].result() == "a"
+ assert futures[1].done()
+ assert futures[1].result() == "b"
+
+
+def test_blocking__commit_with_disabled_compression_at_zero_bytes():
+ batch = create_batch(
+ client_options=types.PublisherOptions(
+ enable_grpc_compression=False, compression_bytes_threshold=0
+ )
+ )
+ futures = (
+ batch.publish({"data": b"This is my message."}),
+ batch.publish({"data": b"This is another message."}),
+ )
+
+ # Set up the underlying API publish method to return a PublishResponse.
+ publish_response = gapic_types.PublishResponse(message_ids=["a", "b"])
+ patch = mock.patch.object(
+ type(batch.client), "_gapic_publish", return_value=publish_response
+ )
+ with patch as publish:
+ batch._commit()
+
+ # Establish that the underlying API call was made with expected
+ # arguments.
+ publish.assert_called_once_with(
+ topic="topic_name",
+ messages=[
+ gapic_types.PubsubMessage(data=b"This is my message."),
+ gapic_types.PubsubMessage(data=b"This is another message."),
+ ],
+ retry=gapic_v1.method.DEFAULT,
+ timeout=gapic_v1.method.DEFAULT,
+ compression=None,
+ )
+
+ # Establish that all of the futures are done, and that they have the
+ # expected values.
+ assert futures[0].done()
+ assert futures[0].result() == "a"
+ assert futures[1].done()
+ assert futures[1].result() == "b"
+
+
+def test_blocking__commit_with_compression_at_default():
+ batch = create_batch(
+ client_options=types.PublisherOptions(enable_grpc_compression=True)
+ )
+ futures = (
+ batch.publish({"data": b"This is my message."}),
+ batch.publish({"data": b"This is another message."}),
+ )
+
+ # Set up the underlying API publish method to return a PublishResponse.
+ publish_response = gapic_types.PublishResponse(message_ids=["a", "b"])
+ patch = mock.patch.object(
+ type(batch.client), "_gapic_publish", return_value=publish_response
+ )
+ with patch as publish:
+ batch._commit()
+
+ # Establish that the underlying API call was made with expected
+ # arguments.
+ publish.assert_called_once_with(
+ topic="topic_name",
+ messages=[
+ gapic_types.PubsubMessage(data=b"This is my message."),
+ gapic_types.PubsubMessage(data=b"This is another message."),
+ ],
+ retry=gapic_v1.method.DEFAULT,
+ timeout=gapic_v1.method.DEFAULT,
+ compression=None,
)
# Establish that all of the futures are done, and that they have the
@@ -178,6 +311,7 @@ def test_blocking__commit_custom_retry():
messages=[gapic_types.PubsubMessage(data=b"This is my message.")],
retry=mock.sentinel.custom_retry,
timeout=gapic_v1.method.DEFAULT,
+ compression=None,
)
@@ -200,6 +334,7 @@ def test_blocking__commit_custom_timeout():
messages=[gapic_types.PubsubMessage(data=b"This is my message.")],
retry=gapic_v1.method.DEFAULT,
timeout=mock.sentinel.custom_timeout,
+ compression=None,
)
@@ -207,7 +342,9 @@ def test_client_api_publish_not_blocking_additional_publish_calls():
batch = create_batch(max_messages=1)
api_publish_called = threading.Event()
- def api_publish_delay(topic="", messages=(), retry=None, timeout=None):
+ def api_publish_delay(
+ topic="", messages=(), retry=None, timeout=None, compression=None
+ ):
api_publish_called.set()
time.sleep(1.0)
message_ids = [str(i) for i in range(len(messages))]
diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py
index 6c68c3943..fd928f329 100644
--- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py
+++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py
@@ -399,7 +399,9 @@ def test_publish_custom_retry_overrides_configured_retry(creds):
client.publish(topic, b"hello!", retry=mock.sentinel.custom_retry)
fake_sequencer.publish.assert_called_once_with(
- mock.ANY, retry=mock.sentinel.custom_retry, timeout=mock.ANY
+ mock.ANY,
+ retry=mock.sentinel.custom_retry,
+ timeout=mock.ANY,
)
message = fake_sequencer.publish.call_args.args[0]
assert message.data == b"hello!"
@@ -418,7 +420,9 @@ def test_publish_custom_timeout_overrides_configured_timeout(creds):
client.publish(topic, b"hello!", timeout=mock.sentinel.custom_timeout)
fake_sequencer.publish.assert_called_once_with(
- mock.ANY, retry=mock.ANY, timeout=mock.sentinel.custom_timeout
+ mock.ANY,
+ retry=mock.ANY,
+ timeout=mock.sentinel.custom_timeout,
)
message = fake_sequencer.publish.call_args.args[0]
assert message.data == b"hello!"