Skip to content

feat: make organic job execution flow sync#835

Open
emnoor-reef wants to merge 3 commits intomasterfrom
sync-organic2
Open

feat: make organic job execution flow sync#835
emnoor-reef wants to merge 3 commits intomasterfrom
sync-organic2

Conversation

@emnoor-reef
Copy link
Contributor

No description provided.

@emnoor-reef emnoor-reef force-pushed the sync-organic2 branch 3 times, most recently from 0169a3b to ee9a948 Compare January 28, 2026 13:33
@emnoor-reef emnoor-reef changed the title Sync organic2 feat: make organic job execution flow sync Jan 28, 2026
@emnoor-reef emnoor-reef force-pushed the sync-organic2 branch 2 times, most recently from a128a9a to cc7522a Compare January 29, 2026 12:51
@emnoor-reef emnoor-reef marked this pull request as ready for review January 29, 2026 14:08
try:
self._run()
except Exception as exc:
sentry_sdk.capture_exception(exc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the sentry_sdk called explicitly here, instead of just logging the exception, possibly with job info?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That I have copied verbatim from the async impl :v

)


@pytest.mark.django_db(databases=["default", "default_alias"], transaction=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like how concise these tests are. readable and really doing the job.

)
return DriverState.FAILED

def _wait_for_message(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems that this implementation is very robust and resistant against weird messages, out of order messages etc. but I think it is not currently tested against that, is it? We've previously had malicious miners gaining some edge by sending out of order messages.

Copy link
Contributor

@mpnowacki-reef mpnowacki-reef left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation looks functionally correct, but SyncOrganicJobDriver at ~800 lines is doing a lot of heavy lifting. What do you think about splitting it? A few ideas:

Message handling - The logic in _wait_for_message that handles different error types (lines 477-537) could live in MinerClient or a separate helper. The client already knows how to send/receive, it could also know how to classify responses.

Receipt operations - Receipt creation and sending is scattered across _reserve_executor, _send_job_accepted_receipt, _collect_results, and _send_job_finished_receipt_if_failed. A small ReceiptService or even just a few standalone functions could clean this up.

Failure handling - _handle_horde_failure, _handle_decline_job, _handle_job_failed, _handle_horde_failed (~170 lines) are cohesive and could be extracted.

This would leave the driver as a ~300-400 line state machine focused on orchestration.
Also, since this is becoming a critical path, could we add some docstrings to the main flow? Specifically:

A class-level docstring explaining the state machine and the job lifecycle stages
Brief docs on _run() and _wait_for_message() explaining how the state transitions work

The state enum is already self-documenting which is great, just want to make sure the next person can follow the flow quickly.

@github-actions
Copy link

github-actions bot commented Feb 4, 2026

Commit message lint report

gitlint --commits 32fa9d141e05b817a908d9e42405016e881d765e..88df5c766905761d8504e3aac204a1e7bfadf032

Status: ❌ Issues found.

gitlint output:

Linting commit range: 32fa9d141e05b817a908d9e42405016e881d765e..88df5c766905761d8504e3aac204a1e7bfadf032
gitlint --commits "32fa9d141e05b817a908d9e42405016e881d765e..88df5c766905761d8504e3aac204a1e7bfadf032"
Commit 88df5c7669:
1: CT1 Title does not start with one of fix, feat, perf, doc, ci, style, refactor, chore, build, test: "squash me: add docstrings"
1: UR1 Commit message is missing an 'Impacts:' footer. Add a line like: Impacts: validator, miner
3: B6 Body message is missing

Commit 8b5ec64228:
1: CT1 Title does not start with one of fix, feat, perf, doc, ci, style, refactor, chore, build, test: "squash me: fix manifest fetching order"
1: UR1 Commit message is missing an 'Impacts:' footer. Add a line like: Impacts: validator, miner
3: B6 Body message is missing
Gitlint found issues in the commit messages.

Fix by amending/rewording your commits (e.g., using git rebase -i and git commit --amend).

@kkowalski-reef
Copy link
Contributor

kkowalski-reef commented Feb 4, 2026

I don't think there's much that can be done here to lower the LoC. We are passing around a lot of context into messages, events, error reports etc and extracting anything means that you still have to pass that stuff into the helper, or if you instead give it the whole job driver instance, this is just method calling with extra steps. Some state methods are literally just "send a message" so not much to gain here. Maybe some methods could be split into less of "how to build a receipt object" and more of "the receipt should be sent now". But IMO looking at the code of the state hanlers, on average it's not worth it. I'd rather have code related to a state be in one place, no single method is hard to comprehend as-is.

Still, I'm not a fan of the state handler deciding the next state. I would maybe structure the "next state decision" and error handling a bit differently, e.g. states are for the happy path, otherwise throw exceptions. I looked into it and it seems I already did something like this for the async driver, there are generic exceptions like HordeError and more meaningful exceptions like "MinerRejectedJob", "MinerReportedJobFailed" etc, makes it so that:

  • you have a wrapped high level "happy path" list of steps
  • a separate list of "except if this happens, that's how to report the problem before exiting cleanly"
  • there's no need to bubble up errors, gets rid of the if isinstance(result, DriverState) checks
  • these exception classes already contain a lot of useful context to report so could make it easier to handle them in a generic way (see the error handling in the async driver)

But this is not critical.

log_exc_info: bool = False,
) -> DriverState:
logger.warning(comment, exc_info=log_exc_info)
self.job.status = OrganicJob.Status.FAILED
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self.job.status = OrganicJob.Status.FAILED
self.job.status = OrganicJob.Status.HORDE_FAILED

It worries me that no test caught this

)

try:
msg = self.miner_client.recv()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we pass in the remaining timeout to recv? There is this timer utility that you can set up before the loop and do a ~ recv(timeout=deadline.time_left()). Otherwise we're only checking for the timeout every time recv returns with no response which seems to be every 5 seconds. The job timing is strict and I'm not even sure how this will influence it but I can imagine we may end up with one stage consuming the time of a subsequent stage or something like that, not fun to debug.

Comment on lines +570 to +584
# Handle legacy messages by mapping them to V0HordeFailedRequest
if isinstance(msg, V0StreamingJobNotReadyRequest):
msg = V0HordeFailedRequest(
job_uuid=msg.job_uuid,
reported_by=JobParticipantType.MINER,
reason=HordeFailureReason.STREAMING_FAILED,
message="Executor reported legacy V0StreamingJobNotReadyRequest message",
)
elif isinstance(msg, V0ExecutorFailedRequest):
msg = V0HordeFailedRequest(
job_uuid=msg.job_uuid,
reported_by=JobParticipantType.MINER,
reason=HordeFailureReason.GENERIC_ERROR,
message="Executor reported legacy V0ExecutorFailedRequest message",
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are not being sent any more. This was only needed for the release of the error reporting. You can safely remove this.

assert_never(self._state)

def _connect(self) -> DriverState:
assert self._state == DriverState.CONNECT
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for this defensive pattern throughout the driver?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants

Comments