From a5081a9fca628e9a9621c8ae626b6d60aaf578e8 Mon Sep 17 00:00:00 2001 From: hzarka Date: Wed, 11 Mar 2020 22:38:30 +0400 Subject: [PATCH 01/12] fix error --- cadence/connection.py | 3 ++- cadence/workflow.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/cadence/connection.py b/cadence/connection.py index 7afd942..2983d4a 100644 --- a/cadence/connection.py +++ b/cadence/connection.py @@ -1,5 +1,6 @@ from __future__ import annotations +import getpass import os import socket from dataclasses import dataclass @@ -194,7 +195,7 @@ def default_tchannel_headers(): @staticmethod def default_application_headers(): return { - "user-name": os.environ.get("LOGNAME", os.getlogin()), + "user-name": getpass.getuser(), "host-name": socket.gethostname(), # Copied from Java client "cadence-client-library-version": "2.2.0", diff --git a/cadence/workflow.py b/cadence/workflow.py index 08ad63c..ca64824 100644 --- a/cadence/workflow.py +++ b/cadence/workflow.py @@ -194,7 +194,7 @@ def exec_workflow(workflow_client, wm: WorkflowMethod, args, workflow_options: W start_request = create_start_workflow_request(workflow_client, wm, args) start_response, err = workflow_client.service.start_workflow(start_request) if err: - raise Exception(err) + raise Exception(repr(err)) execution = WorkflowExecution(workflow_id=start_request.workflow_id, run_id=start_response.run_id) stub_instance._execution = execution return WorkflowExecutionContext(workflow_type=wm._name, workflow_execution=execution) From efdc16222ad99cbaeaaaa08bd8b82558468de367 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Sat, 19 Sep 2020 17:53:34 +0400 Subject: [PATCH 02/12] recreating service on exception --- cadence/activity_loop.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cadence/activity_loop.py b/cadence/activity_loop.py index b0cdcce..80d9805 100644 --- a/cadence/activity_loop.py +++ b/cadence/activity_loop.py @@ -41,6 +41,12 @@ def activity_task_loop(worker: Worker): continue if err: logger.error("PollForActivityTask failed: %s", err) + try: + service: WorkflowService = WorkflowService.create(worker.host, worker.port, + timeout=worker.get_timeout()) + worker.manage_service(service) + except Exception as ex: + logger.error(f"Could not create workflow service due to: {ex}") continue task_token = task.task_token if not task_token: From 1c6afec7989b8725460956ab9dba6a68c5806225 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Sat, 19 Sep 2020 17:54:30 +0400 Subject: [PATCH 03/12] recreating service on exception --- cadence/activity_loop.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cadence/activity_loop.py b/cadence/activity_loop.py index 80d9805..c33dec5 100644 --- a/cadence/activity_loop.py +++ b/cadence/activity_loop.py @@ -42,6 +42,7 @@ def activity_task_loop(worker: Worker): if err: logger.error("PollForActivityTask failed: %s", err) try: + logger.info(f"Possible that connection with cadence broke - so trying to re-establish") service: WorkflowService = WorkflowService.create(worker.host, worker.port, timeout=worker.get_timeout()) worker.manage_service(service) From fc89f387a45a3b610f5aa05aa107b2c5955c0634 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Sat, 19 Sep 2020 18:08:26 +0400 Subject: [PATCH 04/12] recreating service on exception --- cadence/activity_loop.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/cadence/activity_loop.py b/cadence/activity_loop.py index c33dec5..0560fe2 100644 --- a/cadence/activity_loop.py +++ b/cadence/activity_loop.py @@ -38,17 +38,17 @@ def activity_task_loop(worker: Worker): return except Exception as ex: logger.error("PollForActivityTask error: %s", ex) - continue - if err: - logger.error("PollForActivityTask failed: %s", err) try: logger.info(f"Possible that connection with cadence broke - so trying to re-establish") service: WorkflowService = WorkflowService.create(worker.host, worker.port, - timeout=worker.get_timeout()) + timeout=worker.get_timeout()) worker.manage_service(service) except Exception as ex: logger.error(f"Could not create workflow service due to: {ex}") continue + if err: + logger.error("PollForActivityTask failed: %s", err) + continue task_token = task.task_token if not task_token: logger.debug("PollForActivityTask has no task_token (expected): %s", task) From 51382a1b8fdc94eb5fe074f9c23c3686034eca66 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Sat, 19 Sep 2020 21:58:35 +0400 Subject: [PATCH 05/12] logging buffer --- cadence/ioutils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cadence/ioutils.py b/cadence/ioutils.py index a96eb2f..4decf0f 100644 --- a/cadence/ioutils.py +++ b/cadence/ioutils.py @@ -1,8 +1,10 @@ from select import select from socket import socket from typing import IO, Callable +import logging +logger = logging.getLogger(__name__) class IOWrapper: def __init__(self, io_stream: IO, socket_: socket = None): self.io_stream = io_stream @@ -26,6 +28,8 @@ def read_or_eof(self, size, field): self.socket.setblocking(True) self.socket.settimeout(timeout) buf: bytes = self.io_stream.read(size) + logger.info(f"buffer content bytes: {buf.decode('utf-8')}") + logger.info(f"buffer contents: {buf.decode('utf-8')}") if len(buf) != size: raise EOFError(field) return buf From e6373b922404accbea13061fcf31ac8346796287 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Sat, 19 Sep 2020 22:01:23 +0400 Subject: [PATCH 06/12] logging buffer --- cadence/ioutils.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/cadence/ioutils.py b/cadence/ioutils.py index 4decf0f..5abb62b 100644 --- a/cadence/ioutils.py +++ b/cadence/ioutils.py @@ -28,8 +28,10 @@ def read_or_eof(self, size, field): self.socket.setblocking(True) self.socket.settimeout(timeout) buf: bytes = self.io_stream.read(size) - logger.info(f"buffer content bytes: {buf.decode('utf-8')}") - logger.info(f"buffer contents: {buf.decode('utf-8')}") + logger.info(f"buffer content bytes: {buf}") + logger.info(f"buffer contents string: {buf.decode('utf-8')}") + logger.info(f"buffer size: {len(buf)}") + logger.info(f"size: {size}") if len(buf) != size: raise EOFError(field) return buf From 32094e57960160b0117ba4b77df320c14837ecbc Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Sat, 19 Sep 2020 22:03:31 +0400 Subject: [PATCH 07/12] logging buffer --- cadence/ioutils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/cadence/ioutils.py b/cadence/ioutils.py index 5abb62b..db5b581 100644 --- a/cadence/ioutils.py +++ b/cadence/ioutils.py @@ -29,7 +29,6 @@ def read_or_eof(self, size, field): self.socket.settimeout(timeout) buf: bytes = self.io_stream.read(size) logger.info(f"buffer content bytes: {buf}") - logger.info(f"buffer contents string: {buf.decode('utf-8')}") logger.info(f"buffer size: {len(buf)}") logger.info(f"size: {size}") if len(buf) != size: From f56392c69efcf2ab23fc357be31bb5a5ee4b2db5 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Sat, 19 Sep 2020 22:08:36 +0400 Subject: [PATCH 08/12] logging buffer --- cadence/ioutils.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cadence/ioutils.py b/cadence/ioutils.py index db5b581..458f164 100644 --- a/cadence/ioutils.py +++ b/cadence/ioutils.py @@ -28,10 +28,10 @@ def read_or_eof(self, size, field): self.socket.setblocking(True) self.socket.settimeout(timeout) buf: bytes = self.io_stream.read(size) - logger.info(f"buffer content bytes: {buf}") - logger.info(f"buffer size: {len(buf)}") - logger.info(f"size: {size}") if len(buf) != size: + logger.info(f"buffer content bytes: {buf}") + logger.info(f"buffer size: {len(buf)}") + logger.info(f"size: {size}") raise EOFError(field) return buf From e70603a203fffc00796f784cf3cfa838852c49dc Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Sat, 19 Sep 2020 22:17:32 +0400 Subject: [PATCH 09/12] logging buffer --- cadence/activity_loop.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cadence/activity_loop.py b/cadence/activity_loop.py index 0560fe2..88c0b23 100644 --- a/cadence/activity_loop.py +++ b/cadence/activity_loop.py @@ -40,6 +40,7 @@ def activity_task_loop(worker: Worker): logger.error("PollForActivityTask error: %s", ex) try: logger.info(f"Possible that connection with cadence broke - so trying to re-establish") + worker.service_instances.pop(service) service: WorkflowService = WorkflowService.create(worker.host, worker.port, timeout=worker.get_timeout()) worker.manage_service(service) From a61160772e2205f42dcacb9537a5dd9d8530f844 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Sat, 19 Sep 2020 22:23:35 +0400 Subject: [PATCH 10/12] logging buffer --- cadence/activity_loop.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cadence/activity_loop.py b/cadence/activity_loop.py index 88c0b23..e2dce02 100644 --- a/cadence/activity_loop.py +++ b/cadence/activity_loop.py @@ -40,7 +40,7 @@ def activity_task_loop(worker: Worker): logger.error("PollForActivityTask error: %s", ex) try: logger.info(f"Possible that connection with cadence broke - so trying to re-establish") - worker.service_instances.pop(service) + del worker.service_instances[worker.service_instances.index(service)] service: WorkflowService = WorkflowService.create(worker.host, worker.port, timeout=worker.get_timeout()) worker.manage_service(service) From 6708ae8900b66be95d57f4e765ae224456c9fbb3 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Sat, 19 Sep 2020 22:31:02 +0400 Subject: [PATCH 11/12] logging buffer --- cadence/activity_loop.py | 1 + 1 file changed, 1 insertion(+) diff --git a/cadence/activity_loop.py b/cadence/activity_loop.py index e2dce02..f745eed 100644 --- a/cadence/activity_loop.py +++ b/cadence/activity_loop.py @@ -41,6 +41,7 @@ def activity_task_loop(worker: Worker): try: logger.info(f"Possible that connection with cadence broke - so trying to re-establish") del worker.service_instances[worker.service_instances.index(service)] + service.close() service: WorkflowService = WorkflowService.create(worker.host, worker.port, timeout=worker.get_timeout()) worker.manage_service(service) From 0eedefff69f5a5ff18dca325bce53ac1b0ce2751 Mon Sep 17 00:00:00 2001 From: Chandan Bhattad Date: Thu, 8 Oct 2020 00:27:59 +0400 Subject: [PATCH 12/12] break on poll activity worker error --- cadence/activity_loop.py | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/cadence/activity_loop.py b/cadence/activity_loop.py index f745eed..12c8991 100644 --- a/cadence/activity_loop.py +++ b/cadence/activity_loop.py @@ -38,16 +38,8 @@ def activity_task_loop(worker: Worker): return except Exception as ex: logger.error("PollForActivityTask error: %s", ex) - try: - logger.info(f"Possible that connection with cadence broke - so trying to re-establish") - del worker.service_instances[worker.service_instances.index(service)] - service.close() - service: WorkflowService = WorkflowService.create(worker.host, worker.port, - timeout=worker.get_timeout()) - worker.manage_service(service) - except Exception as ex: - logger.error(f"Could not create workflow service due to: {ex}") - continue + logger.error(f"Exiting") + break if err: logger.error("PollForActivityTask failed: %s", err) continue