-
Notifications
You must be signed in to change notification settings - Fork 23
Handling of ActivityTaskTimeoutException, schedule_to_start_timeout_seconds #11
Description
It seems ActivityTaskTimeoutException is being caught in the decision_loop.py/workflow.py, but it doesn't propagate up to the caller. The Java Client catches both of these exceptions using a similar setup so the behavior seems different.
Using this sample code below, if you set the schedule_to_start_timeout_seconds shorter than the execution_start_to_close_timeout, you see the following, but it doesn't propagate up. If you set the schedule_to_start_timeout_seconds longer than the execution timeout, the client will get the WorkflowExecutionTimeoutException.
- Running with schedule_to_start_timeout_seconds = 1 (we see the exception, it just never makes it up to the caller)
ERROR:cadence.decision_loop:Workflow GreetingWorkflow::get_greeting('Python') failed
Traceback (most recent call last):
File "/Users/rich/.pyenv/versions/dev/lib/python3.7/site-packages/cadence/decision_loop.py", line 233, in workflow_main
self.ret_value = await workflow_proc(self.workflow_instance, *self.workflow_input)
File "/Users/rich/.pyenv/versions/dev/lib/python3.7/site-packages/cadence/activity_method.py", line 62, in stub_activity_fn
return await decision_context.schedule_activity_task(parameters=parameters)
File "/Users/rich/.pyenv/versions/dev/lib/python3.7/site-packages/cadence/decision_loop.py", line 342, in schedule_activity_task
await future
cadence.exceptions.ActivityTaskTimeoutException
...
- Running with execution_start_to_close_timeout=1 and schedule_to_start_timeout_seconds=5 we get the exception
Workflow Excution Timeout
Stopping workers....
Workers stopped...
Appears that somewhere in the code, it isn't adding the ActivityTimeout to the GetWorkflowExecutionHistoryResponse. But I could be way off, but that's where the trail seemed to lead. Whether it's supposed to or not is also a question, but the Java Client does throw this exception up to the caller (although it's thrown up as WorkflowException, which I think is correct).
# Activities Interface
class GreetingActivities:
@activity_method(task_list=TASK_LIST,
schedule_to_close_timeout_seconds=100000,
schedule_to_start_timeout_seconds=5)
def compose_greeting(self, greeting: str, name: str) -> str:
raise NotImplementedError
# Workflow Interface
class GreetingWorkflow:
@workflow_method(task_list=TASK_LIST,
execution_start_to_close_timeout_seconds=1)
def get_greeting(self, name: str) -> str:
raise NotImplementedError
# Workflow Implementation
class GreetingWorkflowImpl(GreetingWorkflow):
def __init__(self):
self.greeting_activities: GreetingActivities = Workflow.new_activity_stub(GreetingActivities)
#async
def get_greeting(self, name):
return self.greeting_activities.compose_greeting("Aloha", name)
if __name__ == '__main__':
try:
factory = WorkerFactory("localhost", 7933, DOMAIN)
worker = factory.new_worker(TASK_LIST)
worker.register_workflow_implementation_type(GreetingWorkflowImpl)
factory.start()
client = WorkflowClient.new_client(domain=DOMAIN)
greeting_workflow: GreetingWorkflow = client.new_workflow_stub(GreetingWorkflow)
name = "foo"
result = greeting_workflow.get_greeting("{}".format(name))
print(result)
except ActivityTaskTimeoutException as ex:
print("Activity Task Timeout {}".format(ex))
except WorkflowExecutionTimedOutException as ex2:
print("Workflow Excution Timeout {}".format(ex2))
print("Stopping workers....")
worker.stop()
print("Workers stopped...")
sys.exit(0)