feat: make organic job execution flow sync#835
Conversation
validator/app/src/compute_horde_validator/validator/job_excuses.py
Dismissed
Show dismissed
Hide dismissed
0169a3b to
ee9a948
Compare
a128a9a to
cc7522a
Compare
Impacts: validator
cc7522a to
ec74ed0
Compare
validator/app/src/compute_horde_validator/validator/job_excuses.py
Outdated
Show resolved
Hide resolved
| try: | ||
| self._run() | ||
| except Exception as exc: | ||
| sentry_sdk.capture_exception(exc) |
There was a problem hiding this comment.
why is the sentry_sdk called explicitly here, instead of just logging the exception, possibly with job info?
There was a problem hiding this comment.
That I have copied verbatim from the async impl :v
| ) | ||
|
|
||
|
|
||
| @pytest.mark.django_db(databases=["default", "default_alias"], transaction=True) |
There was a problem hiding this comment.
I really like how concise these tests are. readable and really doing the job.
| ) | ||
| return DriverState.FAILED | ||
|
|
||
| def _wait_for_message( |
There was a problem hiding this comment.
seems that this implementation is very robust and resistant against weird messages, out of order messages etc. but I think it is not currently tested against that, is it? We've previously had malicious miners gaining some edge by sending out of order messages.
mpnowacki-reef
left a comment
There was a problem hiding this comment.
The implementation looks functionally correct, but SyncOrganicJobDriver at ~800 lines is doing a lot of heavy lifting. What do you think about splitting it? A few ideas:
Message handling - The logic in _wait_for_message that handles different error types (lines 477-537) could live in MinerClient or a separate helper. The client already knows how to send/receive, it could also know how to classify responses.
Receipt operations - Receipt creation and sending is scattered across _reserve_executor, _send_job_accepted_receipt, _collect_results, and _send_job_finished_receipt_if_failed. A small ReceiptService or even just a few standalone functions could clean this up.
Failure handling - _handle_horde_failure, _handle_decline_job, _handle_job_failed, _handle_horde_failed (~170 lines) are cohesive and could be extracted.
This would leave the driver as a ~300-400 line state machine focused on orchestration.
Also, since this is becoming a critical path, could we add some docstrings to the main flow? Specifically:
A class-level docstring explaining the state machine and the job lifecycle stages
Brief docs on _run() and _wait_for_message() explaining how the state transitions work
The state enum is already self-documenting which is great, just want to make sure the next person can follow the flow quickly.
Commit message lint report
Status: ❌ Issues found. gitlint output: Fix by amending/rewording your commits (e.g., using |
|
I don't think there's much that can be done here to lower the LoC. We are passing around a lot of context into messages, events, error reports etc and extracting anything means that you still have to pass that stuff into the helper, or if you instead give it the whole job driver instance, this is just method calling with extra steps. Some state methods are literally just "send a message" so not much to gain here. Maybe some methods could be split into less of "how to build a receipt object" and more of "the receipt should be sent now". But IMO looking at the code of the state hanlers, on average it's not worth it. I'd rather have code related to a state be in one place, no single method is hard to comprehend as-is. Still, I'm not a fan of the state handler deciding the next state. I would maybe structure the "next state decision" and error handling a bit differently, e.g. states are for the happy path, otherwise throw exceptions. I looked into it and it seems I already did something like this for the async driver, there are generic exceptions like HordeError and more meaningful exceptions like "MinerRejectedJob", "MinerReportedJobFailed" etc, makes it so that:
But this is not critical. |
| log_exc_info: bool = False, | ||
| ) -> DriverState: | ||
| logger.warning(comment, exc_info=log_exc_info) | ||
| self.job.status = OrganicJob.Status.FAILED |
There was a problem hiding this comment.
| self.job.status = OrganicJob.Status.FAILED | |
| self.job.status = OrganicJob.Status.HORDE_FAILED |
It worries me that no test caught this
| ) | ||
|
|
||
| try: | ||
| msg = self.miner_client.recv() |
There was a problem hiding this comment.
Can we pass in the remaining timeout to recv? There is this timer utility that you can set up before the loop and do a ~ recv(timeout=deadline.time_left()). Otherwise we're only checking for the timeout every time recv returns with no response which seems to be every 5 seconds. The job timing is strict and I'm not even sure how this will influence it but I can imagine we may end up with one stage consuming the time of a subsequent stage or something like that, not fun to debug.
| # Handle legacy messages by mapping them to V0HordeFailedRequest | ||
| if isinstance(msg, V0StreamingJobNotReadyRequest): | ||
| msg = V0HordeFailedRequest( | ||
| job_uuid=msg.job_uuid, | ||
| reported_by=JobParticipantType.MINER, | ||
| reason=HordeFailureReason.STREAMING_FAILED, | ||
| message="Executor reported legacy V0StreamingJobNotReadyRequest message", | ||
| ) | ||
| elif isinstance(msg, V0ExecutorFailedRequest): | ||
| msg = V0HordeFailedRequest( | ||
| job_uuid=msg.job_uuid, | ||
| reported_by=JobParticipantType.MINER, | ||
| reason=HordeFailureReason.GENERIC_ERROR, | ||
| message="Executor reported legacy V0ExecutorFailedRequest message", | ||
| ) |
There was a problem hiding this comment.
These are not being sent any more. This was only needed for the release of the error reporting. You can safely remove this.
| assert_never(self._state) | ||
|
|
||
| def _connect(self) -> DriverState: | ||
| assert self._state == DriverState.CONNECT |
There was a problem hiding this comment.
Any reason for this defensive pattern throughout the driver?
No description provided.