Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/uipath/runtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
from uipath.runtime.chat.runtime import UiPathChatRuntime
from uipath.runtime.context import UiPathRuntimeContext
from uipath.runtime.debug.breakpoint import UiPathBreakpointResult
from uipath.runtime.debug.bridge import UiPathDebugProtocol
from uipath.runtime.debug.exception import UiPathDebugQuitError
from uipath.runtime.debug.protocol import UiPathDebugProtocol
from uipath.runtime.debug.runtime import (
UiPathDebugRuntime,
)
Expand Down
2 changes: 1 addition & 1 deletion src/uipath/runtime/debug/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
"""Initialization module for the debug package."""

from uipath.runtime.debug.breakpoint import UiPathBreakpointResult
from uipath.runtime.debug.bridge import UiPathDebugProtocol
from uipath.runtime.debug.exception import (
UiPathDebugQuitError,
)
from uipath.runtime.debug.protocol import UiPathDebugProtocol
from uipath.runtime.debug.runtime import UiPathDebugRuntime

__all__ = [
Expand Down
28 changes: 22 additions & 6 deletions src/uipath/runtime/resumable/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,44 @@
class UiPathResumableStorageProtocol(Protocol):
"""Protocol for storing and retrieving resume triggers."""

async def save_trigger(self, runtime_id: str, trigger: UiPathResumeTrigger) -> None:
"""Save a resume trigger to storage.
async def save_triggers(
self, runtime_id: str, triggers: list[UiPathResumeTrigger]
) -> None:
"""Save resume triggers to storage.

Args:
trigger: The resume trigger to persist
triggers: The resume triggers to persist

Raises:
Exception: If storage operation fails
"""
...

async def get_latest_trigger(self, runtime_id: str) -> UiPathResumeTrigger | None:
"""Retrieve the most recent resume trigger from storage.
async def get_triggers(self, runtime_id: str) -> list[UiPathResumeTrigger] | None:
"""Retrieve the resume triggers from storage.

Returns:
The latest resume trigger, or None if no triggers exist
The resume triggers, or None if no triggers exist

Raises:
Exception: If retrieval operation fails
"""
...

async def delete_trigger(
self, runtime_id: str, trigger: UiPathResumeTrigger
) -> None:
"""Delete resume trigger from storage.

Args:
runtime_id: The runtime ID
trigger: The resume trigger to delete

Raises:
Exception: If deletion operation fails
"""
...

async def set_value(
self, runtime_id: str, namespace: str, key: str, value: Any
) -> None:
Expand Down
64 changes: 47 additions & 17 deletions src/uipath/runtime/resumable/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import logging
from typing import Any, AsyncGenerator

from uipath.core.errors import UiPathPendingTriggerError

from uipath.runtime.base import (
UiPathExecuteOptions,
UiPathRuntimeProtocol,
Expand Down Expand Up @@ -111,21 +113,32 @@ async def _restore_resume_input(
input: User-provided input (takes precedence)

Returns:
Input to use for resume, either provided or from storage
Input to use for resume: {interrupt_id: resume_data, ...}
"""
# If user provided explicit input, use it
if input is not None:
return input

# Otherwise, fetch from storage
trigger = await self.storage.get_latest_trigger(self.runtime_id)
if not trigger:
# Fetch all triggers from storage
triggers = await self.storage.get_triggers(self.runtime_id)
if not triggers:
return None

# Read trigger data via trigger_manager
resume_data = await self.trigger_manager.read_trigger(trigger)

return resume_data
# Build resume map: {interrupt_id: resume_data}
resume_map: dict[str, Any] = {}
for trigger in triggers:
try:
data = await self.trigger_manager.read_trigger(trigger)
assert trigger.interrupt_id is not None, (
"Trigger interrupt_id cannot be None"
)
resume_map[trigger.interrupt_id] = data
await self.storage.delete_trigger(self.runtime_id, trigger)
except UiPathPendingTriggerError:
# Trigger still pending, skip it
pass

return resume_map

async def _handle_suspension(
self, result: UiPathRuntimeResult
Expand All @@ -142,22 +155,39 @@ async def _handle_suspension(
if isinstance(result, UiPathBreakpointResult):
return result

# Check if trigger already exists in result
if result.trigger:
await self.storage.save_trigger(self.runtime_id, result.trigger)
return result

suspended_result = UiPathRuntimeResult(
status=UiPathRuntimeStatus.SUSPENDED,
output=result.output,
)

if result.output:
suspended_result.trigger = await self.trigger_manager.create_trigger(
result.output
assert result.output is None or isinstance(result.output, dict), (
"Suspended runtime output must be a dict of interrupt IDs to resume data"
)

# Get existing triggers and current interrupts
suspended_result.triggers = (
await self.storage.get_triggers(self.runtime_id) or []
)
current_interrupts = result.output or {}

# Diff: find new interrupts
existing_ids = {t.interrupt_id for t in suspended_result.triggers}
new_ids = current_interrupts.keys() - existing_ids

# Create triggers only for new interrupts
for interrupt_id in new_ids:
trigger = await self.trigger_manager.create_trigger(
current_interrupts[interrupt_id]
)
trigger.interrupt_id = interrupt_id
suspended_result.triggers.append(trigger)

if suspended_result.triggers:
await self.storage.save_triggers(self.runtime_id, suspended_result.triggers)

await self.storage.save_trigger(self.runtime_id, suspended_result.trigger)
# Backward compatibility: set single trigger directly
if len(suspended_result.triggers) == 1:
suspended_result.trigger = suspended_result.triggers[0]

return suspended_result

Expand Down
1 change: 1 addition & 0 deletions src/uipath/runtime/resumable/trigger.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class UiPathApiTrigger(BaseModel):
class UiPathResumeTrigger(BaseModel):
"""Information needed to resume execution."""

interrupt_id: str | None = Field(default=None, alias="interruptId")
trigger_type: UiPathResumeTriggerType = Field(
default=UiPathResumeTriggerType.API, alias="triggerType"
)
Expand Down
1 change: 1 addition & 0 deletions tests/test_chat_runtime.py → tests/test_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ async def stream(
yield UiPathRuntimeResult(
status=UiPathRuntimeStatus.SUSPENDED,
trigger=UiPathResumeTrigger(
interrupt_id="interrupt-1",
trigger_type=UiPathResumeTriggerType.API,
payload={"action": "confirm_tool_call"},
),
Expand Down
Loading