From aea77db2b18ca0eeb4f1b1c40f3c1bccd9fd2a22 Mon Sep 17 00:00:00 2001 From: vrvencislavov Date: Wed, 2 Jul 2025 13:55:56 +0300 Subject: [PATCH 1/9] Add new flag in task data telling that the task is part of the composition one. --- taskflow/__init__.py | 2 +- taskflow/tasks.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/taskflow/__init__.py b/taskflow/__init__.py index ed5fd53..bbe5a77 100644 --- a/taskflow/__init__.py +++ b/taskflow/__init__.py @@ -1,5 +1,5 @@ __title__ = 'Taskflow' -__version__ = '0.2.0' +__version__ = '0.2.1' from .flow import * # noqa from .tasks import * # noqa diff --git a/taskflow/tasks.py b/taskflow/tasks.py index fd8d6af..85144c7 100644 --- a/taskflow/tasks.py +++ b/taskflow/tasks.py @@ -148,7 +148,8 @@ def _get_task_data(self): 'runs': self._runs, 'status': self._status, 'result': self._result, - 'is_standalone': self.is_standalone + 'is_standalone': self.is_standalone, + 'part_of_composition_task': bool(self.is_standalone and self.parent), } def to_list(self): From d7e6e042040e644abc71c7f470a51840e004ee17 Mon Sep 17 00:00:00 2001 From: vrvencislavov Date: Wed, 2 Jul 2025 14:38:13 +0300 Subject: [PATCH 2/9] Fix black. --- taskflow/tasks.py | 42 +++++++++--------------------------------- 1 file changed, 9 insertions(+), 33 deletions(-) diff --git a/taskflow/tasks.py b/taskflow/tasks.py index d69f533..611f39a 100644 --- a/taskflow/tasks.py +++ b/taskflow/tasks.py @@ -1,11 +1,7 @@ import sys from taskflow.defaults import Defaults -from taskflow.type_helpers import ( - function_from_string, - function_to_string, - type_to_string, -) +from taskflow.type_helpers import function_from_string, function_to_string, type_to_string class BaseTask(object): @@ -193,12 +189,8 @@ def __str__(self): class Task(BaseTask): - def __init__( - self, func=None, args=None, max_runs=None, needs_prev_result=True, name=None - ): - super().__init__( - max_runs=max_runs, needs_prev_result=needs_prev_result, name=name - ) + def __init__(self, func=None, args=None, max_runs=None, needs_prev_result=True, name=None): + super().__init__(max_runs=max_runs, needs_prev_result=needs_prev_result, name=name) self._func = func self._args = args or [] @@ -223,20 +215,12 @@ def run(self, **kwargs): self._error = None return self._result except Exception as ex: - self._status = ( - self.STATUS_HALTED - if self._runs >= self.max_runs - else self.STATUS_PENDING - ) + self._status = self.STATUS_HALTED if self._runs >= self.max_runs else self.STATUS_PENDING self._error = ex self._exc_info = sys.exc_info() def __str__(self): - return ( - self._name - if self._name - else f"{function_to_string(self._func)}:{self._args}" - ) + return self._name if self._name else f"{function_to_string(self._func)}:{self._args}" def _get_task_data(self): result = super()._get_task_data() @@ -270,13 +254,9 @@ def __init__(self, *sub_tasks, needs_prev_result=True, name=None): @property def status(self): - if any( - sub_task.leaf.status == self.STATUS_HALTED for sub_task in self._sub_tasks - ): + if any(sub_task.leaf.status == self.STATUS_HALTED for sub_task in self._sub_tasks): return self.STATUS_HALTED - elif all( - sub_task.leaf.status == self.STATUS_COMPLETE for sub_task in self._sub_tasks - ): + elif all(sub_task.leaf.status == self.STATUS_COMPLETE for sub_task in self._sub_tasks): return self.STATUS_COMPLETE return self.STATUS_PENDING @@ -311,18 +291,14 @@ def to_list(self): result += sub_task.to_list() base_list = super().to_list() - base_list[0].update( - {"sub_tasks": [sub_task.id for sub_task in self._sub_tasks]} - ) + base_list[0].update({"sub_tasks": [sub_task.id for sub_task in self._sub_tasks]}) return result + base_list @classmethod def from_data(cls, task_data): result = super().from_data(task_data) - result._sub_tasks = [ - sub_task.local_root for sub_task in task_data["sub_tasks"] or [] - ] + result._sub_tasks = [sub_task.local_root for sub_task in task_data["sub_tasks"] or []] for sub_task in result._sub_tasks: sub_task._parent = result From 9d7a80e86e6473cf9ae8ed616fe8bf985edb51c5 Mon Sep 17 00:00:00 2001 From: vrvencislavov Date: Wed, 2 Jul 2025 15:06:03 +0300 Subject: [PATCH 3/9] Add parent/next for testing purposes. --- taskflow/tasks.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/taskflow/tasks.py b/taskflow/tasks.py index 611f39a..9a488ae 100644 --- a/taskflow/tasks.py +++ b/taskflow/tasks.py @@ -151,6 +151,8 @@ def _get_task_data(self): "result": self._result, "is_standalone": self.is_standalone, "part_of_composition_task": bool(self.is_standalone and self.parent), + "parent": self.parent, + "next": self.next, } def to_list(self): From 973310643a33372ce1b53c200b46277d41289433 Mon Sep 17 00:00:00 2001 From: vrvencislavov Date: Wed, 2 Jul 2025 15:25:40 +0300 Subject: [PATCH 4/9] Test parent with _parent. --- taskflow/tasks.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/taskflow/tasks.py b/taskflow/tasks.py index 9a488ae..ba2d6fa 100644 --- a/taskflow/tasks.py +++ b/taskflow/tasks.py @@ -1,7 +1,11 @@ import sys from taskflow.defaults import Defaults -from taskflow.type_helpers import function_from_string, function_to_string, type_to_string +from taskflow.type_helpers import ( + function_from_string, + function_to_string, + type_to_string, +) class BaseTask(object): @@ -150,9 +154,8 @@ def _get_task_data(self): "status": self._status, "result": self._result, "is_standalone": self.is_standalone, - "part_of_composition_task": bool(self.is_standalone and self.parent), + "part_of_composition_task": bool(self.is_standalone and self._parent), "parent": self.parent, - "next": self.next, } def to_list(self): From 52ddc99d571f00f1a11d0eb653a7d5017b60aef1 Mon Sep 17 00:00:00 2001 From: vrvencislavov Date: Wed, 2 Jul 2025 15:43:11 +0300 Subject: [PATCH 5/9] Change where we update task_data. --- taskflow/tasks.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/taskflow/tasks.py b/taskflow/tasks.py index ba2d6fa..d83494b 100644 --- a/taskflow/tasks.py +++ b/taskflow/tasks.py @@ -1,11 +1,7 @@ import sys from taskflow.defaults import Defaults -from taskflow.type_helpers import ( - function_from_string, - function_to_string, - type_to_string, -) +from taskflow.type_helpers import function_from_string, function_to_string, type_to_string class BaseTask(object): @@ -154,8 +150,6 @@ def _get_task_data(self): "status": self._status, "result": self._result, "is_standalone": self.is_standalone, - "part_of_composition_task": bool(self.is_standalone and self._parent), - "parent": self.parent, } def to_list(self): @@ -229,7 +223,13 @@ def __str__(self): def _get_task_data(self): result = super()._get_task_data() - result.update({"func": function_to_string(self._func), "args": self._args}) + result.update( + { + "func": function_to_string(self._func), + "args": self._args, + "part_of_composition_task": bool(self.is_standalone and self._parent), + } + ) return result From 6772e6d0e024026db9ac7870946fb173ba20ed22 Mon Sep 17 00:00:00 2001 From: vrvencislavov Date: Wed, 2 Jul 2025 16:02:11 +0300 Subject: [PATCH 6/9] Change to use local root for testing --- taskflow/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taskflow/tasks.py b/taskflow/tasks.py index d83494b..6f2495b 100644 --- a/taskflow/tasks.py +++ b/taskflow/tasks.py @@ -227,7 +227,7 @@ def _get_task_data(self): { "func": function_to_string(self._func), "args": self._args, - "part_of_composition_task": bool(self.is_standalone and self._parent), + "part_of_composition_task": bool(self.is_standalone and self.local_root), } ) From 6f3a6e95161c4f4c97b8b8cdcb6a3abef2108ad8 Mon Sep 17 00:00:00 2001 From: vrvencislavov Date: Wed, 2 Jul 2025 16:23:15 +0300 Subject: [PATCH 7/9] Add new sub tasks to list method for overrding purposes. --- taskflow/tasks.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/taskflow/tasks.py b/taskflow/tasks.py index 6f2495b..3c200db 100644 --- a/taskflow/tasks.py +++ b/taskflow/tasks.py @@ -257,6 +257,13 @@ def __init__(self, *sub_tasks, needs_prev_result=True, name=None): # not a standalone task, so only the calculated property makes sense self._status = None + def sub_tasks_to_list(self): + result = [] + for sub_task in self._sub_tasks: + result.append(sub_task.to_list()) + + return result + @property def status(self): if any(sub_task.leaf.status == self.STATUS_HALTED for sub_task in self._sub_tasks): @@ -291,14 +298,12 @@ def run(self, **kwargs): raise RuntimeError("Composite tasks cannot be run directly") def to_list(self): - result = [] - for sub_task in self._sub_tasks: - result += sub_task.to_list() + sub_tasks_result = self.sub_tasks_to_list() base_list = super().to_list() base_list[0].update({"sub_tasks": [sub_task.id for sub_task in self._sub_tasks]}) - return result + base_list + return sub_tasks_result + base_list @classmethod def from_data(cls, task_data): From 3ab9f32a02bf9ff28fa7e9d046cd0a449a95d990 Mon Sep 17 00:00:00 2001 From: vrvencislavov Date: Wed, 2 Jul 2025 16:28:38 +0300 Subject: [PATCH 8/9] Use extend instead of append for result. --- taskflow/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/taskflow/tasks.py b/taskflow/tasks.py index 3c200db..b08dcf3 100644 --- a/taskflow/tasks.py +++ b/taskflow/tasks.py @@ -260,7 +260,7 @@ def __init__(self, *sub_tasks, needs_prev_result=True, name=None): def sub_tasks_to_list(self): result = [] for sub_task in self._sub_tasks: - result.append(sub_task.to_list()) + result.extend(sub_task.to_list()) return result From 4644c35336a1415f4bb4a95d80c6a3b26796ea91 Mon Sep 17 00:00:00 2001 From: vrvencislavov Date: Thu, 3 Jul 2025 10:32:32 +0300 Subject: [PATCH 9/9] Revert changes for flag in get task data. --- taskflow/tasks.py | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/taskflow/tasks.py b/taskflow/tasks.py index b08dcf3..e63b778 100644 --- a/taskflow/tasks.py +++ b/taskflow/tasks.py @@ -223,13 +223,7 @@ def __str__(self): def _get_task_data(self): result = super()._get_task_data() - result.update( - { - "func": function_to_string(self._func), - "args": self._args, - "part_of_composition_task": bool(self.is_standalone and self.local_root), - } - ) + result.update({"func": function_to_string(self._func), "args": self._args}) return result