-
-
Notifications
You must be signed in to change notification settings - Fork 326
perf: refactor batch job local queue population #3363
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
The refactoring is actually bigger than adding a new status. Here is a brief before-vs-after comparison:
BEFORE:
1) The PENDING status of the execution item wasn't changed, when the item was already in the queue. So in case of long-running jobs, such executions items are read from db again
2) All executions items were sent to all pods through redis. This didn't make sense, because
a) they were competing for the lock in db;
b) items were unnecessarily occupying memory - they are processed by a single pod anyway
c) since the queue was big, it took time to filter out the items which are already in process by the other instance
3) non-reliable adding to queue. Because it could happen that when item was saved to db and sent to redis - other pod can concurrently receive this message from redis as well as read is from db by a scheduled job. There is a check `if (queue.contains(item)) {add(it)}`, but it's not atomic and could easily lead to duplicities.
AFTER
1) The execution items are saved to db with status=NEW
2) Only these new NEW items are read from db. NOTE: in rare cases, for example during deployment, it can happen that pod is killed and items are stuck in PENDING state - that's why there is an additional condition to check for such cases; and even if we take an item which is still in queue, there won't be concurrency problem described in BEFORE-3 - this item is in this queue for a long time already)
3) When an item is taken to local queue, its status is updated to PENDING
4) An item is not sent through redis to other pods
5) Other pods are notified about a new job and can concurrently read items from db themselves.
6) Now a local queue in a single pod has almost unique (among all pods) list of items - meaning less memory taken, quicker to process, less concurrency conflicts both in the app and db
📝 WalkthroughWalkthroughIntroduces a NEW execution status to the batch job chunk execution lifecycle and refactors queue population to use event-driven, transactional mechanisms. New and potentially-stuck executions are fetched from the database at job creation and synchronized across components via application events and Redis pub/sub. Changes
Sequence DiagramsequenceDiagram
participant JobService as BatchJobService
participant EventBus as Application<br/>Event Bus
participant Queue as BatchJobChunkExecution<br/>Queue
participant Redis as Redis Pub/Sub
participant DB as Database
rect rgb(200, 240, 255)
Note over JobService,DB: Job Creation Flow
JobService->>DB: storeExecutions()
JobService->>EventBus: publish(OnBatchJobCreated)
end
rect rgb(240, 220, 255)
Note over EventBus,DB: Event-Driven Queue Population
EventBus->>Queue: onNewJobEvent(NewJobEvent)
alt Redis Enabled
Queue->>Redis: publish(NewJobEvent)
Redis->>Queue: receiveNewJobQueueMessage()
end
Queue->>DB: fetch NEW or stale PENDING<br/>executions (SKIP LOCKED)
DB-->>Queue: BatchJobChunkExecution list
Queue->>Queue: pendExecutions()
Queue->>EventBus: publish(JobQueueNewChunkExecutions<br/>Event)
end
rect rgb(220, 255, 220)
Note over EventBus,Queue: Chunk Processing
EventBus->>Queue: onNewChunkFromDb(Event)
Queue->>Queue: addItemsToQueue()
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Fix all issues with AI Agents 🤖
In @backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt:
- Around line 125-137: In populateQueueByChunksFromNewJob(event:
OnBatchJobCreated) the local declaration val event = NewJobEvent(event.job.id)
shadows the method parameter; rename the local variable (e.g., newJobEvent or
jobEvent) and use that name when calling
jacksonObjectMapper().writeValueAsString(...) and when passing to
redisTemplate.convertAndSend so the original OnBatchJobCreated parameter remains
unshadowed and clear.
- Around line 97-108: The query in BatchJobChunkExecutionQueue that detects
stuck PENDING items uses bjce.createdAt (< :isStuckBefore) which is incorrect
because createdAt reflects initial NEW creation; change that predicate to use
bjce.updatedAt so PENDING items are considered stuck based on when they were
last updated; update the WHERE clause reference (the branch checking bjce.status
= :pendingExecutionStatus) to compare bjce.updatedAt < :isStuckBefore and keep
the existing :isStuckBefore parameter and ordering logic intact.
- Around line 139-158: The native query in fetchAndQueueNewJobExecutions
currently selects all rows from tolgee_batch_job_chunk_execution for a job and
may re-queue non-NEW executions; update the query used in
fetchAndQueueNewJobExecutions (the createNativeQuery block returning
List<BatchJobChunkExecution>) to include a status filter (e.g., WHERE
bjce.batch_job_id = :jobId AND bjce.status = 'NEW' or a parameterized :status)
so only NEW executions are selected before calling pendExecutions.
🧹 Nitpick comments (3)
backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt (1)
186-195: Consider updating the comment to reflect the new queue semantics.The comment on line 187 mentions "we can add it only to the local queue," but the method was renamed from
addItemsToLocalQueuetoaddItemsToQueue. If the behavior is still local-only (no Redis broadcast), the comment is fine but could be clarified; otherwise, update it to reflect the new semantics.backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt (2)
60-70: Consider removing unusedADDcase or documenting retention rationale.The comment states
QueueEventType.ADD"will actually not be used at all." If this is dead code, consider removing the case entirely or documenting why it's retained (e.g., backward compatibility, future use). Keeping dead code paths can confuse future maintainers.
178-197: Consider using a Set for O(1) duplicate detection if queue grows large.The
queue.contains(it)check onConcurrentLinkedQueueis O(n). If the queue frequently grows large, this could become a bottleneck. A companionConcurrentHashSetfor tracking IDs would provide O(1) lookups.This is likely acceptable for current usage, but worth noting for future optimization if performance issues arise.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
backend/data/src/main/kotlin/io/tolgee/batch/BatchJobCancellationManager.ktbackend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.ktbackend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.ktbackend/data/src/main/kotlin/io/tolgee/batch/BatchJobService.ktbackend/data/src/main/kotlin/io/tolgee/batch/events/JobQueueNewChunkExecutionsEvent.ktbackend/data/src/main/kotlin/io/tolgee/batch/events/NewJobEvent.ktbackend/data/src/main/kotlin/io/tolgee/batch/events/OnBatchJobCreated.ktbackend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecution.ktbackend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecutionStatus.ktbackend/data/src/main/kotlin/io/tolgee/pubSub/RedisPubSubReceiver.ktbackend/data/src/main/kotlin/io/tolgee/pubSub/RedisPubSubReceiverConfiguration.kt
💤 Files with no reviewable changes (1)
- backend/data/src/main/kotlin/io/tolgee/batch/events/OnBatchJobCreated.kt
🧰 Additional context used
📓 Path-based instructions (2)
backend/**/*.kt
📄 CodeRabbit inference engine (AGENTS.md)
backend/**/*.kt: After modifying JPA entities, run./gradlew diffChangeLogto generate Liquibase changelog entries (add--no-daemonflag if docker command not found)
Run./gradlew ktlintFormatbefore committing code
Files:
backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.ktbackend/data/src/main/kotlin/io/tolgee/batch/events/JobQueueNewChunkExecutionsEvent.ktbackend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecution.ktbackend/data/src/main/kotlin/io/tolgee/batch/BatchJobService.ktbackend/data/src/main/kotlin/io/tolgee/batch/events/NewJobEvent.ktbackend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.ktbackend/data/src/main/kotlin/io/tolgee/pubSub/RedisPubSubReceiver.ktbackend/data/src/main/kotlin/io/tolgee/pubSub/RedisPubSubReceiverConfiguration.ktbackend/data/src/main/kotlin/io/tolgee/batch/BatchJobCancellationManager.ktbackend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecutionStatus.kt
**/*.{ts,tsx,js,jsx,kt,kts}
⚙️ CodeRabbit configuration file
As part of review, please check if the file follows 'The Stepdown Rule': The most important, high-level concepts should be at the top (like a newspaper headline and opening paragraph). Details should increase as you read downward. Functions should be ordered so that a caller appears before the functions it calls. When working with JavaScript components, we allow the main component to live at the bottom of the file as an exception to the rule.
Files:
backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.ktbackend/data/src/main/kotlin/io/tolgee/batch/events/JobQueueNewChunkExecutionsEvent.ktbackend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecution.ktbackend/data/src/main/kotlin/io/tolgee/batch/BatchJobService.ktbackend/data/src/main/kotlin/io/tolgee/batch/events/NewJobEvent.ktbackend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.ktbackend/data/src/main/kotlin/io/tolgee/pubSub/RedisPubSubReceiver.ktbackend/data/src/main/kotlin/io/tolgee/pubSub/RedisPubSubReceiverConfiguration.ktbackend/data/src/main/kotlin/io/tolgee/batch/BatchJobCancellationManager.ktbackend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecutionStatus.kt
🔇 Additional comments (12)
backend/data/src/main/kotlin/io/tolgee/pubSub/RedisPubSubReceiver.kt (1)
25-28: LGTM! Method follows the established pattern.The new
receiveNewJobQueueMessagemethod correctly mirrors the structure of existing receiver methods (receiveJobQueueMessage,receiveJobCancel). The deserialization and event publishing logic is consistent with the codebase patterns.backend/data/src/main/kotlin/io/tolgee/batch/BatchJobCancellationManager.kt (2)
88-88: LGTM! Method name accurately reflects expanded functionality.The rename from
getUnlockedPendingExecutionstogetUnlockedNewOrPendingExecutionsclearly communicates that the method now handles both NEW and PENDING executions.
103-118: LGTM! Query correctly handles both NEW and PENDING statuses.The changes properly implement multi-status cancellation:
- The
status in (:status)clause correctly accepts a list parameter- JPA's
setParameterwith a list is the standard way to bind collection parameters for IN clauses- The SKIP_LOCKED hint is preserved, maintaining concurrent safety
- Both NEW and PENDING statuses are appropriately included for cancellation
backend/data/src/main/kotlin/io/tolgee/batch/events/NewJobEvent.kt (1)
1-5: LGTM! Event class design is appropriate.The data class is well-designed for its purpose:
- Data class provides value equality and other useful methods automatically
- Single property
jobId: Longis sufficient for identifying the job to process- Naming follows Kotlin conventions and is consistent with other event classes in the package
backend/data/src/main/kotlin/io/tolgee/batch/events/JobQueueNewChunkExecutionsEvent.kt (1)
1-7: LGTM! Event class design is appropriate.The data class is well-designed:
- Data class usage is appropriate for event types
items: List<BatchJobChunkExecutionDto>properly encapsulates multiple execution items- Naming is descriptive and follows Kotlin conventions
- Import of
BatchJobChunkExecutionDtois correctbackend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecutionStatus.kt (1)
6-6: No Liquibase changelog is required for this enum constant addition.Since the enum uses
@Enumerated(EnumType.STRING), the database stores status values as plain VARCHAR strings with no constraints. Adding a new enum constant likeNEWdoes not require a database migration—the existing column can immediately store the new string value. ThediffChangeLogrequirement applies to structural JPA entity modifications (columns, types, constraints), not enum value additions.Likely an incorrect or invalid review comment.
backend/data/src/main/kotlin/io/tolgee/model/batch/BatchJobChunkExecution.kt (1)
33-34: Verify Liquibase changelog was generated for this entity change.The default status change from
PENDINGtoNEWlooks correct for the new lifecycle. As per coding guidelines, after modifying JPA entities, run./gradlew diffChangeLogto generate Liquibase changelog entries if needed for the new enum value.backend/data/src/main/kotlin/io/tolgee/pubSub/RedisPubSubReceiverConfiguration.kt (1)
28-29: LGTM!The new
NEW_JOB_QUEUE_TOPICconstant, listener adapter bean, and container registration follow the established patterns consistently with existing topic handlers.Also applies to: 52-55, 65-65
backend/data/src/main/kotlin/io/tolgee/batch/BatchJobConcurrentLauncher.kt (1)
213-216: LGTM!The
addBackToQueuehelper is a clean abstraction for re-queuing items.backend/data/src/main/kotlin/io/tolgee/batch/BatchJobService.kt (1)
121-123: LGTM - Clean separation of persistence from event publishing.The refactored flow correctly:
- Persists executions with
NEWstatus viastoreExecutions- Publishes a simplified
OnBatchJobCreated(job)event without the executions payloadThis aligns with the new architecture where pods independently read
NEWitems from the database.backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt (2)
72-80: LGTM!The event listeners are correctly configured:
@EventListenerforNewJobEventto immediately fetch executions@TransactionalEventListenerforJobQueueNewChunkExecutionsEventto add to queue after the transaction that updated statuses commits
160-172: LGTM!The
pendExecutionsmethod correctly:
- Updates status to
PENDINGwithin the transaction- Publishes
JobQueueNewChunkExecutionsEventwhich is consumed by@TransactionalEventListenerafter commit- Ensures items are only added to the queue after their PENDING status is persisted
| where bjce.status = :newExecutionStatus | ||
| or (bjce.status = :pendingExecutionStatus and bjce.createdAt < :isStuckBefore) | ||
| order by | ||
| case when bk.status = :runningStatus then 0 else 1 end, | ||
| bjce.createdAt asc, | ||
| bjce.executeAfter asc, | ||
| bjce.id asc | ||
| """.trimIndent(), | ||
| BatchJobChunkExecutionDto::class.java, | ||
| ).setParameter("executionStatus", BatchJobChunkExecutionStatus.PENDING) | ||
| BatchJobChunkExecution::class.java, | ||
| ).setParameter("newExecutionStatus", BatchJobChunkExecutionStatus.NEW) | ||
| .setParameter("pendingExecutionStatus", BatchJobChunkExecutionStatus.PENDING) | ||
| .setParameter("isStuckBefore", currentDateProvider.date.addMinutes(-2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential issue: Use updatedAt instead of createdAt for stuck PENDING detection.
The query checks bjce.createdAt < :isStuckBefore for PENDING items, but createdAt is set when the execution is first created (with NEW status). When the status changes to PENDING, updatedAt is modified. Using createdAt could incorrectly include recently-transitioned PENDING items that aren't actually stuck.
Consider using updatedAt to accurately detect when an item became PENDING:
🔎 Proposed fix
where bjce.status = :newExecutionStatus
- or (bjce.status = :pendingExecutionStatus and bjce.createdAt < :isStuckBefore)
+ or (bjce.status = :pendingExecutionStatus and bjce.updatedAt < :isStuckBefore)🤖 Prompt for AI Agents
In @backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt
around lines 97-108, The query in BatchJobChunkExecutionQueue that detects stuck
PENDING items uses bjce.createdAt (< :isStuckBefore) which is incorrect because
createdAt reflects initial NEW creation; change that predicate to use
bjce.updatedAt so PENDING items are considered stuck based on when they were
last updated; update the WHERE clause reference (the branch checking bjce.status
= :pendingExecutionStatus) to compare bjce.updatedAt < :isStuckBefore and keep
the existing :isStuckBefore parameter and ordering logic intact.
| @TransactionalEventListener | ||
| fun populateQueueByChunksFromNewJob(event: OnBatchJobCreated) { | ||
| if (usingRedisProvider.areWeUsingRedis) { | ||
| val event = NewJobEvent(event.job.id) | ||
| redisTemplate.convertAndSend( | ||
| RedisPubSubReceiverConfiguration.NEW_JOB_QUEUE_TOPIC, | ||
| jacksonObjectMapper().writeValueAsString(event), | ||
| ) | ||
| return | ||
| } | ||
|
|
||
| fetchAndQueueNewJobExecutions(event.job.id) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Variable shadowing: event shadows the method parameter.
Line 128 declares val event = NewJobEvent(event.job.id), which shadows the OnBatchJobCreated parameter. This is confusing and could lead to subtle bugs if the outer event is needed later.
🔎 Proposed fix
@TransactionalEventListener
fun populateQueueByChunksFromNewJob(event: OnBatchJobCreated) {
if (usingRedisProvider.areWeUsingRedis) {
- val event = NewJobEvent(event.job.id)
+ val newJobEvent = NewJobEvent(event.job.id)
redisTemplate.convertAndSend(
RedisPubSubReceiverConfiguration.NEW_JOB_QUEUE_TOPIC,
- jacksonObjectMapper().writeValueAsString(event),
+ jacksonObjectMapper().writeValueAsString(newJobEvent),
)
return
}
fetchAndQueueNewJobExecutions(event.job.id)
}🤖 Prompt for AI Agents
In @backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt
around lines 125-137, In populateQueueByChunksFromNewJob(event:
OnBatchJobCreated) the local declaration val event = NewJobEvent(event.job.id)
shadows the method parameter; rename the local variable (e.g., newJobEvent or
jobEvent) and use that name when calling
jacksonObjectMapper().writeValueAsString(...) and when passing to
redisTemplate.convertAndSend so the original OnBatchJobCreated parameter remains
unshadowed and clear.
| private fun fetchAndQueueNewJobExecutions(jobId: Long) { | ||
| executeInNewTransaction(platformTransactionManager) { | ||
| val data = | ||
| entityManager | ||
| .createNativeQuery( | ||
| """ | ||
| SELECT bjce.* FROM tolgee_batch_job_chunk_execution bjce | ||
| WHERE bjce.batch_job_id = :jobId | ||
| ORDER BY bjce.id | ||
| FOR NO KEY UPDATE SKIP LOCKED | ||
| LIMIT :limit | ||
| """, | ||
| BatchJobChunkExecution::class.java, | ||
| ).setParameter("jobId", jobId) | ||
| .setParameter("limit", batchProperties.chunkQueuePopulationSize) | ||
| .resultList as List<BatchJobChunkExecution> | ||
|
|
||
| pendExecutions(data) | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing status filter in query - may re-queue non-NEW executions.
The native query selects all executions for a job without filtering by status. If called on a job with mixed statuses (e.g., some already PENDING or completed), it could incorrectly re-queue them. The query should filter for status = 'NEW' to match the intended lifecycle.
🔎 Proposed fix
private fun fetchAndQueueNewJobExecutions(jobId: Long) {
executeInNewTransaction(platformTransactionManager) {
val data =
entityManager
.createNativeQuery(
"""
SELECT bjce.* FROM tolgee_batch_job_chunk_execution bjce
WHERE bjce.batch_job_id = :jobId
+ AND bjce.status = 'NEW'
ORDER BY bjce.id
FOR NO KEY UPDATE SKIP LOCKED
LIMIT :limit
""",
BatchJobChunkExecution::class.java,
).setParameter("jobId", jobId)
.setParameter("limit", batchProperties.chunkQueuePopulationSize)
.resultList as List<BatchJobChunkExecution>
pendExecutions(data)
}
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| private fun fetchAndQueueNewJobExecutions(jobId: Long) { | |
| executeInNewTransaction(platformTransactionManager) { | |
| val data = | |
| entityManager | |
| .createNativeQuery( | |
| """ | |
| SELECT bjce.* FROM tolgee_batch_job_chunk_execution bjce | |
| WHERE bjce.batch_job_id = :jobId | |
| ORDER BY bjce.id | |
| FOR NO KEY UPDATE SKIP LOCKED | |
| LIMIT :limit | |
| """, | |
| BatchJobChunkExecution::class.java, | |
| ).setParameter("jobId", jobId) | |
| .setParameter("limit", batchProperties.chunkQueuePopulationSize) | |
| .resultList as List<BatchJobChunkExecution> | |
| pendExecutions(data) | |
| } | |
| } | |
| private fun fetchAndQueueNewJobExecutions(jobId: Long) { | |
| executeInNewTransaction(platformTransactionManager) { | |
| val data = | |
| entityManager | |
| .createNativeQuery( | |
| """ | |
| SELECT bjce.* FROM tolgee_batch_job_chunk_execution bjce | |
| WHERE bjce.batch_job_id = :jobId | |
| AND bjce.status = 'NEW' | |
| ORDER BY bjce.id | |
| FOR NO KEY UPDATE SKIP LOCKED | |
| LIMIT :limit | |
| """, | |
| BatchJobChunkExecution::class.java, | |
| ).setParameter("jobId", jobId) | |
| .setParameter("limit", batchProperties.chunkQueuePopulationSize) | |
| .resultList as List<BatchJobChunkExecution> | |
| pendExecutions(data) | |
| } | |
| } |
🤖 Prompt for AI Agents
In @backend/data/src/main/kotlin/io/tolgee/batch/BatchJobChunkExecutionQueue.kt
around lines 139-158, The native query in fetchAndQueueNewJobExecutions
currently selects all rows from tolgee_batch_job_chunk_execution for a job and
may re-queue non-NEW executions; update the query used in
fetchAndQueueNewJobExecutions (the createNativeQuery block returning
List<BatchJobChunkExecution>) to include a status filter (e.g., WHERE
bjce.batch_job_id = :jobId AND bjce.status = 'NEW' or a parameterized :status)
so only NEW executions are selected before calling pendExecutions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems like a very nice step.
I'm still a bit baffled by the database vs redis acting as the lock holder, but this seems to make things simpler and the rationale sounds good.
I added two comments that I think would be improvements but are not necessary.
Once this PR lands and I get a released version, I'll try this version in production at Rakuten France.
| addExecutionsToQueue(event.items) | ||
| } | ||
|
|
||
| @Scheduled(fixedDelay = 60000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been this way before, but I'm thinking this should be made configurable
(fixedDelayString as in ScheduledJobCleaner.kt)
And I'm also thinking that adding jitter would be better (randomized initialDelayString). On several machines, because the output of the SQL script is ordered, we'll have several nodes competing to do the SELECT FOR UPDATE
| BatchJobChunkExecution::class.java, | ||
| ).setParameter("newExecutionStatus", BatchJobChunkExecutionStatus.NEW) | ||
| .setParameter("pendingExecutionStatus", BatchJobChunkExecutionStatus.PENDING) | ||
| .setParameter("isStuckBefore", currentDateProvider.date.addMinutes(-2)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two minutes delay seems aggressive, if you consider a use case where a batch job has 10 to 20 chunks, with each chunk being a call to OpenAI or similar with 20s / 30s latency.
It would be nice if this could be made configurable, or increased (5 minutes?)
perf: refactor batch job local queue population
The refactoring is actually bigger than adding a new status. Here is a brief before-vs-after comparison:
BEFORE:
a) they were competing for the lock in db;
b) items were unnecessarily occupying memory - they are processed by a single pod anyway
c) since the queue was big, it took time to filter out the items which are already in process by the other instance
if (queue.contains(item)) {add(it)}, but it's not atomic and could easily lead to duplicities.AFTER
Summary by CodeRabbit
Release Notes
New Features
Improvements
✏️ Tip: You can customize this high-level summary in your review settings.