Skip to content

Conversation

@bdshadow
Copy link
Contributor

@bdshadow bdshadow commented Jan 5, 2026

perf: refactor batch job local queue population

The refactoring is actually bigger than adding a new status. Here is a brief before-vs-after comparison:

BEFORE:

  1. The PENDING status of the execution item wasn't changed, when the item was already in the queue. So in case of long-running jobs, such executions items are read from db again
  2. All executions items were sent to all pods through redis. This didn't make sense, because
        a) they were competing for the lock in db;
        b) items were unnecessarily occupying memory - they are processed by a single pod anyway
        c) since the queue was big, it took time to filter out the items which are already in process by the other instance
  3. non-reliable adding to queue. Because it could happen that when item was saved to db and sent to redis - other pod can concurrently receive this message from redis as well as read is from db by a scheduled job. There is a check if (queue.contains(item)) {add(it)}, but it's not atomic and could easily lead to duplicities.

AFTER

  1. The execution items are saved to db with status=NEW
  2. Only these new NEW items are read from db. NOTE: in rare cases, for example during deployment, it can happen that pod is killed and items are stuck in PENDING state - that's why there is an additional condition to check for such cases; and even if we take an item which is still in queue, there won't be concurrency problem described in BEFORE-3 - this item is in this queue for a long time already)
  3. When an item is taken to local queue, its status is updated to PENDING
  4. An item is not sent through redis to other pods
  5. Other pods are notified about a new job and can concurrently read items from db themselves.
  6. Now a local queue in a single pod has almost unique (among all pods) list of items - meaning less memory taken, quicker to process, less concurrency conflicts both in the app and db

Summary by CodeRabbit

Release Notes

  • New Features

    • Introduced new status state for batch job executions, enabling improved tracking and visibility into job progress.
  • Improvements

    • Enhanced batch job queue reliability with improved detection and handling of stuck or stale executions.
    • Strengthened multi-instance batch processing coordination through improved event-driven messaging infrastructure.

✏️ Tip: You can customize this high-level summary in your review settings.

The refactoring is actually bigger than adding a new status. Here is a brief before-vs-after comparison:

BEFORE:
1) The PENDING status of the execution item wasn't changed, when the item was already in the queue. So in case of long-running jobs, such executions items are read from db again
2) All executions items were sent to all pods through redis. This didn't make sense, because
    a) they were competing for the lock in db;
    b) items were unnecessarily occupying memory - they are processed by a single pod anyway
    c) since the queue was big, it took time to filter out the items which are already in process by the other instance
3) non-reliable adding to queue. Because it could happen that when item was saved to db and sent to redis - other pod can concurrently receive this message from redis as well as read is from db by a scheduled job. There is a check `if (queue.contains(item)) {add(it)}`, but it's not atomic and could easily lead to duplicities.

AFTER
1) The execution items are saved to db with status=NEW
2) Only these new NEW items are read from db. NOTE: in rare cases, for example during deployment, it can happen that pod is killed and items are stuck in PENDING state - that's why there is an additional condition to check for such cases; and even if we take an item which is still in queue, there won't be concurrency problem described in BEFORE-3 - this item is in this queue for a long time already)
3) When an item is taken to local queue, its status is updated to PENDING
4) An item is not sent through redis to other pods
5) Other pods are notified about a new job and can concurrently read items from db themselves.
6) Now a local queue in a single pod has almost unique (among all pods) list of items - meaning less memory taken, quicker to process, less concurrency conflicts both in the app and db
@bdshadow bdshadow requested review from Anty0 and JanCizmar January 5, 2026 11:05
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 5, 2026

📝 Walkthrough

Walkthrough

Introduces a NEW execution status to the batch job chunk execution lifecycle and refactors queue population to use event-driven, transactional mechanisms. New and potentially-stuck executions are fetched from the database at job creation and synchronized across components via application events and Redis pub/sub.

Changes

Cohort / File(s) Summary
Status Enumeration & Model Defaults
backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecutionStatus.kt, backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecution.kt
Added NEW enum constant to BatchJobChunkExecutionStatus; updated BatchJobChunkExecution default status from PENDING to NEW.
Cancellation & Status Queries
backend/data/src/main/kotlin/io/tolgee/batch/BatchJobCancellationManager.kt
Expanded status query to include both NEW and PENDING states; renamed helper method from getUnlockedPendingExecutions to getUnlockedNewOrPendingExecutions.
Event Definitions
backend/data/src/main/kotlin/io/tolgee/batch/events/NewJobEvent.kt, backend/data/src/main/kotlin/io/tolgee/batch/events/JobQueueNewChunkExecutionsEvent.kt, backend/data/src/main/kotlin/io/tolgee/batch/events/OnBatchJobCreated.kt
Introduced NewJobEvent and JobQueueNewChunkExecutionsEvent; removed executions field from OnBatchJobCreated, now carries only job reference.
Queue Population & Transactional Event Handling
backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt
Major refactor: replaced in-memory queuing with database-driven, transactional event listeners; added query for NEW or stale PENDING executions with explicit SKIP LOCKED locking; introduced new dependencies (ApplicationContext, PlatformTransactionManager, CurrentDateProvider) and event handlers (onNewJobEvent, onNewChunkFromDb, populateQueueByChunksFromNewJob); added pendExecutions and fetchAndQueueNewJobExecutions helpers.
Job Service Event Publishing
backend/data/src/main/kotlin/io/tolgee/batch/BatchJobService.kt
Changed OnBatchJobCreated to carry only job (no executions); removed onCreated listener that previously re-published event and queued executions.
Concurrent Launcher & Queue Integration
backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt
Updated method calls from addItemsToLocalQueue to addItemsToQueue for queue requeuing.
Redis Pub/Sub Integration
backend/data/src/main/kotlin/io/tolgee/pubSub/RedisPubSubReceiver.kt, backend/data/src/main/kotlin/io/tolgee/pubSub/RedisPubSubReceiverConfiguration.kt
Added receiveNewJobQueueMessage method and corresponding Redis listener bean; introduced NEW_JOB_QUEUE_TOPIC constant for pub/sub synchronization.

Sequence Diagram

sequenceDiagram
    participant JobService as BatchJobService
    participant EventBus as Application<br/>Event Bus
    participant Queue as BatchJobChunkExecution<br/>Queue
    participant Redis as Redis Pub/Sub
    participant DB as Database
    
    rect rgb(200, 240, 255)
    Note over JobService,DB: Job Creation Flow
    JobService->>DB: storeExecutions()
    JobService->>EventBus: publish(OnBatchJobCreated)
    end
    
    rect rgb(240, 220, 255)
    Note over EventBus,DB: Event-Driven Queue Population
    EventBus->>Queue: onNewJobEvent(NewJobEvent)
    alt Redis Enabled
        Queue->>Redis: publish(NewJobEvent)
        Redis->>Queue: receiveNewJobQueueMessage()
    end
    Queue->>DB: fetch NEW or stale PENDING<br/>executions (SKIP LOCKED)
    DB-->>Queue: BatchJobChunkExecution list
    Queue->>Queue: pendExecutions()
    Queue->>EventBus: publish(JobQueueNewChunkExecutions<br/>Event)
    end
    
    rect rgb(220, 255, 220)
    Note over EventBus,Queue: Chunk Processing
    EventBus->>Queue: onNewChunkFromDb(Event)
    Queue->>Queue: addItemsToQueue()
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • Anty0

Poem

🐰 A queue springs to life, event-driven and bright,
With NEW status gleaming and locks held tight,
No more in-memory schemes that confuse the mind—
From database-fresh to Redis-aligned.
Transactions bloom, listeners dance with grace,
Batch jobs find order in their rightful place! ✨

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'perf: refactor batch job local queue population' directly and clearly summarizes the main change: refactoring how batch job execution items are populated in the local queue.
✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@JanCizmar JanCizmar marked this pull request as ready for review January 5, 2026 12:18
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Fix all issues with AI Agents 🤖
In @backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt:
- Around line 125-137: In populateQueueByChunksFromNewJob(event:
OnBatchJobCreated) the local declaration val event = NewJobEvent(event.job.id)
shadows the method parameter; rename the local variable (e.g., newJobEvent or
jobEvent) and use that name when calling
jacksonObjectMapper().writeValueAsString(...) and when passing to
redisTemplate.convertAndSend so the original OnBatchJobCreated parameter remains
unshadowed and clear.
- Around line 97-108: The query in BatchJobChunkExecutionQueue that detects
stuck PENDING items uses bjce.createdAt (< :isStuckBefore) which is incorrect
because createdAt reflects initial NEW creation; change that predicate to use
bjce.updatedAt so PENDING items are considered stuck based on when they were
last updated; update the WHERE clause reference (the branch checking bjce.status
= :pendingExecutionStatus) to compare bjce.updatedAt < :isStuckBefore and keep
the existing :isStuckBefore parameter and ordering logic intact.
- Around line 139-158: The native query in fetchAndQueueNewJobExecutions
currently selects all rows from tolgee_batch_job_chunk_execution for a job and
may re-queue non-NEW executions; update the query used in
fetchAndQueueNewJobExecutions (the createNativeQuery block returning
List<BatchJobChunkExecution>) to include a status filter (e.g., WHERE
bjce.batch_job_id = :jobId AND bjce.status = 'NEW' or a parameterized :status)
so only NEW executions are selected before calling pendExecutions.
🧹 Nitpick comments (3)
backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt (1)

186-195: Consider updating the comment to reflect the new queue semantics.

The comment on line 187 mentions "we can add it only to the local queue," but the method was renamed from addItemsToLocalQueue to addItemsToQueue. If the behavior is still local-only (no Redis broadcast), the comment is fine but could be clarified; otherwise, update it to reflect the new semantics.

backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt (2)

60-70: Consider removing unused ADD case or documenting retention rationale.

The comment states QueueEventType.ADD "will actually not be used at all." If this is dead code, consider removing the case entirely or documenting why it's retained (e.g., backward compatibility, future use). Keeping dead code paths can confuse future maintainers.


178-197: Consider using a Set for O(1) duplicate detection if queue grows large.

The queue.contains(it) check on ConcurrentLinkedQueue is O(n). If the queue frequently grows large, this could become a bottleneck. A companion ConcurrentHashSet for tracking IDs would provide O(1) lookups.

This is likely acceptable for current usage, but worth noting for future optimization if performance issues arise.

📜 Review details

Configuration used: Path: .coderabbit.yaml

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between be65163 and e058042.

📒 Files selected for processing (11)
  • backend/data/src/main/kotlin/io/tolgee/batch/BatchJobCancellationManager.kt
  • backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt
  • backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt
  • backend/data/src/main/kotlin/io/tolgee/batch/BatchJobService.kt
  • backend/data/src/main/kotlin/io/tolgee/batch/events/JobQueueNewChunkExecutionsEvent.kt
  • backend/data/src/main/kotlin/io/tolgee/batch/events/NewJobEvent.kt
  • backend/data/src/main/kotlin/io/tolgee/batch/events/OnBatchJobCreated.kt
  • backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecution.kt
  • backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecutionStatus.kt
  • backend/data/src/main/kotlin/io/tolgee/pubSub/RedisPubSubReceiver.kt
  • backend/data/src/main/kotlin/io/tolgee/pubSub/RedisPubSubReceiverConfiguration.kt
💤 Files with no reviewable changes (1)
  • backend/data/src/main/kotlin/io/tolgee/batch/events/OnBatchJobCreated.kt
🧰 Additional context used
📓 Path-based instructions (2)
backend/**/*.kt

📄 CodeRabbit inference engine (AGENTS.md)

backend/**/*.kt: After modifying JPA entities, run ./gradlew diffChangeLog to generate Liquibase changelog entries (add --no-daemon flag if docker command not found)
Run ./gradlew ktlintFormat before committing code

Files:

  • backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt
  • backend/data/src/main/kotlin/io/tolgee/batch/events/JobQueueNewChunkExecutionsEvent.kt
  • backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecution.kt
  • backend/data/src/main/kotlin/io/tolgee/batch/BatchJobService.kt
  • backend/data/src/main/kotlin/io/tolgee/batch/events/NewJobEvent.kt
  • backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt
  • backend/data/src/main/kotlin/io/tolgee/pubSub/RedisPubSubReceiver.kt
  • backend/data/src/main/kotlin/io/tolgee/pubSub/RedisPubSubReceiverConfiguration.kt
  • backend/data/src/main/kotlin/io/tolgee/batch/BatchJobCancellationManager.kt
  • backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecutionStatus.kt
**/*.{ts,tsx,js,jsx,kt,kts}

⚙️ CodeRabbit configuration file

As part of review, please check if the file follows 'The Stepdown Rule': The most important, high-level concepts should be at the top (like a newspaper headline and opening paragraph). Details should increase as you read downward. Functions should be ordered so that a caller appears before the functions it calls. When working with JavaScript components, we allow the main component to live at the bottom of the file as an exception to the rule.

Files:

  • backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt
  • backend/data/src/main/kotlin/io/tolgee/batch/events/JobQueueNewChunkExecutionsEvent.kt
  • backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecution.kt
  • backend/data/src/main/kotlin/io/tolgee/batch/BatchJobService.kt
  • backend/data/src/main/kotlin/io/tolgee/batch/events/NewJobEvent.kt
  • backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt
  • backend/data/src/main/kotlin/io/tolgee/pubSub/RedisPubSubReceiver.kt
  • backend/data/src/main/kotlin/io/tolgee/pubSub/RedisPubSubReceiverConfiguration.kt
  • backend/data/src/main/kotlin/io/tolgee/batch/BatchJobCancellationManager.kt
  • backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecutionStatus.kt
🔇 Additional comments (12)
backend/data/src/main/kotlin/io/tolgee/pubSub/RedisPubSubReceiver.kt (1)

25-28: LGTM! Method follows the established pattern.

The new receiveNewJobQueueMessage method correctly mirrors the structure of existing receiver methods (receiveJobQueueMessage, receiveJobCancel). The deserialization and event publishing logic is consistent with the codebase patterns.

backend/data/src/main/kotlin/io/tolgee/batch/BatchJobCancellationManager.kt (2)

88-88: LGTM! Method name accurately reflects expanded functionality.

The rename from getUnlockedPendingExecutions to getUnlockedNewOrPendingExecutions clearly communicates that the method now handles both NEW and PENDING executions.


103-118: LGTM! Query correctly handles both NEW and PENDING statuses.

The changes properly implement multi-status cancellation:

  • The status in (:status) clause correctly accepts a list parameter
  • JPA's setParameter with a list is the standard way to bind collection parameters for IN clauses
  • The SKIP_LOCKED hint is preserved, maintaining concurrent safety
  • Both NEW and PENDING statuses are appropriately included for cancellation
backend/data/src/main/kotlin/io/tolgee/batch/events/NewJobEvent.kt (1)

1-5: LGTM! Event class design is appropriate.

The data class is well-designed for its purpose:

  • Data class provides value equality and other useful methods automatically
  • Single property jobId: Long is sufficient for identifying the job to process
  • Naming follows Kotlin conventions and is consistent with other event classes in the package
backend/data/src/main/kotlin/io/tolgee/batch/events/JobQueueNewChunkExecutionsEvent.kt (1)

1-7: LGTM! Event class design is appropriate.

The data class is well-designed:

  • Data class usage is appropriate for event types
  • items: List<BatchJobChunkExecutionDto> properly encapsulates multiple execution items
  • Naming is descriptive and follows Kotlin conventions
  • Import of BatchJobChunkExecutionDto is correct
backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecutionStatus.kt (1)

6-6: No Liquibase changelog is required for this enum constant addition.

Since the enum uses @Enumerated(EnumType.STRING), the database stores status values as plain VARCHAR strings with no constraints. Adding a new enum constant like NEW does not require a database migration—the existing column can immediately store the new string value. The diffChangeLog requirement applies to structural JPA entity modifications (columns, types, constraints), not enum value additions.

Likely an incorrect or invalid review comment.

backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecution.kt (1)

33-34: Verify Liquibase changelog was generated for this entity change.

The default status change from PENDING to NEW looks correct for the new lifecycle. As per coding guidelines, after modifying JPA entities, run ./gradlew diffChangeLog to generate Liquibase changelog entries if needed for the new enum value.

backend/data/src/main/kotlin/io/tolgee/pubSub/RedisPubSubReceiverConfiguration.kt (1)

28-29: LGTM!

The new NEW_JOB_QUEUE_TOPIC constant, listener adapter bean, and container registration follow the established patterns consistently with existing topic handlers.

Also applies to: 52-55, 65-65

backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt (1)

213-216: LGTM!

The addBackToQueue helper is a clean abstraction for re-queuing items.

backend/data/src/main/kotlin/io/tolgee/batch/BatchJobService.kt (1)

121-123: LGTM - Clean separation of persistence from event publishing.

The refactored flow correctly:

  1. Persists executions with NEW status via storeExecutions
  2. Publishes a simplified OnBatchJobCreated(job) event without the executions payload

This aligns with the new architecture where pods independently read NEW items from the database.

backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt (2)

72-80: LGTM!

The event listeners are correctly configured:

  • @EventListener for NewJobEvent to immediately fetch executions
  • @TransactionalEventListener for JobQueueNewChunkExecutionsEvent to add to queue after the transaction that updated statuses commits

160-172: LGTM!

The pendExecutions method correctly:

  1. Updates status to PENDING within the transaction
  2. Publishes JobQueueNewChunkExecutionsEvent which is consumed by @TransactionalEventListener after commit
  3. Ensures items are only added to the queue after their PENDING status is persisted

Comment on lines +97 to +108
where bjce.status = :newExecutionStatus
or (bjce.status = :pendingExecutionStatus and bjce.createdAt < :isStuckBefore)
order by
case when bk.status = :runningStatus then 0 else 1 end,
bjce.createdAt asc,
bjce.executeAfter asc,
bjce.id asc
""".trimIndent(),
BatchJobChunkExecutionDto::class.java,
).setParameter("executionStatus", BatchJobChunkExecutionStatus.PENDING)
BatchJobChunkExecution::class.java,
).setParameter("newExecutionStatus", BatchJobChunkExecutionStatus.NEW)
.setParameter("pendingExecutionStatus", BatchJobChunkExecutionStatus.PENDING)
.setParameter("isStuckBefore", currentDateProvider.date.addMinutes(-2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Potential issue: Use updatedAt instead of createdAt for stuck PENDING detection.

The query checks bjce.createdAt < :isStuckBefore for PENDING items, but createdAt is set when the execution is first created (with NEW status). When the status changes to PENDING, updatedAt is modified. Using createdAt could incorrectly include recently-transitioned PENDING items that aren't actually stuck.

Consider using updatedAt to accurately detect when an item became PENDING:

🔎 Proposed fix
          where bjce.status = :newExecutionStatus
-              or (bjce.status = :pendingExecutionStatus and bjce.createdAt < :isStuckBefore)
+              or (bjce.status = :pendingExecutionStatus and bjce.updatedAt < :isStuckBefore)
🤖 Prompt for AI Agents
In @backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt
around lines 97-108, The query in BatchJobChunkExecutionQueue that detects stuck
PENDING items uses bjce.createdAt (< :isStuckBefore) which is incorrect because
createdAt reflects initial NEW creation; change that predicate to use
bjce.updatedAt so PENDING items are considered stuck based on when they were
last updated; update the WHERE clause reference (the branch checking bjce.status
= :pendingExecutionStatus) to compare bjce.updatedAt < :isStuckBefore and keep
the existing :isStuckBefore parameter and ordering logic intact.

Comment on lines +125 to 137
@TransactionalEventListener
fun populateQueueByChunksFromNewJob(event: OnBatchJobCreated) {
if (usingRedisProvider.areWeUsingRedis) {
val event = NewJobEvent(event.job.id)
redisTemplate.convertAndSend(
RedisPubSubReceiverConfiguration.NEW_JOB_QUEUE_TOPIC,
jacksonObjectMapper().writeValueAsString(event),
)
return
}

fetchAndQueueNewJobExecutions(event.job.id)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Variable shadowing: event shadows the method parameter.

Line 128 declares val event = NewJobEvent(event.job.id), which shadows the OnBatchJobCreated parameter. This is confusing and could lead to subtle bugs if the outer event is needed later.

🔎 Proposed fix
  @TransactionalEventListener
  fun populateQueueByChunksFromNewJob(event: OnBatchJobCreated) {
    if (usingRedisProvider.areWeUsingRedis) {
-      val event = NewJobEvent(event.job.id)
+      val newJobEvent = NewJobEvent(event.job.id)
      redisTemplate.convertAndSend(
        RedisPubSubReceiverConfiguration.NEW_JOB_QUEUE_TOPIC,
-        jacksonObjectMapper().writeValueAsString(event),
+        jacksonObjectMapper().writeValueAsString(newJobEvent),
      )
      return
    }

    fetchAndQueueNewJobExecutions(event.job.id)
  }
🤖 Prompt for AI Agents
In @backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt
around lines 125-137, In populateQueueByChunksFromNewJob(event:
OnBatchJobCreated) the local declaration val event = NewJobEvent(event.job.id)
shadows the method parameter; rename the local variable (e.g., newJobEvent or
jobEvent) and use that name when calling
jacksonObjectMapper().writeValueAsString(...) and when passing to
redisTemplate.convertAndSend so the original OnBatchJobCreated parameter remains
unshadowed and clear.

Comment on lines +139 to +158
private fun fetchAndQueueNewJobExecutions(jobId: Long) {
executeInNewTransaction(platformTransactionManager) {
val data =
entityManager
.createNativeQuery(
"""
SELECT bjce.* FROM tolgee_batch_job_chunk_execution bjce
WHERE bjce.batch_job_id = :jobId
ORDER BY bjce.id
FOR NO KEY UPDATE SKIP LOCKED
LIMIT :limit
""",
BatchJobChunkExecution::class.java,
).setParameter("jobId", jobId)
.setParameter("limit", batchProperties.chunkQueuePopulationSize)
.resultList as List<BatchJobChunkExecution>

pendExecutions(data)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Missing status filter in query - may re-queue non-NEW executions.

The native query selects all executions for a job without filtering by status. If called on a job with mixed statuses (e.g., some already PENDING or completed), it could incorrectly re-queue them. The query should filter for status = 'NEW' to match the intended lifecycle.

🔎 Proposed fix
  private fun fetchAndQueueNewJobExecutions(jobId: Long) {
    executeInNewTransaction(platformTransactionManager) {
      val data =
        entityManager
          .createNativeQuery(
            """
            SELECT bjce.* FROM tolgee_batch_job_chunk_execution bjce
            WHERE bjce.batch_job_id = :jobId
+              AND bjce.status = 'NEW'
            ORDER BY bjce.id
            FOR NO KEY UPDATE SKIP LOCKED
            LIMIT :limit
            """,
            BatchJobChunkExecution::class.java,
          ).setParameter("jobId", jobId)
          .setParameter("limit", batchProperties.chunkQueuePopulationSize)
          .resultList as List<BatchJobChunkExecution>

      pendExecutions(data)
    }
  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
private fun fetchAndQueueNewJobExecutions(jobId: Long) {
executeInNewTransaction(platformTransactionManager) {
val data =
entityManager
.createNativeQuery(
"""
SELECT bjce.* FROM tolgee_batch_job_chunk_execution bjce
WHERE bjce.batch_job_id = :jobId
ORDER BY bjce.id
FOR NO KEY UPDATE SKIP LOCKED
LIMIT :limit
""",
BatchJobChunkExecution::class.java,
).setParameter("jobId", jobId)
.setParameter("limit", batchProperties.chunkQueuePopulationSize)
.resultList as List<BatchJobChunkExecution>
pendExecutions(data)
}
}
private fun fetchAndQueueNewJobExecutions(jobId: Long) {
executeInNewTransaction(platformTransactionManager) {
val data =
entityManager
.createNativeQuery(
"""
SELECT bjce.* FROM tolgee_batch_job_chunk_execution bjce
WHERE bjce.batch_job_id = :jobId
AND bjce.status = 'NEW'
ORDER BY bjce.id
FOR NO KEY UPDATE SKIP LOCKED
LIMIT :limit
""",
BatchJobChunkExecution::class.java,
).setParameter("jobId", jobId)
.setParameter("limit", batchProperties.chunkQueuePopulationSize)
.resultList as List<BatchJobChunkExecution>
pendExecutions(data)
}
}
🤖 Prompt for AI Agents
In @backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt
around lines 139-158, The native query in fetchAndQueueNewJobExecutions
currently selects all rows from tolgee_batch_job_chunk_execution for a job and
may re-queue non-NEW executions; update the query used in
fetchAndQueueNewJobExecutions (the createNativeQuery block returning
List<BatchJobChunkExecution>) to include a status filter (e.g., WHERE
bjce.batch_job_id = :jobId AND bjce.status = 'NEW' or a parameterized :status)
so only NEW executions are selected before calling pendExecutions.

Copy link
Contributor

@ajeans ajeans left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a very nice step.

I'm still a bit baffled by the database vs redis acting as the lock holder, but this seems to make things simpler and the rationale sounds good.

I added two comments that I think would be improvements but are not necessary.

Once this PR lands and I get a released version, I'll try this version in production at Rakuten France.

addExecutionsToQueue(event.items)
}

@Scheduled(fixedDelay = 60000)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been this way before, but I'm thinking this should be made configurable
(fixedDelayString as in ScheduledJobCleaner.kt)

And I'm also thinking that adding jitter would be better (randomized initialDelayString). On several machines, because the output of the SQL script is ordered, we'll have several nodes competing to do the SELECT FOR UPDATE

BatchJobChunkExecution::class.java,
).setParameter("newExecutionStatus", BatchJobChunkExecutionStatus.NEW)
.setParameter("pendingExecutionStatus", BatchJobChunkExecutionStatus.PENDING)
.setParameter("isStuckBefore", currentDateProvider.date.addMinutes(-2))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two minutes delay seems aggressive, if you consider a use case where a batch job has 10 to 20 chunks, with each chunk being a call to OpenAI or similar with 20s / 30s latency.

It would be nice if this could be made configurable, or increased (5 minutes?)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants