From 2a76d8256cb4a2eb2a579ec0a4f6ee9172b950d3 Mon Sep 17 00:00:00 2001 From: Martin Husbyn Date: Fri, 31 Oct 2025 16:16:51 +0000 Subject: [PATCH] refactor(ProcessExecution): remove io as a field on the class and supply it as a parm to `all_output_data` instead The main reason is that the io object isn't serialisable, making it harder to keep track of. Instead, we pass it in as a param where it is needed. BREAKING CHANGE: removes `io` field from `ProcessExecution` and instead adds it as a parameter to `ProcessExecution.all_output_data`. --- nextflow/command.py | 12 ++++---- nextflow/models.py | 6 ++-- tests/integration/base.py | 22 +++++++------- tests/integration/test_run.py | 2 +- tests/unit/test_command.py | 41 +++++++++++---------------- tests/unit/test_process_executions.py | 6 ++-- 6 files changed, 40 insertions(+), 49 deletions(-) diff --git a/nextflow/command.py b/nextflow/command.py index dc362e1..64071ad 100644 --- a/nextflow/command.py +++ b/nextflow/command.py @@ -348,7 +348,7 @@ def get_execution(execution_path, log_path, nextflow_command, execution=None, lo if not log: return None, 0 log = log[log_start:] execution = make_or_update_execution(log, execution_path, nextflow_command, execution, io) - process_executions, changed = get_initial_process_executions(log, execution, io) + process_executions, changed = get_initial_process_executions(log, execution) no_path = [k for k, v in process_executions.items() if not v.path] process_ids_to_paths = get_process_ids_to_paths(no_path, execution_path, io) for process_id, path in process_ids_to_paths.items(): @@ -392,7 +392,7 @@ def make_or_update_execution(log, execution_path, nextflow_command, execution, i return execution -def get_initial_process_executions(log, execution, io): +def get_initial_process_executions(log, execution): """Parses a section of a log file and looks for new process executions not currently in the list, or uncompleted ones which can now be completed. Some attributes are not yet filled in. @@ -401,7 +401,6 @@ def get_initial_process_executions(log, execution, io): :param str log: a section of the log file. :param nextflow.models.Execution execution: the containing execution. - :param io: an optional custom io object to handle file operations. :rtype: ``tuple``""" lines = log.splitlines() @@ -410,7 +409,7 @@ def get_initial_process_executions(log, execution, io): for line in lines: if "Submitted process" in line or "Cached process" in line: is_cached = "Cached process" in line - proc_ex = create_process_execution_from_line(line, is_cached, io) + proc_ex = create_process_execution_from_line(line, is_cached) if not proc_ex: continue proc_ex.execution = execution process_executions[proc_ex.identifier] = proc_ex @@ -422,13 +421,12 @@ def get_initial_process_executions(log, execution, io): return process_executions, just_updated -def create_process_execution_from_line(line, cached=False, io=None): +def create_process_execution_from_line(line, cached=False): """Creates a process execution from a line of the log file in which its submission (or previous caching) is reported. :param str line: a line from the log file. :param bool cached: whether the process is cached. - :param io: an optional custom io object to handle file operations. :rtype: ``nextflow.models.ProcessExecution``""" if cached: @@ -442,7 +440,7 @@ def create_process_execution_from_line(line, cached=False, io=None): path="", stdout="", stderr="", bash="", started=None, finished=None, return_code="0" if cached else "", status="COMPLETED" if cached else "-", - cached=cached, io=io + cached=cached ) diff --git a/nextflow/models.py b/nextflow/models.py index d21a4a8..6d8391c 100644 --- a/nextflow/models.py +++ b/nextflow/models.py @@ -78,7 +78,6 @@ class ProcessExecution: finished: datetime | None status: str cached: bool - io: Any def __repr__(self): @@ -126,17 +125,18 @@ def input_data(self, include_path=True): return [os.path.basename(f) for f in inputs] - def all_output_data(self, include_path=True): + def all_output_data(self, include_path=True, io=None): """A list of all output data produced by the process execution, including unpublished staging files. :param bool include_path: if ``False``, only filenames returned. + :param io: an optional custom io object to handle file operations. :type: ``list``""" outputs = [] if not self.path: return [] inputs = self.input_data(include_path=False) - listdir = self.io.listdir if self.io else os.listdir + listdir = io.listdir if io else os.listdir for f in listdir(self.full_path): full_path = Path(f"{self.full_path}/{f}") if not f.startswith(".command") and f != ".exitcode": diff --git a/tests/integration/base.py b/tests/integration/base.py index eb0a252..5cc1cac 100644 --- a/tests/integration/base.py +++ b/tests/integration/base.py @@ -33,7 +33,7 @@ def check_running_execution(self, execution, last_stdout, output_path=None): return execution.stdout - def check_execution(self, execution, line_count=24, output_path=None, log_path=None, version=None, timezone=None, report=None, timeline=None, dag=None, trace=None, check_stderr=True): + def check_execution(self, execution, line_count=24, output_path=None, log_path=None, version=None, timezone=None, report=None, timeline=None, dag=None, trace=None, check_stderr=True, io=None): # Files created if not output_path: self.assertIn(".nextflow", os.listdir(self.get_path("rundirectory"))) if log_path: @@ -105,10 +105,10 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N self.assertEqual(proc_ex.input_data(), [self.get_path("files/data.txt")]) self.assertEqual(proc_ex.input_data(include_path=False), ["data.txt"]) self.assertEqual( - set(proc_ex.all_output_data(include_path=False)), + set(proc_ex.all_output_data(include_path=False, io=io)), {"abc.dat", "xyz.dat", "log.txt"} ) - self.assertIn(proc_ex.identifier, proc_ex.all_output_data()[0]) + self.assertIn(proc_ex.identifier, proc_ex.all_output_data(io=io)[0]) proc_ex = self.get_process_execution(execution, "PROCESS_DATA:DUPLICATE_AND_LOWER:DUPLICATE (abc.dat)") self.check_process_execution(proc_ex, execution, False, check_time=not timezone) @@ -121,9 +121,9 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N proc_ex.input_data()[0] ) self.assertEqual( - set(proc_ex.all_output_data(include_path=False)), {"duplicated_abc.dat"} + set(proc_ex.all_output_data(include_path=False, io=io)), {"duplicated_abc.dat"} ) - with open(proc_ex.all_output_data(include_path=True)[0]) as f: + with open(proc_ex.all_output_data(include_path=True, io=io)[0]) as f: self.assertEqual(len(f.read().splitlines()), line_count) proc_ex = self.get_process_execution(execution, "PROCESS_DATA:DUPLICATE_AND_LOWER:DUPLICATE (xyz.dat)") @@ -137,7 +137,7 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N proc_ex.input_data()[0] ) self.assertEqual( - set(proc_ex.all_output_data(include_path=False)), {"duplicated_xyz.dat"} + set(proc_ex.all_output_data(include_path=False, io=io)), {"duplicated_xyz.dat"} ) proc_ex = self.get_process_execution(execution, "PROCESS_DATA:DUPLICATE_AND_LOWER:LOWER (duplicated_abc.dat)") @@ -151,7 +151,7 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N proc_ex.input_data()[0] ) self.assertEqual( - set(proc_ex.all_output_data(include_path=False)), {"lowered_duplicated_abc.dat"} + set(proc_ex.all_output_data(include_path=False, io=io)), {"lowered_duplicated_abc.dat"} ) proc_ex = self.get_process_execution(execution, "PROCESS_DATA:DUPLICATE_AND_LOWER:LOWER (duplicated_xyz.dat)") @@ -165,7 +165,7 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N proc_ex.input_data()[0] ) self.assertEqual( - set(proc_ex.all_output_data(include_path=False)), {"lowered_duplicated_xyz.dat"} + set(proc_ex.all_output_data(include_path=False, io=io)), {"lowered_duplicated_xyz.dat"} ) proc_ex = self.get_process_execution(execution, "PROCESS_DATA:APPEND (lowered_duplicated_abc.dat)") @@ -175,7 +175,7 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N self.assertEqual(proc_ex.process, "PROCESS_DATA:APPEND") self.assertEqual(set(proc_ex.input_data(include_path=False)), {"lowered_duplicated_abc.dat", "suffix.txt"}) self.assertEqual( - set(proc_ex.all_output_data(include_path=False)), {"suffix_lowered_duplicated_abc.dat"} + set(proc_ex.all_output_data(include_path=False, io=io)), {"suffix_lowered_duplicated_abc.dat"} ) proc_ex = self.get_process_execution(execution, "PROCESS_DATA:APPEND (lowered_duplicated_xyz.dat)") @@ -185,7 +185,7 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N self.assertEqual(proc_ex.process, "PROCESS_DATA:APPEND") self.assertEqual(set(proc_ex.input_data(include_path=False)), {"lowered_duplicated_xyz.dat", "suffix.txt"}) self.assertEqual( - set(proc_ex.all_output_data(include_path=False)), {"suffix_lowered_duplicated_xyz.dat"} + set(proc_ex.all_output_data(include_path=False, io=io)), {"suffix_lowered_duplicated_xyz.dat"} ) proc_ex = self.get_process_execution(execution, "JOIN:COMBINE_FILES") @@ -198,7 +198,7 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N {"suffix_lowered_duplicated_abc.dat", "suffix_lowered_duplicated_xyz.dat"} ) self.assertEqual( - set(proc_ex.all_output_data(include_path=False)), {"combined.txt"} + set(proc_ex.all_output_data(include_path=False, io=io)), {"combined.txt"} ) diff --git a/tests/integration/test_run.py b/tests/integration/test_run.py index 4b8920f..fb1d498 100644 --- a/tests/integration/test_run.py +++ b/tests/integration/test_run.py @@ -291,7 +291,7 @@ def glob(self, path): ) # Execution is fine - self.check_execution(execution) + self.check_execution(execution, io=io) # The custom io functions were used with open(f"{self.rundirectory}/log.txt", "r") as f: diff --git a/tests/unit/test_command.py b/tests/unit/test_command.py index 47feb13..21613fc 100644 --- a/tests/unit/test_command.py +++ b/tests/unit/test_command.py @@ -413,7 +413,7 @@ def test_can_get_first_execution(self, mock_update, mock_paths, mock_init, mock_ self.assertEqual(size, 3) mock_text.assert_called_with(os.path.join("/log", ".nextflow.log"), io) mock_make.assert_called_with("LOG", "/ex", "nf run", None, io) - mock_init.assert_called_with("LOG", mock_execution, io) + mock_init.assert_called_with("LOG", mock_execution) mock_paths.assert_called_with(["cc/dd","gg/hh"], "/ex", io) self.assertEqual([c[0] for c in mock_update.call_args_list], [ (process_executions["aa/bb"], "/ex", "UTC", io), @@ -454,7 +454,7 @@ def test_can_get_subsequent_execution(self, mock_update, mock_paths, mock_init, self.assertEqual(size, 3) mock_text.assert_called_with(os.path.join("/log", ".nextflow.log"), io) mock_make.assert_called_with("LOG", "/ex", "nf run", mock_execution, io) - mock_init.assert_called_with("LOG", mock_execution, io) + mock_init.assert_called_with("LOG", mock_execution) mock_paths.assert_called_with(["cc/dd","gg/hh"], "/ex", io) self.assertEqual([c[0] for c in mock_update.call_args_list], [ (process_executions["aa/bb"], "/ex", "UTC", io), @@ -555,14 +555,13 @@ def test_can_create_first_pass(self, mock_update, mock_create): mock_create.side_effect = [p1, p2, None] mock_update.return_value = "cc/dd" log = "line1\nSubmitted process a/bb\n..[ab/123456]\n[cd/789012] Submitted process\nTask completed\nSubmitted process" - io = Mock() - process_executions, updated = get_initial_process_executions(log, execution, io) + process_executions, updated = get_initial_process_executions(log, execution) self.assertEqual(process_executions, {"aa/bb": p1, "xx/yy": p2}) self.assertEqual(updated, ["aa/bb", "xx/yy", "cc/dd"]) self.assertEqual([c[0] for c in mock_create.call_args_list], [ - ("Submitted process a/bb", False, io), - ("[cd/789012] Submitted process", False, io), - ("Submitted process", False, io), + ("Submitted process a/bb", False), + ("[cd/789012] Submitted process", False), + ("Submitted process", False), ]) mock_update.assert_called_with({"aa/bb": p1, "xx/yy": p2}, "Task completed") @@ -575,14 +574,13 @@ def test_can_create_first_pass_cached(self, mock_update, mock_create): mock_create.side_effect = [p1, p2, None] mock_update.return_value = "cc/dd" log = "line1\nSubmitted process a/bb\n..[ab/123456]\n[cd/789012] Cached process\nTask completed\nSubmitted process" - io = Mock() - process_executions, updated = get_initial_process_executions(log, execution, io) + process_executions, updated = get_initial_process_executions(log, execution) self.assertEqual(process_executions, {"aa/bb": p1, "xx/yy": p2}) self.assertEqual(updated, ["aa/bb", "xx/yy", "cc/dd"]) self.assertEqual([c[0] for c in mock_create.call_args_list], [ - ("Submitted process a/bb", False, io), - ("[cd/789012] Cached process", True, io), - ("Submitted process", False, io), + ("Submitted process a/bb", False), + ("[cd/789012] Cached process", True), + ("Submitted process", False), ]) mock_update.assert_called_with({"aa/bb": p1, "xx/yy": p2}, "Task completed") @@ -596,14 +594,13 @@ def test_can_update_existing(self, mock_update, mock_create): mock_create.side_effect = [p1, p2, None] mock_update.return_value = "cc/dd" log = "line1\nSubmitted process a/bb\n..[ab/123456]\n[cd/789012] Submitted process\nTask completed\nSubmitted process" - io = Mock() - process_executions, updated = get_initial_process_executions(log, execution, io) + process_executions, updated = get_initial_process_executions(log, execution) self.assertEqual(process_executions, {"aa/bb": p1, "cc/dd": p3, "xx/yy": p2}) self.assertEqual(updated, ["aa/bb", "xx/yy", "cc/dd"]) self.assertEqual([c[0] for c in mock_create.call_args_list], [ - ("Submitted process a/bb", False, io), - ("[cd/789012] Submitted process", False, io), - ("Submitted process", False, io), + ("Submitted process a/bb", False), + ("[cd/789012] Submitted process", False), + ("Submitted process", False), ]) mock_update.assert_called_with({"aa/bb": p1, "cc/dd": p3, "xx/yy": p2}, "Task completed") @@ -614,8 +611,7 @@ class CreateProcessExecutionFromLineTests(TestCase): @patch("nextflow.command.parse_submitted_line") def test_can_create_process_execution(self, mock_parse): mock_parse.return_value = ("aa/bb", "PROC (123)", "PROC", "NOW") - io = Mock() - proc_ex = create_process_execution_from_line("line1", io=io) + proc_ex = create_process_execution_from_line("line1") self.assertEqual(proc_ex.identifier, "aa/bb") self.assertEqual(proc_ex.name, "PROC (123)") self.assertEqual(proc_ex.process, "PROC") @@ -629,14 +625,12 @@ def test_can_create_process_execution(self, mock_parse): self.assertEqual(proc_ex.status, "-") self.assertEqual(proc_ex.path, "") self.assertFalse(proc_ex.cached) - self.assertIs(proc_ex.io, io) - + @patch("nextflow.command.parse_cached_line") def test_can_create_cached_process_execution(self, mock_parse): mock_parse.return_value = ("aa/bb", "PROC (123)", "PROC") - io = Mock() - proc_ex = create_process_execution_from_line("line1", cached=True, io=io) + proc_ex = create_process_execution_from_line("line1", cached=True) self.assertEqual(proc_ex.identifier, "aa/bb") self.assertEqual(proc_ex.name, "PROC (123)") self.assertEqual(proc_ex.process, "PROC") @@ -650,7 +644,6 @@ def test_can_create_cached_process_execution(self, mock_parse): self.assertEqual(proc_ex.status, "COMPLETED") self.assertEqual(proc_ex.path, "") self.assertTrue(proc_ex.cached) - self.assertIs(proc_ex.io, io) @patch("nextflow.command.parse_submitted_line") diff --git a/tests/unit/test_process_executions.py b/tests/unit/test_process_executions.py index ff3193c..6b89021 100644 --- a/tests/unit/test_process_executions.py +++ b/tests/unit/test_process_executions.py @@ -11,7 +11,7 @@ def make_process_execution(self, **kwargs): "identifier": "12/3456", "name": "FASTQC (1)", "submitted": datetime(2021, 7, 4), "process": "FASTQC", "path": "12/34567890", "stdout": "good", "stderr": "bad", "return_code": "0", "bash": "$", "started": datetime(2021, 7, 5), "cached": False, - "finished": datetime(2021, 7, 6), "status": "COMPLETED", "io": None, **kwargs + "finished": datetime(2021, 7, 6), "status": "COMPLETED", **kwargs } return ProcessExecution(**kwargs) @@ -24,7 +24,7 @@ def test_can_make_process_execution(self): identifier="12/3456", name="FASTQC (1)", submitted=datetime(2021, 7, 4), process="FASTQC", path="12/34567890", stdout="good", stderr="bad", return_code="0", bash="$", started=datetime(2021, 7, 5), cached=True, - finished=datetime(2021, 7, 6), status="COMPLETED", io=None + finished=datetime(2021, 7, 6), status="COMPLETED" ) self.assertEqual(process_execution.identifier, "12/3456") self.assertEqual(process_execution.name, "FASTQC (1)") @@ -236,7 +236,7 @@ def test_can_use_custom_io(self, mock_path, mock_input): io = Mock() io.listdir.return_value = ["file1", "file2", ".command.run", ".exitcode", "file3"] self.assertEqual( - self.make_process_execution(io=io).all_output_data(), + self.make_process_execution().all_output_data(io=io), [str(Path("/loc/file1")), str(Path("/loc/file3"))] ) mock_input.assert_called_with(include_path=False) \ No newline at end of file