Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 5 additions & 7 deletions nextflow/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ def get_execution(execution_path, log_path, nextflow_command, execution=None, lo
if not log: return None, 0
log = log[log_start:]
execution = make_or_update_execution(log, execution_path, nextflow_command, execution, io)
process_executions, changed = get_initial_process_executions(log, execution, io)
process_executions, changed = get_initial_process_executions(log, execution)
no_path = [k for k, v in process_executions.items() if not v.path]
process_ids_to_paths = get_process_ids_to_paths(no_path, execution_path, io)
for process_id, path in process_ids_to_paths.items():
Expand Down Expand Up @@ -392,7 +392,7 @@ def make_or_update_execution(log, execution_path, nextflow_command, execution, i
return execution


def get_initial_process_executions(log, execution, io):
def get_initial_process_executions(log, execution):
"""Parses a section of a log file and looks for new process executions not
currently in the list, or uncompleted ones which can now be completed. Some
attributes are not yet filled in.
Expand All @@ -401,7 +401,6 @@ def get_initial_process_executions(log, execution, io):

:param str log: a section of the log file.
:param nextflow.models.Execution execution: the containing execution.
:param io: an optional custom io object to handle file operations.
:rtype: ``tuple``"""

lines = log.splitlines()
Expand All @@ -410,7 +409,7 @@ def get_initial_process_executions(log, execution, io):
for line in lines:
if "Submitted process" in line or "Cached process" in line:
is_cached = "Cached process" in line
proc_ex = create_process_execution_from_line(line, is_cached, io)
proc_ex = create_process_execution_from_line(line, is_cached)
if not proc_ex: continue
proc_ex.execution = execution
process_executions[proc_ex.identifier] = proc_ex
Expand All @@ -422,13 +421,12 @@ def get_initial_process_executions(log, execution, io):
return process_executions, just_updated


def create_process_execution_from_line(line, cached=False, io=None):
def create_process_execution_from_line(line, cached=False):
"""Creates a process execution from a line of the log file in which its
submission (or previous caching) is reported.

:param str line: a line from the log file.
:param bool cached: whether the process is cached.
:param io: an optional custom io object to handle file operations.
:rtype: ``nextflow.models.ProcessExecution``"""

if cached:
Expand All @@ -442,7 +440,7 @@ def create_process_execution_from_line(line, cached=False, io=None):
path="", stdout="", stderr="", bash="", started=None, finished=None,
return_code="0" if cached else "",
status="COMPLETED" if cached else "-",
cached=cached, io=io
cached=cached
)


Expand Down
6 changes: 3 additions & 3 deletions nextflow/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ class ProcessExecution:
finished: datetime | None
status: str
cached: bool
io: Any


def __repr__(self):
Expand Down Expand Up @@ -126,17 +125,18 @@ def input_data(self, include_path=True):
return [os.path.basename(f) for f in inputs]


def all_output_data(self, include_path=True):
def all_output_data(self, include_path=True, io=None):
"""A list of all output data produced by the process execution,
including unpublished staging files.

:param bool include_path: if ``False``, only filenames returned.
:param io: an optional custom io object to handle file operations.
:type: ``list``"""

outputs = []
if not self.path: return []
inputs = self.input_data(include_path=False)
listdir = self.io.listdir if self.io else os.listdir
listdir = io.listdir if io else os.listdir
for f in listdir(self.full_path):
full_path = Path(f"{self.full_path}/{f}")
if not f.startswith(".command") and f != ".exitcode":
Expand Down
22 changes: 11 additions & 11 deletions tests/integration/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def check_running_execution(self, execution, last_stdout, output_path=None):
return execution.stdout


def check_execution(self, execution, line_count=24, output_path=None, log_path=None, version=None, timezone=None, report=None, timeline=None, dag=None, trace=None, check_stderr=True):
def check_execution(self, execution, line_count=24, output_path=None, log_path=None, version=None, timezone=None, report=None, timeline=None, dag=None, trace=None, check_stderr=True, io=None):
# Files created
if not output_path: self.assertIn(".nextflow", os.listdir(self.get_path("rundirectory")))
if log_path:
Expand Down Expand Up @@ -105,10 +105,10 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N
self.assertEqual(proc_ex.input_data(), [self.get_path("files/data.txt")])
self.assertEqual(proc_ex.input_data(include_path=False), ["data.txt"])
self.assertEqual(
set(proc_ex.all_output_data(include_path=False)),
set(proc_ex.all_output_data(include_path=False, io=io)),
{"abc.dat", "xyz.dat", "log.txt"}
)
self.assertIn(proc_ex.identifier, proc_ex.all_output_data()[0])
self.assertIn(proc_ex.identifier, proc_ex.all_output_data(io=io)[0])

proc_ex = self.get_process_execution(execution, "PROCESS_DATA:DUPLICATE_AND_LOWER:DUPLICATE (abc.dat)")
self.check_process_execution(proc_ex, execution, False, check_time=not timezone)
Expand All @@ -121,9 +121,9 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N
proc_ex.input_data()[0]
)
self.assertEqual(
set(proc_ex.all_output_data(include_path=False)), {"duplicated_abc.dat"}
set(proc_ex.all_output_data(include_path=False, io=io)), {"duplicated_abc.dat"}
)
with open(proc_ex.all_output_data(include_path=True)[0]) as f:
with open(proc_ex.all_output_data(include_path=True, io=io)[0]) as f:
self.assertEqual(len(f.read().splitlines()), line_count)

proc_ex = self.get_process_execution(execution, "PROCESS_DATA:DUPLICATE_AND_LOWER:DUPLICATE (xyz.dat)")
Expand All @@ -137,7 +137,7 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N
proc_ex.input_data()[0]
)
self.assertEqual(
set(proc_ex.all_output_data(include_path=False)), {"duplicated_xyz.dat"}
set(proc_ex.all_output_data(include_path=False, io=io)), {"duplicated_xyz.dat"}
)

proc_ex = self.get_process_execution(execution, "PROCESS_DATA:DUPLICATE_AND_LOWER:LOWER (duplicated_abc.dat)")
Expand All @@ -151,7 +151,7 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N
proc_ex.input_data()[0]
)
self.assertEqual(
set(proc_ex.all_output_data(include_path=False)), {"lowered_duplicated_abc.dat"}
set(proc_ex.all_output_data(include_path=False, io=io)), {"lowered_duplicated_abc.dat"}
)

proc_ex = self.get_process_execution(execution, "PROCESS_DATA:DUPLICATE_AND_LOWER:LOWER (duplicated_xyz.dat)")
Expand All @@ -165,7 +165,7 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N
proc_ex.input_data()[0]
)
self.assertEqual(
set(proc_ex.all_output_data(include_path=False)), {"lowered_duplicated_xyz.dat"}
set(proc_ex.all_output_data(include_path=False, io=io)), {"lowered_duplicated_xyz.dat"}
)

proc_ex = self.get_process_execution(execution, "PROCESS_DATA:APPEND (lowered_duplicated_abc.dat)")
Expand All @@ -175,7 +175,7 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N
self.assertEqual(proc_ex.process, "PROCESS_DATA:APPEND")
self.assertEqual(set(proc_ex.input_data(include_path=False)), {"lowered_duplicated_abc.dat", "suffix.txt"})
self.assertEqual(
set(proc_ex.all_output_data(include_path=False)), {"suffix_lowered_duplicated_abc.dat"}
set(proc_ex.all_output_data(include_path=False, io=io)), {"suffix_lowered_duplicated_abc.dat"}
)

proc_ex = self.get_process_execution(execution, "PROCESS_DATA:APPEND (lowered_duplicated_xyz.dat)")
Expand All @@ -185,7 +185,7 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N
self.assertEqual(proc_ex.process, "PROCESS_DATA:APPEND")
self.assertEqual(set(proc_ex.input_data(include_path=False)), {"lowered_duplicated_xyz.dat", "suffix.txt"})
self.assertEqual(
set(proc_ex.all_output_data(include_path=False)), {"suffix_lowered_duplicated_xyz.dat"}
set(proc_ex.all_output_data(include_path=False, io=io)), {"suffix_lowered_duplicated_xyz.dat"}
)

proc_ex = self.get_process_execution(execution, "JOIN:COMBINE_FILES")
Expand All @@ -198,7 +198,7 @@ def check_execution(self, execution, line_count=24, output_path=None, log_path=N
{"suffix_lowered_duplicated_abc.dat", "suffix_lowered_duplicated_xyz.dat"}
)
self.assertEqual(
set(proc_ex.all_output_data(include_path=False)), {"combined.txt"}
set(proc_ex.all_output_data(include_path=False, io=io)), {"combined.txt"}
)


Expand Down
2 changes: 1 addition & 1 deletion tests/integration/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,7 @@ def glob(self, path):
)

# Execution is fine
self.check_execution(execution)
self.check_execution(execution, io=io)

# The custom io functions were used
with open(f"{self.rundirectory}/log.txt", "r") as f:
Expand Down
41 changes: 17 additions & 24 deletions tests/unit/test_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ def test_can_get_first_execution(self, mock_update, mock_paths, mock_init, mock_
self.assertEqual(size, 3)
mock_text.assert_called_with(os.path.join("/log", ".nextflow.log"), io)
mock_make.assert_called_with("LOG", "/ex", "nf run", None, io)
mock_init.assert_called_with("LOG", mock_execution, io)
mock_init.assert_called_with("LOG", mock_execution)
mock_paths.assert_called_with(["cc/dd","gg/hh"], "/ex", io)
self.assertEqual([c[0] for c in mock_update.call_args_list], [
(process_executions["aa/bb"], "/ex", "UTC", io),
Expand Down Expand Up @@ -454,7 +454,7 @@ def test_can_get_subsequent_execution(self, mock_update, mock_paths, mock_init,
self.assertEqual(size, 3)
mock_text.assert_called_with(os.path.join("/log", ".nextflow.log"), io)
mock_make.assert_called_with("LOG", "/ex", "nf run", mock_execution, io)
mock_init.assert_called_with("LOG", mock_execution, io)
mock_init.assert_called_with("LOG", mock_execution)
mock_paths.assert_called_with(["cc/dd","gg/hh"], "/ex", io)
self.assertEqual([c[0] for c in mock_update.call_args_list], [
(process_executions["aa/bb"], "/ex", "UTC", io),
Expand Down Expand Up @@ -555,14 +555,13 @@ def test_can_create_first_pass(self, mock_update, mock_create):
mock_create.side_effect = [p1, p2, None]
mock_update.return_value = "cc/dd"
log = "line1\nSubmitted process a/bb\n..[ab/123456]\n[cd/789012] Submitted process\nTask completed\nSubmitted process"
io = Mock()
process_executions, updated = get_initial_process_executions(log, execution, io)
process_executions, updated = get_initial_process_executions(log, execution)
self.assertEqual(process_executions, {"aa/bb": p1, "xx/yy": p2})
self.assertEqual(updated, ["aa/bb", "xx/yy", "cc/dd"])
self.assertEqual([c[0] for c in mock_create.call_args_list], [
("Submitted process a/bb", False, io),
("[cd/789012] Submitted process", False, io),
("Submitted process", False, io),
("Submitted process a/bb", False),
("[cd/789012] Submitted process", False),
("Submitted process", False),
])
mock_update.assert_called_with({"aa/bb": p1, "xx/yy": p2}, "Task completed")

Expand All @@ -575,14 +574,13 @@ def test_can_create_first_pass_cached(self, mock_update, mock_create):
mock_create.side_effect = [p1, p2, None]
mock_update.return_value = "cc/dd"
log = "line1\nSubmitted process a/bb\n..[ab/123456]\n[cd/789012] Cached process\nTask completed\nSubmitted process"
io = Mock()
process_executions, updated = get_initial_process_executions(log, execution, io)
process_executions, updated = get_initial_process_executions(log, execution)
self.assertEqual(process_executions, {"aa/bb": p1, "xx/yy": p2})
self.assertEqual(updated, ["aa/bb", "xx/yy", "cc/dd"])
self.assertEqual([c[0] for c in mock_create.call_args_list], [
("Submitted process a/bb", False, io),
("[cd/789012] Cached process", True, io),
("Submitted process", False, io),
("Submitted process a/bb", False),
("[cd/789012] Cached process", True),
("Submitted process", False),
])
mock_update.assert_called_with({"aa/bb": p1, "xx/yy": p2}, "Task completed")

Expand All @@ -596,14 +594,13 @@ def test_can_update_existing(self, mock_update, mock_create):
mock_create.side_effect = [p1, p2, None]
mock_update.return_value = "cc/dd"
log = "line1\nSubmitted process a/bb\n..[ab/123456]\n[cd/789012] Submitted process\nTask completed\nSubmitted process"
io = Mock()
process_executions, updated = get_initial_process_executions(log, execution, io)
process_executions, updated = get_initial_process_executions(log, execution)
self.assertEqual(process_executions, {"aa/bb": p1, "cc/dd": p3, "xx/yy": p2})
self.assertEqual(updated, ["aa/bb", "xx/yy", "cc/dd"])
self.assertEqual([c[0] for c in mock_create.call_args_list], [
("Submitted process a/bb", False, io),
("[cd/789012] Submitted process", False, io),
("Submitted process", False, io),
("Submitted process a/bb", False),
("[cd/789012] Submitted process", False),
("Submitted process", False),
])
mock_update.assert_called_with({"aa/bb": p1, "cc/dd": p3, "xx/yy": p2}, "Task completed")

Expand All @@ -614,8 +611,7 @@ class CreateProcessExecutionFromLineTests(TestCase):
@patch("nextflow.command.parse_submitted_line")
def test_can_create_process_execution(self, mock_parse):
mock_parse.return_value = ("aa/bb", "PROC (123)", "PROC", "NOW")
io = Mock()
proc_ex = create_process_execution_from_line("line1", io=io)
proc_ex = create_process_execution_from_line("line1")
self.assertEqual(proc_ex.identifier, "aa/bb")
self.assertEqual(proc_ex.name, "PROC (123)")
self.assertEqual(proc_ex.process, "PROC")
Expand All @@ -629,14 +625,12 @@ def test_can_create_process_execution(self, mock_parse):
self.assertEqual(proc_ex.status, "-")
self.assertEqual(proc_ex.path, "")
self.assertFalse(proc_ex.cached)
self.assertIs(proc_ex.io, io)



@patch("nextflow.command.parse_cached_line")
def test_can_create_cached_process_execution(self, mock_parse):
mock_parse.return_value = ("aa/bb", "PROC (123)", "PROC")
io = Mock()
proc_ex = create_process_execution_from_line("line1", cached=True, io=io)
proc_ex = create_process_execution_from_line("line1", cached=True)
self.assertEqual(proc_ex.identifier, "aa/bb")
self.assertEqual(proc_ex.name, "PROC (123)")
self.assertEqual(proc_ex.process, "PROC")
Expand All @@ -650,7 +644,6 @@ def test_can_create_cached_process_execution(self, mock_parse):
self.assertEqual(proc_ex.status, "COMPLETED")
self.assertEqual(proc_ex.path, "")
self.assertTrue(proc_ex.cached)
self.assertIs(proc_ex.io, io)


@patch("nextflow.command.parse_submitted_line")
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/test_process_executions.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def make_process_execution(self, **kwargs):
"identifier": "12/3456", "name": "FASTQC (1)", "submitted": datetime(2021, 7, 4),
"process": "FASTQC", "path": "12/34567890", "stdout": "good", "stderr": "bad",
"return_code": "0", "bash": "$", "started": datetime(2021, 7, 5), "cached": False,
"finished": datetime(2021, 7, 6), "status": "COMPLETED", "io": None, **kwargs
"finished": datetime(2021, 7, 6), "status": "COMPLETED", **kwargs
}
return ProcessExecution(**kwargs)

Expand All @@ -24,7 +24,7 @@ def test_can_make_process_execution(self):
identifier="12/3456", name="FASTQC (1)", submitted=datetime(2021, 7, 4),
process="FASTQC", path="12/34567890", stdout="good", stderr="bad",
return_code="0", bash="$", started=datetime(2021, 7, 5), cached=True,
finished=datetime(2021, 7, 6), status="COMPLETED", io=None
finished=datetime(2021, 7, 6), status="COMPLETED"
)
self.assertEqual(process_execution.identifier, "12/3456")
self.assertEqual(process_execution.name, "FASTQC (1)")
Expand Down Expand Up @@ -236,7 +236,7 @@ def test_can_use_custom_io(self, mock_path, mock_input):
io = Mock()
io.listdir.return_value = ["file1", "file2", ".command.run", ".exitcode", "file3"]
self.assertEqual(
self.make_process_execution(io=io).all_output_data(),
self.make_process_execution().all_output_data(io=io),
[str(Path("/loc/file1")), str(Path("/loc/file3"))]
)
mock_input.assert_called_with(include_path=False)