Skip to content

Commit e176922

Browse files
authored
Adjustable Polling Interval (#518)
- Adjustable queue polling interval (as a queue parameter) - Adjustable `get_result` polling interval (as a parameter to `get_result`). - Don't override the status filter in `list_queued_workflows` Addresses #515 and #517
1 parent aecf814 commit e176922

File tree

8 files changed

+169
-61
lines changed

8 files changed

+169
-61
lines changed

dbos/_client.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
from dbos._app_db import ApplicationDatabase
2020
from dbos._context import MaxPriority, MinPriority
21+
from dbos._core import DEFAULT_POLLING_INTERVAL
2122
from dbos._sys_db import SystemDatabase
2223
from dbos._utils import generate_uuid
2324

@@ -85,8 +86,12 @@ def __init__(self, workflow_id: str, sys_db: SystemDatabase):
8586
def get_workflow_id(self) -> str:
8687
return self.workflow_id
8788

88-
def get_result(self) -> R:
89-
res: R = self._sys_db.await_workflow_result(self.workflow_id)
89+
def get_result(
90+
self, *, polling_interval_sec: float = DEFAULT_POLLING_INTERVAL
91+
) -> R:
92+
res: R = self._sys_db.await_workflow_result(
93+
self.workflow_id, polling_interval_sec
94+
)
9095
return res
9196

9297
def get_status(self) -> WorkflowStatus:
@@ -105,9 +110,11 @@ def __init__(self, workflow_id: str, sys_db: SystemDatabase):
105110
def get_workflow_id(self) -> str:
106111
return self.workflow_id
107112

108-
async def get_result(self) -> R:
113+
async def get_result(
114+
self, *, polling_interval_sec: float = DEFAULT_POLLING_INTERVAL
115+
) -> R:
109116
res: R = await asyncio.to_thread(
110-
self._sys_db.await_workflow_result, self.workflow_id
117+
self._sys_db.await_workflow_result, self.workflow_id, polling_interval_sec
111118
)
112119
return res
113120

dbos/_core.py

Lines changed: 26 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191

9292
TEMP_SEND_WF_NAME = "<temp>.temp_send_workflow"
9393
DEBOUNCER_WORKFLOW_NAME = "_dbos_debouncer_workflow"
94+
DEFAULT_POLLING_INTERVAL = 1.0
9495

9596

9697
class WorkflowHandleFuture(Generic[R]):
@@ -103,7 +104,9 @@ def __init__(self, workflow_id: str, future: Future[R], dbos: "DBOS"):
103104
def get_workflow_id(self) -> str:
104105
return self.workflow_id
105106

106-
def get_result(self) -> R:
107+
def get_result(
108+
self, *, polling_interval_sec: float = DEFAULT_POLLING_INTERVAL
109+
) -> R:
107110
try:
108111
r = self.future.result()
109112
except Exception as e:
@@ -130,9 +133,13 @@ def __init__(self, workflow_id: str, dbos: "DBOS"):
130133
def get_workflow_id(self) -> str:
131134
return self.workflow_id
132135

133-
def get_result(self) -> R:
136+
def get_result(
137+
self, *, polling_interval_sec: float = DEFAULT_POLLING_INTERVAL
138+
) -> R:
134139
try:
135-
r: R = self.dbos._sys_db.await_workflow_result(self.workflow_id)
140+
r: R = self.dbos._sys_db.await_workflow_result(
141+
self.workflow_id, polling_interval_sec
142+
)
136143
except Exception as e:
137144
serialized_e = self.dbos._serializer.serialize(e)
138145
self.dbos._sys_db.record_get_result(self.workflow_id, None, serialized_e)
@@ -158,7 +165,9 @@ def __init__(self, workflow_id: str, task: asyncio.Future[R], dbos: "DBOS"):
158165
def get_workflow_id(self) -> str:
159166
return self.workflow_id
160167

161-
async def get_result(self) -> R:
168+
async def get_result(
169+
self, *, polling_interval_sec: float = DEFAULT_POLLING_INTERVAL
170+
) -> R:
162171
try:
163172
r = await self.task
164173
except Exception as e:
@@ -192,10 +201,14 @@ def __init__(self, workflow_id: str, dbos: "DBOS"):
192201
def get_workflow_id(self) -> str:
193202
return self.workflow_id
194203

195-
async def get_result(self) -> R:
204+
async def get_result(
205+
self, *, polling_interval_sec: float = DEFAULT_POLLING_INTERVAL
206+
) -> R:
196207
try:
197208
r: R = await asyncio.to_thread(
198-
self.dbos._sys_db.await_workflow_result, self.workflow_id
209+
self.dbos._sys_db.await_workflow_result,
210+
self.workflow_id,
211+
polling_interval_sec,
199212
)
200213
except Exception as e:
201214
serialized_e = self.dbos._serializer.serialize(e)
@@ -366,7 +379,7 @@ def persist(func: Callable[[], R]) -> R:
366379
)
367380
# Directly return the result if the workflow is already completed
368381
recorded_result: R = dbos._sys_db.await_workflow_result(
369-
status["workflow_uuid"]
382+
status["workflow_uuid"], polling_interval=DEFAULT_POLLING_INTERVAL
370383
)
371384
return recorded_result
372385
try:
@@ -381,7 +394,9 @@ def persist(func: Callable[[], R]) -> R:
381394
return output
382395
except DBOSWorkflowConflictIDError:
383396
# Await the workflow result
384-
r: R = dbos._sys_db.await_workflow_result(status["workflow_uuid"])
397+
r: R = dbos._sys_db.await_workflow_result(
398+
status["workflow_uuid"], polling_interval=DEFAULT_POLLING_INTERVAL
399+
)
385400
return r
386401
except DBOSWorkflowCancelledError as error:
387402
raise DBOSAwaitedWorkflowCancelledError(status["workflow_uuid"])
@@ -788,7 +803,9 @@ def recorded_result(
788803
c_wfid: str, dbos: "DBOS"
789804
) -> Callable[[Callable[[], R]], R]:
790805
def recorded_result_inner(func: Callable[[], R]) -> R:
791-
r: R = dbos._sys_db.await_workflow_result(c_wfid)
806+
r: R = dbos._sys_db.await_workflow_result(
807+
c_wfid, polling_interval=DEFAULT_POLLING_INTERVAL
808+
)
792809
return r
793810

794811
return recorded_result_inner

dbos/_dbos.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
from ._classproperty import classproperty
3939
from ._core import (
4040
DEBOUNCER_WORKFLOW_NAME,
41+
DEFAULT_POLLING_INTERVAL,
4142
TEMP_SEND_WF_NAME,
4243
WorkflowHandleAsyncPolling,
4344
WorkflowHandlePolling,
@@ -335,6 +336,8 @@ def __init__(
335336
self._executor_field: Optional[ThreadPoolExecutor] = None
336337
self._background_threads: List[threading.Thread] = []
337338
self.conductor_url: Optional[str] = conductor_url
339+
if config.get("conductor_url"):
340+
self.conductor_url = config.get("conductor_url")
338341
self.conductor_key: Optional[str] = conductor_key
339342
if config.get("conductor_key"):
340343
self.conductor_key = config.get("conductor_key")
@@ -1551,7 +1554,9 @@ def get_workflow_id(self) -> str:
15511554
"""Return the applicable workflow ID."""
15521555
...
15531556

1554-
def get_result(self) -> R:
1557+
def get_result(
1558+
self, *, polling_interval_sec: float = DEFAULT_POLLING_INTERVAL
1559+
) -> R:
15551560
"""Return the result of the workflow function invocation, waiting if necessary."""
15561561
...
15571562

@@ -1580,7 +1585,9 @@ def get_workflow_id(self) -> str:
15801585
"""Return the applicable workflow ID."""
15811586
...
15821587

1583-
async def get_result(self) -> R:
1588+
async def get_result(
1589+
self, *, polling_interval_sec: float = DEFAULT_POLLING_INTERVAL
1590+
) -> R:
15841591
"""Return the result of the workflow function invocation, waiting if necessary."""
15851592
...
15861593

dbos/_dbos_config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class DBOSConfig(TypedDict, total=False):
3939
enable_otlp (bool): If True, enable built-in DBOS OTLP tracing and logging.
4040
system_database_engine (sa.Engine): A custom system database engine. If provided, DBOS will not create an engine but use this instead.
4141
conductor_key (str): An API key for DBOS Conductor. Pass this in to connect your process to Conductor.
42+
conductor_url (str): The websockets URL for your DBOS Conductor service. Only set if you're self-hosting Conductor.
4243
serializer (Serializer): A custom serializer and deserializer DBOS uses when storing program data in the system database
4344
"""
4445

@@ -60,6 +61,7 @@ class DBOSConfig(TypedDict, total=False):
6061
enable_otlp: Optional[bool]
6162
system_database_engine: Optional[sa.Engine]
6263
conductor_key: Optional[str]
64+
conductor_url: Optional[str]
6365
serializer: Optional[Serializer]
6466

6567

dbos/_queue.py

Lines changed: 96 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ def __init__(
4444
worker_concurrency: Optional[int] = None,
4545
priority_enabled: bool = False,
4646
partition_queue: bool = False,
47+
polling_interval_sec: float = 1.0,
4748
) -> None:
4849
if (
4950
worker_concurrency is not None
@@ -53,12 +54,15 @@ def __init__(
5354
raise ValueError(
5455
"worker_concurrency must be less than or equal to concurrency"
5556
)
57+
if polling_interval_sec <= 0.0:
58+
raise ValueError("polling_interval_sec must be positive")
5659
self.name = name
5760
self.concurrency = concurrency
5861
self.worker_concurrency = worker_concurrency
5962
self.limiter = limiter
6063
self.priority_enabled = priority_enabled
6164
self.partition_queue = partition_queue
65+
self.polling_interval_sec = polling_interval_sec
6266
from ._dbos import _get_or_create_dbos_registry
6367

6468
registry = _get_or_create_dbos_registry()
@@ -108,50 +112,102 @@ async def enqueue_async(
108112
return await start_workflow_async(dbos, func, self.name, False, *args, **kwargs)
109113

110114

111-
def queue_thread(stop_event: threading.Event, dbos: "DBOS") -> None:
112-
polling_interval = 1.0
113-
min_polling_interval = 1.0
114-
max_polling_interval = 120.0
115+
def queue_worker_thread(
116+
stop_event: threading.Event, dbos: "DBOS", queue: Queue
117+
) -> None:
118+
"""Worker thread for processing a single queue."""
119+
polling_interval = queue.polling_interval_sec
120+
min_polling_interval = queue.polling_interval_sec
121+
max_polling_interval = max(queue.polling_interval_sec, 120.0)
122+
115123
while not stop_event.is_set():
116124
# Wait for the polling interval with jitter
117125
if stop_event.wait(timeout=polling_interval * random.uniform(0.95, 1.05)):
118126
return
119-
queues = dict(dbos._registry.queue_info_map)
120-
for _, queue in queues.items():
121-
try:
122-
if queue.partition_queue:
123-
dequeued_workflows = []
124-
queue_partition_keys = dbos._sys_db.get_queue_partitions(queue.name)
125-
for key in queue_partition_keys:
126-
dequeued_workflows += dbos._sys_db.start_queued_workflows(
127-
queue,
128-
GlobalParams.executor_id,
129-
GlobalParams.app_version,
130-
key,
131-
)
132-
else:
133-
dequeued_workflows = dbos._sys_db.start_queued_workflows(
134-
queue, GlobalParams.executor_id, GlobalParams.app_version, None
135-
)
136-
for id in dequeued_workflows:
137-
execute_workflow_by_id(dbos, id)
138-
except OperationalError as e:
139-
if isinstance(
140-
e.orig, (errors.SerializationFailure, errors.LockNotAvailable)
141-
):
142-
# If a serialization error is encountered, increase the polling interval
143-
polling_interval = min(
144-
max_polling_interval,
145-
polling_interval * 2.0,
146-
)
147-
dbos.logger.warning(
148-
f"Contention detected in queue thread for {queue.name}. Increasing polling interval to {polling_interval:.2f}."
127+
128+
try:
129+
if queue.partition_queue:
130+
dequeued_workflows = []
131+
queue_partition_keys = dbos._sys_db.get_queue_partitions(queue.name)
132+
for key in queue_partition_keys:
133+
dequeued_workflows += dbos._sys_db.start_queued_workflows(
134+
queue,
135+
GlobalParams.executor_id,
136+
GlobalParams.app_version,
137+
key,
149138
)
150-
else:
151-
dbos.logger.warning(f"Exception encountered in queue thread: {e}")
152-
except Exception as e:
153-
if not stop_event.is_set():
154-
# Only print the error if the thread is not stopping
155-
dbos.logger.warning(f"Exception encountered in queue thread: {e}")
139+
else:
140+
dequeued_workflows = dbos._sys_db.start_queued_workflows(
141+
queue, GlobalParams.executor_id, GlobalParams.app_version, None
142+
)
143+
for id in dequeued_workflows:
144+
execute_workflow_by_id(dbos, id)
145+
except OperationalError as e:
146+
if isinstance(
147+
e.orig, (errors.SerializationFailure, errors.LockNotAvailable)
148+
):
149+
# If a serialization error is encountered, increase the polling interval
150+
polling_interval = min(
151+
max_polling_interval,
152+
polling_interval * 2.0,
153+
)
154+
dbos.logger.warning(
155+
f"Contention detected in queue thread for {queue.name}. Increasing polling interval to {polling_interval:.2f}."
156+
)
157+
else:
158+
dbos.logger.warning(
159+
f"Exception encountered in queue thread for {queue.name}: {e}"
160+
)
161+
except Exception as e:
162+
if not stop_event.is_set():
163+
# Only print the error if the thread is not stopping
164+
dbos.logger.warning(
165+
f"Exception encountered in queue thread for {queue.name}: {e}"
166+
)
167+
156168
# Attempt to scale back the polling interval on each iteration
157169
polling_interval = max(min_polling_interval, polling_interval * 0.9)
170+
171+
172+
def queue_thread(stop_event: threading.Event, dbos: "DBOS") -> None:
173+
"""Main queue manager thread that spawns and monitors worker threads for each queue."""
174+
queue_threads: dict[str, threading.Thread] = {}
175+
check_interval = 1.0 # Check for new queues every second
176+
177+
while not stop_event.is_set():
178+
# Check for new queues
179+
current_queues = dict(dbos._registry.queue_info_map)
180+
181+
# Start threads for new queues
182+
for queue_name, queue in current_queues.items():
183+
if (
184+
queue_name not in queue_threads
185+
or not queue_threads[queue_name].is_alive()
186+
):
187+
thread = threading.Thread(
188+
target=queue_worker_thread,
189+
args=(stop_event, dbos, queue),
190+
name=f"queue-worker-{queue_name}",
191+
daemon=True,
192+
)
193+
thread.start()
194+
queue_threads[queue_name] = thread
195+
dbos.logger.debug(f"Started worker thread for queue: {queue_name}")
196+
197+
# Wait for the check interval or stop event
198+
if stop_event.wait(timeout=check_interval):
199+
break
200+
201+
# Join all queue worker threads
202+
dbos.logger.info("Stopping queue manager, joining all worker threads...")
203+
for queue_name, thread in queue_threads.items():
204+
if thread.is_alive():
205+
thread.join(timeout=10.0) # Give each thread 10 seconds to finish
206+
if thread.is_alive():
207+
dbos.logger.debug(
208+
f"Queue worker thread for {queue_name} did not stop in time"
209+
)
210+
else:
211+
dbos.logger.debug(
212+
f"Queue worker thread for {queue_name} stopped successfully"
213+
)

dbos/_sys_db.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -930,7 +930,7 @@ def get_deduplicated_workflow(
930930
return workflow_id
931931

932932
@db_retry()
933-
def await_workflow_result(self, workflow_id: str) -> Any:
933+
def await_workflow_result(self, workflow_id: str, polling_interval: float) -> Any:
934934
while True:
935935
with self.engine.begin() as c:
936936
row = c.execute(
@@ -955,7 +955,7 @@ def await_workflow_result(self, workflow_id: str) -> Any:
955955
raise DBOSAwaitedWorkflowCancelledError(workflow_id)
956956
else:
957957
pass # CB: I guess we're assuming the WF will show up eventually.
958-
time.sleep(1)
958+
time.sleep(polling_interval)
959959

960960
def get_workflows(
961961
self,
@@ -998,11 +998,12 @@ def get_workflows(
998998

999999
if input.queues_only:
10001000
query = sa.select(*load_columns).where(
1001-
sa.and_(
1002-
SystemSchema.workflow_status.c.queue_name.isnot(None),
1003-
SystemSchema.workflow_status.c.status.in_(["ENQUEUED", "PENDING"]),
1004-
)
1001+
SystemSchema.workflow_status.c.queue_name.isnot(None),
10051002
)
1003+
if not input.status:
1004+
query = query.where(
1005+
SystemSchema.workflow_status.c.status.in_(["ENQUEUED", "PENDING"])
1006+
)
10061007
else:
10071008
query = sa.select(*load_columns)
10081009
if input.sort_desc:

0 commit comments

Comments
 (0)