diff --git a/health.go b/health.go index d5e414d1..dd863295 100644 --- a/health.go +++ b/health.go @@ -4,10 +4,11 @@ import ( "bytes" "context" "encoding/base64" + "html" + // "encoding/json" "errors" "fmt" - "github.com/goccy/go-json" "io" "io/ioutil" "log" @@ -17,6 +18,9 @@ import ( "strconv" "strings" "time" + "math" + + "github.com/goccy/go-json" "github.com/Masterminds/semver" "github.com/frikky/kin-openapi/openapi3" @@ -724,7 +728,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { // Use channel for getting RunOpsWorkflow function results workflowHealthChannel := make(chan WorkflowHealth) - errorChannel := make(chan error, 6) + errorChannel := make(chan error, 7) go func() { if debug { log.Printf("[DEBUG] Running workflowHealthChannel goroutine") @@ -741,6 +745,24 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { errorChannel <- err }() + // Agent health check + agentHealthChannel := make(chan AgentHealth) + go func() { + if debug { + log.Printf("[DEBUG] Health check Running agentHealthChannel goroutine") + } + + agentHealth, err := RunOpsAgent(apiKey, orgId, "") + if err != nil { + if project.Environment == "cloud" { + log.Printf("[ERROR] Health check failed for the agent: %s", err) + } + } + + agentHealthChannel <- agentHealth + errorChannel <- err + }() + if project.Environment != "cloud" { opensearchHealthChannel := make(chan opensearchapi.ClusterHealthResp) go func() { @@ -815,6 +837,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { platformHealth.FileOps = <-fileHealthChannel platformHealth.Apps = <-openapiAppHealthChannel platformHealth.Workflows = <-workflowHealthChannel + platformHealth.Agents = <-agentHealthChannel err = <-errorChannel if project.Environment != "cloud" { @@ -840,7 +863,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { } } - if platformHealth.Workflows.Create == true && platformHealth.Workflows.Delete == true && platformHealth.Workflows.Run == true && platformHealth.Workflows.RunFinished == true && platformHealth.Workflows.RunStatus == "FINISHED" { + if platformHealth.Workflows.Create == true && platformHealth.Workflows.Delete == true && platformHealth.Workflows.Run == true && platformHealth.Workflows.RunFinished == true && platformHealth.Workflows.RunStatus == "FINISHED" && platformHealth.Agents.Run == true && platformHealth.Agents.RunFinished == true && platformHealth.Agents.RunStatus == "FINISHED" && platformHealth.Agents.LLMCallSuccess == true { log.Printf("[DEBUG] Platform health check successful! All necessary values are true.") platformHealth.Success = true } @@ -854,6 +877,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) { HealthCheck.Datastore = platformHealth.Datastore HealthCheck.FileOps = platformHealth.FileOps HealthCheck.Apps = platformHealth.Apps + HealthCheck.Agents = platformHealth.Agents // Add to database err = SetPlatformHealth(ctx, HealthCheck) if err != nil { @@ -4068,14 +4092,14 @@ func HandleStopExecutions(resp http.ResponseWriter, request *http.Request) { if err != nil { log.Printf("[WARNING] Failed to get environment %s for org %s", fileId, user.ActiveOrg.Id) resp.WriteHeader(401) - resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed to get environment %s"}`, fileId))) + resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed to get environment %s"}`, html.EscapeString(fileId)))) return } if env.OrgId != user.ActiveOrg.Id { log.Printf("[WARNING] %s (%s) doesn't have permission to stop all executions for environment %s", user.Username, user.Id, fileId) resp.WriteHeader(401) - resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "You don't have permission to stop environment executions for ID %s"}`, fileId))) + resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "You don't have permission to stop environment executions for ID %s"}`, html.EscapeString(fileId)))) return } @@ -4184,3 +4208,255 @@ func HandleStopExecutions(resp http.ResponseWriter, request *http.Request) { resp.WriteHeader(200) resp.Write([]byte(fmt.Sprintf(`{"success": true, "reason": "Successfully deleted and stopped %d executions"}`, total))) } + +func resolveAgentBaseUrl(cloudRunUrl string) string { + if project.Environment == "onprem" { + return "http://localhost:5001" + } + + baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL") + if len(baseUrl) > 0 { + return baseUrl + } + + if len(cloudRunUrl) > 0 { + return cloudRunUrl + } + + log.Printf("[DEBUG] Base url not set. Setting to default") + return "https://shuffler.io" +} + +type agentStartResult struct { + ExecutionId string + Authorization string +} + +// startAgentExecution POSTs to /api/v1/agent and returns the execution ID and +// authorization token needed to poll for results. +func startAgentExecution(baseUrl, apiKey, orgId string) (agentStartResult, error) { + url := baseUrl + "/api/v1/agent" + + requestBody := map[string]interface{}{ + "params": map[string]interface{}{ + "input": map[string]string{ + "text": "Get the current weather of new york using https://wttr.in/New+York api and just output the current weather temperature without any commentary, just output the number in celcius and dont include the decimals, use action as custom_action, tool as api and category as singul keep the url as it and not needed for any other hallucinated params or headers, just include the url as is and the method name which is GET.", + }, + }, + } + + requestBodyJson, err := json.Marshal(requestBody) + if err != nil { + return agentStartResult{}, fmt.Errorf("failed marshalling agent start request: %w", err) + } + + req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBodyJson)) + if err != nil { + return agentStartResult{}, fmt.Errorf("failed creating agent start HTTP request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+apiKey) + req.Header.Set("Org-Id", orgId) + + client := &http.Client{Timeout: 60 * time.Second} + resp, err := client.Do(req) + if err != nil { + return agentStartResult{}, fmt.Errorf("failed sending agent start request: %w", err) + } + defer resp.Body.Close() + + respBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + return agentStartResult{}, fmt.Errorf("failed reading agent start response body: %w", err) + } + + if resp.StatusCode != 200 { + log.Printf("[ERROR] Agent start response body: %s", respBody) + return agentStartResult{}, fmt.Errorf("agent start failed with status %d", resp.StatusCode) + } + + parsed := Parsed{} + + if err := json.Unmarshal(respBody, &parsed); err != nil { + return agentStartResult{}, fmt.Errorf("failed parsing agent start response: %w", err) + } + + if !parsed.Success || len(parsed.ExecutionId) == 0 { + return agentStartResult{}, errors.New("agent start returned success=false or empty execution ID") + } + + log.Printf("[DEBUG] Health check for Agent execution started with ID: %s", parsed.ExecutionId) + return agentStartResult{ + ExecutionId: parsed.ExecutionId, + Authorization: parsed.Authorization, + }, nil +} + +// fetchAgentExecutionResults POSTs to /api/v1/streams/results and returns the +// current WorkflowExecution snapshot for the given execution. +func fetchAgentExecutionResults(baseUrl, apiKey, orgId, executionId, authorization string) (WorkflowExecution, error) { + url := baseUrl + "/api/v1/streams/results" + + reqBody := map[string]string{ + "execution_id": executionId, + "authorization": authorization, + } + reqBodyJson, err := json.Marshal(reqBody) + if err != nil { + return WorkflowExecution{}, fmt.Errorf("failed marshalling results request: %w", err) + } + + req, err := http.NewRequest("POST", url, bytes.NewBuffer(reqBodyJson)) + if err != nil { + return WorkflowExecution{}, fmt.Errorf("failed creating results HTTP request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", "Bearer "+apiKey) + req.Header.Set("Org-Id", orgId) + + client := &http.Client{} + resp, err := client.Do(req) + if err != nil { + return WorkflowExecution{}, fmt.Errorf("failed sending results request: %w", err) + } + defer resp.Body.Close() + + respBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + return WorkflowExecution{}, fmt.Errorf("failed reading results response body: %w", err) + } + + if resp.StatusCode != 200 { + return WorkflowExecution{}, fmt.Errorf("results endpoint returned status %d", resp.StatusCode) + } + + var execution WorkflowExecution + if err := json.Unmarshal(respBody, &execution); err != nil { + return WorkflowExecution{}, fmt.Errorf("failed parsing execution results: %w", err) + } + + return execution, nil +} + + +func extractAgentOutputFromResults(execution WorkflowExecution) (AgentOutput, bool) { + for _, result := range execution.Results { + var agentOutput AgentOutput + if err := json.Unmarshal([]byte(result.Result), &agentOutput); err == nil && len(agentOutput.Decisions) > 0 { + return agentOutput, true + } + } + return AgentOutput{}, false +} + +// RunOpsAgent runs a health check for AI agents by directly calling the agent +// execution endpoint, polling for results, and verifying the agent completes +func RunOpsAgent(apiKey string, orgId string, cloudRunUrl string) (AgentHealth, error) { + agentHealth := AgentHealth{ + Create: true, // Not creating workflow, but keeping for compatibility + BackendVersion: os.Getenv("SHUFFLE_BACKEND_VERSION"), + Delete: true, + } + + baseUrl := resolveAgentBaseUrl(cloudRunUrl) + + startResult, err := startAgentExecution(baseUrl, apiKey, orgId) + if err != nil { + log.Printf("[ERROR] Health check failed for startAgentExecution: %s", err) + return agentHealth, err + } + + agentHealth.Run = true + agentHealth.ExecutionId = startResult.ExecutionId + startTime := time.Now() + timeout := time.After(5 * time.Minute) + + for !agentHealth.RunFinished { + execution, err := fetchAgentExecutionResults(baseUrl, apiKey, orgId, startResult.ExecutionId, startResult.Authorization) + if err != nil { + log.Printf("[ERROR] Health check failed in fetchAgentExecutionResults: %s", err) + return agentHealth, err + } + + // Update run status whenever the execution is no longer EXECUTING. + if execution.Status != "EXECUTING" && execution.Status != "WAITING" { + log.Printf("[DEBUG] Health check for Agent execution status: %s (ID: %s)", execution.Status, agentHealth.ExecutionId) + agentHealth.RunFinished = true + agentHealth.RunStatus = execution.Status + + // Extract agent-level output (decisions, LLM success) from action results. + if agentOutput, found := extractAgentOutputFromResults(execution); found { + agentHealth.AgentStatus = agentOutput.Status + agentTemp, err := strconv.Atoi(strings.TrimSpace(agentOutput.Output)) + if err != nil { + log.Printf("[ERROR] Agent Health check failed due to atoi conversion failure: %s", err) + agentHealth.LLMCallSuccess = false + } + + realTemp, apiErr := getRealTempC() + if apiErr != nil { + log.Printf("[ERROR] Agent Health check failed due to weather api call failure: %s", apiErr) + agentHealth.LLMCallSuccess = false + } else { + diff := int(math.Abs(float64(agentTemp - realTemp))) + if diff > 1 { + agentHealth.LLMCallSuccess = false + log.Printf("[ERROR] Agent Health check - LLM Call was not successful. Expected: %d, Got: %d, Diff: %d", realTemp, agentTemp, diff) + } else { + agentHealth.LLMCallSuccess = true + log.Printf("[INFO] Agent Health check - LLM Call was successful. Expected: %d, Got: %d, Diff: %d", realTemp, agentTemp, diff) + } + } + + agentHealth.AgentDecisionCount = len(agentOutput.Decisions) + log.Printf("[DEBUG] Health check for Agent made %d decisions, LLM call successful", len(agentOutput.Decisions)) + } + } + + if execution.Status == "FINISHED" { + log.Printf("[DEBUG] Health check for Agent execution finished successfully") + agentHealth.ExecutionTook = time.Since(startTime).Seconds() + } + + // Check whether the overall health-check deadline has been hit. + select { + case <-timeout: + log.Printf("[ERROR] Timeout reached for agent health check") + agentHealth.RunStatus = "ABANDONED_BY_HEALTHCHECK" + return agentHealth, errors.New("timeout reached for agent health check") + default: + } + + if !agentHealth.RunFinished { + time.Sleep(2 * time.Second) + } + } + + return agentHealth, nil +} + +func getRealTempC() (int, error) { + client := &http.Client{Timeout: 5 * time.Second} + resp, err := client.Get("https://wttr.in/New+York?format=j1") + if err != nil { + return 0, err + } + defer resp.Body.Close() + + var data WttrResponse + + if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { + return 0, err + } + + if len(data.CurrentCondition) > 0 { + // Parse as float first to handle "9.0" or "9.5" without crashing + val, err := strconv.ParseFloat(data.CurrentCondition[0].TempC, 64) + if err != nil { + return 0, err + } + return int(math.Round(val)), nil + } + + return 0, fmt.Errorf("no data") +} \ No newline at end of file diff --git a/structs.go b/structs.go index 5068dd95..2a25623d 100755 --- a/structs.go +++ b/structs.go @@ -4392,6 +4392,7 @@ type HealthCheck struct { Updated int64 `json:"updated"` Apps AppHealth `json:"apps"` Workflows WorkflowHealth `json:"workflows"` + Agents AgentHealth `json:"agents"` //PythonApps AppHealth `json:"python_apps"` Datastore DatastoreHealth `json:"datastore"` FileOps FileHealth `json:"fileops"` @@ -4402,6 +4403,7 @@ type HealthCheckDB struct { Success bool `json:"success"` Updated int64 `json:"updated"` Workflows WorkflowHealth `json:"workflows"` + Agents AgentHealth `json:"agents"` Opensearch opensearchapi.ClusterHealthResp `json:"opnsearch"` Datastore DatastoreHealth `json:"datastore"` FileOps FileHealth `json:"fileops"` @@ -5127,3 +5129,31 @@ type AppBuildRequest struct { Id string `datastore:"id"` Image string `datastore:"image"` } + +type AgentHealth struct { + Create bool `json:"create"` + Run bool `json:"run"` + BackendVersion string `json:"backend_version"` + RunFinished bool `json:"run_finished"` + ExecutionTook float64 `json:"execution_took"` + RunStatus string `json:"run_status"` + Delete bool `json:"delete"` + ExecutionId string `json:"execution_id"` + WorkflowId string `json:"workflow_id"` + AgentNodeId string `json:"agent_node_id"` + AgentStatus string `json:"agent_status"` // Status of the agent itself (RUNNING, FINISHED, ABORTED) + AgentDecisionCount int `json:"agent_decision_count"` // Number of decisions made by the agent + LLMCallSuccess bool `json:"llm_call_success"` // Whether the LLM call succeeded +} + +type WttrResponse struct { + CurrentCondition []struct { + TempC string `json:"temp_C"` + } `json:"current_condition"` +} + +type Parsed struct { + Success bool `json:"success"` + ExecutionId string `json:"execution_id"` + Authorization string `json:"authorization"` +} \ No newline at end of file