Skip to content

Commit 5851e56

Browse files
authored
Execute dequeued workflows immediately (#527)
Avoid the case where "contention detected" exception prevents some workflows on a partitioned queue from running.
1 parent 9e14249 commit 5851e56

File tree

1 file changed

+5
-4
lines changed

1 file changed

+5
-4
lines changed

dbos/_queue.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,21 +127,22 @@ def queue_worker_thread(
127127

128128
try:
129129
if queue.partition_queue:
130-
dequeued_workflows = []
131130
queue_partition_keys = dbos._sys_db.get_queue_partitions(queue.name)
132131
for key in queue_partition_keys:
133-
dequeued_workflows += dbos._sys_db.start_queued_workflows(
132+
dequeued_workflows = dbos._sys_db.start_queued_workflows(
134133
queue,
135134
GlobalParams.executor_id,
136135
GlobalParams.app_version,
137136
key,
138137
)
138+
for id in dequeued_workflows:
139+
execute_workflow_by_id(dbos, id)
139140
else:
140141
dequeued_workflows = dbos._sys_db.start_queued_workflows(
141142
queue, GlobalParams.executor_id, GlobalParams.app_version, None
142143
)
143-
for id in dequeued_workflows:
144-
execute_workflow_by_id(dbos, id)
144+
for id in dequeued_workflows:
145+
execute_workflow_by_id(dbos, id)
145146
except OperationalError as e:
146147
if isinstance(
147148
e.orig, (errors.SerializationFailure, errors.LockNotAvailable)

0 commit comments

Comments
 (0)