Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/toil/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -2738,7 +2738,7 @@ def _fulfillPromises(self, returnValues, jobStore):
# File may be gone if the job is a service being re-run and the accessing job is
# already complete.
if jobStore.file_exists(promiseFileStoreID):
logger.debug(
logger.info(
"Resolve promise %s from %s with a %s",
promiseFileStoreID,
self,
Expand Down Expand Up @@ -4204,6 +4204,11 @@ class Promise:
A set of IDs of files containing promised values when we know we won't need them anymore
"""

resolving = True
"""
Set to False to disable promise resolution for debugging.
"""

def __init__(self, job: Job, path: Any):
"""
Initialize this promise.
Expand Down Expand Up @@ -4241,7 +4246,7 @@ def __new__(cls, *args) -> Promise:
raise RuntimeError(
"Cannot instantiate promise. Invalid number of arguments given (Expected 2)."
)
if isinstance(args[0], Job):
if not cls.resolving or isinstance(args[0], Job):
# Regular instantiation when promise is created, before it is being pickled
return super().__new__(cls)
else:
Expand Down
21 changes: 11 additions & 10 deletions src/toil/jobStores/fileJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -688,16 +688,17 @@ def get_file_size(self, file_id):
@contextmanager
def update_file_stream(self, file_id, encoding=None, errors=None):
self._check_job_store_file_id(file_id)
# File objects are context managers (CM) so we could simply return what open returns.
# However, it is better to wrap it in another CM so as to prevent users from accessing
# the file object directly, without a with statement.
with open(
self._get_file_path_from_id(file_id),
"wb" if encoding == None else "wt",
encoding=encoding,
errors=errors,
) as f:
yield f

with AtomicFileCreate(self._get_file_path_from_id(file_id)) as tmp_path:
# We show the user an open stream, and take the update only if the
# user finishes writing successfully.
with open(
tmp_path,
"wb" if encoding == None else "wt",
encoding=encoding,
errors=errors,
) as f:
yield f

@contextmanager
@overload
Expand Down
25 changes: 21 additions & 4 deletions src/toil/jobStores/googleJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -722,10 +722,27 @@ class UploadPipe(WritablePipe):
def readFrom(self, readable):
if not update:
assert not blob.exists()
if readable.seekable():
blob.upload_from_file(readable)
else:
blob.upload_from_string(readable.read())
# TODO: our pipe stream is not seekable, and
# blob.upload_from_file() apparently wants a seekable stream
# (TODO: check this! The docs at
# <https://docs.cloud.google.com/python/docs/reference/storage/latest/google.cloud.storage.blob.Blob#google_cloud_storage_blob_Blob_upload_from_file>
# don't say but they do say it acts differently based on file
# size.)

# So we just always dump the whole stream to memory and then
# upload it.
all_data = readable.read()

# This in turn means we can use WritabelPipe's built-in writer
# exception detection to cancel the upload on writer failure,
# and we don't need to work out a way to get an error to come
# from a read call inside Google's code.
if self.writer_error is not None:
# Don't actually commit an upload where the writer failed.
log.error("Aborting upload due to error in writer")
return

blob.upload_from_string(all_data)

with UploadPipe(encoding=encoding, errors=errors) as writable:
yield writable
Expand Down
12 changes: 11 additions & 1 deletion src/toil/lib/aws/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,23 @@ def readFrom(self, readable: IO[Any]) -> None:
# Get the next block of data we want to put
buf = readable.read(self.part_size)
if len(buf) == 0:
# Don't allow any part other than the very first to be empty.
# The writer has stoped writing.
if self.writer_error is not None:
# This is because of abnormal termination.
# Don't complete the upload.
raise RuntimeError("Writer failed when writing to MultiPartPipe") from self.writer_error
# Otherwise, the upload is done.
break
hasher.update(buf)
except:
logger.exception(f"[{upload_id}] Aborting upload")
self.s3_client.abort_multipart_upload(
Bucket=self.bucket_name, Key=self.file_id, UploadId=upload_id
)
if self.writer_error is None:
# If the writer isn't already failing (and causing us to fail
# for that reason), fail for this reason.
raise
else:
# Save the checksum
checksum = f"sha1${hasher.hexdigest()}"
Expand Down
51 changes: 42 additions & 9 deletions src/toil/lib/pipes.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@
import os
from abc import ABC, abstractmethod
from typing import IO, Any
from types import TracebackType

from toil.lib.checksum import ChecksumError
from toil.lib.threading import ExceptionalThread

log = logging.getLogger(__name__)


class WritablePipe(ABC):
"""
An object-oriented wrapper for os.pipe. Clients should subclass it, implement
Expand All @@ -25,11 +25,9 @@ class WritablePipe(ABC):
... _ = writable.write('Hello, world!\\n'.encode('utf-8'))
Hello, world!

Each instance of this class creates a thread and invokes the readFrom method in that thread.
The thread will be join()ed upon normal exit from the context manager, i.e. the body of the
`with` statement. If an exception occurs, the thread will not be joined but a well-behaved
:meth:`.readFrom` implementation will terminate shortly thereafter due to the pipe having
been closed.
Each instance of this class creates a thread and invokes the readFrom
method in that thread. The thread will be join()ed upon exit from the
context manager, i.e. the body of the `with` statement.

Now, exceptions in the reader thread will be reraised in the main thread:

Expand Down Expand Up @@ -71,6 +69,27 @@ class WritablePipe(ABC):
RuntimeError: Hello, world!
>>> y = os.dup(0); os.close(y); x == y
True

Exceptions in the body of the with statement will cause an error that the
readFrom method can detect, visible by the time the empty-read EOF marker
is visible.

>>> seen_errors = []
>>> class MyPipe(WritablePipe):
... def readFrom(self, readable):
... while readable.read(100) != "":
... pass
... if self.writer_error is not None:
... seen_errors.append(self.writer_error)
>>> with MyPipe() as writable:
... raise RuntimeError('Hello, world!')
Traceback (most recent call last):
...
RuntimeError: Hello, world!
>>> len(seen_errors)
1
>>> type(seen_errors[0])
RuntimeError
"""

def __init__(self, encoding: str | None = None, errors: str | None = None) -> None:
Expand All @@ -90,6 +109,7 @@ def __init__(self, encoding: str | None = None, errors: str | None = None) -> No
self.writable: IO[Any] | None = None
self.thread: ExceptionalThread | None = None
self.reader_done: bool = False
self.writer_error: BaseException | None = None

def __enter__(self) -> IO[Any]:
self.readable_fh, writable_fh = os.pipe()
Expand All @@ -99,13 +119,19 @@ def __enter__(self) -> IO[Any]:
encoding=self.encoding,
errors=self.errors,
)
self.writer_error = None
self.thread = ExceptionalThread(target=self._reader)
self.thread.start()
return self.writable

def __exit__(
self, exc_type: str | None, exc_val: str | None, exc_tb: str | None
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None:
# If there was an exception, make sure it is visible to the reader
# before closing the write end of the pipe and generating an EOF.
if exc_val is not None:
self.writer_error = exc_val

# Closing the writable end will send EOF to the readable and cause the reader thread
# to finish.
# TODO: Can close() fail? If so, would we try and clean up after the reader?
Expand Down Expand Up @@ -143,6 +169,10 @@ def readFrom(self, readable: IO[Any]) -> None:
Implement this method to read data from the pipe. This method should support both
binary and text mode output.

If this method needs to do any sort of cleanup on failure, it should
check self.writer_error after observing EOF, to distinguish normal
and abnormal termination of the writer.

:param file readable: the file object representing the readable end of the pipe. Do not
explicitly invoke the close() method of the object; that will be done automatically.
"""
Expand All @@ -159,7 +189,6 @@ def _reader(self) -> None:
self.readFrom(readable)
self.reader_done = True


class ReadablePipe(ABC):
"""
An object-oriented wrapper for os.pipe. Clients should subclass it, implement
Expand Down Expand Up @@ -228,6 +257,10 @@ def writeTo(self, writable: IO[Any]) -> None:
Implement this method to write data from the pipe. This method should support both
binary and text mode input.

Trying to write to the writable stream after the reader has
unexpectedly failed will produce an error, because the pipe will be
broken.

:param file writable: the file object representing the writable end of the pipe. Do not
explicitly invoke the close() method of the object, that will be done automatically.
"""
Expand Down Expand Up @@ -274,7 +307,7 @@ def __enter__(self) -> IO[Any]:
return self.readable

def __exit__(
self, exc_type: str | None, exc_val: str | None, exc_tb: str | None
self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None
) -> None:
# Close the read end of the pipe. The writing thread may
# still be writing to the other end, but this will wake it up
Expand Down
25 changes: 25 additions & 0 deletions src/toil/test/jobStores/jobStoreTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,31 @@ def testReadWriteFileStreamTextMode(self):
with jobstore2.read_file_stream(fileID, encoding="utf-8") as f:
self.assertEqual(bar, f.read())

def testStreamUpdateAtomic(self):
"""Checks if updating a stream and failing in the middle does nothing."""
jobstore = self.jobstore_initialized
foo = "foo"
bar = "bar"

with jobstore.write_file_stream(encoding="utf-8") as (
f,
fileID,
):
f.write(foo)

class FakeError(RuntimeError):
pass

try:
with jobstore.update_file_stream(fileID, encoding="utf-8") as f:
f.write(bar)
raise FakeError("Oh dear")
except FakeError as e:
pass

with jobstore.read_file_stream(fileID, encoding="utf-8") as f:
self.assertEqual(foo, f.read())

def testPerJobFiles(self):
"""Tests the behavior of files on jobs."""
jobstore1 = self.jobstore_initialized
Expand Down