Skip to content

Commit 9e14249

Browse files
authored
Consolidate Operation Outputs (#523)
Transactions now also store a record in the `operation_outputs` table, in addition to the record in the `transaction_outputs` table they use for transactional guarantees. This ensures the `operation_outputs` table contains the workflow's full history.
1 parent e176922 commit 9e14249

File tree

7 files changed

+103
-166
lines changed

7 files changed

+103
-166
lines changed

dbos/_app_db.py

Lines changed: 0 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -173,81 +173,6 @@ def check_transaction_execution(
173173
}
174174
return result
175175

176-
def get_transactions(self, workflow_uuid: str) -> List[StepInfo]:
177-
with self.engine.begin() as conn:
178-
rows = conn.execute(
179-
sa.select(
180-
ApplicationSchema.transaction_outputs.c.function_id,
181-
ApplicationSchema.transaction_outputs.c.function_name,
182-
ApplicationSchema.transaction_outputs.c.output,
183-
ApplicationSchema.transaction_outputs.c.error,
184-
).where(
185-
ApplicationSchema.transaction_outputs.c.workflow_uuid
186-
== workflow_uuid,
187-
)
188-
).all()
189-
return [
190-
StepInfo(
191-
function_id=row[0],
192-
function_name=row[1],
193-
output=(
194-
self.serializer.deserialize(row[2])
195-
if row[2] is not None
196-
else row[2]
197-
),
198-
error=(
199-
self.serializer.deserialize(row[3])
200-
if row[3] is not None
201-
else row[3]
202-
),
203-
child_workflow_id=None,
204-
started_at_epoch_ms=None,
205-
completed_at_epoch_ms=None,
206-
)
207-
for row in rows
208-
]
209-
210-
def clone_workflow_transactions(
211-
self, src_workflow_id: str, forked_workflow_id: str, start_step: int
212-
) -> None:
213-
"""
214-
Copies all steps from dbos.transctions_outputs where function_id < input function_id
215-
into a new workflow_uuid. Returns the new workflow_uuid.
216-
"""
217-
218-
with self.engine.begin() as conn:
219-
220-
insert_stmt = sa.insert(ApplicationSchema.transaction_outputs).from_select(
221-
[
222-
"workflow_uuid",
223-
"function_id",
224-
"output",
225-
"error",
226-
"txn_id",
227-
"txn_snapshot",
228-
"executor_id",
229-
"function_name",
230-
],
231-
sa.select(
232-
sa.literal(forked_workflow_id).label("workflow_uuid"),
233-
ApplicationSchema.transaction_outputs.c.function_id,
234-
ApplicationSchema.transaction_outputs.c.output,
235-
ApplicationSchema.transaction_outputs.c.error,
236-
ApplicationSchema.transaction_outputs.c.txn_id,
237-
ApplicationSchema.transaction_outputs.c.txn_snapshot,
238-
ApplicationSchema.transaction_outputs.c.executor_id,
239-
ApplicationSchema.transaction_outputs.c.function_name,
240-
).where(
241-
(
242-
ApplicationSchema.transaction_outputs.c.workflow_uuid
243-
== src_workflow_id
244-
)
245-
& (ApplicationSchema.transaction_outputs.c.function_id < start_step)
246-
),
247-
)
248-
249-
conn.execute(insert_stmt)
250-
251176
def garbage_collect(
252177
self, cutoff_epoch_timestamp_ms: int, pending_workflow_ids: list[str]
253178
) -> None:

dbos/_client.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
fork_workflow,
4444
get_workflow,
4545
list_queued_workflows,
46-
list_workflow_steps,
4746
list_workflows,
4847
)
4948

@@ -479,7 +478,7 @@ async def list_queued_workflows_async(
479478
)
480479

481480
def list_workflow_steps(self, workflow_id: str) -> List[StepInfo]:
482-
return list_workflow_steps(self._sys_db, self._app_db, workflow_id)
481+
return self._sys_db.list_workflow_steps(workflow_id)
483482

484483
async def list_workflow_steps_async(self, workflow_id: str) -> List[StepInfo]:
485484
return await asyncio.to_thread(self.list_workflow_steps, workflow_id)
@@ -493,7 +492,6 @@ def fork_workflow(
493492
) -> "WorkflowHandle[Any]":
494493
forked_workflow_id = fork_workflow(
495494
self._sys_db,
496-
self._app_db,
497495
workflow_id,
498496
start_step,
499497
application_version=application_version,
@@ -510,7 +508,6 @@ async def fork_workflow_async(
510508
forked_workflow_id = await asyncio.to_thread(
511509
fork_workflow,
512510
self._sys_db,
513-
self._app_db,
514511
workflow_id,
515512
start_step,
516513
application_version=application_version,

dbos/_conductor/conductor.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
get_workflow,
1717
global_timeout,
1818
list_queued_workflows,
19-
list_workflow_steps,
2019
list_workflows,
2120
)
2221

@@ -341,10 +340,8 @@ def run(self) -> None:
341340
list_steps_message = p.ListStepsRequest.from_json(message)
342341
step_info = None
343342
try:
344-
step_info = list_workflow_steps(
345-
self.dbos._sys_db,
346-
self.dbos._app_db,
347-
list_steps_message.workflow_id,
343+
self.dbos._sys_db.list_workflow_steps(
344+
list_steps_message.workflow_id
348345
)
349346
except Exception as e:
350347
error_message = f"Exception encountered when getting workflow {list_steps_message.workflow_id}: {traceback.format_exc()}"

dbos/_core.py

Lines changed: 69 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -594,11 +594,14 @@ def start_workflow(
594594
ctx = new_wf_ctx
595595
new_child_workflow_id = ctx.id_assigned_for_next_workflow
596596
if ctx.has_parent():
597-
child_workflow_id = dbos._sys_db.check_child_workflow(
598-
ctx.parent_workflow_id, ctx.parent_workflow_fid
597+
recorded_result = dbos._sys_db.check_operation_execution(
598+
ctx.parent_workflow_id, ctx.parent_workflow_fid, get_dbos_func_name(func)
599599
)
600-
if child_workflow_id is not None:
601-
return WorkflowHandlePolling(child_workflow_id, dbos)
600+
if recorded_result and recorded_result["error"]:
601+
e: Exception = dbos._sys_db.serializer.deserialize(recorded_result["error"])
602+
raise e
603+
elif recorded_result and recorded_result["child_workflow_id"]:
604+
return WorkflowHandlePolling(recorded_result["child_workflow_id"], dbos)
602605

603606
status = _init_workflow(
604607
dbos,
@@ -690,13 +693,19 @@ async def start_workflow_async(
690693
ctx = new_wf_ctx
691694
new_child_workflow_id = ctx.id_assigned_for_next_workflow
692695
if ctx.has_parent():
693-
child_workflow_id = await asyncio.to_thread(
694-
dbos._sys_db.check_child_workflow,
696+
recorded_result = await asyncio.to_thread(
697+
dbos._sys_db.check_operation_execution,
695698
ctx.parent_workflow_id,
696699
ctx.parent_workflow_fid,
700+
get_dbos_func_name(func),
697701
)
698-
if child_workflow_id is not None:
699-
return WorkflowHandleAsyncPolling(child_workflow_id, dbos)
702+
if recorded_result and recorded_result["error"]:
703+
e: Exception = dbos._sys_db.serializer.deserialize(recorded_result["error"])
704+
raise e
705+
elif recorded_result and recorded_result["child_workflow_id"]:
706+
return WorkflowHandleAsyncPolling(
707+
recorded_result["child_workflow_id"], dbos
708+
)
700709

701710
status = await asyncio.to_thread(
702711
_init_workflow,
@@ -815,11 +824,16 @@ def recorded_result_inner(func: Callable[[], R]) -> R:
815824
workflow_id = ctx.workflow_id
816825

817826
if ctx.has_parent():
818-
child_workflow_id = dbos._sys_db.check_child_workflow(
819-
ctx.parent_workflow_id, ctx.parent_workflow_fid
827+
r = dbos._sys_db.check_operation_execution(
828+
ctx.parent_workflow_id,
829+
ctx.parent_workflow_fid,
830+
get_dbos_func_name(func),
820831
)
821-
if child_workflow_id is not None:
822-
return recorded_result(child_workflow_id, dbos)
832+
if r and r["error"]:
833+
e: Exception = dbos._sys_db.serializer.deserialize(r["error"])
834+
raise e
835+
elif r and r["child_workflow_id"]:
836+
return recorded_result(r["child_workflow_id"], dbos)
823837

824838
status = _init_workflow(
825839
dbos,
@@ -906,12 +920,6 @@ def invoke_tx(*args: Any, **kwargs: Any) -> Any:
906920
)
907921

908922
dbos = dbosreg.dbos
909-
ctx = assert_current_dbos_context()
910-
status = dbos._sys_db.get_workflow_status(ctx.workflow_id)
911-
if status and status["status"] == WorkflowStatusString.CANCELLED.value:
912-
raise DBOSWorkflowCancelledError(
913-
f"Workflow {ctx.workflow_id} is cancelled. Aborting transaction {transaction_name}."
914-
)
915923
assert (
916924
dbos._app_db
917925
), "Transactions can only be used if DBOS is configured with an application_database_url"
@@ -922,6 +930,26 @@ def invoke_tx(*args: Any, **kwargs: Any) -> Any:
922930
}
923931
with EnterDBOSTransaction(session, attributes=attributes):
924932
ctx = assert_current_dbos_context()
933+
# Check if the step record for this transaction exists
934+
recorded_step_output = dbos._sys_db.check_operation_execution(
935+
ctx.workflow_id, ctx.function_id, transaction_name
936+
)
937+
if recorded_step_output:
938+
dbos.logger.debug(
939+
f"Replaying transaction, id: {ctx.function_id}, name: {attributes['name']}"
940+
)
941+
if recorded_step_output["error"]:
942+
step_error: Exception = dbos._serializer.deserialize(
943+
recorded_step_output["error"]
944+
)
945+
raise step_error
946+
elif recorded_step_output["output"]:
947+
return dbos._serializer.deserialize(
948+
recorded_step_output["output"]
949+
)
950+
else:
951+
raise Exception("Output and error are both None")
952+
925953
txn_output: TransactionResultInternal = {
926954
"workflow_uuid": ctx.workflow_id,
927955
"function_id": ctx.function_id,
@@ -932,6 +960,14 @@ def invoke_tx(*args: Any, **kwargs: Any) -> Any:
932960
"txn_id": None,
933961
"function_name": transaction_name,
934962
}
963+
step_output: OperationResultInternal = {
964+
"workflow_uuid": ctx.workflow_id,
965+
"function_id": ctx.function_id,
966+
"function_name": transaction_name,
967+
"output": None,
968+
"error": None,
969+
"started_at_epoch_ms": int(time.time() * 1000),
970+
}
935971
retry_wait_seconds = 0.001
936972
backoff_factor = 1.5
937973
max_retry_wait_seconds = 2.0
@@ -970,8 +1006,18 @@ def invoke_tx(*args: Any, **kwargs: Any) -> Any:
9701006
)
9711007
)
9721008
has_recorded_error = True
1009+
step_output["error"] = recorded_output["error"]
1010+
dbos._sys_db.record_operation_result(
1011+
step_output
1012+
)
9731013
raise deserialized_error
9741014
elif recorded_output["output"]:
1015+
step_output["output"] = recorded_output[
1016+
"output"
1017+
]
1018+
dbos._sys_db.record_operation_result(
1019+
step_output
1020+
)
9751021
return dbos._serializer.deserialize(
9761022
recorded_output["output"]
9771023
)
@@ -1028,10 +1074,13 @@ def invoke_tx(*args: Any, **kwargs: Any) -> Any:
10281074
finally:
10291075
# Don't record the error if it was already recorded
10301076
if txn_error and not has_recorded_error:
1031-
txn_output["error"] = dbos._serializer.serialize(
1032-
txn_error
1077+
step_output["error"] = txn_output["error"] = (
1078+
dbos._serializer.serialize(txn_error)
10331079
)
10341080
dbos._app_db.record_transaction_error(txn_output)
1081+
dbos._sys_db.record_operation_result(step_output)
1082+
step_output["output"] = dbos._serializer.serialize(output)
1083+
dbos._sys_db.record_operation_result(step_output)
10351084
return output
10361085

10371086
if inspect.iscoroutinefunction(func):

dbos/_dbos.py

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@
112112
dbos_logger,
113113
init_logger,
114114
)
115-
from ._workflow_commands import get_workflow, list_workflow_steps
115+
from ._workflow_commands import get_workflow
116116

117117
# Most DBOS functions are just any callable F, so decorators / wrappers work on F
118118
# There are cases where the parameters P and return value R should be separate
@@ -1093,7 +1093,6 @@ def fn() -> str:
10931093
dbos_logger.info(f"Forking workflow: {workflow_id} from step {start_step}")
10941094
return fork_workflow(
10951095
_get_dbos_instance()._sys_db,
1096-
_get_dbos_instance()._app_db,
10971096
workflow_id,
10981097
start_step,
10991098
application_version=application_version,
@@ -1270,9 +1269,7 @@ async def list_queued_workflows_async(
12701269
@classmethod
12711270
def list_workflow_steps(cls, workflow_id: str) -> List[StepInfo]:
12721271
def fn() -> List[StepInfo]:
1273-
return list_workflow_steps(
1274-
_get_dbos_instance()._sys_db, _get_dbos_instance()._app_db, workflow_id
1275-
)
1272+
return _get_dbos_instance()._sys_db.list_workflow_steps(workflow_id)
12761273

12771274
return _get_dbos_instance()._sys_db.call_function_as_step(
12781275
fn, "DBOS.listWorkflowSteps"

0 commit comments

Comments
 (0)