Consistently use flock and not the sometimes-non-interacting fcntl locks#5449
Consistently use flock and not the sometimes-non-interacting fcntl locks#5449
Conversation
Add deterministic tests that verify the locking protocol is correct by mocking fcntl.flock and file operations to control thread execution order. Tests verify: - Reader blocked while writer holds exclusive lock - Writer blocked while reader holds shared lock - Multiple readers can hold shared locks simultaneously - Writers serialize (cannot hold exclusive locks concurrently) - Reader never sees partial write content Also add BOTS.md with development environment notes for AI assistants.
Use Checkpoint class with arrive_and_wait/wait_for_arrival pattern instead of time.sleep() calls. Tests now run in ~0.7s instead of ~24s and are more deterministic.
|
I've added a ream or so of synthetic test code that does fiddly mocking and condition variable dances to try and actually lean on the locking-ness of the locks in the "safe" read and write file functions. I don't currently understand it and I'm going to have to review it with a real review before merging. I'm not sure it's worth its maintenance; do we need to maintain 1 KLOC to ensure that I don't forget to call the right lock function at the right place again in 40 lines of locking/unlocking code? Just because we can spin this out in half an hour doesn't mean we should. |
…en-safe-read-file
adamnovak
left a comment
There was a problem hiding this comment.
I think the tests are mostly on the right track, but I think the design of the various manager widgets should be simplified/unified around a kind of class that lets you hook a Checkpoint into an operation (of which we can have 3 implementations: one for flock, one for read, and one for write).
We also need to consolidate all the hooking into one with on one context manager method we implement in the test class.
| """Release whatever lock this thread holds.""" | ||
| thread_id = threading.current_thread().ident | ||
| with self._condition: | ||
| if self._exclusive_held and self._exclusive_holder == thread_id: |
There was a problem hiding this comment.
This might not stand up to thread ID re-use, but we can't have that in this test.
src/toil/test/server/safeFileTest.py
Outdated
|
|
||
|
|
||
| class LockManager: | ||
| """Manages simulated locks for multiple files.""" |
src/toil/test/server/safeFileTest.py
Outdated
| except (OSError, ValueError): | ||
| pass |
There was a problem hiding this comment.
It might be better to fail here? Or do we need to be able to pretend to wrap things we can't really wrap, as part of the hooks not breaking everything?
src/toil/test/server/safeFileTest.py
Outdated
| self.during_write: dict[str, Checkpoint] = {} | ||
| self.during_read: dict[str, Checkpoint] = {} | ||
|
|
||
| def wrap_file(self, file_obj: Any, path: str) -> Any: | ||
| """Wrap a file object to track operations.""" | ||
| try: | ||
| self.lock_manager.register_fd(file_obj.fileno(), path) | ||
| except (OSError, ValueError): | ||
| pass | ||
|
|
||
| original_write = file_obj.write | ||
| original_read = file_obj.read | ||
| tracker = self | ||
|
|
||
| def tracked_write(data: str) -> int: | ||
| thread_name = threading.current_thread().name | ||
| checkpoint = tracker.during_write.get(thread_name) | ||
| if checkpoint: | ||
| checkpoint.arrive_and_wait() | ||
| return original_write(data) | ||
|
|
||
| def tracked_read(size: int = -1) -> str: | ||
| thread_name = threading.current_thread().name | ||
| checkpoint = tracker.during_read.get(thread_name) | ||
| if checkpoint: | ||
| checkpoint.arrive_and_wait() | ||
| return original_read(size) | ||
|
|
||
| file_obj.write = tracked_write | ||
| file_obj.read = tracked_read |
There was a problem hiding this comment.
Instead of two duplicate but slightly different versions of the wrapper function and the whole system, this could be like one function that sticks the sync point onto another function given the function and the checkpoint collection.
There was a problem hiding this comment.
Actually, we probably should split this class up into one for read and one for write, which would make each be able to be the same shape as the one for flock, in terms of providing an interface to attach checkpoints.
src/toil/test/server/safeFileTest.py
Outdated
|
|
||
| def writer() -> None: | ||
| try: | ||
| safe_write_file(str(self.test_file), "BBBB") |
There was a problem hiding this comment.
This write isn't short enough that we could tell if we saw a partial value. Really the partial case we can trigger with the tools we have is that the write finished but the truncate() didn't happen yet. So we need to overwrite with a shorter value, not a same-length value. And I think the blocks-reader-mid-write test is where we want to test that.
| return file_obj | ||
|
|
||
|
|
||
| # TODO: Add tests for AtomicFileCreate path (concurrent new file creation). |
There was a problem hiding this comment.
I don't think we need to test that for now.
src/toil/test/server/safeFileTest.py
Outdated
| checkpoint.arrive_and_wait() | ||
|
|
||
|
|
||
| class FileOperationTracker: |
There was a problem hiding this comment.
This doesn't really "track" file operations; it really provides a Checkpoint-based interface to blocking read and write calls by particular threads. Maybe it should be IOCheckpointer.
src/toil/test/server/safeFileTest.py
Outdated
| return self._shared_count | ||
|
|
||
|
|
||
| class LockManager: |
There was a problem hiding this comment.
This is really here to let us put Checkpoints on lock operations, so it should be a LockCheckpointer.
src/toil/test/server/safeFileTest.py
Outdated
| def patched_open(path: Any, mode: str = "r", *args: Any, **kwargs: Any) -> Any: | ||
| f = original_open(path, mode, *args, **kwargs) | ||
| return file_tracker.wrap_file(f, str(path)) |
There was a problem hiding this comment.
If we make this register the file with both the LockManager and the FileOperationTracker separately, neither has to know about the other, and I think we can get away with passing fewer arguments to the registration functions.
src/toil/test/server/safeFileTest.py
Outdated
|
|
||
| # Checkpoint to pause writer after acquiring lock | ||
| writer_checkpoint = Checkpoint() | ||
| self.lock_manager.after_acquire["writer"] = writer_checkpoint |
There was a problem hiding this comment.
Instead of storing checkpoints in after_acquire, during_read, during_write, etc. from outside the manager classes, they should both expose the same function to put a checkpoint on a thread by name.
Simplify test infrastructure by creating a unified Checkpointer base class with three implementations (LockCheckpointer, ReadCheckpointer, WriteCheckpointer). Each checkpointer provides its own install() context manager that patches the necessary functions, composing naturally via ExitStack. - Remove LockManager, FileOperationTracker, and _create_patches() - Add Checkpointer ABC with add()/get() and abstract install() method - LockCheckpointer handles fd registration and flock simulation - ReadCheckpointer/WriteCheckpointer hook into file read/write ops - Consolidate patching into single patched_io() context manager - Replace test_reader_never_sees_partial_write with test_reader_paused_mid_read_blocks_writer for better coverage
|
I had to feed the code generator a bunch of the pieces here (separate hooking per kind of event, I think the test code is good enough to be getting on with. |
This should fix #5447 by using the same file locking mechanism on the write and read sides.
Changelog Entry
To be copied to the draft changelog by merger:
RUNNINGIZING, as fun as that sounds for them.Reviewer Checklist
issues/XXXX-fix-the-thingin the Toil repo, or from an external repo.camelCasethat want to be insnake_case.docs/running/{cliOptions,cwl,wdl}.rstMerger Checklist