Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-runtime"
version = "0.2.9"
version = "0.3.0"
description = "Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
4 changes: 2 additions & 2 deletions src/uipath/runtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from uipath.runtime.chat.runtime import UiPathChatRuntime
from uipath.runtime.context import UiPathRuntimeContext
from uipath.runtime.debug.breakpoint import UiPathBreakpointResult
from uipath.runtime.debug.bridge import UiPathDebugBridgeProtocol
from uipath.runtime.debug.bridge import UiPathDebugProtocol
from uipath.runtime.debug.exception import UiPathDebugQuitError
from uipath.runtime.debug.runtime import (
UiPathDebugRuntime,
Expand Down Expand Up @@ -63,7 +63,7 @@
"UiPathResumeTriggerType",
"UiPathResumableRuntime",
"UiPathDebugQuitError",
"UiPathDebugBridgeProtocol",
"UiPathDebugProtocol",
"UiPathDebugRuntime",
"UiPathBreakpointResult",
"UiPathStreamNotSupportedError",
Expand Down
27 changes: 25 additions & 2 deletions src/uipath/runtime/chat/protocol.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
"""Abstract conversation bridge interface."""

from typing import Protocol
from typing import Any, Protocol

from uipath.core.chat import UiPathConversationMessageEvent
from uipath.core.chat import (
UiPathConversationMessageEvent,
)

from uipath.runtime.result import UiPathRuntimeResult


class UiPathChatProtocol(Protocol):
Expand All @@ -28,3 +32,22 @@ async def emit_message_event(
message_event: UiPathConversationMessageEvent to wrap and send
"""
...

async def emit_interrupt_event(
self,
interrupt_event: UiPathRuntimeResult,
) -> None:
"""Wrap and send an interrupt event.

Args:
interrupt_event: UiPathConversationInterruptEvent to wrap and send
"""
...

async def emit_exchange_end_event(self) -> None:
"""Send an exchange end event."""
...

async def wait_for_resume(self) -> dict[str, Any]:
"""Wait for the interrupt_end event to be received."""
...
43 changes: 38 additions & 5 deletions src/uipath/runtime/chat/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
UiPathRuntimeResult,
UiPathRuntimeStatus,
)
from uipath.runtime.resumable.trigger import UiPathResumeTriggerType
from uipath.runtime.schema import UiPathRuntimeSchema

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -65,12 +66,44 @@ async def stream(
"""Stream execution events with chat support."""
await self.chat_bridge.connect()

async for event in self.delegate.stream(input, options=options):
if isinstance(event, UiPathRuntimeMessageEvent):
if event.payload:
await self.chat_bridge.emit_message_event(event.payload)
execution_completed = False
current_input = input
current_options = UiPathStreamOptions(
resume=options.resume if options else False,
breakpoints=options.breakpoints if options else None,
)

while not execution_completed:
async for event in self.delegate.stream(
current_input, options=current_options
):
if isinstance(event, UiPathRuntimeMessageEvent):
if event.payload:
await self.chat_bridge.emit_message_event(event.payload)

if isinstance(event, UiPathRuntimeResult):
runtime_result = event

if (
runtime_result.status == UiPathRuntimeStatus.SUSPENDED
and runtime_result.trigger
and runtime_result.trigger.trigger_type
== UiPathResumeTriggerType.API
):
await self.chat_bridge.emit_interrupt_event(runtime_result)
resume_data = await self.chat_bridge.wait_for_resume()

# Continue with resumed execution
current_input = resume_data
current_options.resume = True
break
else:
yield event
execution_completed = True
else:
yield event

yield event
await self.chat_bridge.emit_exchange_end_event()

async def get_schema(self) -> UiPathRuntimeSchema:
"""Get schema from the delegate runtime."""
Expand Down
4 changes: 2 additions & 2 deletions src/uipath/runtime/debug/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
"""Initialization module for the debug package."""

from uipath.runtime.debug.breakpoint import UiPathBreakpointResult
from uipath.runtime.debug.bridge import UiPathDebugBridgeProtocol
from uipath.runtime.debug.bridge import UiPathDebugProtocol
from uipath.runtime.debug.exception import (
UiPathDebugQuitError,
)
from uipath.runtime.debug.runtime import UiPathDebugRuntime

__all__ = [
"UiPathDebugQuitError",
"UiPathDebugBridgeProtocol",
"UiPathDebugProtocol",
"UiPathDebugRuntime",
"UiPathBreakpointResult",
]
2 changes: 1 addition & 1 deletion src/uipath/runtime/debug/bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
)


class UiPathDebugBridgeProtocol(Protocol):
class UiPathDebugProtocol(Protocol):
"""Abstract interface for debug communication.

Implementations: SignalR, Console, WebSocket, etc.
Expand Down
6 changes: 3 additions & 3 deletions src/uipath/runtime/debug/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
)
from uipath.runtime.debug import (
UiPathBreakpointResult,
UiPathDebugBridgeProtocol,
UiPathDebugProtocol,
UiPathDebugQuitError,
)
from uipath.runtime.events import (
Expand Down Expand Up @@ -42,7 +42,7 @@ class UiPathDebugRuntime:
def __init__(
self,
delegate: UiPathRuntimeProtocol,
debug_bridge: UiPathDebugBridgeProtocol,
debug_bridge: UiPathDebugProtocol,
trigger_poll_interval: float = 5.0,
):
"""Initialize the UiPathDebugRuntime.
Expand All @@ -54,7 +54,7 @@ def __init__(
"""
super().__init__()
self.delegate = delegate
self.debug_bridge: UiPathDebugBridgeProtocol = debug_bridge
self.debug_bridge: UiPathDebugProtocol = debug_bridge
if trigger_poll_interval < 0:
raise ValueError("trigger_poll_interval must be >= 0")
self.trigger_poll_interval = trigger_poll_interval
Expand Down
9 changes: 8 additions & 1 deletion src/uipath/runtime/result.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class UiPathRuntimeResult(UiPathRuntimeEvent):
output: dict[str, Any] | BaseModel | str | None = None
status: UiPathRuntimeStatus = UiPathRuntimeStatus.SUCCESSFUL
trigger: UiPathResumeTrigger | None = None
triggers: list[UiPathResumeTrigger] | None = None
error: UiPathErrorContract | None = None

event_type: UiPathRuntimeEventType = Field(
Expand All @@ -42,14 +43,20 @@ def to_dict(self) -> dict[str, Any]:
else:
output_data = self.output

result = {
result: dict[str, Any] = {
"output": output_data,
"status": self.status,
}

if self.trigger:
result["resume"] = self.trigger.model_dump(by_alias=True)

if self.triggers:
result["resumeTriggers"] = [
resume_trigger.model_dump(by_alias=True)
for resume_trigger in self.triggers
]

if self.error:
result["error"] = self.error.model_dump()

Expand Down
36 changes: 34 additions & 2 deletions src/uipath/runtime/resumable/protocols.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
class UiPathResumableStorageProtocol(Protocol):
"""Protocol for storing and retrieving resume triggers."""

async def save_trigger(self, trigger: UiPathResumeTrigger) -> None:
async def save_trigger(self, runtime_id: str, trigger: UiPathResumeTrigger) -> None:
"""Save a resume trigger to storage.

Args:
Expand All @@ -19,7 +19,7 @@ async def save_trigger(self, trigger: UiPathResumeTrigger) -> None:
"""
...

async def get_latest_trigger(self) -> UiPathResumeTrigger | None:
async def get_latest_trigger(self, runtime_id: str) -> UiPathResumeTrigger | None:
"""Retrieve the most recent resume trigger from storage.

Returns:
Expand All @@ -30,6 +30,38 @@ async def get_latest_trigger(self) -> UiPathResumeTrigger | None:
"""
...

async def set_value(
self, runtime_id: str, namespace: str, key: str, value: Any
) -> None:
"""Store values for a specific runtime.

Args:
runtime_id: The runtime ID
namespace: The namespace of the persisted value
key: The key associated with the persisted value
value: The value to persist

Raises:
Exception: If storage operation fails
"""
...

async def get_value(self, runtime_id: str, namespace: str, key: str) -> Any:
"""Retrieve values for a specific runtime from storage.

Args:
runtime_id: The runtime ID
namespace: The namespace of the persisted value
key: The key associated with the persisted value

Returns:
The value matching the method's parameters, or None if it does not exist

Raises:
Exception: If retrieval operation fails
"""
...


class UiPathResumeTriggerCreatorProtocol(Protocol):
"""Protocol for creating resume triggers from suspend values."""
Expand Down
11 changes: 7 additions & 4 deletions src/uipath/runtime/resumable/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
UiPathRuntimeProtocol,
UiPathStreamOptions,
)
from uipath.runtime.debug import UiPathBreakpointResult
from uipath.runtime.debug.breakpoint import UiPathBreakpointResult
from uipath.runtime.events import UiPathRuntimeEvent
from uipath.runtime.result import UiPathRuntimeResult, UiPathRuntimeStatus
from uipath.runtime.resumable.protocols import (
Expand All @@ -35,17 +35,20 @@ def __init__(
delegate: UiPathRuntimeProtocol,
storage: UiPathResumableStorageProtocol,
trigger_manager: UiPathResumeTriggerProtocol,
runtime_id: str,
):
"""Initialize the resumable runtime wrapper.

Args:
delegate: The underlying runtime to wrap
storage: Storage for persisting/retrieving resume triggers
trigger_manager: Manager for creating and reading resume triggers
runtime_id: Id used for runtime orchestration
"""
self.delegate = delegate
self.storage = storage
self.trigger_manager = trigger_manager
self.runtime_id = runtime_id

async def execute(
self,
Expand Down Expand Up @@ -115,7 +118,7 @@ async def _restore_resume_input(
return input

# Otherwise, fetch from storage
trigger = await self.storage.get_latest_trigger()
trigger = await self.storage.get_latest_trigger(self.runtime_id)
if not trigger:
return None

Expand All @@ -141,7 +144,7 @@ async def _handle_suspension(

# Check if trigger already exists in result
if result.trigger:
await self.storage.save_trigger(result.trigger)
await self.storage.save_trigger(self.runtime_id, result.trigger)
return result

suspended_result = UiPathRuntimeResult(
Expand All @@ -154,7 +157,7 @@ async def _handle_suspension(
result.output
)

await self.storage.save_trigger(suspended_result.trigger)
await self.storage.save_trigger(self.runtime_id, suspended_result.trigger)

return suspended_result

Expand Down
Loading