From d31a393ae680f69c9d2e2486a41b0b98399cf2da Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues de Jesus Date: Fri, 23 May 2025 15:45:54 -0300 Subject: [PATCH 1/6] Fixed issue that causes error on parallel task execution on Postgres backend. --- backend/postgres/postgres.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index cf119de8..e8c9b6f2 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -311,7 +311,7 @@ func (be *postgresBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi builder := strings.Builder{} builder.WriteString("INSERT INTO NewTasks (InstanceID, EventPayload) VALUES ") for i := 0; i < newActivityCount; i++ { - builder.WriteString(fmt.Sprintf("($%d, $%d)", 3*i+1, 3*i+2)) + builder.WriteString(fmt.Sprintf("($%d, $%d)", 2*i+1, 2*i+2)) if i < newActivityCount-1 { builder.WriteString(", ") } From 0ffee530d6fecb63dbb8299e1e2722ce7da68f27 Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues Date: Wed, 12 Nov 2025 19:03:11 -0300 Subject: [PATCH 2/6] Added order by over sequence number to prioritize older instances and tasks --- backend/postgres/postgres.go | 2 ++ backend/sqlite/sqlite.go | 2 ++ 2 files changed, 4 insertions(+) diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index a2c23325..bb7232ce 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -778,6 +778,7 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe SELECT 1 FROM NewEvents E WHERE E.InstanceID = I.InstanceID AND (E.VisibleTime IS NULL OR E.VisibleTime < $4) ) + ORDER BY SequenceNumber ASC LIMIT 1 ) RETURNING InstanceID`, be.workerName, // LockedBy for Instances table @@ -864,6 +865,7 @@ func (be *postgresBackend) GetActivityWorkItem(ctx context.Context) (*backend.Ac WHERE SequenceNumber = ( SELECT SequenceNumber FROM NewTasks T WHERE T.LockExpiration IS NULL OR T.LockExpiration < $3 + ORDER BY SequenceNumber ASC LIMIT 1 ) RETURNING SequenceNumber, InstanceID, EventPayload`, be.workerName, diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index 83433422..b3013d41 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -763,6 +763,7 @@ func (be *sqliteBackend) GetOrchestrationWorkItem(ctx context.Context) (*backend SELECT 1 FROM NewEvents E WHERE E.[InstanceID] = I.[InstanceID] AND (E.[VisibleTime] IS NULL OR E.[VisibleTime] < ?) ) + ORDER BY [SequenceNumber] ASC LIMIT 1 ) RETURNING [InstanceID]`, be.workerName, // LockedBy for Instances table @@ -852,6 +853,7 @@ func (be *sqliteBackend) GetActivityWorkItem(ctx context.Context) (*backend.Acti WHERE [SequenceNumber] = ( SELECT [SequenceNumber] FROM NewTasks T WHERE T.[LockExpiration] IS NULL OR T.[LockExpiration] < ? + ORDER BY [SequenceNumber] ASC LIMIT 1 ) RETURNING [SequenceNumber], [InstanceID], [EventPayload]`, be.workerName, From b443bea7dd4562d01a7fe3a3049104fb99ef78f7 Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues Date: Fri, 14 Nov 2025 12:32:52 -0300 Subject: [PATCH 3/6] Fixed table alias issue. Applied resource to avoid race conditions. --- backend/postgres/postgres.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index bb7232ce..7de78900 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -778,8 +778,9 @@ func (be *postgresBackend) GetOrchestrationWorkItem(ctx context.Context) (*backe SELECT 1 FROM NewEvents E WHERE E.InstanceID = I.InstanceID AND (E.VisibleTime IS NULL OR E.VisibleTime < $4) ) - ORDER BY SequenceNumber ASC + ORDER BY I.SequenceNumber ASC LIMIT 1 + FOR UPDATE SKIP LOCKED ) RETURNING InstanceID`, be.workerName, // LockedBy for Instances table newLockExpiration, // Updated LockExpiration for Instances table @@ -865,8 +866,9 @@ func (be *postgresBackend) GetActivityWorkItem(ctx context.Context) (*backend.Ac WHERE SequenceNumber = ( SELECT SequenceNumber FROM NewTasks T WHERE T.LockExpiration IS NULL OR T.LockExpiration < $3 - ORDER BY SequenceNumber ASC + ORDER BY T.SequenceNumber ASC LIMIT 1 + FOR UPDATE SKIP LOCKED ) RETURNING SequenceNumber, InstanceID, EventPayload`, be.workerName, newLockExpiration, From 48ba21fcf42665dfc389de7dd89505ffd3f38813 Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues Date: Fri, 14 Nov 2025 13:26:01 -0300 Subject: [PATCH 4/6] Fixed table alias issue for sqlite besides postgres --- backend/sqlite/sqlite.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index b3013d41..0a15b71b 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -763,7 +763,7 @@ func (be *sqliteBackend) GetOrchestrationWorkItem(ctx context.Context) (*backend SELECT 1 FROM NewEvents E WHERE E.[InstanceID] = I.[InstanceID] AND (E.[VisibleTime] IS NULL OR E.[VisibleTime] < ?) ) - ORDER BY [SequenceNumber] ASC + ORDER BY I.[SequenceNumber] ASC LIMIT 1 ) RETURNING [InstanceID]`, be.workerName, // LockedBy for Instances table @@ -853,7 +853,7 @@ func (be *sqliteBackend) GetActivityWorkItem(ctx context.Context) (*backend.Acti WHERE [SequenceNumber] = ( SELECT [SequenceNumber] FROM NewTasks T WHERE T.[LockExpiration] IS NULL OR T.[LockExpiration] < ? - ORDER BY [SequenceNumber] ASC + ORDER BY T.[SequenceNumber] ASC LIMIT 1 ) RETURNING [SequenceNumber], [InstanceID], [EventPayload]`, be.workerName, From 0f7ec06c2ea20bc7db98243c0c264032a410a496 Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues Date: Fri, 14 Nov 2025 17:36:28 -0300 Subject: [PATCH 5/6] Fixed wrong order by over nonexistent column Instances.SequenceNumber for sqlite. --- backend/sqlite/sqlite.go | 1 - 1 file changed, 1 deletion(-) diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index 0a15b71b..c39b14f3 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -763,7 +763,6 @@ func (be *sqliteBackend) GetOrchestrationWorkItem(ctx context.Context) (*backend SELECT 1 FROM NewEvents E WHERE E.[InstanceID] = I.[InstanceID] AND (E.[VisibleTime] IS NULL OR E.[VisibleTime] < ?) ) - ORDER BY I.[SequenceNumber] ASC LIMIT 1 ) RETURNING [InstanceID]`, be.workerName, // LockedBy for Instances table From 1405b00656b56a8d637a10056f7258ff7880a906 Mon Sep 17 00:00:00 2001 From: Marcelo Rodrigues Date: Fri, 14 Nov 2025 17:52:54 -0300 Subject: [PATCH 6/6] Created index to improve queries with ORDER BY Instances.SequenceNumber --- backend/postgres/schema.sql | 3 +++ 1 file changed, 3 insertions(+) diff --git a/backend/postgres/schema.sql b/backend/postgres/schema.sql index 153a5ff6..435fc3ab 100644 --- a/backend/postgres/schema.sql +++ b/backend/postgres/schema.sql @@ -18,6 +18,9 @@ CREATE TABLE IF NOT EXISTS Instances ( ParentInstanceID TEXT NULL ); +-- This index is used to improve queries with ORDER BY Instances.SequenceNumber +CREATE INDEX IF NOT EXISTS IX_Instances_SequenceNumber ON Instances(SequenceNumber); + -- This index is used by LockNext and Purge logic CREATE INDEX IF NOT EXISTS IX_Instances_RuntimeStatus ON Instances(RuntimeStatus);