From 1c7ec580570f195264d2fc7ddd6a343c1370d34c Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Thu, 12 Mar 2026 19:45:20 +0530 Subject: [PATCH 01/18] add http app --- ai.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/ai.go b/ai.go index 93b665d4..ee45a847 100644 --- a/ai.go +++ b/ai.go @@ -7636,6 +7636,11 @@ func HandleAiAgentExecutionStart(execution WorkflowExecution, startNode Action, decidedApps += lowername + ", " } + + // Let's inject http. + if !strings.Contains(decidedApps, "http") { + decidedApps += "http, " + } } if len(decidedApps) > 0 { @@ -8676,6 +8681,10 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( } } } + + if createNextActions { + return startNode, nil + } // 1. Map the response back newResult, err := json.Marshal(resultMapping) From 985eda466c8b2e6b8c69cae06b8fd5a6e17d3167 Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Thu, 12 Mar 2026 19:45:45 +0530 Subject: [PATCH 02/18] fix agent stuck loading due to timeout --- db-connector.go | 21 +++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) diff --git a/db-connector.go b/db-connector.go index 2cad9091..35bff75f 100755 --- a/db-connector.go +++ b/db-connector.go @@ -2017,7 +2017,7 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor finishedDecisions = append(finishedDecisions, decision.RunDetails.Id) continue } else if decision.RunDetails.Status == "FAILURE" { - //finishedDecisions = append(finishedDecisions, decision.RunDetails.Id) + finishedDecisions = append(finishedDecisions, decision.RunDetails.Id) failedFound = true continue } else if decision.RunDetails.Status == "RUNNING" && decision.Action != "ask" { @@ -2030,8 +2030,11 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor mappedOutput.Decisions[decisionIndex].RunDetails.Status = "FAILURE" mappedOutput.Decisions[decisionIndex].RunDetails.CompletedAt = time.Now().Unix() mappedOutput.Decisions[decisionIndex].RunDetails.RawResponse += "\n[ERROR] Decision marked as FAILURE due to 5 minute timeout." - } + // Count this as finished + failed so recovery triggers in the same Fixexecution run + finishedDecisions = append(finishedDecisions, decision.RunDetails.Id) + failedFound = true + } } else { if decision.RunDetails.CompletedAt > 0 { if debug { @@ -2133,7 +2136,7 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor go sendAgentActionSelfRequest("SUCCESS", workflowExecution, workflowExecution.Results[resultIndex]) }() } else { - log.Printf("[INFO][%s] All decisions finished for agent action %s - but no finish action found, marking as WAITING.", workflowExecution.ExecutionId, action.ID) + log.Printf("[INFO][%s] All decisions finished for agent action %s - but no finish action found. Re-invoking agent to finalize (failedFound: %t).", workflowExecution.ExecutionId, action.ID, failedFound) mappedOutput.Status = "RUNNING" mappedOutput.CompletedAt = 0 @@ -2143,10 +2146,16 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor workflowExecution.Status = "EXECUTING" } - // To ensure the execution is actually updated + // Re-invoke the agent so the LLM can see the failure and produce a proper "finish" decision. + + capturedExec := workflowExecution + capturedAction := action go func() { - time.Sleep(1 * time.Second) - sendAgentActionSelfRequest("WAITING", workflowExecution, workflowExecution.Results[resultIndex]) + time.Sleep(2 * time.Second) + _, err := HandleAiAgentExecutionStart(capturedExec, capturedAction, true) + if err != nil { + log.Printf("[ERROR][%s] Failed re-invoking agent after decisions completed for action %s: %s", capturedExec.ExecutionId, capturedAction.ID, err) + } }() } } else if (result.Status == "" || result.Status == "WAITING") && mappedOutput.Status == "FINISHED" { From 392ca5670effc377cd6a9a8da21a6015b81e6f87 Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Thu, 12 Mar 2026 19:46:08 +0530 Subject: [PATCH 03/18] added auth injection for other internal apps as well --- shared.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/shared.go b/shared.go index 0164c500..301829f2 100644 --- a/shared.go +++ b/shared.go @@ -17760,7 +17760,15 @@ func handleAgentDecisionStreamResult(workflowExecution WorkflowExecution, action // Handle agent decisionmaking. Use the same log.Printf("[INFO][%s] With the agent being finished, we are asking it whether it would like to do anything else", workflowExecution.ExecutionId) - returnAction, err := HandleAiAgentExecutionStart(workflowExecution, actionResult.Action, true) + var originalAction Action + if foundActionResultIndex >= 0 && foundActionResultIndex < len(workflowExecution.Results) { + originalAction = workflowExecution.Results[foundActionResultIndex].Action + } else { + // Fallback in case of an issue + originalAction = actionResult.Action + } + + returnAction, err := HandleAiAgentExecutionStart(workflowExecution, originalAction, true) if err != nil { log.Printf("[ERROR][%s] Failed handling agent execution start: %s", workflowExecution.ExecutionId, err) } @@ -22079,6 +22087,14 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by workflowExecution.OrgId = user.ActiveOrg.Id } + formattedAppName := strings.ReplaceAll(strings.ToLower(app.Name), " ", "_") + + isInternalShuffleApp := false + switch formattedAppName { + case "shuffle_datastore", "shuffle_org_management", "shuffle_app_management", "shuffle_workflow_management": + isInternalShuffleApp = true + } + if len(app.Name) == 0 && len(action.AppName) > 0 { app.Name = action.AppName } From cff8a26d590c2ac5b8e0fee46e8be941728f0cc3 Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Wed, 1 Apr 2026 19:58:58 +0530 Subject: [PATCH 04/18] include more conditions --- ai.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ai.go b/ai.go index ee45a847..d6406f00 100644 --- a/ai.go +++ b/ai.go @@ -8629,7 +8629,7 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( //log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s status=%s duration=%ds decisions=%d", execution.ExecutionId, agentOutput.Status, time.Now().Unix()-agentOutput.StartedAt, len(agentOutput.Decisions)) - if agentOutput.Status == "FINISHED" && agentOutput.CompletedAt > 0 && execution.Status == "EXECUTING" { + if agentOutput.Status == "FINISHED" && agentOutput.CompletedAt > 0 && execution.Status != "ABORTED" && execution.Status != "FAILURE" { duration := agentOutput.CompletedAt - agentOutput.StartedAt log.Printf("[INFO][%s] AI_AGENT_COMPLETE: org=%s duration=%ds decisions=%d llm_calls=%d total_tokens=%d status=SUCCESS", execution.ExecutionId, execution.Workflow.OrgId, duration, len(agentOutput.Decisions), agentOutput.LLMCallCount, agentOutput.TotalTokens) for resultIndex, result := range execution.Results { From 1b0788c81c7265c647ba610b3e055fd4224ef1a1 Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Thu, 2 Apr 2026 13:46:01 +0530 Subject: [PATCH 05/18] commented out not used code --- shared.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/shared.go b/shared.go index d71eb4f1..053e78a1 100644 --- a/shared.go +++ b/shared.go @@ -22094,13 +22094,13 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by workflowExecution.OrgId = user.ActiveOrg.Id } - formattedAppName := strings.ReplaceAll(strings.ToLower(app.Name), " ", "_") + // formattedAppName := strings.ReplaceAll(strings.ToLower(app.Name), " ", "_") - isInternalShuffleApp := false - switch formattedAppName { - case "shuffle_datastore", "shuffle_org_management", "shuffle_app_management", "shuffle_workflow_management": - isInternalShuffleApp = true - } + // isInternalShuffleApp := false + // switch formattedAppName { + // case "shuffle_datastore", "shuffle_org_management", "shuffle_app_management", "shuffle_workflow_management": + // isInternalShuffleApp = true + // } if len(app.Name) == 0 && len(action.AppName) > 0 { app.Name = action.AppName From 993cb63ecd03700f69663021e595cb3dbb4a280f Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Fri, 3 Apr 2026 22:10:41 +0530 Subject: [PATCH 06/18] a ton of changes again --- ai.go | 146 ++++++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 131 insertions(+), 15 deletions(-) diff --git a/ai.go b/ai.go index f6620398..c16d893a 100644 --- a/ai.go +++ b/ai.go @@ -7101,9 +7101,13 @@ func abortAgentExecution(ctx context.Context, execution WorkflowExecution, start marshalledOutput = []byte(`{"status":"FINISHED","error":"marshal error"}`) } - log.Printf("[ERROR][%s] AI_AGENT_ABORT: org=%s label=%s decisions=%d llm_calls=%d total_tokens=%d reason=%q", - execution.ExecutionId, execution.Workflow.OrgId, abortLabel, - len(agentOutput.Decisions), agentOutput.LLMCallCount, agentOutput.TotalTokens, reason) + log.Printf("[ERROR][%s] AI_AGENT_ABORT: org=%s label=%s decisions=%d llm_calls=%d total_tokens=%d reason=%q", execution.ExecutionId, execution.Workflow.OrgId, abortLabel, len(agentOutput.Decisions), agentOutput.LLMCallCount, agentOutput.TotalTokens, reason) + + abortDuration := int64(0) + if agentOutput.StartedAt > 0 && agentOutput.CompletedAt > agentOutput.StartedAt { + abortDuration = agentOutput.CompletedAt - agentOutput.StartedAt + } + log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=ABORTED duration=%ds decisions=%d llm_calls=%d tokens_used=%d reason=%q", execution.ExecutionId, execution.Workflow.OrgId, abortDuration, len(agentOutput.Decisions), agentOutput.LLMCallCount, agentOutput.TotalTokens, reason) abortResult := ActionResult{ Status: "SUCCESS", @@ -7235,6 +7239,7 @@ func HandleAiAgentExecutionStart(execution WorkflowExecution, startNode Action, Action: startNode, }) go SetWorkflowExecution(ctx, execution, true) + log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=ABORTED duration=0s decisions=0 llm_calls=0 tokens_used=0 reason=%q", execution.ExecutionId, execution.Workflow.OrgId, "missing_onprem_ai_config") return startNode, err } } @@ -7809,7 +7814,7 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( Action: startNode, }) go SetWorkflowExecution(ctx, execution, true) - + log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=ABORTED duration=0s decisions=0 llm_calls=0 tokens_used=0 reason=%q", execution.ExecutionId, execution.Workflow.OrgId, "no_user_message") return startNode, errors.New("No user message/input found for AI Agent start") } @@ -7933,7 +7938,7 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( Action: startNode, }) go SetWorkflowExecution(ctx, execution, true) - + log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=ABORTED duration=0s decisions=0 llm_calls=0 tokens_used=0 reason=%q", execution.ExecutionId, execution.Workflow.OrgId, "marshal_request_body_failed") return startNode, err } @@ -7949,7 +7954,7 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( Action: startNode, }) go SetWorkflowExecution(ctx, execution, true) - + log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=ABORTED duration=0s decisions=0 llm_calls=0 tokens_used=0 reason=%q", execution.ExecutionId, execution.Workflow.OrgId, "unsupported_app_not_openai") return startNode, errors.New("Unhandled Singul BODY for OpenAI agent (first request)") } @@ -8003,7 +8008,7 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( Action: startNode, }) go SetWorkflowExecution(ctx, execution, true) - + log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=ABORTED duration=0s decisions=0 llm_calls=0 tokens_used=0 reason=%q", execution.ExecutionId, execution.Workflow.OrgId, "marshal_ai_action_failed") return startNode, err } @@ -8151,12 +8156,13 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( Action: startNode, }) go SetWorkflowExecution(ctx, execution, true) - + log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=ABORTED duration=0s decisions=0 llm_calls=0 tokens_used=0 reason=%q", execution.ExecutionId, execution.Workflow.OrgId, "llm_response_unmarshal_failed") return startNode, err } if outputMap.Status != 200 { log.Printf("[ERROR][%s] AI Agent: Failed to run AI agent with status code %d", execution.ExecutionId, outputMap.Status) + // Don't log AI_AGENT_LLM_FAILURE here yet - wait to see if we can parse the error details below //return startNode, errors.New(fmt.Sprintf("Failed to run AI agent with status code %d", outputMap.Status)) } @@ -8168,6 +8174,9 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( log.Printf("[ERROR][%s] AI Agent: Failed to convert body to MAP in AI Agent response. Raw response: %s", execution.ExecutionId, string(resultMapping.Result)) choicesString = fmt.Sprintf("LLM Response Error: %s", string(resultMapping.Result)) + + // Log LLM failure for body parsing error + log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: org=%s status_code=%d error_type=body_parse_error raw_response=%s", execution.ExecutionId, execution.Workflow.OrgId, outputMap.Status, string(resultMapping.Result)) } else { bodyString, err = json.Marshal(bodyMap) if err != nil { @@ -8180,7 +8189,7 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( Action: startNode, }) go SetWorkflowExecution(ctx, execution, true) - + log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=ABORTED duration=0s decisions=0 llm_calls=0 tokens_used=0 reason=%q", execution.ExecutionId, execution.Workflow.OrgId, "llm_body_marshal_failed") return startNode, err } } @@ -8205,9 +8214,15 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( choicesString = fmt.Sprintf("LLM Error: %s", newOutput.Error.Message) resultMapping.Status = "FAILURE" + + // Log LLM failure with details + log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: org=%s status_code=%d error_type=%s error_message=%s", execution.ExecutionId, execution.Workflow.OrgId, outputMap.Status, newOutput.Error.Type, newOutput.Error.Message) } else { log.Printf("[ERROR][%s] AI Agent: No choices, nor error found in AI agent response. Status: %d. Raw: %s", execution.ExecutionId, outputMap.Status, bodyString) resultMapping.Status = "FAILURE" + + // Log LLM failure for unknown error format + log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: org=%s status_code=%d error_type=unknown_format raw_response=%s", execution.ExecutionId, execution.Workflow.OrgId, outputMap.Status, string(bodyString)) } } else { choicesString = openaiOutput.Choices[0].Message.Content @@ -8281,10 +8296,63 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( err = json.Unmarshal([]byte(decisionString), &mappedDecisions) if err != nil { log.Printf("[ERROR][%s] AI Agent: Failed unmarshalling decisions in AI Agent response (2): %s. String: %s", execution.ExecutionId, err, decisionString) - resultMapping.Status = "FAILURE" - // Updating the OUTPUT in some way to help the user a bit. - errorMessage = fmt.Sprintf("The output from the LLM had no decisions. See the raw decisions tring for the response. Contact support@shuffler.io if you think this is wrong.") + // FALLBACK: LLM returned raw content instead of agent decisions + // Wrap it in a proper finish decision structure + log.Printf("[INFO][%s] AI Agent: Applying fallback - wrapping raw LLM content in finish decision", execution.ExecutionId) + + var rawContent interface{} + jsonErr := json.Unmarshal([]byte(decisionString), &rawContent) + outputValue := "" + if jsonErr == nil { + // It's valid JSON, wrap it in {"reply": ...} + wrappedContent := map[string]interface{}{ + "reply": rawContent, + } + wrappedBytes, _ := json.Marshal(wrappedContent) + outputValue = string(wrappedBytes) + } else { + // Not valid JSON, wrap the raw string + wrappedContent := map[string]interface{}{ + "reply": decisionString, + } + wrappedBytes, _ := json.Marshal(wrappedContent) + outputValue = string(wrappedBytes) + } + + // Create a proper finish decision + b := make([]byte, 6) + _, randErr := rand.Read(b) + if randErr != nil { + log.Printf("[ERROR][%s] AI Agent: Failed generating random string for fallback decision", execution.ExecutionId) + } + + fallbackDecision := AgentDecision{ + I: lastFinishedIndex, + Category: "finish", + Action: "finish", + Tool: "core", + Confidence: 1.0, + Runs: "1", + ApprovalRequired: false, + Reason: "LLM returned unexpected format - applied fallback wrapping", + Fields: []Valuereplace{ + { + Key: "output", + Value: outputValue, + }, + }, + RunDetails: AgentDecisionRunDetails{ + Id: base64.RawURLEncoding.EncodeToString(b), + Status: "", + StartedAt: 0, + CompletedAt: 0, + }, + } + + mappedDecisions = []AgentDecision{fallbackDecision} + resultMapping.Status = "" + log.Printf("[INFO][%s] AI Agent: Fallback applied successfully - created finish decision with wrapped content", execution.ExecutionId) } } } @@ -8637,6 +8705,44 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( if !decisionActionRan { log.Printf("[ERROR][%s] AI Agent: No decision action was run. Marking the agent as FAILURE.", execution.ExecutionId) + + // Properly mark the agent as failed + agentOutput.Status = "FAILURE" + agentOutput.CompletedAt = time.Now().Unix() + + sentFailure := false + // Update the result status - finds the existing node entry in execution.Results + for resultIndex, result := range execution.Results { + if result.Action.ID != startNode.ID { + continue + } + + execution.Results[resultIndex].Status = "FAILURE" + execution.Results[resultIndex].CompletedAt = agentOutput.CompletedAt + + marshalledAgentOutput, err := json.Marshal(agentOutput) + if err != nil { + log.Printf("[ERROR] AI Agent: Failed marshalling agent output for FAILURE: %s", err) + } else { + execution.Results[resultIndex].Result = string(marshalledAgentOutput) + resultMapping.Result = string(marshalledAgentOutput) + } + + log.Printf("[DEBUG][%s] About to call sendAgentActionSelfRequest for FAILURE on agent action %s", execution.ExecutionId, startNode.ID) + go sendAgentActionSelfRequest("FAILURE", execution, execution.Results[resultIndex]) + sentFailure = true + break + } + + if !sentFailure { + log.Printf("[WARNING][%s] AI Agent: No result entry found in execution.Results, using resultMapping fallback", execution.ExecutionId) + fallbackOutput, merr := json.Marshal(agentOutput) + if merr == nil { + resultMapping.Status = "FAILURE" + resultMapping.Result = string(fallbackOutput) + } + go sendAgentActionSelfRequest("FAILURE", execution, resultMapping) + } } marshalledAgentOutput, err := json.Marshal(agentOutput) @@ -8680,8 +8786,8 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( //log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s status=%s duration=%ds decisions=%d", execution.ExecutionId, agentOutput.Status, time.Now().Unix()-agentOutput.StartedAt, len(agentOutput.Decisions)) if agentOutput.Status == "FINISHED" && agentOutput.CompletedAt > 0 && execution.Status != "ABORTED" && execution.Status != "FAILURE" { - duration := agentOutput.CompletedAt - agentOutput.StartedAt - log.Printf("[INFO][%s] AI_AGENT_COMPLETE: org=%s duration=%ds decisions=%d llm_calls=%d total_tokens=%d status=SUCCESS", execution.ExecutionId, execution.Workflow.OrgId, duration, len(agentOutput.Decisions), agentOutput.LLMCallCount, agentOutput.TotalTokens) + + foundResult := false for resultIndex, result := range execution.Results { if result.Action.ID != startNode.ID { continue @@ -8691,12 +8797,22 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( execution.Results[resultIndex].CompletedAt = agentOutput.CompletedAt log.Printf("[DEBUG][%s] About to call sendAgentActionSelfRequest for agent action %s", execution.ExecutionId, startNode.ID) go sendAgentActionSelfRequest("SUCCESS", execution, execution.Results[resultIndex]) + foundResult = true break } + + if !foundResult { + log.Printf("[DEBUG][%s] Result not found in execution.Results, using resultMapping for sendAgentActionSelfRequest", execution.ExecutionId) + resultMapping.Status = "SUCCESS" + resultMapping.CompletedAt = agentOutput.CompletedAt + go sendAgentActionSelfRequest("SUCCESS", execution, resultMapping) + } } } else { - log.Printf("[ERROR] AI Agent: No result found in AI agent response. Status: %d. Body: %s", newresp.StatusCode, string(body)) + // LLM returned an empty result body — this is a failure + log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: Empty result body from LLM response (status %d). Aborting agent.", execution.ExecutionId, newresp.StatusCode) + return abortAgentExecution(ctx, execution, startNode, oldAgentOutput, "empty_llm_result", fmt.Sprintf("LLM returned empty response body with HTTP status %d", newresp.StatusCode)) } if memorizationEngine == "shuffle_db" { From f87f52ef7aeac3c8918a36152f6ae450cd524ec5 Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Fri, 3 Apr 2026 22:11:33 +0530 Subject: [PATCH 07/18] undo some changes for now --- db-connector.go | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/db-connector.go b/db-connector.go index e7e54a4e..58395573 100755 --- a/db-connector.go +++ b/db-connector.go @@ -2017,7 +2017,7 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor finishedDecisions = append(finishedDecisions, decision.RunDetails.Id) continue } else if decision.RunDetails.Status == "FAILURE" { - finishedDecisions = append(finishedDecisions, decision.RunDetails.Id) + //finishedDecisions = append(finishedDecisions, decision.RunDetails.Id) failedFound = true continue } else if decision.RunDetails.Status == "RUNNING" && decision.Action != "ask" { @@ -2032,8 +2032,8 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor mappedOutput.Decisions[decisionIndex].RunDetails.RawResponse += "\n[ERROR] Decision marked as FAILURE due to 5 minute timeout." // Count this as finished + failed so recovery triggers in the same Fixexecution run - finishedDecisions = append(finishedDecisions, decision.RunDetails.Id) - failedFound = true + // finishedDecisions = append(finishedDecisions, decision.RunDetails.Id) + // failedFound = true } } else { if decision.RunDetails.CompletedAt > 0 { @@ -2136,7 +2136,8 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor go sendAgentActionSelfRequest("SUCCESS", workflowExecution, workflowExecution.Results[resultIndex]) }() } else { - log.Printf("[INFO][%s] All decisions finished for agent action %s - but no finish action found. Re-invoking agent to finalize (failedFound: %t).", workflowExecution.ExecutionId, action.ID, failedFound) + log.Printf("[INFO][%s] All decisions finished for agent action %s - but no finish action found, marking as WAITING.", workflowExecution.ExecutionId, action.ID) + //log.Printf("[INFO][%s] All decisions finished for agent action %s - but no finish action found. Re-invoking agent to finalize (failedFound: %t).", workflowExecution.ExecutionId, action.ID, failedFound) mappedOutput.Status = "RUNNING" mappedOutput.CompletedAt = 0 @@ -2145,17 +2146,19 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor if workflowExecution.Status == "FINISHED" { workflowExecution.Status = "EXECUTING" } - + // To ensure the execution is actually updated // Re-invoke the agent so the LLM can see the failure and produce a proper "finish" decision. - capturedExec := workflowExecution - capturedAction := action + // capturedExec := workflowExecution + // capturedAction := action go func() { - time.Sleep(2 * time.Second) - _, err := HandleAiAgentExecutionStart(capturedExec, capturedAction, true) - if err != nil { - log.Printf("[ERROR][%s] Failed re-invoking agent after decisions completed for action %s: %s", capturedExec.ExecutionId, capturedAction.ID, err) - } + time.Sleep(1 * time.Second) + sendAgentActionSelfRequest("WAITING", workflowExecution, workflowExecution.Results[resultIndex]) + // time.Sleep(2 * time.Second) + // _, err := HandleAiAgentExecutionStart(capturedExec, capturedAction, true) + // if err != nil { + // log.Printf("[ERROR][%s] Failed re-invoking agent after decisions completed for action %s: %s", capturedExec.ExecutionId, capturedAction.ID, err) + // } }() } } else if (result.Status == "" || result.Status == "WAITING") && mappedOutput.Status == "FINISHED" { From 08581ebb8aa9e4a628b407cb93b62d7450aee1c9 Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Fri, 3 Apr 2026 22:13:34 +0530 Subject: [PATCH 08/18] increase the TTL terminal statuses --- shared.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/shared.go b/shared.go index 053e78a1..a2041853 100644 --- a/shared.go +++ b/shared.go @@ -17346,7 +17346,11 @@ func sendAgentActionSelfRequest(status string, workflowExecution WorkflowExecuti return nil } else { - SetCache(ctx, cacheKey, []byte("1"), 1) + var cacheTTL int32 = 1 // 1 minute for non-terminal statuses + if status == "SUCCESS" || status == "FINISHED" || status == "FAILURE" || status == "ABORTED" { + cacheTTL = 1440 // 24 hours — execution outcome is permanent + } + SetCache(ctx, cacheKey, []byte("1"), cacheTTL) } if status == "SUCCESS" || status == "FINISHED" || status == "FAILURE" || status == "ABORTED" { From db23299fd371a50728d4ead6368b71640fabe76a Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Sat, 4 Apr 2026 21:03:46 +0530 Subject: [PATCH 09/18] fixed some abort bugs --- ai.go | 129 +++++++++++++----------------------------------- db-connector.go | 7 ++- 2 files changed, 40 insertions(+), 96 deletions(-) diff --git a/ai.go b/ai.go index c16d893a..5ed256bb 100644 --- a/ai.go +++ b/ai.go @@ -7054,7 +7054,7 @@ func runSupportRequest(ctx context.Context, input QueryInput) string { // Callers must return immediately after this call. func abortAgentExecution(ctx context.Context, execution WorkflowExecution, startNode Action, base AgentOutput, abortLabel, reason string) (Action, error) { agentOutput := base - agentOutput.Status = "FINISHED" + agentOutput.Status = "ABORTED" agentOutput.Error = reason agentOutput.CompletedAt = time.Now().Unix() @@ -7098,19 +7098,13 @@ func abortAgentExecution(ctx context.Context, execution WorkflowExecution, start marshalledOutput, marshalErr := json.Marshal(agentOutput) if marshalErr != nil { log.Printf("[ERROR][%s] abortAgentExecution: failed marshalling AgentOutput: %s", execution.ExecutionId, marshalErr) - marshalledOutput = []byte(`{"status":"FINISHED","error":"marshal error"}`) + marshalledOutput = []byte(`{"status":"ABORTED","error":"marshal error"}`) } - log.Printf("[ERROR][%s] AI_AGENT_ABORT: org=%s label=%s decisions=%d llm_calls=%d total_tokens=%d reason=%q", execution.ExecutionId, execution.Workflow.OrgId, abortLabel, len(agentOutput.Decisions), agentOutput.LLMCallCount, agentOutput.TotalTokens, reason) - - abortDuration := int64(0) - if agentOutput.StartedAt > 0 && agentOutput.CompletedAt > agentOutput.StartedAt { - abortDuration = agentOutput.CompletedAt - agentOutput.StartedAt - } - log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=ABORTED duration=%ds decisions=%d llm_calls=%d tokens_used=%d reason=%q", execution.ExecutionId, execution.Workflow.OrgId, abortDuration, len(agentOutput.Decisions), agentOutput.LLMCallCount, agentOutput.TotalTokens, reason) + // log.Printf("[ERROR][%s] AI_AGENT_ABORT: org=%s label=%s decisions=%d llm_calls=%d total_tokens=%d reason=%q", execution.ExecutionId, execution.Workflow.OrgId, abortLabel, len(agentOutput.Decisions), agentOutput.LLMCallCount, agentOutput.TotalTokens, reason) abortResult := ActionResult{ - Status: "SUCCESS", + Status: "ABORTED", Result: string(marshalledOutput), Action: startNode, StartedAt: time.Now().UnixMicro(), @@ -7131,8 +7125,11 @@ func abortAgentExecution(ctx context.Context, execution WorkflowExecution, start execution.Results = append(execution.Results, abortResult) } - execution.Status = "FINISHED" + execution.Status = "ABORTED" + + go sendAgentActionSelfRequest("ABORTED", execution, abortResult) go SetWorkflowExecution(ctx, execution, true) + return startNode, errors.New(reason) } @@ -7232,15 +7229,7 @@ func HandleAiAgentExecutionStart(execution WorkflowExecution, startNode Action, err := errors.New("AI Configuration Error: AI_MODEL or OPENAI_MODEL environment variable must be set for On-Premise AI Agent execution.") log.Printf("[ERROR] %v", err) - execution.Status = "ABORTED" - execution.Results = append(execution.Results, ActionResult{ - Status: "ABORTED", - Result: fmt.Sprintf(`{"success": false, "reason": "%s"}`, err.Error()), - Action: startNode, - }) - go SetWorkflowExecution(ctx, execution, true) - log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=ABORTED duration=0s decisions=0 llm_calls=0 tokens_used=0 reason=%q", execution.ExecutionId, execution.Workflow.OrgId, "missing_onprem_ai_config") - return startNode, err + return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "missing_onprem_ai_config", err.Error()) } } @@ -7450,8 +7439,7 @@ func HandleAiAgentExecutionStart(execution WorkflowExecution, startNode Action, relevantDecisions = append(relevantDecisions, mappedResult.Decisions[i]) } - log.Printf("[INFO][%s] AI_AGENT: org=%s decisions_total=%d failures=%d successes=%d last_index=%d", - execution.ExecutionId, execution.Workflow.OrgId, len(mappedResult.Decisions), failureCount, successCount, lastFinishedIndex) + log.Printf("[INFO][%s] AI_AGENT: org=%s decisions_total=%d failures=%d successes=%d last_index=%d", execution.ExecutionId, execution.Workflow.OrgId, len(mappedResult.Decisions), failureCount, successCount, lastFinishedIndex) marshalledDecisions, err = json.Marshal(relevantDecisions) if err != nil { @@ -7808,15 +7796,7 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( if len(userMessage) == 0 { log.Printf("[ERROR][%s] AI Agent: No user message/input found for action %s", execution.ExecutionId, startNode.ID) - execution.Results = append(execution.Results, ActionResult{ - Status: "ABORTED", - Result: fmt.Sprintf(`{"success": false, "reason": "Failed to start AI Agent (3): No input provided."}`), - Action: startNode, - }) - go SetWorkflowExecution(ctx, execution, true) - log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=ABORTED duration=0s decisions=0 llm_calls=0 tokens_used=0 reason=%q", execution.ExecutionId, execution.Workflow.OrgId, "no_user_message") - return startNode, errors.New("No user message/input found for AI Agent start") - + return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "no_user_message", "No user message/input found for AI Agent start") } // Track who initiated this agent (for audit trail) @@ -7930,32 +7910,14 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( if err != nil { log.Printf("[ERROR][%s] AI Agent: Failed marshalling input for action %s: %s", execution.ExecutionId, startNode.ID, err) - - execution.Status = "ABORTED" - execution.Results = append(execution.Results, ActionResult{ - Status: "ABORTED", - Result: fmt.Sprintf(`{"success": false, "reason": "Failed to start AI Agent (4): %s"}`, strings.Replace(err.Error(), `"`, `\"`, -1)), - Action: startNode, - }) - go SetWorkflowExecution(ctx, execution, true) - log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=ABORTED duration=0s decisions=0 llm_calls=0 tokens_used=0 reason=%q", execution.ExecutionId, execution.Workflow.OrgId, "marshal_request_body_failed") - return startNode, err + return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "marshal_request_body_failed", fmt.Sprintf("Failed to start AI Agent (4): %s", err.Error())) } //go executeSpecificCloudApp(ctx, execution.ExecutionId, execution.Authorization, urls, startNode) if !runOpenaiRequest { - + log.Printf("[ERROR] AI Agent: Unhandled Singul BODY for OpenAI agent (first request): %s. AI APPNAME (can't be empty): %#v", string(initialAgentRequestBody), appname) - - execution.Status = "ABORTED" - execution.Results = append(execution.Results, ActionResult{ - Status: "ABORTED", - Result: fmt.Sprintf(`{"success": false, "reason": "Failed to start AI Agent (5): Failed initial AI request. Contact support@shuffler.io if this persists."}`), - Action: startNode, - }) - go SetWorkflowExecution(ctx, execution, true) - log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=ABORTED duration=0s decisions=0 llm_calls=0 tokens_used=0 reason=%q", execution.ExecutionId, execution.Workflow.OrgId, "unsupported_app_not_openai") - return startNode, errors.New("Unhandled Singul BODY for OpenAI agent (first request)") + return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "unsupported_app_not_openai", "Failed to start AI Agent (5): Failed initial AI request. Contact support@shuffler.io if this persists.") } if debug { @@ -8000,16 +7962,7 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( marshalledAction, err := json.Marshal(aiNode) if err != nil { log.Printf("[ERROR][%s] AI Agent: Failed marshalling action for AI Agent (first agent request): %s", execution.ExecutionId, err) - - execution.Status = "ABORTED" - execution.Results = append(execution.Results, ActionResult{ - Status: "ABORTED", - Result: fmt.Sprintf(`{"success": false, "reason": "Failed to start AI Agent (6): %s"}`, strings.Replace(err.Error(), `"`, `\"`, -1)), - Action: startNode, - }) - go SetWorkflowExecution(ctx, execution, true) - log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=ABORTED duration=0s decisions=0 llm_calls=0 tokens_used=0 reason=%q", execution.ExecutionId, execution.Workflow.OrgId, "marshal_ai_action_failed") - return startNode, err + return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "marshal_ai_action_failed", fmt.Sprintf("Failed to start AI Agent (6): %s", err.Error())) } // Self-request starts here! @@ -8061,7 +8014,7 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( ) if err != nil { - log.Printf("[ERROR][%s] AI Agent: Failed creating request during LLM setup: %s", execution.ExecutionId, err) + log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: Failed creating request during LLM setup: %s", execution.ExecutionId, err) return abortAgentExecution(ctx, execution, startNode, oldAgentOutput, "llm_request_build_failed", fmt.Sprintf("Failed to start AI Agent (7): %s", err.Error())) } @@ -8148,16 +8101,7 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( err = json.Unmarshal([]byte(resultMapping.Result), &outputMap) if err != nil { log.Printf("[ERROR][%s] AI Agent: Failed unmarshalling response from sending request for stream during SKIPPED user input: %s. Body: %s", execution.ExecutionId, err, string(resultMapping.Result)) - - execution.Status = "ABORTED" - execution.Results = append(execution.Results, ActionResult{ - Status: "ABORTED", - Result: fmt.Sprintf(`{"success": false, "reason": "Failed to start AI Agent (1): %s"}`, strings.Replace(err.Error(), `"`, `\"`, -1)), - Action: startNode, - }) - go SetWorkflowExecution(ctx, execution, true) - log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=ABORTED duration=0s decisions=0 llm_calls=0 tokens_used=0 reason=%q", execution.ExecutionId, execution.Workflow.OrgId, "llm_response_unmarshal_failed") - return startNode, err + return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "llm_response_unmarshal_failed", fmt.Sprintf("Failed to start AI Agent (1): %s", err.Error())) } if outputMap.Status != 200 { @@ -8181,16 +8125,7 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( bodyString, err = json.Marshal(bodyMap) if err != nil { log.Printf("[ERROR] AI Agent: Failed marshalling body to string in AI Agent response: %s", err) - - execution.Status = "ABORTED" - execution.Results = append(execution.Results, ActionResult{ - Status: "ABORTED", - Result: fmt.Sprintf(`{"success": false, "reason": "Failed to start AI Agent (3): %s"}`, strings.Replace(err.Error(), `"`, `\"`, -1)), - Action: startNode, - }) - go SetWorkflowExecution(ctx, execution, true) - log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=ABORTED duration=0s decisions=0 llm_calls=0 tokens_used=0 reason=%q", execution.ExecutionId, execution.Workflow.OrgId, "llm_body_marshal_failed") - return startNode, err + return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "llm_body_marshal_failed", fmt.Sprintf("Failed to start AI Agent (3): %s", err.Error())) } } @@ -8211,12 +8146,13 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( newOutput := openai.ErrorResponse{} err = json.Unmarshal(bodyString, &newOutput) if err == nil && len(newOutput.Error.Message) > 0 { - choicesString = fmt.Sprintf("LLM Error: %s", newOutput.Error.Message) - - resultMapping.Status = "FAILURE" + // choicesString = fmt.Sprintf("LLM Error: %s", newOutput.Error.Message) - // Log LLM failure with details + // resultMapping.Status = "FAILURE" + // LLM returned a proper error (401 invalid key, 429 rate limit, 500 server error, etc.) + // Abort log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: org=%s status_code=%d error_type=%s error_message=%s", execution.ExecutionId, execution.Workflow.OrgId, outputMap.Status, newOutput.Error.Type, newOutput.Error.Message) + return abortAgentExecution(ctx, execution, startNode, oldAgentOutput, "llm_http_error", fmt.Sprintf("LLM error (HTTP %d %s): %s", outputMap.Status, newOutput.Error.Type, newOutput.Error.Message)) } else { log.Printf("[ERROR][%s] AI Agent: No choices, nor error found in AI agent response. Status: %d. Raw: %s", execution.ExecutionId, outputMap.Status, bodyString) resultMapping.Status = "FAILURE" @@ -8297,8 +8233,8 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( if err != nil { log.Printf("[ERROR][%s] AI Agent: Failed unmarshalling decisions in AI Agent response (2): %s. String: %s", execution.ExecutionId, err, decisionString) - // FALLBACK: LLM returned raw content instead of agent decisions - // Wrap it in a proper finish decision structure + // FALLBACK: LLM returned raw content instead of agent decisions (HTTP 200 but non-JSON output). + // Wrap it as a finish decision so the workflow completes cleanly. log.Printf("[INFO][%s] AI Agent: Applying fallback - wrapping raw LLM content in finish decision", execution.ExecutionId) var rawContent interface{} @@ -8802,10 +8738,14 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( } if !foundResult { - log.Printf("[DEBUG][%s] Result not found in execution.Results, using resultMapping for sendAgentActionSelfRequest", execution.ExecutionId) - resultMapping.Status = "SUCCESS" - resultMapping.CompletedAt = agentOutput.CompletedAt - go sendAgentActionSelfRequest("SUCCESS", execution, resultMapping) + // duration := int64(0) + // if agentOutput.StartedAt > 0 && agentOutput.CompletedAt > 0 { + // duration = agentOutput.CompletedAt - agentOutput.StartedAt + // } else if agentOutput.StartedAt > 0 { + // duration = time.Now().Unix() - agentOutput.StartedAt + // } + + // log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=SUCCESS duration=%ds decisions=%d llm_calls=%d tokens_used=%d", execution.ExecutionId, execution.Workflow.OrgId, duration, len(agentOutput.Decisions), agentOutput.LLMCallCount, agentOutput.TotalTokens) } } @@ -8847,7 +8787,7 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( } } } - + if createNextActions { return startNode, nil } @@ -9029,7 +8969,6 @@ func GenerateSingulWorkflows(resp http.ResponseWriter, request *http.Request) { return } - // Maps everything AROUND the usecase err = HandleSingulWorkflowEnablement(ctx, *workflow, user, categoryAction) if err != nil { diff --git a/db-connector.go b/db-connector.go index 58395573..feab9f6c 100755 --- a/db-connector.go +++ b/db-connector.go @@ -2366,7 +2366,12 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor for _, result := range workflowExecution.Results { if result.Status == "FAILURE" || result.Status == "ABORTED" { - log.Printf("[DEBUG][%s] Setting execution to aborted because of result %s (%s) with status '%s'. Should update execution parent if it exists (not implemented).", workflowExecution.ExecutionId, result.Action.Name, result.Action.ID, result.Status) + // Only log once per execution to avoid spam + cacheKey := fmt.Sprintf("abort_log_%s", workflowExecution.ExecutionId) + if _, err := GetCache(ctx, cacheKey); err != nil { + log.Printf("[DEBUG][%s] Setting execution to aborted because of result %s (%s) with status '%s'. Should update execution parent if it exists (not implemented).", workflowExecution.ExecutionId, result.Action.Name, result.Action.ID, result.Status) + SetCache(ctx, cacheKey, []byte("logged"), 5) // 5 minute TTL + } workflowExecution.Status = "ABORTED" dbsave = true From 1c4589042f3e50e03350db616c58595321da1330 Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Sun, 5 Apr 2026 00:14:32 +0530 Subject: [PATCH 10/18] general output format fix --- ai.go | 120 ++++++++++++++++++++++++++++------------------------------ 1 file changed, 58 insertions(+), 62 deletions(-) diff --git a/ai.go b/ai.go index 5ed256bb..7ee00cbb 100644 --- a/ai.go +++ b/ai.go @@ -8150,7 +8150,6 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( // resultMapping.Status = "FAILURE" // LLM returned a proper error (401 invalid key, 429 rate limit, 500 server error, etc.) - // Abort log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: org=%s status_code=%d error_type=%s error_message=%s", execution.ExecutionId, execution.Workflow.OrgId, outputMap.Status, newOutput.Error.Type, newOutput.Error.Message) return abortAgentExecution(ctx, execution, startNode, oldAgentOutput, "llm_http_error", fmt.Sprintf("LLM error (HTTP %d %s): %s", outputMap.Status, newOutput.Error.Type, newOutput.Error.Message)) } else { @@ -8224,72 +8223,69 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( errorMessage := "" err = json.Unmarshal([]byte(decisionString), &mappedDecisions) if err != nil { - log.Printf("[ERROR][%s] AI Agent: Failed unmarshalling decisions in AI Agent response: %s", execution.ExecutionId, err) - - if len(mappedDecisions) == 0 { - decisionString = strings.Replace(decisionString, `\"`, `"`, -1) - - err = json.Unmarshal([]byte(decisionString), &mappedDecisions) - if err != nil { - log.Printf("[ERROR][%s] AI Agent: Failed unmarshalling decisions in AI Agent response (2): %s. String: %s", execution.ExecutionId, err, decisionString) - - // FALLBACK: LLM returned raw content instead of agent decisions (HTTP 200 but non-JSON output). - // Wrap it as a finish decision so the workflow completes cleanly. - log.Printf("[INFO][%s] AI Agent: Applying fallback - wrapping raw LLM content in finish decision", execution.ExecutionId) - - var rawContent interface{} - jsonErr := json.Unmarshal([]byte(decisionString), &rawContent) - outputValue := "" - if jsonErr == nil { - // It's valid JSON, wrap it in {"reply": ...} - wrappedContent := map[string]interface{}{ - "reply": rawContent, - } - wrappedBytes, _ := json.Marshal(wrappedContent) - outputValue = string(wrappedBytes) - } else { - // Not valid JSON, wrap the raw string - wrappedContent := map[string]interface{}{ - "reply": decisionString, - } - wrappedBytes, _ := json.Marshal(wrappedContent) - outputValue = string(wrappedBytes) + log.Printf("[ERROR][%s] AI Agent: Failed unmarshalling decisions: %s", execution.ExecutionId, err) + + recovered := false + if outputMap.Status >= 200 && outputMap.Status < 300 { + fixedString := strings.Replace(decisionString, `\"`, `"`, -1) + if json.Unmarshal([]byte(fixedString), &mappedDecisions) == nil { + log.Printf("[INFO][%s] AI Agent: Recovered by fixing escaped quotes", execution.ExecutionId) + recovered = true + } + + if !recovered { + balanced := balanceJSONLikeString(decisionString) + if json.Unmarshal([]byte(balanced), &mappedDecisions) == nil { + log.Printf("[INFO][%s] AI Agent: Recovered by balancing JSON", execution.ExecutionId) + recovered = true } + } + } - // Create a proper finish decision - b := make([]byte, 6) - _, randErr := rand.Read(b) - if randErr != nil { - log.Printf("[ERROR][%s] AI Agent: Failed generating random string for fallback decision", execution.ExecutionId) - } + if !recovered { + isErrorResponse := outputMap.Status < 200 || outputMap.Status >= 300 + reason := "LLM returned unexpected format" + if isErrorResponse { + reason = fmt.Sprintf("LLM error (HTTP %d)", outputMap.Status) + } - fallbackDecision := AgentDecision{ - I: lastFinishedIndex, - Category: "finish", - Action: "finish", - Tool: "core", - Confidence: 1.0, - Runs: "1", - ApprovalRequired: false, - Reason: "LLM returned unexpected format - applied fallback wrapping", - Fields: []Valuereplace{ - { - Key: "output", - Value: outputValue, - }, - }, - RunDetails: AgentDecisionRunDetails{ - Id: base64.RawURLEncoding.EncodeToString(b), - Status: "", - StartedAt: 0, - CompletedAt: 0, - }, - } + log.Printf("[INFO][%s] AI Agent: Wrapping response as finish decision - %s", execution.ExecutionId, reason) - mappedDecisions = []AgentDecision{fallbackDecision} - resultMapping.Status = "" - log.Printf("[INFO][%s] AI Agent: Fallback applied successfully - created finish decision with wrapped content", execution.ExecutionId) + // Wrap content as valid JSON output + var outputValue string + var rawContent interface{} + if json.Unmarshal([]byte(decisionString), &rawContent) == nil { + wrapped, _ := json.Marshal(map[string]interface{}{"reply": rawContent}) + outputValue = string(wrapped) + } else { + wrapped, _ := json.Marshal(map[string]interface{}{"reply": decisionString}) + outputValue = string(wrapped) } + + b := make([]byte, 6) + rand.Read(b) + + mappedDecisions = []AgentDecision{{ + I: lastFinishedIndex, + Category: "finish", + Action: "finish", + Tool: "core", + Confidence: 1.0, + Runs: "1", + ApprovalRequired: false, + Reason: reason, + Fields: []Valuereplace{{ + Key: "output", + Value: outputValue, + }}, + RunDetails: AgentDecisionRunDetails{ + Id: base64.RawURLEncoding.EncodeToString(b), + Status: "", + StartedAt: 0, + CompletedAt: 0, + }, + }} + resultMapping.Status = "" } } From 8c7610032fc911f6b26341e725ba09a75308b1fe Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Sun, 5 Apr 2026 16:54:49 +0530 Subject: [PATCH 11/18] added more visibility in logs --- ai.go | 11 ++++++++--- shared.go | 7 ++++--- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/ai.go b/ai.go index 7ee00cbb..5c1f70fe 100644 --- a/ai.go +++ b/ai.go @@ -7182,7 +7182,7 @@ func sendAITokenLimitAlert(ctx context.Context, execution WorkflowExecution, ful // createNextActions = false => start of agent to find initial decisions // createNextActions = true => mid-agent to decide next steps -func HandleAiAgentExecutionStart(execution WorkflowExecution, startNode Action, createNextActions bool) (Action, error) { +func HandleAiAgentExecutionStart(execution WorkflowExecution, startNode Action, createNextActions bool, caller string, traceID string) (Action, error) { aiStarttime := time.Now().Unix() // A handler to ensure we ALWAYS focus on next actions if a node starts late @@ -7806,7 +7806,12 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( } if !createNextActions { - log.Printf("[INFO] AI_AGENT_START: execution_id=%s org=%s user=%s input_length=%d", execution.ExecutionId, execution.Workflow.OrgId, initiatedBy, len(userMessage)) + if strings.TrimSpace(caller) == "" { + log.Printf("ERROR[%s] AI agent: No caller function info provided for AI agent, aborting the request ...", execution.ExecutionId) + return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "no_caller_info", "No caller function info provided for AI Agent start") + } + + log.Printf("[INFO][%s] AI_AGENT_START: org=%s workflow=%s user=%s caller=%s trace-id=%s input_length=%d", execution.ExecutionId, execution.Workflow.OrgId, execution.WorkflowId, initiatedBy, caller, traceID, len(userMessage)) } // Set model based on environment @@ -13137,7 +13142,7 @@ func RunMCPAction(resp http.ResponseWriter, request *http.Request) { return } - workflowExecution, err := PrepareSingleAction(ctx, user, "agent", marshalledAction, false, "") + workflowExecution, err := PrepareSingleAction(ctx, user, "agent", marshalledAction, false, "RunMCPAction", "", "") if fileId == "agent_starter" { log.Printf("[INFO] Returning early for agent_starter single action execution: %s", workflowExecution.ExecutionId) resp.WriteHeader(200) diff --git a/shared.go b/shared.go index a2041853..ef1b578c 100644 --- a/shared.go +++ b/shared.go @@ -17779,7 +17779,7 @@ func handleAgentDecisionStreamResult(workflowExecution WorkflowExecution, action originalAction = actionResult.Action } - returnAction, err := HandleAiAgentExecutionStart(workflowExecution, originalAction, true) + returnAction, err := HandleAiAgentExecutionStart(workflowExecution, originalAction, true, "handleAgentDecisionStreamResult", "") if err != nil { log.Printf("[ERROR][%s] Failed handling agent execution start: %s", workflowExecution.ExecutionId, err) } @@ -21701,7 +21701,7 @@ func CheckHookAuth(request *http.Request, auth string) error { } // Body = The action body received from the user to test. -func PrepareSingleAction(ctx context.Context, user User, appId string, body []byte, runValidationAction bool, decision ...string) (WorkflowExecution, error) { +func PrepareSingleAction(ctx context.Context, user User, appId string, body []byte, runValidationAction bool, caller string, traceID string, decision ...string) (WorkflowExecution, error) { workflowExecution := WorkflowExecution{} @@ -21768,13 +21768,14 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by } } - action, err := HandleAiAgentExecutionStart(exec, action, false) + action, err := HandleAiAgentExecutionStart(exec, action, false, caller, traceID) if err != nil { log.Printf("[ERROR] Failed to handle AI agent execution start: %s", err) } exec.Workflow.Actions[0] = action newExec, err := GetWorkflowExecution(ctx, exec.ExecutionId) + log.Printf("[INFO][%s] AI Agent: %s Started standalone for org %s, execution id %s, workflow %s, with trace-id %s", exec.ExecutionId, caller, user.ActiveOrg.Id, exec.ExecutionId, exec.WorkflowId, traceID) if err != nil { log.Printf("[ERROR] Failed to get workflow execution after starting agent: %s", err) } else { From 23d7dd8ece89255c7c51c59dace0672777b2335e Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Sun, 5 Apr 2026 17:20:31 +0530 Subject: [PATCH 12/18] added trace logs to handleRunDatastoreAutomation --- codegen.go | 21 ++++++++++++++++----- db-connector.go | 4 ++-- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/codegen.go b/codegen.go index 857d0431..5bd92b00 100755 --- a/codegen.go +++ b/codegen.go @@ -10,6 +10,7 @@ import ( "crypto/md5" "crypto/sha256" "encoding/json" + "math/rand" "errors" "fmt" "io" @@ -4790,7 +4791,7 @@ func handleDatastoreAutomationWebhook(ctx context.Context, marshalledBody []byte return nil } -func handleRunDatastoreAutomation(cacheData CacheKeyData, automation DatastoreAutomation) error { +func handleRunDatastoreAutomation(ctx context.Context, cacheData CacheKeyData, automation DatastoreAutomation) error { if len(cacheData.OrgId) == 0 { return errors.New("CacheKeyData.OrgId is required for handleRunAutomation") } @@ -4798,8 +4799,16 @@ func handleRunDatastoreAutomation(cacheData CacheKeyData, automation DatastoreAu if len(cacheData.Category) == 0 { return errors.New("CacheKeyData.Category is required for handleRunAutomation") } + + if ctx == nil { + ctx = context.Background() + } + + traceID, _ := ctx.Value("trace_id").(string) + if traceID == "" { + traceID = fmt.Sprintf("ROOT-%d-%d", time.Now().UnixNano(), rand.Intn(1000)) + } - ctx := context.Background() parsedName := strings.ReplaceAll(strings.ToLower(automation.Name), " ", "_") // These are ran pre-execution @@ -4872,7 +4881,7 @@ func handleRunDatastoreAutomation(cacheData CacheKeyData, automation DatastoreAu // november 2025 after adding graphic system to datastore } else if parsedName == "run_ai_agent" { - log.Printf("[DEBUG] Handling run_ai_agent automation for key %s in category %s", cacheData.Key, cacheData.Category) + log.Printf("[INFO] Handling run_ai_agent automation for key %s in category %s with trace-id %s", cacheData.Key, cacheData.Category, traceID) if len(foundApikey) == 0 { log.Printf("[ERROR] No admin user with API key found for org %s", cacheData.OrgId) return errors.New("No admin user with API key found") @@ -4959,8 +4968,10 @@ func handleRunDatastoreAutomation(cacheData CacheKeyData, automation DatastoreAu return err } - req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", foundApikey)) - req.Header.Add("Org-Id", cacheData.OrgId) + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", foundApikey)) + req.Header.Set("Org-Id", cacheData.OrgId) + req.Header.Set("X-Internal-Caller", "handleRunDatastoreAutomation") + req.Header.Set("X-Trace-ID", traceID) resp, err := client.Do(req) if err != nil { diff --git a/db-connector.go b/db-connector.go index feab9f6c..b197e348 100755 --- a/db-connector.go +++ b/db-connector.go @@ -14565,7 +14565,7 @@ func SetDatastoreKeyBulk(ctx context.Context, allKeys []CacheKeyData) ([]Datasto // Run the automation // This should make a notification if it fails go func(cacheData CacheKeyData, automation DatastoreAutomation) { - err := handleRunDatastoreAutomation(cacheData, automation) + err := handleRunDatastoreAutomation(ctx, cacheData, automation) if err != nil { log.Printf("[ERROR] Failed running automation %s for cache key %s: %s", automation.Name, cacheData.Key, err) @@ -15074,7 +15074,7 @@ func SetDatastoreKey(ctx context.Context, cacheData CacheKeyData) error { // Run the automation // This should make a notification if it fails go func(cacheData CacheKeyData, automation DatastoreAutomation) { - err := handleRunDatastoreAutomation(cacheData, automation) + err := handleRunDatastoreAutomation(ctx, cacheData, automation) if err != nil { log.Printf("[ERROR] Failed running automation %s for cache key %s: %s", automation.Name, cacheData.Key, err) From 32966027049b9b79304d694a686e3bcf75e5b2b5 Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Sun, 5 Apr 2026 18:36:04 +0530 Subject: [PATCH 13/18] using the context instead of modifying function signature --- shared.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/shared.go b/shared.go index ef1b578c..ec3c4ea1 100644 --- a/shared.go +++ b/shared.go @@ -17531,6 +17531,8 @@ func handleAgentDecisionStreamResult(workflowExecution WorkflowExecution, action log.Printf("[DEBUG][%s] Got decision ID '%s' for agent '%s'. Ref: %s", workflowExecution.ExecutionId, decisionId, actionResult.Action.ID, actionResult.Status) } + ctx := context.Background() + foundActionResultIndex := -1 for actionIndex, result := range workflowExecution.Results { if result.Action.ID == actionResult.Action.ID { @@ -17754,7 +17756,6 @@ func handleAgentDecisionStreamResult(workflowExecution WorkflowExecution, action log.Printf("[DEBUG] Getting agent chat history: %s", requestKey) } - ctx := context.Background() agentRequestMemory, err := GetDatastoreKey(ctx, requestKey, "agent_requests") if err != nil { log.Printf("[ERROR][%s] Failed to find request memory for updates", actionResult.ExecutionId) @@ -17779,7 +17780,7 @@ func handleAgentDecisionStreamResult(workflowExecution WorkflowExecution, action originalAction = actionResult.Action } - returnAction, err := HandleAiAgentExecutionStart(workflowExecution, originalAction, true, "handleAgentDecisionStreamResult", "") + returnAction, err := HandleAiAgentExecutionStart(ctx, workflowExecution, originalAction, true) if err != nil { log.Printf("[ERROR][%s] Failed handling agent execution start: %s", workflowExecution.ExecutionId, err) } @@ -21701,9 +21702,12 @@ func CheckHookAuth(request *http.Request, auth string) error { } // Body = The action body received from the user to test. -func PrepareSingleAction(ctx context.Context, user User, appId string, body []byte, runValidationAction bool, caller string, traceID string, decision ...string) (WorkflowExecution, error) { +func PrepareSingleAction(ctx context.Context, user User, appId string, body []byte, runValidationAction bool, decision ...string) (WorkflowExecution, error) { workflowExecution := WorkflowExecution{} + if ctx == nil { + ctx = context.Background() + } var action Action err := json.Unmarshal(body, &action) @@ -21768,7 +21772,10 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by } } - action, err := HandleAiAgentExecutionStart(exec, action, false, caller, traceID) + caller, _ := ctx.Value("caller").(string) + traceID, _ := ctx.Value("trace_id").(string) + + action, err := HandleAiAgentExecutionStart(ctx, exec, action, false) if err != nil { log.Printf("[ERROR] Failed to handle AI agent execution start: %s", err) } From 622e567ea9b3b9b996e5549338c251c3e7b987d4 Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Sun, 5 Apr 2026 20:58:45 +0530 Subject: [PATCH 14/18] use the context for passing caller and trace info --- ai.go | 8 ++++++-- codegen.go | 2 +- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/ai.go b/ai.go index 5c1f70fe..27ce7a45 100644 --- a/ai.go +++ b/ai.go @@ -7182,7 +7182,7 @@ func sendAITokenLimitAlert(ctx context.Context, execution WorkflowExecution, ful // createNextActions = false => start of agent to find initial decisions // createNextActions = true => mid-agent to decide next steps -func HandleAiAgentExecutionStart(execution WorkflowExecution, startNode Action, createNextActions bool, caller string, traceID string) (Action, error) { +func HandleAiAgentExecutionStart(ctx context.Context, execution WorkflowExecution, startNode Action, createNextActions bool) (Action, error) { aiStarttime := time.Now().Unix() // A handler to ensure we ALWAYS focus on next actions if a node starts late @@ -7221,7 +7221,9 @@ func HandleAiAgentExecutionStart(execution WorkflowExecution, startNode Action, execution.Workflow.OrgId = execution.ExecutionOrg } - ctx := context.Background() + if ctx == nil { + ctx = context.Background() + } // Validate On-Prem Configuration immediately if project.Environment != "cloud" { @@ -7806,6 +7808,8 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( } if !createNextActions { + caller, _ := ctx.Value("caller").(string) + traceID, _ := ctx.Value("trace_id").(string) if strings.TrimSpace(caller) == "" { log.Printf("ERROR[%s] AI agent: No caller function info provided for AI agent, aborting the request ...", execution.ExecutionId) return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "no_caller_info", "No caller function info provided for AI Agent start") diff --git a/codegen.go b/codegen.go index 5bd92b00..6565d50a 100755 --- a/codegen.go +++ b/codegen.go @@ -4881,7 +4881,7 @@ func handleRunDatastoreAutomation(ctx context.Context, cacheData CacheKeyData, a // november 2025 after adding graphic system to datastore } else if parsedName == "run_ai_agent" { - log.Printf("[INFO] Handling run_ai_agent automation for key %s in category %s with trace-id %s", cacheData.Key, cacheData.Category, traceID) + log.Printf("[INFO] AI agent: Handling run_ai_agent automation for key %s in category %s with trace-id %s", cacheData.Key, cacheData.Category, traceID) if len(foundApikey) == 0 { log.Printf("[ERROR] No admin user with API key found for org %s", cacheData.OrgId) return errors.New("No admin user with API key found") From da6c753aaee1b432277aace4adf80f9527634ac7 Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Sun, 5 Apr 2026 21:03:02 +0530 Subject: [PATCH 15/18] update the call with right params --- ai.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ai.go b/ai.go index 27ce7a45..b736334b 100644 --- a/ai.go +++ b/ai.go @@ -13146,7 +13146,7 @@ func RunMCPAction(resp http.ResponseWriter, request *http.Request) { return } - workflowExecution, err := PrepareSingleAction(ctx, user, "agent", marshalledAction, false, "RunMCPAction", "", "") + workflowExecution, err := PrepareSingleAction(ctx, user, "agent", marshalledAction, false, "") if fileId == "agent_starter" { log.Printf("[INFO] Returning early for agent_starter single action execution: %s", workflowExecution.ExecutionId) resp.WriteHeader(200) From aa60279e0c59228c354bc9083da9f8adce03e1e5 Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Sun, 5 Apr 2026 21:30:31 +0530 Subject: [PATCH 16/18] commented out for now --- shared.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/shared.go b/shared.go index ec3c4ea1..9a43e550 100644 --- a/shared.go +++ b/shared.go @@ -17338,6 +17338,7 @@ func sendAgentActionSelfRequest(status string, workflowExecution WorkflowExecuti // Check if the request has been sent already (just in case) cacheKey := fmt.Sprintf("agent_request_%s_%s_%s", workflowExecution.ExecutionId, actionResult.Action.ID, status) + // cacheKey := fmt.Sprintf("agent_request_%s_%s", workflowExecution.ExecutionId, actionResult.Action.ID) _, err := GetCache(ctx, cacheKey) if err == nil { if debug { @@ -17351,6 +17352,7 @@ func sendAgentActionSelfRequest(status string, workflowExecution WorkflowExecuti cacheTTL = 1440 // 24 hours — execution outcome is permanent } SetCache(ctx, cacheKey, []byte("1"), cacheTTL) + // SetCache(ctx, cacheKey, []byte(status), cacheTTL) } if status == "SUCCESS" || status == "FINISHED" || status == "FAILURE" || status == "ABORTED" { From 4b6a26dfb18a658fe845b39ef9ba1d5b38a75e1b Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Mon, 6 Apr 2026 11:54:39 +0530 Subject: [PATCH 17/18] added new alowed headers to cors check --- shared.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shared.go b/shared.go index 9a43e550..5a7e6aeb 100644 --- a/shared.go +++ b/shared.go @@ -190,7 +190,7 @@ func HandleCors(resp http.ResponseWriter, request *http.Request) bool { } //resp.Header().Set("Access-Control-Allow-Origin", "http://localhost:8000") - resp.Header().Set("Access-Control-Allow-Headers", "Content-Type, Accept, X-Requested-With, remember-me, Org-Id, Org, Authorization, X-Debug-Url") + resp.Header().Set("Access-Control-Allow-Headers", "Content-Type, Accept, X-Requested-With, remember-me, Org-Id, Org, Authorization, X-Debug-Url, X-Internal-Caller, X-Trace-ID") resp.Header().Set("Access-Control-Allow-Methods", "POST, GET, PUT, DELETE, PATCH") resp.Header().Set("Access-Control-Allow-Credentials", "true") From 3822912d31dc99101469ce849d411d74952587d4 Mon Sep 17 00:00:00 2001 From: Hari Krishna Date: Mon, 6 Apr 2026 11:58:21 +0530 Subject: [PATCH 18/18] code ql fix: added Escape single quotes --- ai.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/ai.go b/ai.go index b736334b..333701ae 100644 --- a/ai.go +++ b/ai.go @@ -8555,11 +8555,14 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( question = mappedDecision.Fields[0].Value } + // Escape single quotes to prevent quote injection + safeQuestion := strings.ReplaceAll(question, "'", "\\'") + err = CreateOrgNotification( ctx, - fmt.Sprintf("Agent - input required: '%s'", question), - fmt.Sprintf("Input required during agent run."), - fmt.Sprintf("/forms/%s?authorization=%s&reference_execution=%s&source_node=%s&decision_id=%s&backend_url=%s", execution.WorkflowId, execution.Authorization, execution.ExecutionId, startNode.ID, mappedDecision.RunDetails.Id, backendUrl), + fmt.Sprintf("Agent - input required: '%s'", safeQuestion), + fmt.Sprintf("Input required during agent run."), + fmt.Sprintf("/forms/%s?authorization=%s&reference_execution=%s&source_node=%s&decision_id=%s&backend_url=%s", execution.WorkflowId, url.QueryEscape(execution.Authorization), execution.ExecutionId, startNode.ID, mappedDecision.RunDetails.Id, url.QueryEscape(backendUrl)), execution.ExecutionOrg, false, "LOW",