From df0ee3fc17cf8f0c4bfd7b5faa82b0be891f34cb Mon Sep 17 00:00:00 2001 From: Domenico Date: Tue, 2 Dec 2025 15:35:20 -0300 Subject: [PATCH 1/3] fix(runtime): resolve deadlock on converging conditional paths Previously, when conditional branches merged into a single node, execution would silently halt. The merging node would wait indefinitely for input from the "skipped" branch. This fix propagates the ignored status to downstream dependencies by injecting `null` inputs. This satisfies the waiting condition (`hasReceivedRequiredInputs`) and allows the flow to proceed using data from the active branch only. --- packages/server/src/utils/buildAgentflow.ts | 54 +++++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/packages/server/src/utils/buildAgentflow.ts b/packages/server/src/utils/buildAgentflow.ts index 1a960947627..bc0bba2a3f6 100644 --- a/packages/server/src/utils/buildAgentflow.ts +++ b/packages/server/src/utils/buildAgentflow.ts @@ -820,6 +820,60 @@ async function processNodeOutputs({ const ignoreNodeIds = await determineNodesToIgnore(currentNode, result, edges, nodeId) if (ignoreNodeIds.length) { logger.debug(` ⏭️ Skipping nodes: [${ignoreNodeIds.join(', ')}]`) + + // Propagar la señal de "salto" a los nodos que dependen de los nodos ignorados + // para evitar que se queden esperando indefinidamente. + const nodesToIgnoreQueue = [...ignoreNodeIds] + const processedIgnoredNodes = new Set() + + while (nodesToIgnoreQueue.length > 0) { + const ignoredId = nodesToIgnoreQueue.shift()! + if (processedIgnoredNodes.has(ignoredId)) continue + processedIgnoredNodes.add(ignoredId) + + // Buscar bordes que salgan del nodo ignorado hacia otros nodos + const downstreamEdges = edges.filter((edge) => edge.source === ignoredId) + + for (const edge of downstreamEdges) { + const targetId = edge.target + + // Obtener o inicializar el nodo que está esperando + let waitingNode = waitingNodes.get(targetId) + if (!waitingNode) { + waitingNode = setupNodeDependencies(targetId, edges, nodes) + waitingNodes.set(targetId, waitingNode) + } + + // Marcar el input del nodo ignorado como recibido (con valor null) + // Esto satisface la dependencia sin aportar datos corruptos + if (!waitingNode.receivedInputs.has(ignoredId)) { + waitingNode.receivedInputs.set(ignoredId, null) + } + + // Verificar si el nodo objetivo ya está listo para ejecutarse ahora que + // hemos "satisfecho" la dependencia muerta + if (hasReceivedRequiredInputs(waitingNode)) { + logger.debug(` ✅ Node ${targetId} ready (dependency skipped)!`) + waitingNodes.delete(targetId) + + const combinedInputs = combineNodeInputs(waitingNode.receivedInputs) + + // Si todos los inputs son null (o no hay inputs válidos), + // entonces este nodo también debe ser ignorado. + if (combinedInputs === null) { + logger.debug(` ⏭️ Node ${targetId} also ignored (all inputs null)`) + nodesToIgnoreQueue.push(targetId) + } else { + // Si hay al menos un input válido (convergencia), se ejecuta. + nodeExecutionQueue.push({ + nodeId: targetId, + data: combinedInputs, + inputs: Object.fromEntries(waitingNode.receivedInputs) + }) + } + } + } + } } for (const childId of childNodeIds) { From 866bf4ade42a32c6b571b473b5107178f0ec42d5 Mon Sep 17 00:00:00 2001 From: Domenico Date: Tue, 2 Dec 2025 16:20:23 -0300 Subject: [PATCH 2/3] refactor(runtime): improve deadlock resolution performance This refactoring improves efficiency by using the graph object for a direct lookup of downstream nodes, instead of filtering the entire edges array in each loop iteration. This avoids an O(E) operation (where E is the number of edges) inside the loop, making the code cleaner and more performant, especially for larger graphs. Co-authored-by: @gemini-code-assist --- packages/server/src/utils/buildAgentflow.ts | 25 ++++++++++----------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/packages/server/src/utils/buildAgentflow.ts b/packages/server/src/utils/buildAgentflow.ts index bc0bba2a3f6..78fa3754075 100644 --- a/packages/server/src/utils/buildAgentflow.ts +++ b/packages/server/src/utils/buildAgentflow.ts @@ -831,40 +831,39 @@ async function processNodeOutputs({ if (processedIgnoredNodes.has(ignoredId)) continue processedIgnoredNodes.add(ignoredId) - // Buscar bordes que salgan del nodo ignorado hacia otros nodos - const downstreamEdges = edges.filter((edge) => edge.source === ignoredId) + // Get downstream nodes + const downstreamNodeIds = graph[ignoredId] || [] - for (const edge of downstreamEdges) { - const targetId = edge.target - - // Obtener o inicializar el nodo que está esperando + for (const targetId of downstreamNodeIds) { + // Get or initialize waiting node let waitingNode = waitingNodes.get(targetId) if (!waitingNode) { waitingNode = setupNodeDependencies(targetId, edges, nodes) waitingNodes.set(targetId, waitingNode) } - // Marcar el input del nodo ignorado como recibido (con valor null) - // Esto satisface la dependencia sin aportar datos corruptos + // Mark the ignored node input as received (with null value) + // This satisfies the dependency without providing corrupted data if (!waitingNode.receivedInputs.has(ignoredId)) { waitingNode.receivedInputs.set(ignoredId, null) } - // Verificar si el nodo objetivo ya está listo para ejecutarse ahora que - // hemos "satisfecho" la dependencia muerta + // Check if the target node is ready to execute now that + // we have satisfied the dead dependency if (hasReceivedRequiredInputs(waitingNode)) { logger.debug(` ✅ Node ${targetId} ready (dependency skipped)!`) waitingNodes.delete(targetId) const combinedInputs = combineNodeInputs(waitingNode.receivedInputs) - // Si todos los inputs son null (o no hay inputs válidos), - // entonces este nodo también debe ser ignorado. + // If all inputs are null, + // then ignore the node if (combinedInputs === null) { logger.debug(` ⏭️ Node ${targetId} also ignored (all inputs null)`) nodesToIgnoreQueue.push(targetId) } else { - // Si hay al menos un input válido (convergencia), se ejecuta. + // Otherwise if there is at least one valid input, + // execute the node. nodeExecutionQueue.push({ nodeId: targetId, data: combinedInputs, From 4a1cc522d1a49a52d9cd4953eb7bff993bac70c1 Mon Sep 17 00:00:00 2001 From: Domenico Date: Tue, 2 Dec 2025 16:24:52 -0300 Subject: [PATCH 3/3] docs: improve comments --- packages/server/src/utils/buildAgentflow.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/server/src/utils/buildAgentflow.ts b/packages/server/src/utils/buildAgentflow.ts index 78fa3754075..2b571b24aa9 100644 --- a/packages/server/src/utils/buildAgentflow.ts +++ b/packages/server/src/utils/buildAgentflow.ts @@ -821,8 +821,7 @@ async function processNodeOutputs({ if (ignoreNodeIds.length) { logger.debug(` ⏭️ Skipping nodes: [${ignoreNodeIds.join(', ')}]`) - // Propagar la señal de "salto" a los nodos que dependen de los nodos ignorados - // para evitar que se queden esperando indefinidamente. + // Propagate a skip signal on downstream nodes const nodesToIgnoreQueue = [...ignoreNodeIds] const processedIgnoredNodes = new Set()