diff --git a/ai.go b/ai.go index 8b81971a..333701ae 100644 --- a/ai.go +++ b/ai.go @@ -7054,7 +7054,7 @@ func runSupportRequest(ctx context.Context, input QueryInput) string { // Callers must return immediately after this call. func abortAgentExecution(ctx context.Context, execution WorkflowExecution, startNode Action, base AgentOutput, abortLabel, reason string) (Action, error) { agentOutput := base - agentOutput.Status = "FINISHED" + agentOutput.Status = "ABORTED" agentOutput.Error = reason agentOutput.CompletedAt = time.Now().Unix() @@ -7098,15 +7098,13 @@ func abortAgentExecution(ctx context.Context, execution WorkflowExecution, start marshalledOutput, marshalErr := json.Marshal(agentOutput) if marshalErr != nil { log.Printf("[ERROR][%s] abortAgentExecution: failed marshalling AgentOutput: %s", execution.ExecutionId, marshalErr) - marshalledOutput = []byte(`{"status":"FINISHED","error":"marshal error"}`) + marshalledOutput = []byte(`{"status":"ABORTED","error":"marshal error"}`) } - log.Printf("[ERROR][%s] AI_AGENT_ABORT: org=%s label=%s decisions=%d llm_calls=%d total_tokens=%d reason=%q", - execution.ExecutionId, execution.Workflow.OrgId, abortLabel, - len(agentOutput.Decisions), agentOutput.LLMCallCount, agentOutput.TotalTokens, reason) + // log.Printf("[ERROR][%s] AI_AGENT_ABORT: org=%s label=%s decisions=%d llm_calls=%d total_tokens=%d reason=%q", execution.ExecutionId, execution.Workflow.OrgId, abortLabel, len(agentOutput.Decisions), agentOutput.LLMCallCount, agentOutput.TotalTokens, reason) abortResult := ActionResult{ - Status: "SUCCESS", + Status: "ABORTED", Result: string(marshalledOutput), Action: startNode, StartedAt: time.Now().UnixMicro(), @@ -7127,8 +7125,11 @@ func abortAgentExecution(ctx context.Context, execution WorkflowExecution, start execution.Results = append(execution.Results, abortResult) } - execution.Status = "FINISHED" + execution.Status = "ABORTED" + + go sendAgentActionSelfRequest("ABORTED", execution, abortResult) go SetWorkflowExecution(ctx, execution, true) + return startNode, errors.New(reason) } @@ -7181,7 +7182,7 @@ func sendAITokenLimitAlert(ctx context.Context, execution WorkflowExecution, ful // createNextActions = false => start of agent to find initial decisions // createNextActions = true => mid-agent to decide next steps -func HandleAiAgentExecutionStart(execution WorkflowExecution, startNode Action, createNextActions bool) (Action, error) { +func HandleAiAgentExecutionStart(ctx context.Context, execution WorkflowExecution, startNode Action, createNextActions bool) (Action, error) { aiStarttime := time.Now().Unix() // A handler to ensure we ALWAYS focus on next actions if a node starts late @@ -7220,7 +7221,9 @@ func HandleAiAgentExecutionStart(execution WorkflowExecution, startNode Action, execution.Workflow.OrgId = execution.ExecutionOrg } - ctx := context.Background() + if ctx == nil { + ctx = context.Background() + } // Validate On-Prem Configuration immediately if project.Environment != "cloud" { @@ -7228,14 +7231,7 @@ func HandleAiAgentExecutionStart(execution WorkflowExecution, startNode Action, err := errors.New("AI Configuration Error: AI_MODEL or OPENAI_MODEL environment variable must be set for On-Premise AI Agent execution.") log.Printf("[ERROR] %v", err) - execution.Status = "ABORTED" - execution.Results = append(execution.Results, ActionResult{ - Status: "ABORTED", - Result: fmt.Sprintf(`{"success": false, "reason": "%s"}`, err.Error()), - Action: startNode, - }) - go SetWorkflowExecution(ctx, execution, true) - return startNode, err + return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "missing_onprem_ai_config", err.Error()) } } @@ -7445,8 +7441,7 @@ func HandleAiAgentExecutionStart(execution WorkflowExecution, startNode Action, relevantDecisions = append(relevantDecisions, mappedResult.Decisions[i]) } - log.Printf("[INFO][%s] AI_AGENT: org=%s decisions_total=%d failures=%d successes=%d last_index=%d", - execution.ExecutionId, execution.Workflow.OrgId, len(mappedResult.Decisions), failureCount, successCount, lastFinishedIndex) + log.Printf("[INFO][%s] AI_AGENT: org=%s decisions_total=%d failures=%d successes=%d last_index=%d", execution.ExecutionId, execution.Workflow.OrgId, len(mappedResult.Decisions), failureCount, successCount, lastFinishedIndex) marshalledDecisions, err = json.Marshal(relevantDecisions) if err != nil { @@ -7636,6 +7631,11 @@ func HandleAiAgentExecutionStart(execution WorkflowExecution, startNode Action, decidedApps += lowername + ", " } + + // Let's inject http. + if !strings.Contains(decidedApps, "http") { + decidedApps += "http, " + } } if len(decidedApps) > 0 { @@ -7798,15 +7798,7 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( if len(userMessage) == 0 { log.Printf("[ERROR][%s] AI Agent: No user message/input found for action %s", execution.ExecutionId, startNode.ID) - execution.Results = append(execution.Results, ActionResult{ - Status: "ABORTED", - Result: fmt.Sprintf(`{"success": false, "reason": "Failed to start AI Agent (3): No input provided."}`), - Action: startNode, - }) - go SetWorkflowExecution(ctx, execution, true) - - return startNode, errors.New("No user message/input found for AI Agent start") - + return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "no_user_message", "No user message/input found for AI Agent start") } // Track who initiated this agent (for audit trail) @@ -7816,7 +7808,14 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( } if !createNextActions { - log.Printf("[INFO] AI_AGENT_START: execution_id=%s org=%s user=%s input_length=%d", execution.ExecutionId, execution.Workflow.OrgId, initiatedBy, len(userMessage)) + caller, _ := ctx.Value("caller").(string) + traceID, _ := ctx.Value("trace_id").(string) + if strings.TrimSpace(caller) == "" { + log.Printf("ERROR[%s] AI agent: No caller function info provided for AI agent, aborting the request ...", execution.ExecutionId) + return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "no_caller_info", "No caller function info provided for AI Agent start") + } + + log.Printf("[INFO][%s] AI_AGENT_START: org=%s workflow=%s user=%s caller=%s trace-id=%s input_length=%d", execution.ExecutionId, execution.Workflow.OrgId, execution.WorkflowId, initiatedBy, caller, traceID, len(userMessage)) } // Set model based on environment @@ -7920,32 +7919,14 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( if err != nil { log.Printf("[ERROR][%s] AI Agent: Failed marshalling input for action %s: %s", execution.ExecutionId, startNode.ID, err) - - execution.Status = "ABORTED" - execution.Results = append(execution.Results, ActionResult{ - Status: "ABORTED", - Result: fmt.Sprintf(`{"success": false, "reason": "Failed to start AI Agent (4): %s"}`, strings.Replace(err.Error(), `"`, `\"`, -1)), - Action: startNode, - }) - go SetWorkflowExecution(ctx, execution, true) - - return startNode, err + return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "marshal_request_body_failed", fmt.Sprintf("Failed to start AI Agent (4): %s", err.Error())) } //go executeSpecificCloudApp(ctx, execution.ExecutionId, execution.Authorization, urls, startNode) if !runOpenaiRequest { - + log.Printf("[ERROR] AI Agent: Unhandled Singul BODY for OpenAI agent (first request): %s. AI APPNAME (can't be empty): %#v", string(initialAgentRequestBody), appname) - - execution.Status = "ABORTED" - execution.Results = append(execution.Results, ActionResult{ - Status: "ABORTED", - Result: fmt.Sprintf(`{"success": false, "reason": "Failed to start AI Agent (5): Failed initial AI request. Contact support@shuffler.io if this persists."}`), - Action: startNode, - }) - go SetWorkflowExecution(ctx, execution, true) - - return startNode, errors.New("Unhandled Singul BODY for OpenAI agent (first request)") + return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "unsupported_app_not_openai", "Failed to start AI Agent (5): Failed initial AI request. Contact support@shuffler.io if this persists.") } if debug { @@ -7990,16 +7971,7 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( marshalledAction, err := json.Marshal(aiNode) if err != nil { log.Printf("[ERROR][%s] AI Agent: Failed marshalling action for AI Agent (first agent request): %s", execution.ExecutionId, err) - - execution.Status = "ABORTED" - execution.Results = append(execution.Results, ActionResult{ - Status: "ABORTED", - Result: fmt.Sprintf(`{"success": false, "reason": "Failed to start AI Agent (6): %s"}`, strings.Replace(err.Error(), `"`, `\"`, -1)), - Action: startNode, - }) - go SetWorkflowExecution(ctx, execution, true) - - return startNode, err + return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "marshal_ai_action_failed", fmt.Sprintf("Failed to start AI Agent (6): %s", err.Error())) } // Self-request starts here! @@ -8051,7 +8023,7 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( ) if err != nil { - log.Printf("[ERROR][%s] AI Agent: Failed creating request during LLM setup: %s", execution.ExecutionId, err) + log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: Failed creating request during LLM setup: %s", execution.ExecutionId, err) return abortAgentExecution(ctx, execution, startNode, oldAgentOutput, "llm_request_build_failed", fmt.Sprintf("Failed to start AI Agent (7): %s", err.Error())) } @@ -8138,20 +8110,12 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( err = json.Unmarshal([]byte(resultMapping.Result), &outputMap) if err != nil { log.Printf("[ERROR][%s] AI Agent: Failed unmarshalling response from sending request for stream during SKIPPED user input: %s. Body: %s", execution.ExecutionId, err, string(resultMapping.Result)) - - execution.Status = "ABORTED" - execution.Results = append(execution.Results, ActionResult{ - Status: "ABORTED", - Result: fmt.Sprintf(`{"success": false, "reason": "Failed to start AI Agent (1): %s"}`, strings.Replace(err.Error(), `"`, `\"`, -1)), - Action: startNode, - }) - go SetWorkflowExecution(ctx, execution, true) - - return startNode, err + return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "llm_response_unmarshal_failed", fmt.Sprintf("Failed to start AI Agent (1): %s", err.Error())) } if outputMap.Status != 200 { log.Printf("[ERROR][%s] AI Agent: Failed to run AI agent with status code %d", execution.ExecutionId, outputMap.Status) + // Don't log AI_AGENT_LLM_FAILURE here yet - wait to see if we can parse the error details below //return startNode, errors.New(fmt.Sprintf("Failed to run AI agent with status code %d", outputMap.Status)) } @@ -8163,20 +8127,14 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( log.Printf("[ERROR][%s] AI Agent: Failed to convert body to MAP in AI Agent response. Raw response: %s", execution.ExecutionId, string(resultMapping.Result)) choicesString = fmt.Sprintf("LLM Response Error: %s", string(resultMapping.Result)) + + // Log LLM failure for body parsing error + log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: org=%s status_code=%d error_type=body_parse_error raw_response=%s", execution.ExecutionId, execution.Workflow.OrgId, outputMap.Status, string(resultMapping.Result)) } else { bodyString, err = json.Marshal(bodyMap) if err != nil { log.Printf("[ERROR] AI Agent: Failed marshalling body to string in AI Agent response: %s", err) - - execution.Status = "ABORTED" - execution.Results = append(execution.Results, ActionResult{ - Status: "ABORTED", - Result: fmt.Sprintf(`{"success": false, "reason": "Failed to start AI Agent (3): %s"}`, strings.Replace(err.Error(), `"`, `\"`, -1)), - Action: startNode, - }) - go SetWorkflowExecution(ctx, execution, true) - - return startNode, err + return abortAgentExecution(ctx, execution, startNode, AgentOutput{}, "llm_body_marshal_failed", fmt.Sprintf("Failed to start AI Agent (3): %s", err.Error())) } } @@ -8197,12 +8155,18 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( newOutput := openai.ErrorResponse{} err = json.Unmarshal(bodyString, &newOutput) if err == nil && len(newOutput.Error.Message) > 0 { - choicesString = fmt.Sprintf("LLM Error: %s", newOutput.Error.Message) + // choicesString = fmt.Sprintf("LLM Error: %s", newOutput.Error.Message) - resultMapping.Status = "FAILURE" + // resultMapping.Status = "FAILURE" + // LLM returned a proper error (401 invalid key, 429 rate limit, 500 server error, etc.) + log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: org=%s status_code=%d error_type=%s error_message=%s", execution.ExecutionId, execution.Workflow.OrgId, outputMap.Status, newOutput.Error.Type, newOutput.Error.Message) + return abortAgentExecution(ctx, execution, startNode, oldAgentOutput, "llm_http_error", fmt.Sprintf("LLM error (HTTP %d %s): %s", outputMap.Status, newOutput.Error.Type, newOutput.Error.Message)) } else { log.Printf("[ERROR][%s] AI Agent: No choices, nor error found in AI agent response. Status: %d. Raw: %s", execution.ExecutionId, outputMap.Status, bodyString) resultMapping.Status = "FAILURE" + + // Log LLM failure for unknown error format + log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: org=%s status_code=%d error_type=unknown_format raw_response=%s", execution.ExecutionId, execution.Workflow.OrgId, outputMap.Status, string(bodyString)) } } else { choicesString = openaiOutput.Choices[0].Message.Content @@ -8268,19 +8232,69 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( errorMessage := "" err = json.Unmarshal([]byte(decisionString), &mappedDecisions) if err != nil { - log.Printf("[ERROR][%s] AI Agent: Failed unmarshalling decisions in AI Agent response: %s", execution.ExecutionId, err) + log.Printf("[ERROR][%s] AI Agent: Failed unmarshalling decisions: %s", execution.ExecutionId, err) + + recovered := false + if outputMap.Status >= 200 && outputMap.Status < 300 { + fixedString := strings.Replace(decisionString, `\"`, `"`, -1) + if json.Unmarshal([]byte(fixedString), &mappedDecisions) == nil { + log.Printf("[INFO][%s] AI Agent: Recovered by fixing escaped quotes", execution.ExecutionId) + recovered = true + } + + if !recovered { + balanced := balanceJSONLikeString(decisionString) + if json.Unmarshal([]byte(balanced), &mappedDecisions) == nil { + log.Printf("[INFO][%s] AI Agent: Recovered by balancing JSON", execution.ExecutionId) + recovered = true + } + } + } - if len(mappedDecisions) == 0 { - decisionString = strings.Replace(decisionString, `\"`, `"`, -1) + if !recovered { + isErrorResponse := outputMap.Status < 200 || outputMap.Status >= 300 + reason := "LLM returned unexpected format" + if isErrorResponse { + reason = fmt.Sprintf("LLM error (HTTP %d)", outputMap.Status) + } - err = json.Unmarshal([]byte(decisionString), &mappedDecisions) - if err != nil { - log.Printf("[ERROR][%s] AI Agent: Failed unmarshalling decisions in AI Agent response (2): %s. String: %s", execution.ExecutionId, err, decisionString) - resultMapping.Status = "FAILURE" + log.Printf("[INFO][%s] AI Agent: Wrapping response as finish decision - %s", execution.ExecutionId, reason) - // Updating the OUTPUT in some way to help the user a bit. - errorMessage = fmt.Sprintf("The output from the LLM had no decisions. See the raw decisions tring for the response. Contact support@shuffler.io if you think this is wrong.") + // Wrap content as valid JSON output + var outputValue string + var rawContent interface{} + if json.Unmarshal([]byte(decisionString), &rawContent) == nil { + wrapped, _ := json.Marshal(map[string]interface{}{"reply": rawContent}) + outputValue = string(wrapped) + } else { + wrapped, _ := json.Marshal(map[string]interface{}{"reply": decisionString}) + outputValue = string(wrapped) } + + b := make([]byte, 6) + rand.Read(b) + + mappedDecisions = []AgentDecision{{ + I: lastFinishedIndex, + Category: "finish", + Action: "finish", + Tool: "core", + Confidence: 1.0, + Runs: "1", + ApprovalRequired: false, + Reason: reason, + Fields: []Valuereplace{{ + Key: "output", + Value: outputValue, + }}, + RunDetails: AgentDecisionRunDetails{ + Id: base64.RawURLEncoding.EncodeToString(b), + Status: "", + StartedAt: 0, + CompletedAt: 0, + }, + }} + resultMapping.Status = "" } } @@ -8541,11 +8555,14 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( question = mappedDecision.Fields[0].Value } + // Escape single quotes to prevent quote injection + safeQuestion := strings.ReplaceAll(question, "'", "\\'") + err = CreateOrgNotification( ctx, - fmt.Sprintf("Agent - input required: '%s'", question), - fmt.Sprintf("Input required during agent run."), - fmt.Sprintf("/forms/%s?authorization=%s&reference_execution=%s&source_node=%s&decision_id=%s&backend_url=%s", execution.WorkflowId, execution.Authorization, execution.ExecutionId, startNode.ID, mappedDecision.RunDetails.Id, backendUrl), + fmt.Sprintf("Agent - input required: '%s'", safeQuestion), + fmt.Sprintf("Input required during agent run."), + fmt.Sprintf("/forms/%s?authorization=%s&reference_execution=%s&source_node=%s&decision_id=%s&backend_url=%s", execution.WorkflowId, url.QueryEscape(execution.Authorization), execution.ExecutionId, startNode.ID, mappedDecision.RunDetails.Id, url.QueryEscape(backendUrl)), execution.ExecutionOrg, false, "LOW", @@ -8632,6 +8649,44 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( if !decisionActionRan { log.Printf("[ERROR][%s] AI Agent: No decision action was run. Marking the agent as FAILURE.", execution.ExecutionId) + + // Properly mark the agent as failed + agentOutput.Status = "FAILURE" + agentOutput.CompletedAt = time.Now().Unix() + + sentFailure := false + // Update the result status - finds the existing node entry in execution.Results + for resultIndex, result := range execution.Results { + if result.Action.ID != startNode.ID { + continue + } + + execution.Results[resultIndex].Status = "FAILURE" + execution.Results[resultIndex].CompletedAt = agentOutput.CompletedAt + + marshalledAgentOutput, err := json.Marshal(agentOutput) + if err != nil { + log.Printf("[ERROR] AI Agent: Failed marshalling agent output for FAILURE: %s", err) + } else { + execution.Results[resultIndex].Result = string(marshalledAgentOutput) + resultMapping.Result = string(marshalledAgentOutput) + } + + log.Printf("[DEBUG][%s] About to call sendAgentActionSelfRequest for FAILURE on agent action %s", execution.ExecutionId, startNode.ID) + go sendAgentActionSelfRequest("FAILURE", execution, execution.Results[resultIndex]) + sentFailure = true + break + } + + if !sentFailure { + log.Printf("[WARNING][%s] AI Agent: No result entry found in execution.Results, using resultMapping fallback", execution.ExecutionId) + fallbackOutput, merr := json.Marshal(agentOutput) + if merr == nil { + resultMapping.Status = "FAILURE" + resultMapping.Result = string(fallbackOutput) + } + go sendAgentActionSelfRequest("FAILURE", execution, resultMapping) + } } marshalledAgentOutput, err := json.Marshal(agentOutput) @@ -8674,9 +8729,9 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( //log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s status=%s duration=%ds decisions=%d", execution.ExecutionId, agentOutput.Status, time.Now().Unix()-agentOutput.StartedAt, len(agentOutput.Decisions)) - if agentOutput.Status == "FINISHED" && agentOutput.CompletedAt > 0 && execution.Status == "EXECUTING" { - duration := agentOutput.CompletedAt - agentOutput.StartedAt - log.Printf("[INFO][%s] AI_AGENT_COMPLETE: org=%s duration=%ds decisions=%d llm_calls=%d total_tokens=%d status=SUCCESS", execution.ExecutionId, execution.Workflow.OrgId, duration, len(agentOutput.Decisions), agentOutput.LLMCallCount, agentOutput.TotalTokens) + if agentOutput.Status == "FINISHED" && agentOutput.CompletedAt > 0 && execution.Status != "ABORTED" && execution.Status != "FAILURE" { + + foundResult := false for resultIndex, result := range execution.Results { if result.Action.ID != startNode.ID { continue @@ -8686,12 +8741,26 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( execution.Results[resultIndex].CompletedAt = agentOutput.CompletedAt log.Printf("[DEBUG][%s] About to call sendAgentActionSelfRequest for agent action %s", execution.ExecutionId, startNode.ID) go sendAgentActionSelfRequest("SUCCESS", execution, execution.Results[resultIndex]) + foundResult = true break } + + if !foundResult { + // duration := int64(0) + // if agentOutput.StartedAt > 0 && agentOutput.CompletedAt > 0 { + // duration = agentOutput.CompletedAt - agentOutput.StartedAt + // } else if agentOutput.StartedAt > 0 { + // duration = time.Now().Unix() - agentOutput.StartedAt + // } + + // log.Printf("[INFO] AI_AGENT_FINISH: execution_id=%s org=%s status=SUCCESS duration=%ds decisions=%d llm_calls=%d tokens_used=%d", execution.ExecutionId, execution.Workflow.OrgId, duration, len(agentOutput.Decisions), agentOutput.LLMCallCount, agentOutput.TotalTokens) + } } } else { - log.Printf("[ERROR] AI Agent: No result found in AI agent response. Status: %d. Body: %s", newresp.StatusCode, string(body)) + // LLM returned an empty result body — this is a failure + log.Printf("[ERROR][%s] AI_AGENT_LLM_FAILURE: Empty result body from LLM response (status %d). Aborting agent.", execution.ExecutionId, newresp.StatusCode) + return abortAgentExecution(ctx, execution, startNode, oldAgentOutput, "empty_llm_result", fmt.Sprintf("LLM returned empty response body with HTTP status %d", newresp.StatusCode)) } if memorizationEngine == "shuffle_db" { @@ -8727,6 +8796,10 @@ You are the Action Execution Agent for the Shuffle platform. You receive tools ( } } + if createNextActions { + return startNode, nil + } + // 1. Map the response back newResult, err := json.Marshal(resultMapping) if err != nil { @@ -8904,7 +8977,6 @@ func GenerateSingulWorkflows(resp http.ResponseWriter, request *http.Request) { return } - // Maps everything AROUND the usecase err = HandleSingulWorkflowEnablement(ctx, *workflow, user, categoryAction) if err != nil { diff --git a/codegen.go b/codegen.go index 857d0431..6565d50a 100755 --- a/codegen.go +++ b/codegen.go @@ -10,6 +10,7 @@ import ( "crypto/md5" "crypto/sha256" "encoding/json" + "math/rand" "errors" "fmt" "io" @@ -4790,7 +4791,7 @@ func handleDatastoreAutomationWebhook(ctx context.Context, marshalledBody []byte return nil } -func handleRunDatastoreAutomation(cacheData CacheKeyData, automation DatastoreAutomation) error { +func handleRunDatastoreAutomation(ctx context.Context, cacheData CacheKeyData, automation DatastoreAutomation) error { if len(cacheData.OrgId) == 0 { return errors.New("CacheKeyData.OrgId is required for handleRunAutomation") } @@ -4798,8 +4799,16 @@ func handleRunDatastoreAutomation(cacheData CacheKeyData, automation DatastoreAu if len(cacheData.Category) == 0 { return errors.New("CacheKeyData.Category is required for handleRunAutomation") } + + if ctx == nil { + ctx = context.Background() + } + + traceID, _ := ctx.Value("trace_id").(string) + if traceID == "" { + traceID = fmt.Sprintf("ROOT-%d-%d", time.Now().UnixNano(), rand.Intn(1000)) + } - ctx := context.Background() parsedName := strings.ReplaceAll(strings.ToLower(automation.Name), " ", "_") // These are ran pre-execution @@ -4872,7 +4881,7 @@ func handleRunDatastoreAutomation(cacheData CacheKeyData, automation DatastoreAu // november 2025 after adding graphic system to datastore } else if parsedName == "run_ai_agent" { - log.Printf("[DEBUG] Handling run_ai_agent automation for key %s in category %s", cacheData.Key, cacheData.Category) + log.Printf("[INFO] AI agent: Handling run_ai_agent automation for key %s in category %s with trace-id %s", cacheData.Key, cacheData.Category, traceID) if len(foundApikey) == 0 { log.Printf("[ERROR] No admin user with API key found for org %s", cacheData.OrgId) return errors.New("No admin user with API key found") @@ -4959,8 +4968,10 @@ func handleRunDatastoreAutomation(cacheData CacheKeyData, automation DatastoreAu return err } - req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", foundApikey)) - req.Header.Add("Org-Id", cacheData.OrgId) + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", foundApikey)) + req.Header.Set("Org-Id", cacheData.OrgId) + req.Header.Set("X-Internal-Caller", "handleRunDatastoreAutomation") + req.Header.Set("X-Trace-ID", traceID) resp, err := client.Do(req) if err != nil { diff --git a/db-connector.go b/db-connector.go index 6fa106d9..b197e348 100755 --- a/db-connector.go +++ b/db-connector.go @@ -2030,8 +2030,11 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor mappedOutput.Decisions[decisionIndex].RunDetails.Status = "FAILURE" mappedOutput.Decisions[decisionIndex].RunDetails.CompletedAt = time.Now().Unix() mappedOutput.Decisions[decisionIndex].RunDetails.RawResponse += "\n[ERROR] Decision marked as FAILURE due to 5 minute timeout." - } + // Count this as finished + failed so recovery triggers in the same Fixexecution run + // finishedDecisions = append(finishedDecisions, decision.RunDetails.Id) + // failedFound = true + } } else { if decision.RunDetails.CompletedAt > 0 { if debug { @@ -2134,6 +2137,7 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor }() } else { log.Printf("[INFO][%s] All decisions finished for agent action %s - but no finish action found, marking as WAITING.", workflowExecution.ExecutionId, action.ID) + //log.Printf("[INFO][%s] All decisions finished for agent action %s - but no finish action found. Re-invoking agent to finalize (failedFound: %t).", workflowExecution.ExecutionId, action.ID, failedFound) mappedOutput.Status = "RUNNING" mappedOutput.CompletedAt = 0 @@ -2142,11 +2146,19 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor if workflowExecution.Status == "FINISHED" { workflowExecution.Status = "EXECUTING" } - // To ensure the execution is actually updated + // Re-invoke the agent so the LLM can see the failure and produce a proper "finish" decision. + + // capturedExec := workflowExecution + // capturedAction := action go func() { time.Sleep(1 * time.Second) sendAgentActionSelfRequest("WAITING", workflowExecution, workflowExecution.Results[resultIndex]) + // time.Sleep(2 * time.Second) + // _, err := HandleAiAgentExecutionStart(capturedExec, capturedAction, true) + // if err != nil { + // log.Printf("[ERROR][%s] Failed re-invoking agent after decisions completed for action %s: %s", capturedExec.ExecutionId, capturedAction.ID, err) + // } }() } } else if (result.Status == "" || result.Status == "WAITING") && mappedOutput.Status == "FINISHED" { @@ -2354,7 +2366,12 @@ func Fixexecution(ctx context.Context, workflowExecution WorkflowExecution) (Wor for _, result := range workflowExecution.Results { if result.Status == "FAILURE" || result.Status == "ABORTED" { - log.Printf("[DEBUG][%s] Setting execution to aborted because of result %s (%s) with status '%s'. Should update execution parent if it exists (not implemented).", workflowExecution.ExecutionId, result.Action.Name, result.Action.ID, result.Status) + // Only log once per execution to avoid spam + cacheKey := fmt.Sprintf("abort_log_%s", workflowExecution.ExecutionId) + if _, err := GetCache(ctx, cacheKey); err != nil { + log.Printf("[DEBUG][%s] Setting execution to aborted because of result %s (%s) with status '%s'. Should update execution parent if it exists (not implemented).", workflowExecution.ExecutionId, result.Action.Name, result.Action.ID, result.Status) + SetCache(ctx, cacheKey, []byte("logged"), 5) // 5 minute TTL + } workflowExecution.Status = "ABORTED" dbsave = true @@ -14548,7 +14565,7 @@ func SetDatastoreKeyBulk(ctx context.Context, allKeys []CacheKeyData) ([]Datasto // Run the automation // This should make a notification if it fails go func(cacheData CacheKeyData, automation DatastoreAutomation) { - err := handleRunDatastoreAutomation(cacheData, automation) + err := handleRunDatastoreAutomation(ctx, cacheData, automation) if err != nil { log.Printf("[ERROR] Failed running automation %s for cache key %s: %s", automation.Name, cacheData.Key, err) @@ -15057,7 +15074,7 @@ func SetDatastoreKey(ctx context.Context, cacheData CacheKeyData) error { // Run the automation // This should make a notification if it fails go func(cacheData CacheKeyData, automation DatastoreAutomation) { - err := handleRunDatastoreAutomation(cacheData, automation) + err := handleRunDatastoreAutomation(ctx, cacheData, automation) if err != nil { log.Printf("[ERROR] Failed running automation %s for cache key %s: %s", automation.Name, cacheData.Key, err) diff --git a/shared.go b/shared.go index 99753710..5a7e6aeb 100644 --- a/shared.go +++ b/shared.go @@ -190,7 +190,7 @@ func HandleCors(resp http.ResponseWriter, request *http.Request) bool { } //resp.Header().Set("Access-Control-Allow-Origin", "http://localhost:8000") - resp.Header().Set("Access-Control-Allow-Headers", "Content-Type, Accept, X-Requested-With, remember-me, Org-Id, Org, Authorization, X-Debug-Url") + resp.Header().Set("Access-Control-Allow-Headers", "Content-Type, Accept, X-Requested-With, remember-me, Org-Id, Org, Authorization, X-Debug-Url, X-Internal-Caller, X-Trace-ID") resp.Header().Set("Access-Control-Allow-Methods", "POST, GET, PUT, DELETE, PATCH") resp.Header().Set("Access-Control-Allow-Credentials", "true") @@ -17338,6 +17338,7 @@ func sendAgentActionSelfRequest(status string, workflowExecution WorkflowExecuti // Check if the request has been sent already (just in case) cacheKey := fmt.Sprintf("agent_request_%s_%s_%s", workflowExecution.ExecutionId, actionResult.Action.ID, status) + // cacheKey := fmt.Sprintf("agent_request_%s_%s", workflowExecution.ExecutionId, actionResult.Action.ID) _, err := GetCache(ctx, cacheKey) if err == nil { if debug { @@ -17346,7 +17347,12 @@ func sendAgentActionSelfRequest(status string, workflowExecution WorkflowExecuti return nil } else { - SetCache(ctx, cacheKey, []byte("1"), 1) + var cacheTTL int32 = 1 // 1 minute for non-terminal statuses + if status == "SUCCESS" || status == "FINISHED" || status == "FAILURE" || status == "ABORTED" { + cacheTTL = 1440 // 24 hours — execution outcome is permanent + } + SetCache(ctx, cacheKey, []byte("1"), cacheTTL) + // SetCache(ctx, cacheKey, []byte(status), cacheTTL) } if status == "SUCCESS" || status == "FINISHED" || status == "FAILURE" || status == "ABORTED" { @@ -17527,6 +17533,8 @@ func handleAgentDecisionStreamResult(workflowExecution WorkflowExecution, action log.Printf("[DEBUG][%s] Got decision ID '%s' for agent '%s'. Ref: %s", workflowExecution.ExecutionId, decisionId, actionResult.Action.ID, actionResult.Status) } + ctx := context.Background() + foundActionResultIndex := -1 for actionIndex, result := range workflowExecution.Results { if result.Action.ID == actionResult.Action.ID { @@ -17750,7 +17758,6 @@ func handleAgentDecisionStreamResult(workflowExecution WorkflowExecution, action log.Printf("[DEBUG] Getting agent chat history: %s", requestKey) } - ctx := context.Background() agentRequestMemory, err := GetDatastoreKey(ctx, requestKey, "agent_requests") if err != nil { log.Printf("[ERROR][%s] Failed to find request memory for updates", actionResult.ExecutionId) @@ -17767,7 +17774,15 @@ func handleAgentDecisionStreamResult(workflowExecution WorkflowExecution, action // Handle agent decisionmaking. Use the same log.Printf("[INFO][%s] With the agent being finished, we are asking it whether it would like to do anything else", workflowExecution.ExecutionId) - returnAction, err := HandleAiAgentExecutionStart(workflowExecution, actionResult.Action, true) + var originalAction Action + if foundActionResultIndex >= 0 && foundActionResultIndex < len(workflowExecution.Results) { + originalAction = workflowExecution.Results[foundActionResultIndex].Action + } else { + // Fallback in case of an issue + originalAction = actionResult.Action + } + + returnAction, err := HandleAiAgentExecutionStart(ctx, workflowExecution, originalAction, true) if err != nil { log.Printf("[ERROR][%s] Failed handling agent execution start: %s", workflowExecution.ExecutionId, err) } @@ -21692,6 +21707,9 @@ func CheckHookAuth(request *http.Request, auth string) error { func PrepareSingleAction(ctx context.Context, user User, appId string, body []byte, runValidationAction bool, decision ...string) (WorkflowExecution, error) { workflowExecution := WorkflowExecution{} + if ctx == nil { + ctx = context.Background() + } var action Action err := json.Unmarshal(body, &action) @@ -21756,13 +21774,17 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by } } - action, err := HandleAiAgentExecutionStart(exec, action, false) + caller, _ := ctx.Value("caller").(string) + traceID, _ := ctx.Value("trace_id").(string) + + action, err := HandleAiAgentExecutionStart(ctx, exec, action, false) if err != nil { log.Printf("[ERROR] Failed to handle AI agent execution start: %s", err) } exec.Workflow.Actions[0] = action newExec, err := GetWorkflowExecution(ctx, exec.ExecutionId) + log.Printf("[INFO][%s] AI Agent: %s Started standalone for org %s, execution id %s, workflow %s, with trace-id %s", exec.ExecutionId, caller, user.ActiveOrg.Id, exec.ExecutionId, exec.WorkflowId, traceID) if err != nil { log.Printf("[ERROR] Failed to get workflow execution after starting agent: %s", err) } else { @@ -22086,6 +22108,14 @@ func PrepareSingleAction(ctx context.Context, user User, appId string, body []by workflowExecution.OrgId = user.ActiveOrg.Id } + // formattedAppName := strings.ReplaceAll(strings.ToLower(app.Name), " ", "_") + + // isInternalShuffleApp := false + // switch formattedAppName { + // case "shuffle_datastore", "shuffle_org_management", "shuffle_app_management", "shuffle_workflow_management": + // isInternalShuffleApp = true + // } + if len(app.Name) == 0 && len(action.AppName) > 0 { app.Name = action.AppName }