Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
286 changes: 281 additions & 5 deletions health.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ import (
"bytes"
"context"
"encoding/base64"
"html"

// "encoding/json"
"errors"
"fmt"
"github.com/goccy/go-json"
"io"
"io/ioutil"
"log"
Expand All @@ -17,6 +18,9 @@ import (
"strconv"
"strings"
"time"
"math"

"github.com/goccy/go-json"

"github.com/Masterminds/semver"
"github.com/frikky/kin-openapi/openapi3"
Expand Down Expand Up @@ -724,7 +728,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {

// Use channel for getting RunOpsWorkflow function results
workflowHealthChannel := make(chan WorkflowHealth)
errorChannel := make(chan error, 6)
errorChannel := make(chan error, 7)
go func() {
if debug {
log.Printf("[DEBUG] Running workflowHealthChannel goroutine")
Expand All @@ -741,6 +745,24 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
errorChannel <- err
}()

// Agent health check
agentHealthChannel := make(chan AgentHealth)
go func() {
if debug {
log.Printf("[DEBUG] Health check Running agentHealthChannel goroutine")
}

agentHealth, err := RunOpsAgent(apiKey, orgId, "")
if err != nil {
if project.Environment == "cloud" {
log.Printf("[ERROR] Health check failed for the agent: %s", err)
}
}

agentHealthChannel <- agentHealth
errorChannel <- err
}()

if project.Environment != "cloud" {
opensearchHealthChannel := make(chan opensearchapi.ClusterHealthResp)
go func() {
Expand Down Expand Up @@ -815,6 +837,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
platformHealth.FileOps = <-fileHealthChannel
platformHealth.Apps = <-openapiAppHealthChannel
platformHealth.Workflows = <-workflowHealthChannel
platformHealth.Agents = <-agentHealthChannel
err = <-errorChannel

if project.Environment != "cloud" {
Expand All @@ -840,7 +863,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
}
}

if platformHealth.Workflows.Create == true && platformHealth.Workflows.Delete == true && platformHealth.Workflows.Run == true && platformHealth.Workflows.RunFinished == true && platformHealth.Workflows.RunStatus == "FINISHED" {
if platformHealth.Workflows.Create == true && platformHealth.Workflows.Delete == true && platformHealth.Workflows.Run == true && platformHealth.Workflows.RunFinished == true && platformHealth.Workflows.RunStatus == "FINISHED" && platformHealth.Agents.Run == true && platformHealth.Agents.RunFinished == true && platformHealth.Agents.RunStatus == "FINISHED" && platformHealth.Agents.LLMCallSuccess == true {
log.Printf("[DEBUG] Platform health check successful! All necessary values are true.")
platformHealth.Success = true
}
Expand All @@ -854,6 +877,7 @@ func RunOpsHealthCheck(resp http.ResponseWriter, request *http.Request) {
HealthCheck.Datastore = platformHealth.Datastore
HealthCheck.FileOps = platformHealth.FileOps
HealthCheck.Apps = platformHealth.Apps
HealthCheck.Agents = platformHealth.Agents
// Add to database
err = SetPlatformHealth(ctx, HealthCheck)
if err != nil {
Expand Down Expand Up @@ -4068,14 +4092,14 @@ func HandleStopExecutions(resp http.ResponseWriter, request *http.Request) {
if err != nil {
log.Printf("[WARNING] Failed to get environment %s for org %s", fileId, user.ActiveOrg.Id)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed to get environment %s"}`, fileId)))
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "Failed to get environment %s"}`, html.EscapeString(fileId))))
return
}

if env.OrgId != user.ActiveOrg.Id {
log.Printf("[WARNING] %s (%s) doesn't have permission to stop all executions for environment %s", user.Username, user.Id, fileId)
resp.WriteHeader(401)
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "You don't have permission to stop environment executions for ID %s"}`, fileId)))
resp.Write([]byte(fmt.Sprintf(`{"success": false, "reason": "You don't have permission to stop environment executions for ID %s"}`, html.EscapeString(fileId))))
return
}

Expand Down Expand Up @@ -4184,3 +4208,255 @@ func HandleStopExecutions(resp http.ResponseWriter, request *http.Request) {
resp.WriteHeader(200)
resp.Write([]byte(fmt.Sprintf(`{"success": true, "reason": "Successfully deleted and stopped %d executions"}`, total)))
}

func resolveAgentBaseUrl(cloudRunUrl string) string {
if project.Environment == "onprem" {
return "http://localhost:5001"
}

baseUrl := os.Getenv("SHUFFLE_CLOUDRUN_URL")
if len(baseUrl) > 0 {
return baseUrl
}

if len(cloudRunUrl) > 0 {
return cloudRunUrl
}

log.Printf("[DEBUG] Base url not set. Setting to default")
return "https://shuffler.io"
}

type agentStartResult struct {
ExecutionId string
Authorization string
}

// startAgentExecution POSTs to /api/v1/agent and returns the execution ID and
// authorization token needed to poll for results.
func startAgentExecution(baseUrl, apiKey, orgId string) (agentStartResult, error) {
url := baseUrl + "/api/v1/agent"

requestBody := map[string]interface{}{
"params": map[string]interface{}{
"input": map[string]string{
"text": "Get the current weather of new york using https://wttr.in/New+York api and just output the current weather temperature without any commentary, just output the number in celcius and dont include the decimals, use action as custom_action, tool as api and category as singul keep the url as it and not needed for any other hallucinated params or headers, just include the url as is and the method name which is GET.",
},
},
}

requestBodyJson, err := json.Marshal(requestBody)
if err != nil {
return agentStartResult{}, fmt.Errorf("failed marshalling agent start request: %w", err)
}

req, err := http.NewRequest("POST", url, bytes.NewBuffer(requestBodyJson))
if err != nil {
return agentStartResult{}, fmt.Errorf("failed creating agent start HTTP request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+apiKey)
req.Header.Set("Org-Id", orgId)

client := &http.Client{Timeout: 60 * time.Second}
resp, err := client.Do(req)
if err != nil {
return agentStartResult{}, fmt.Errorf("failed sending agent start request: %w", err)
}
defer resp.Body.Close()

respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return agentStartResult{}, fmt.Errorf("failed reading agent start response body: %w", err)
}

if resp.StatusCode != 200 {
log.Printf("[ERROR] Agent start response body: %s", respBody)
return agentStartResult{}, fmt.Errorf("agent start failed with status %d", resp.StatusCode)
}

parsed := Parsed{}

if err := json.Unmarshal(respBody, &parsed); err != nil {
return agentStartResult{}, fmt.Errorf("failed parsing agent start response: %w", err)
}

if !parsed.Success || len(parsed.ExecutionId) == 0 {
return agentStartResult{}, errors.New("agent start returned success=false or empty execution ID")
}

log.Printf("[DEBUG] Health check for Agent execution started with ID: %s", parsed.ExecutionId)
return agentStartResult{
ExecutionId: parsed.ExecutionId,
Authorization: parsed.Authorization,
}, nil
}

// fetchAgentExecutionResults POSTs to /api/v1/streams/results and returns the
// current WorkflowExecution snapshot for the given execution.
func fetchAgentExecutionResults(baseUrl, apiKey, orgId, executionId, authorization string) (WorkflowExecution, error) {
url := baseUrl + "/api/v1/streams/results"

reqBody := map[string]string{
"execution_id": executionId,
"authorization": authorization,
}
reqBodyJson, err := json.Marshal(reqBody)
if err != nil {
return WorkflowExecution{}, fmt.Errorf("failed marshalling results request: %w", err)
}

req, err := http.NewRequest("POST", url, bytes.NewBuffer(reqBodyJson))
if err != nil {
return WorkflowExecution{}, fmt.Errorf("failed creating results HTTP request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+apiKey)
req.Header.Set("Org-Id", orgId)

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return WorkflowExecution{}, fmt.Errorf("failed sending results request: %w", err)
}
defer resp.Body.Close()

respBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return WorkflowExecution{}, fmt.Errorf("failed reading results response body: %w", err)
}

if resp.StatusCode != 200 {
return WorkflowExecution{}, fmt.Errorf("results endpoint returned status %d", resp.StatusCode)
}

var execution WorkflowExecution
if err := json.Unmarshal(respBody, &execution); err != nil {
return WorkflowExecution{}, fmt.Errorf("failed parsing execution results: %w", err)
}

return execution, nil
}


func extractAgentOutputFromResults(execution WorkflowExecution) (AgentOutput, bool) {
for _, result := range execution.Results {
var agentOutput AgentOutput
if err := json.Unmarshal([]byte(result.Result), &agentOutput); err == nil && len(agentOutput.Decisions) > 0 {
return agentOutput, true
}
}
return AgentOutput{}, false
}

// RunOpsAgent runs a health check for AI agents by directly calling the agent
// execution endpoint, polling for results, and verifying the agent completes
func RunOpsAgent(apiKey string, orgId string, cloudRunUrl string) (AgentHealth, error) {
agentHealth := AgentHealth{
Create: true, // Not creating workflow, but keeping for compatibility
BackendVersion: os.Getenv("SHUFFLE_BACKEND_VERSION"),
Delete: true,
}

baseUrl := resolveAgentBaseUrl(cloudRunUrl)

startResult, err := startAgentExecution(baseUrl, apiKey, orgId)
if err != nil {
log.Printf("[ERROR] Health check failed for startAgentExecution: %s", err)
return agentHealth, err
}

agentHealth.Run = true
agentHealth.ExecutionId = startResult.ExecutionId
startTime := time.Now()
timeout := time.After(5 * time.Minute)

for !agentHealth.RunFinished {
execution, err := fetchAgentExecutionResults(baseUrl, apiKey, orgId, startResult.ExecutionId, startResult.Authorization)
if err != nil {
log.Printf("[ERROR] Health check failed in fetchAgentExecutionResults: %s", err)
return agentHealth, err
}

// Update run status whenever the execution is no longer EXECUTING.
if execution.Status != "EXECUTING" && execution.Status != "WAITING" {
log.Printf("[DEBUG] Health check for Agent execution status: %s (ID: %s)", execution.Status, agentHealth.ExecutionId)
agentHealth.RunFinished = true
agentHealth.RunStatus = execution.Status

// Extract agent-level output (decisions, LLM success) from action results.
if agentOutput, found := extractAgentOutputFromResults(execution); found {
agentHealth.AgentStatus = agentOutput.Status
agentTemp, err := strconv.Atoi(strings.TrimSpace(agentOutput.Output))
if err != nil {
log.Printf("[ERROR] Agent Health check failed due to atoi conversion failure: %s", err)
agentHealth.LLMCallSuccess = false
}

realTemp, apiErr := getRealTempC()
if apiErr != nil {
log.Printf("[ERROR] Agent Health check failed due to weather api call failure: %s", apiErr)
agentHealth.LLMCallSuccess = false
} else {
diff := int(math.Abs(float64(agentTemp - realTemp)))
if diff > 1 {
agentHealth.LLMCallSuccess = false
log.Printf("[ERROR] Agent Health check - LLM Call was not successful. Expected: %d, Got: %d, Diff: %d", realTemp, agentTemp, diff)
} else {
agentHealth.LLMCallSuccess = true
log.Printf("[INFO] Agent Health check - LLM Call was successful. Expected: %d, Got: %d, Diff: %d", realTemp, agentTemp, diff)
}
}

agentHealth.AgentDecisionCount = len(agentOutput.Decisions)
log.Printf("[DEBUG] Health check for Agent made %d decisions, LLM call successful", len(agentOutput.Decisions))
}
}

if execution.Status == "FINISHED" {
log.Printf("[DEBUG] Health check for Agent execution finished successfully")
agentHealth.ExecutionTook = time.Since(startTime).Seconds()
}

// Check whether the overall health-check deadline has been hit.
select {
case <-timeout:
log.Printf("[ERROR] Timeout reached for agent health check")
agentHealth.RunStatus = "ABANDONED_BY_HEALTHCHECK"
return agentHealth, errors.New("timeout reached for agent health check")
default:
}

if !agentHealth.RunFinished {
time.Sleep(2 * time.Second)
}
}

return agentHealth, nil
}

func getRealTempC() (int, error) {
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Get("https://wttr.in/New+York?format=j1")
if err != nil {
return 0, err
}
defer resp.Body.Close()

var data WttrResponse

if err := json.NewDecoder(resp.Body).Decode(&data); err != nil {
return 0, err
}

if len(data.CurrentCondition) > 0 {
// Parse as float first to handle "9.0" or "9.5" without crashing
val, err := strconv.ParseFloat(data.CurrentCondition[0].TempC, 64)
if err != nil {
return 0, err
}
return int(math.Round(val)), nil
}

return 0, fmt.Errorf("no data")
}
Loading
Loading