Skip to content

Commit 614f9a5

Browse files
mefogleclaude
andcommitted
fix(adk): improve event handling and HITL tool processing
- Fix EventTranslator to close active streams on empty final responses - Add HITL/frontend tool scenario handling in message processing loop - Skip historical backend tool interactions to prevent duplicate processing - Change state update author from "system" to "user" to avoid ADK warnings - Add debug logging for message loop and background execution 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 9467ba2 commit 614f9a5

File tree

3 files changed

+61
-22
lines changed

3 files changed

+61
-22
lines changed

integrations/adk-middleware/python/src/ag_ui_adk/adk_agent.py

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -375,13 +375,11 @@ async def run(self, input: RunAgentInput) -> AsyncGenerator[BaseEvent, None]:
375375
skip_tool_message_batch = False
376376

377377
# Check if there are pending tool calls AND tool results in unseen messages
378-
# If so, we should skip to the tool results first
379-
pending_tool_call_ids = await self._get_pending_tool_call_ids(input.thread_id)
380-
has_pending_tools = pending_tool_call_ids is not None and len(pending_tool_call_ids) > 0
378+
has_pending_tools = await self._has_pending_tool_calls(input.thread_id)
381379
has_tool_results_in_unseen = any(getattr(msg, "role", None) == "tool" for msg in unseen_messages)
382380

383381
if has_pending_tools and has_tool_results_in_unseen:
384-
# Find the index of the first tool result and skip to it
382+
# HITL/Frontend tool scenario: skip to the tool results first
385383
for i, msg in enumerate(unseen_messages):
386384
if getattr(msg, "role", None) == "tool":
387385
# Mark all messages before the tool result as processed (they're already in the ADK session)
@@ -395,6 +393,8 @@ async def run(self, input: RunAgentInput) -> AsyncGenerator[BaseEvent, None]:
395393
index = i
396394
break
397395

396+
logger.debug(f"[RUN_LOOP] Starting message loop for thread={input.thread_id}, total_unseen={total_unseen}, starting_index={index}")
397+
398398
while index < total_unseen:
399399
current = unseen_messages[index]
400400
role = getattr(current, "role", None)
@@ -508,6 +508,38 @@ async def run(self, input: RunAgentInput) -> AsyncGenerator[BaseEvent, None]:
508508
else:
509509
skip_tool_message_batch = False
510510

511+
# Check if there's an upcoming tool batch that will be skipped
512+
# If so, this non-tool batch is part of historical backend tool interaction
513+
# and should also be skipped
514+
upcoming_tool_batch_skipped = False
515+
if index < total_unseen and getattr(unseen_messages[index], "role", None) == "tool":
516+
# Peek at the upcoming tool batch
517+
peek_idx = index
518+
upcoming_tool_call_ids = []
519+
while peek_idx < total_unseen and getattr(unseen_messages[peek_idx], "role", None) == "tool":
520+
tool_call_id = getattr(unseen_messages[peek_idx], "tool_call_id", None)
521+
if tool_call_id:
522+
upcoming_tool_call_ids.append(tool_call_id)
523+
peek_idx += 1
524+
525+
if upcoming_tool_call_ids:
526+
pending_ids = await self._get_pending_tool_call_ids(input.thread_id)
527+
if pending_ids is not None:
528+
pending_set = set(pending_ids)
529+
# If NONE of the upcoming tool results match pending, they're historical
530+
if not any(tc_id in pending_set for tc_id in upcoming_tool_call_ids):
531+
upcoming_tool_batch_skipped = True
532+
533+
if upcoming_tool_batch_skipped:
534+
# Skip this message batch - it's part of historical backend tool interaction
535+
# Mark the messages as processed
536+
logger.debug(f"[RUN_LOOP] Skipping message batch (upcoming tool batch will be skipped)")
537+
batch_ids = self._collect_message_ids(message_batch)
538+
if batch_ids:
539+
self._session_manager.mark_messages_processed(app_name, input.thread_id, batch_ids)
540+
continue
541+
542+
logger.debug(f"[RUN_LOOP] Calling _start_new_execution with message_batch of {len(message_batch)} messages")
511543
async for event in self._start_new_execution(input, message_batch=message_batch):
512544
yield event
513545

@@ -1049,6 +1081,8 @@ async def _run_adk_in_background(
10491081
event_queue: Queue for emitting events
10501082
"""
10511083
runner: Optional[Runner] = None
1084+
logger.debug(f"[BG_EXEC] _run_adk_in_background called for thread={input.thread_id}")
1085+
logger.debug(f"[BG_EXEC] tool_results={len(tool_results) if tool_results else 0}, message_batch={len(message_batch) if message_batch else 0}")
10521086
try:
10531087
# Agent is already prepared with tools and SystemMessage instructions (if any)
10541088
# from _start_background_execution, so no additional agent copying needed here

integrations/adk-middleware/python/src/ag_ui_adk/event_translator.py

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -279,23 +279,10 @@ async def _translate_text_content(
279279
return
280280

281281
combined_text = "".join(text_parts)
282-
if not combined_text:
283-
return
284-
285-
# Use proper ADK streaming detection (handle None values)
286-
is_partial = getattr(adk_event, 'partial', False)
287-
turn_complete = getattr(adk_event, 'turn_complete', False)
288-
289-
# (is_final_response is already calculated above)
290-
291-
# Handle None values: if a turn is complete or a final chunk arrives, end streaming
292-
has_finish_reason = bool(getattr(adk_event, 'finish_reason', None))
293-
should_send_end = (
294-
(turn_complete and not is_partial)
295-
or (is_final_response and not is_partial)
296-
or (has_finish_reason and self._is_streaming)
297-
)
298282

283+
# Handle is_final_response BEFORE the empty text early return.
284+
# An empty final response is a valid stream-closing signal that must close
285+
# any active stream, even if there's no new text content.
299286
if is_final_response:
300287
# This is the final, complete message event.
301288

@@ -365,7 +352,23 @@ async def _translate_text_content(
365352
self._last_streamed_run_id = None
366353
return
367354

368-
355+
# Early return for empty text (non-final responses only).
356+
# Final responses with empty text are handled above to close active streams.
357+
if not combined_text:
358+
return
359+
360+
# Use proper ADK streaming detection (handle None values)
361+
is_partial = getattr(adk_event, 'partial', False)
362+
turn_complete = getattr(adk_event, 'turn_complete', False)
363+
364+
# Handle None values: if a turn is complete or a final chunk arrives, end streaming
365+
has_finish_reason = bool(getattr(adk_event, 'finish_reason', None))
366+
should_send_end = (
367+
(turn_complete and not is_partial)
368+
or (is_final_response and not is_partial)
369+
or (has_finish_reason and self._is_streaming)
370+
)
371+
369372
# Handle streaming logic (if not is_final_response)
370373
if not self._is_streaming:
371374
# Start of new message - emit START event

integrations/adk-middleware/python/src/ag_ui_adk/session_manager.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,10 +196,12 @@ async def update_session_state(
196196
# This depends on ADK's behavior - may need to explicitly clear
197197

198198
# Create event with state changes
199+
# Use "user" as author since state updates come from the frontend
200+
# Note: Using "system" causes ADK runner warnings in _find_agent_to_run
199201
actions = EventActions(state_delta=state_delta)
200202
event = Event(
201203
invocation_id=f"state_update_{int(time.time())}",
202-
author="system",
204+
author="user",
203205
actions=actions,
204206
timestamp=time.time()
205207
)

0 commit comments

Comments
 (0)