diff --git a/src/uipath/runtime/__init__.py b/src/uipath/runtime/__init__.py index 895f9a8..7932492 100644 --- a/src/uipath/runtime/__init__.py +++ b/src/uipath/runtime/__init__.py @@ -11,8 +11,8 @@ from uipath.runtime.chat.runtime import UiPathChatRuntime from uipath.runtime.context import UiPathRuntimeContext from uipath.runtime.debug.breakpoint import UiPathBreakpointResult -from uipath.runtime.debug.bridge import UiPathDebugProtocol from uipath.runtime.debug.exception import UiPathDebugQuitError +from uipath.runtime.debug.protocol import UiPathDebugProtocol from uipath.runtime.debug.runtime import ( UiPathDebugRuntime, ) diff --git a/src/uipath/runtime/debug/__init__.py b/src/uipath/runtime/debug/__init__.py index 0a37956..a3daa0d 100644 --- a/src/uipath/runtime/debug/__init__.py +++ b/src/uipath/runtime/debug/__init__.py @@ -1,10 +1,10 @@ """Initialization module for the debug package.""" from uipath.runtime.debug.breakpoint import UiPathBreakpointResult -from uipath.runtime.debug.bridge import UiPathDebugProtocol from uipath.runtime.debug.exception import ( UiPathDebugQuitError, ) +from uipath.runtime.debug.protocol import UiPathDebugProtocol from uipath.runtime.debug.runtime import UiPathDebugRuntime __all__ = [ diff --git a/src/uipath/runtime/debug/bridge.py b/src/uipath/runtime/debug/protocol.py similarity index 100% rename from src/uipath/runtime/debug/bridge.py rename to src/uipath/runtime/debug/protocol.py diff --git a/src/uipath/runtime/resumable/protocols.py b/src/uipath/runtime/resumable/protocols.py index 4b1ecbb..1396bb3 100644 --- a/src/uipath/runtime/resumable/protocols.py +++ b/src/uipath/runtime/resumable/protocols.py @@ -8,28 +8,44 @@ class UiPathResumableStorageProtocol(Protocol): """Protocol for storing and retrieving resume triggers.""" - async def save_trigger(self, runtime_id: str, trigger: UiPathResumeTrigger) -> None: - """Save a resume trigger to storage. + async def save_triggers( + self, runtime_id: str, triggers: list[UiPathResumeTrigger] + ) -> None: + """Save resume triggers to storage. Args: - trigger: The resume trigger to persist + triggers: The resume triggers to persist Raises: Exception: If storage operation fails """ ... - async def get_latest_trigger(self, runtime_id: str) -> UiPathResumeTrigger | None: - """Retrieve the most recent resume trigger from storage. + async def get_triggers(self, runtime_id: str) -> list[UiPathResumeTrigger] | None: + """Retrieve the resume triggers from storage. Returns: - The latest resume trigger, or None if no triggers exist + The resume triggers, or None if no triggers exist Raises: Exception: If retrieval operation fails """ ... + async def delete_trigger( + self, runtime_id: str, trigger: UiPathResumeTrigger + ) -> None: + """Delete resume trigger from storage. + + Args: + runtime_id: The runtime ID + trigger: The resume trigger to delete + + Raises: + Exception: If deletion operation fails + """ + ... + async def set_value( self, runtime_id: str, namespace: str, key: str, value: Any ) -> None: diff --git a/src/uipath/runtime/resumable/runtime.py b/src/uipath/runtime/resumable/runtime.py index 44bb9d4..d12b93c 100644 --- a/src/uipath/runtime/resumable/runtime.py +++ b/src/uipath/runtime/resumable/runtime.py @@ -3,6 +3,8 @@ import logging from typing import Any, AsyncGenerator +from uipath.core.errors import UiPathPendingTriggerError + from uipath.runtime.base import ( UiPathExecuteOptions, UiPathRuntimeProtocol, @@ -111,21 +113,32 @@ async def _restore_resume_input( input: User-provided input (takes precedence) Returns: - Input to use for resume, either provided or from storage + Input to use for resume: {interrupt_id: resume_data, ...} """ # If user provided explicit input, use it if input is not None: return input - # Otherwise, fetch from storage - trigger = await self.storage.get_latest_trigger(self.runtime_id) - if not trigger: + # Fetch all triggers from storage + triggers = await self.storage.get_triggers(self.runtime_id) + if not triggers: return None - # Read trigger data via trigger_manager - resume_data = await self.trigger_manager.read_trigger(trigger) - - return resume_data + # Build resume map: {interrupt_id: resume_data} + resume_map: dict[str, Any] = {} + for trigger in triggers: + try: + data = await self.trigger_manager.read_trigger(trigger) + assert trigger.interrupt_id is not None, ( + "Trigger interrupt_id cannot be None" + ) + resume_map[trigger.interrupt_id] = data + await self.storage.delete_trigger(self.runtime_id, trigger) + except UiPathPendingTriggerError: + # Trigger still pending, skip it + pass + + return resume_map async def _handle_suspension( self, result: UiPathRuntimeResult @@ -142,22 +155,39 @@ async def _handle_suspension( if isinstance(result, UiPathBreakpointResult): return result - # Check if trigger already exists in result - if result.trigger: - await self.storage.save_trigger(self.runtime_id, result.trigger) - return result - suspended_result = UiPathRuntimeResult( status=UiPathRuntimeStatus.SUSPENDED, output=result.output, ) - if result.output: - suspended_result.trigger = await self.trigger_manager.create_trigger( - result.output + assert result.output is None or isinstance(result.output, dict), ( + "Suspended runtime output must be a dict of interrupt IDs to resume data" + ) + + # Get existing triggers and current interrupts + suspended_result.triggers = ( + await self.storage.get_triggers(self.runtime_id) or [] + ) + current_interrupts = result.output or {} + + # Diff: find new interrupts + existing_ids = {t.interrupt_id for t in suspended_result.triggers} + new_ids = current_interrupts.keys() - existing_ids + + # Create triggers only for new interrupts + for interrupt_id in new_ids: + trigger = await self.trigger_manager.create_trigger( + current_interrupts[interrupt_id] ) + trigger.interrupt_id = interrupt_id + suspended_result.triggers.append(trigger) + + if suspended_result.triggers: + await self.storage.save_triggers(self.runtime_id, suspended_result.triggers) - await self.storage.save_trigger(self.runtime_id, suspended_result.trigger) + # Backward compatibility: set single trigger directly + if len(suspended_result.triggers) == 1: + suspended_result.trigger = suspended_result.triggers[0] return suspended_result diff --git a/src/uipath/runtime/resumable/trigger.py b/src/uipath/runtime/resumable/trigger.py index 796fde2..ba7a12c 100644 --- a/src/uipath/runtime/resumable/trigger.py +++ b/src/uipath/runtime/resumable/trigger.py @@ -49,6 +49,7 @@ class UiPathApiTrigger(BaseModel): class UiPathResumeTrigger(BaseModel): """Information needed to resume execution.""" + interrupt_id: str | None = Field(default=None, alias="interruptId") trigger_type: UiPathResumeTriggerType = Field( default=UiPathResumeTriggerType.API, alias="triggerType" ) diff --git a/tests/test_chat_runtime.py b/tests/test_chat.py similarity index 99% rename from tests/test_chat_runtime.py rename to tests/test_chat.py index 2101c6a..e9c1525 100644 --- a/tests/test_chat_runtime.py +++ b/tests/test_chat.py @@ -145,6 +145,7 @@ async def stream( yield UiPathRuntimeResult( status=UiPathRuntimeStatus.SUSPENDED, trigger=UiPathResumeTrigger( + interrupt_id="interrupt-1", trigger_type=UiPathResumeTriggerType.API, payload={"action": "confirm_tool_call"}, ), diff --git a/tests/test_resumable.py b/tests/test_resumable.py new file mode 100644 index 0000000..2233a17 --- /dev/null +++ b/tests/test_resumable.py @@ -0,0 +1,258 @@ +"""Tests for UiPathResumableRuntime with multiple triggers.""" + +from __future__ import annotations + +from typing import Any, AsyncGenerator, cast +from unittest.mock import AsyncMock, Mock + +import pytest +from uipath.core.errors import UiPathPendingTriggerError + +from uipath.runtime import ( + UiPathExecuteOptions, + UiPathResumeTrigger, + UiPathResumeTriggerType, + UiPathRuntimeResult, + UiPathRuntimeStatus, + UiPathStreamOptions, +) +from uipath.runtime.events import UiPathRuntimeEvent +from uipath.runtime.resumable.protocols import ( + UiPathResumeTriggerProtocol, +) +from uipath.runtime.resumable.runtime import UiPathResumableRuntime +from uipath.runtime.schema import UiPathRuntimeSchema + + +class MultiTriggerMockRuntime: + """Mock runtime that simulates parallel branching with multiple interrupts.""" + + def __init__(self) -> None: + self.execution_count = 0 + + async def dispose(self) -> None: + pass + + async def execute( + self, + input: dict[str, Any] | None = None, + options: UiPathExecuteOptions | None = None, + ) -> UiPathRuntimeResult: + """Simulate parallel branches with progressive suspensions.""" + self.execution_count += 1 + is_resume = options and options.resume + + if self.execution_count == 1: + # First execution: suspend with 2 parallel interrupts + return UiPathRuntimeResult( + status=UiPathRuntimeStatus.SUSPENDED, + output={ + "int-1": {"action": "approve_branch_1"}, + "int-2": {"action": "approve_branch_2"}, + }, + ) + elif self.execution_count == 2: + # Second execution: int-1 completed, int-2 still pending + new int-3 + # input should contain: {"int-1": {"approved": True}} + assert is_resume + assert input is not None + assert "int-1" in input + + return UiPathRuntimeResult( + status=UiPathRuntimeStatus.SUSPENDED, + output={ + "int-2": {"action": "approve_branch_2"}, # still pending + "int-3": {"action": "approve_branch_3"}, # new interrupt + }, + ) + else: + # Third execution: all completed + assert is_resume + assert input is not None + assert "int-2" in input + assert "int-3" in input + + return UiPathRuntimeResult( + status=UiPathRuntimeStatus.SUCCESSFUL, + output={"completed": True, "resume_data": input}, + ) + + async def stream( + self, + input: dict[str, Any] | None = None, + options: UiPathStreamOptions | None = None, + ) -> AsyncGenerator[UiPathRuntimeEvent, None]: + """Stream version of execute.""" + result = await self.execute(input, options) + yield result + + async def get_schema(self) -> UiPathRuntimeSchema: + raise NotImplementedError() + + +class StatefulStorageMock: + """Stateful storage mock that tracks triggers.""" + + def __init__(self) -> None: + self.triggers: list[UiPathResumeTrigger] = [] + + async def get_triggers(self, runtime_id: str) -> list[UiPathResumeTrigger]: + return list(self.triggers) + + async def save_triggers( + self, runtime_id: str, triggers: list[UiPathResumeTrigger] + ) -> None: + self.triggers = list(triggers) + + async def delete_trigger( + self, runtime_id: str, trigger: UiPathResumeTrigger + ) -> None: + self.triggers = [ + t for t in self.triggers if t.interrupt_id != trigger.interrupt_id + ] + + async def set_value( + self, runtime_id: str, namespace: str, key: str, value: Any + ) -> None: + pass + + async def get_value(self, runtime_id: str, namespace: str, key: str) -> Any: + return None + + +def make_trigger_manager_mock() -> UiPathResumeTriggerProtocol: + """Create trigger manager mock.""" + manager = Mock(spec=UiPathResumeTriggerProtocol) + + def create_trigger_impl(data: dict[str, Any]) -> UiPathResumeTrigger: + return UiPathResumeTrigger( + interrupt_id="", # Will be set by resumable runtime + trigger_type=UiPathResumeTriggerType.API, + payload=data, + ) + + manager.create_trigger = AsyncMock(side_effect=create_trigger_impl) + manager.read_trigger = AsyncMock() + + return cast(UiPathResumeTriggerProtocol, manager) + + +@pytest.mark.asyncio +async def test_resumable_creates_multiple_triggers_on_first_suspension(): + """First suspension with parallel branches should create multiple triggers.""" + + runtime_impl = MultiTriggerMockRuntime() + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() + + resumable = UiPathResumableRuntime( + delegate=runtime_impl, + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) + + result = await resumable.execute({}) + + # Should be suspended with 2 triggers + assert result.status == UiPathRuntimeStatus.SUSPENDED + assert result.triggers is not None + assert len(result.triggers) == 2 + assert {t.interrupt_id for t in result.triggers} == {"int-1", "int-2"} + + # Check payloads by interrupt_id (order not guaranteed) + payloads_by_id = {t.interrupt_id: t.payload for t in result.triggers} + assert payloads_by_id["int-1"] == {"action": "approve_branch_1"} + assert payloads_by_id["int-2"] == {"action": "approve_branch_2"} + + # Both triggers should be created and saved + assert cast(AsyncMock, trigger_manager.create_trigger).await_count == 2 + assert len(storage.triggers) == 2 + + +@pytest.mark.asyncio +async def test_resumable_adds_only_new_triggers_on_partial_resume(): + """Partial resume should keep pending trigger and add only new ones.""" + + runtime_impl = MultiTriggerMockRuntime() + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() + + # First execution + resumable = UiPathResumableRuntime( + delegate=runtime_impl, + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) + + result1 = await resumable.execute({}) + assert result1.triggers is not None + assert len(result1.triggers) == 2 # int-1, int-2 + + # Create async side effect function for read_trigger + async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: + if trigger.interrupt_id == "int-1": + return {"approved": True} + raise UiPathPendingTriggerError("still pending") + + # Replace the mock with new side_effect + trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore + + # Second execution (resume) + result2 = await resumable.execute(None, options=UiPathExecuteOptions(resume=True)) + + # Should have 2 triggers: int-2 (existing) + int-3 (new) + assert result2.status == UiPathRuntimeStatus.SUSPENDED + assert result2.triggers is not None + assert len(result2.triggers) == 2 + assert {t.interrupt_id for t in result2.triggers} == {"int-2", "int-3"} + + # Only one new trigger created (int-3) - total 3 calls (2 from first + 1 new) + assert cast(AsyncMock, trigger_manager.create_trigger).await_count == 3 + + +@pytest.mark.asyncio +async def test_resumable_completes_after_all_triggers_resolved(): + """After all triggers resolved, execution should complete successfully.""" + + runtime_impl = MultiTriggerMockRuntime() + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() + + resumable = UiPathResumableRuntime( + delegate=runtime_impl, + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) + + # First execution - creates int-1, int-2 + await resumable.execute({}) + + # Create async side effect for second resume + async def read_trigger_impl_2(trigger: UiPathResumeTrigger) -> dict[str, Any]: + if trigger.interrupt_id == "int-1": + return {"approved": True} + raise UiPathPendingTriggerError("pending") + + trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl_2) # type: ignore + + # Second execution - int-1 resolved, creates int-3 + await resumable.execute(None, options=UiPathExecuteOptions(resume=True)) + + # Create async side effect for final resume + async def read_trigger_impl_3(trigger: UiPathResumeTrigger) -> dict[str, Any]: + return {"approved": True} + + trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl_3) # type: ignore + + # Third execution - int-2 and int-3 both resolved + result = await resumable.execute(None, options=UiPathExecuteOptions(resume=True)) + + # Should be successful now + assert result.status == UiPathRuntimeStatus.SUCCESSFUL + assert isinstance(result.output, dict) + assert result.output["completed"] is True + assert "int-2" in result.output["resume_data"] + assert "int-3" in result.output["resume_data"]