From e759dc0ef92222b2da51f699dfa31378edcf4757 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Thu, 12 Feb 2026 09:40:42 -0800 Subject: [PATCH 1/2] Try to prevent empty promise files from existing when their fulfilling jobs think they are done --- src/toil/job.py | 9 +++++++-- src/toil/jobStores/fileJobStore.py | 21 +++++++++++---------- src/toil/lib/aws/s3.py | 1 + src/toil/test/jobStores/jobStoreTest.py | 25 +++++++++++++++++++++++++ 4 files changed, 44 insertions(+), 12 deletions(-) diff --git a/src/toil/job.py b/src/toil/job.py index 43cef05631..f8385d8506 100644 --- a/src/toil/job.py +++ b/src/toil/job.py @@ -2738,7 +2738,7 @@ def _fulfillPromises(self, returnValues, jobStore): # File may be gone if the job is a service being re-run and the accessing job is # already complete. if jobStore.file_exists(promiseFileStoreID): - logger.debug( + logger.info( "Resolve promise %s from %s with a %s", promiseFileStoreID, self, @@ -4204,6 +4204,11 @@ class Promise: A set of IDs of files containing promised values when we know we won't need them anymore """ + resolving = True + """ + Set to False to disable promise resolution for debugging. + """ + def __init__(self, job: Job, path: Any): """ Initialize this promise. @@ -4241,7 +4246,7 @@ def __new__(cls, *args) -> Promise: raise RuntimeError( "Cannot instantiate promise. Invalid number of arguments given (Expected 2)." ) - if isinstance(args[0], Job): + if not cls.resolving or isinstance(args[0], Job): # Regular instantiation when promise is created, before it is being pickled return super().__new__(cls) else: diff --git a/src/toil/jobStores/fileJobStore.py b/src/toil/jobStores/fileJobStore.py index b68f83f9cf..88b73a018a 100644 --- a/src/toil/jobStores/fileJobStore.py +++ b/src/toil/jobStores/fileJobStore.py @@ -688,16 +688,17 @@ def get_file_size(self, file_id): @contextmanager def update_file_stream(self, file_id, encoding=None, errors=None): self._check_job_store_file_id(file_id) - # File objects are context managers (CM) so we could simply return what open returns. - # However, it is better to wrap it in another CM so as to prevent users from accessing - # the file object directly, without a with statement. - with open( - self._get_file_path_from_id(file_id), - "wb" if encoding == None else "wt", - encoding=encoding, - errors=errors, - ) as f: - yield f + + with AtomicFileCreate(self._get_file_path_from_id(file_id)) as tmp_path: + # We show the user an open stream, and take the update only if the + # user finishes writing successfully. + with open( + tmp_path, + "wb" if encoding == None else "wt", + encoding=encoding, + errors=errors, + ) as f: + yield f @contextmanager @overload diff --git a/src/toil/lib/aws/s3.py b/src/toil/lib/aws/s3.py index 584fb599a6..dc8f9be535 100644 --- a/src/toil/lib/aws/s3.py +++ b/src/toil/lib/aws/s3.py @@ -319,6 +319,7 @@ def readFrom(self, readable: IO[Any]) -> None: break hasher.update(buf) except: + logger.error("Aborting upload!") self.s3_client.abort_multipart_upload( Bucket=self.bucket_name, Key=self.file_id, UploadId=upload_id ) diff --git a/src/toil/test/jobStores/jobStoreTest.py b/src/toil/test/jobStores/jobStoreTest.py index 26ca769b79..1867a0e94b 100644 --- a/src/toil/test/jobStores/jobStoreTest.py +++ b/src/toil/test/jobStores/jobStoreTest.py @@ -446,6 +446,31 @@ def testReadWriteFileStreamTextMode(self): with jobstore2.read_file_stream(fileID, encoding="utf-8") as f: self.assertEqual(bar, f.read()) + def testStreamUpdateAtomic(self): + """Checks if updating a stream and failing in the middle does nothing.""" + jobstore = self.jobstore_initialized + foo = "foo" + bar = "bar" + + with jobstore.write_file_stream(encoding="utf-8") as ( + f, + fileID, + ): + f.write(foo) + + class FakeError(RuntimeError): + pass + + try: + with jobstore.update_file_stream(fileID, encoding="utf-8") as f: + f.write(bar) + raise FakeError("Oh dear") + except FakeError as e: + pass + + with jobstore.read_file_stream(fileID, encoding="utf-8") as f: + self.assertEqual(foo, f.read()) + def testPerJobFiles(self): """Tests the behavior of files on jobs.""" jobstore1 = self.jobstore_initialized From 0d92acf82c3296d2ba2f1ebaf6852b3463844b99 Mon Sep 17 00:00:00 2001 From: Adam Novak Date: Thu, 12 Feb 2026 12:00:08 -0800 Subject: [PATCH 2/2] Make file stream uploads atomic for AWS and Google --- src/toil/jobStores/googleJobStore.py | 25 +++++++++++--- src/toil/lib/aws/s3.py | 13 +++++-- src/toil/lib/pipes.py | 51 +++++++++++++++++++++++----- 3 files changed, 74 insertions(+), 15 deletions(-) diff --git a/src/toil/jobStores/googleJobStore.py b/src/toil/jobStores/googleJobStore.py index 6026c855ff..ba11a06996 100644 --- a/src/toil/jobStores/googleJobStore.py +++ b/src/toil/jobStores/googleJobStore.py @@ -722,10 +722,27 @@ class UploadPipe(WritablePipe): def readFrom(self, readable): if not update: assert not blob.exists() - if readable.seekable(): - blob.upload_from_file(readable) - else: - blob.upload_from_string(readable.read()) + # TODO: our pipe stream is not seekable, and + # blob.upload_from_file() apparently wants a seekable stream + # (TODO: check this! The docs at + # + # don't say but they do say it acts differently based on file + # size.) + + # So we just always dump the whole stream to memory and then + # upload it. + all_data = readable.read() + + # This in turn means we can use WritabelPipe's built-in writer + # exception detection to cancel the upload on writer failure, + # and we don't need to work out a way to get an error to come + # from a read call inside Google's code. + if self.writer_error is not None: + # Don't actually commit an upload where the writer failed. + log.error("Aborting upload due to error in writer") + return + + blob.upload_from_string(all_data) with UploadPipe(encoding=encoding, errors=errors) as writable: yield writable diff --git a/src/toil/lib/aws/s3.py b/src/toil/lib/aws/s3.py index dc8f9be535..a7b825a9d9 100644 --- a/src/toil/lib/aws/s3.py +++ b/src/toil/lib/aws/s3.py @@ -315,14 +315,23 @@ def readFrom(self, readable: IO[Any]) -> None: # Get the next block of data we want to put buf = readable.read(self.part_size) if len(buf) == 0: - # Don't allow any part other than the very first to be empty. + # The writer has stoped writing. + if self.writer_error is not None: + # This is because of abnormal termination. + # Don't complete the upload. + raise RuntimeError("Writer failed when writing to MultiPartPipe") from self.writer_error + # Otherwise, the upload is done. break hasher.update(buf) except: - logger.error("Aborting upload!") + logger.exception(f"[{upload_id}] Aborting upload") self.s3_client.abort_multipart_upload( Bucket=self.bucket_name, Key=self.file_id, UploadId=upload_id ) + if self.writer_error is None: + # If the writer isn't already failing (and causing us to fail + # for that reason), fail for this reason. + raise else: # Save the checksum checksum = f"sha1${hasher.hexdigest()}" diff --git a/src/toil/lib/pipes.py b/src/toil/lib/pipes.py index b4e63ec925..4482d9e9fb 100644 --- a/src/toil/lib/pipes.py +++ b/src/toil/lib/pipes.py @@ -4,13 +4,13 @@ import os from abc import ABC, abstractmethod from typing import IO, Any +from types import TracebackType from toil.lib.checksum import ChecksumError from toil.lib.threading import ExceptionalThread log = logging.getLogger(__name__) - class WritablePipe(ABC): """ An object-oriented wrapper for os.pipe. Clients should subclass it, implement @@ -25,11 +25,9 @@ class WritablePipe(ABC): ... _ = writable.write('Hello, world!\\n'.encode('utf-8')) Hello, world! - Each instance of this class creates a thread and invokes the readFrom method in that thread. - The thread will be join()ed upon normal exit from the context manager, i.e. the body of the - `with` statement. If an exception occurs, the thread will not be joined but a well-behaved - :meth:`.readFrom` implementation will terminate shortly thereafter due to the pipe having - been closed. + Each instance of this class creates a thread and invokes the readFrom + method in that thread. The thread will be join()ed upon exit from the + context manager, i.e. the body of the `with` statement. Now, exceptions in the reader thread will be reraised in the main thread: @@ -71,6 +69,27 @@ class WritablePipe(ABC): RuntimeError: Hello, world! >>> y = os.dup(0); os.close(y); x == y True + + Exceptions in the body of the with statement will cause an error that the + readFrom method can detect, visible by the time the empty-read EOF marker + is visible. + + >>> seen_errors = [] + >>> class MyPipe(WritablePipe): + ... def readFrom(self, readable): + ... while readable.read(100) != "": + ... pass + ... if self.writer_error is not None: + ... seen_errors.append(self.writer_error) + >>> with MyPipe() as writable: + ... raise RuntimeError('Hello, world!') + Traceback (most recent call last): + ... + RuntimeError: Hello, world! + >>> len(seen_errors) + 1 + >>> type(seen_errors[0]) + RuntimeError """ def __init__(self, encoding: str | None = None, errors: str | None = None) -> None: @@ -90,6 +109,7 @@ def __init__(self, encoding: str | None = None, errors: str | None = None) -> No self.writable: IO[Any] | None = None self.thread: ExceptionalThread | None = None self.reader_done: bool = False + self.writer_error: BaseException | None = None def __enter__(self) -> IO[Any]: self.readable_fh, writable_fh = os.pipe() @@ -99,13 +119,19 @@ def __enter__(self) -> IO[Any]: encoding=self.encoding, errors=self.errors, ) + self.writer_error = None self.thread = ExceptionalThread(target=self._reader) self.thread.start() return self.writable def __exit__( - self, exc_type: str | None, exc_val: str | None, exc_tb: str | None + self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None ) -> None: + # If there was an exception, make sure it is visible to the reader + # before closing the write end of the pipe and generating an EOF. + if exc_val is not None: + self.writer_error = exc_val + # Closing the writable end will send EOF to the readable and cause the reader thread # to finish. # TODO: Can close() fail? If so, would we try and clean up after the reader? @@ -143,6 +169,10 @@ def readFrom(self, readable: IO[Any]) -> None: Implement this method to read data from the pipe. This method should support both binary and text mode output. + If this method needs to do any sort of cleanup on failure, it should + check self.writer_error after observing EOF, to distinguish normal + and abnormal termination of the writer. + :param file readable: the file object representing the readable end of the pipe. Do not explicitly invoke the close() method of the object; that will be done automatically. """ @@ -159,7 +189,6 @@ def _reader(self) -> None: self.readFrom(readable) self.reader_done = True - class ReadablePipe(ABC): """ An object-oriented wrapper for os.pipe. Clients should subclass it, implement @@ -228,6 +257,10 @@ def writeTo(self, writable: IO[Any]) -> None: Implement this method to write data from the pipe. This method should support both binary and text mode input. + Trying to write to the writable stream after the reader has + unexpectedly failed will produce an error, because the pipe will be + broken. + :param file writable: the file object representing the writable end of the pipe. Do not explicitly invoke the close() method of the object, that will be done automatically. """ @@ -274,7 +307,7 @@ def __enter__(self) -> IO[Any]: return self.readable def __exit__( - self, exc_type: str | None, exc_val: str | None, exc_tb: str | None + self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: TracebackType | None ) -> None: # Close the read end of the pipe. The writing thread may # still be writing to the other end, but this will wake it up