diff --git a/internal/health/checker.go b/internal/health/checker.go index b608e25..95709af 100644 --- a/internal/health/checker.go +++ b/internal/health/checker.go @@ -25,8 +25,8 @@ import ( // ErrMethodNotFound indicates that the RPC method is not supported by the endpoint var ErrMethodNotFound = errors.New("method not found") -// rpcResponse represents a JSON-RPC 2.0 response -type rpcResponse struct { +// RpcResponse represents a JSON-RPC response +type RpcResponse struct { Result any `json:"result"` Error *struct { Code int `json:"code"` @@ -35,7 +35,7 @@ type rpcResponse struct { } // checkRPCError checks for errors in an RPC response and handles method-not-found errors specially -func checkRPCError(response *rpcResponse, method, protocol, chain, endpointID, url string) error { +func checkRPCError(response *RpcResponse, method, protocol, chain, endpointID, url string) error { if response.Error == nil { return nil } @@ -79,12 +79,12 @@ func containsMethodNotFound(message string) bool { // Checker represents a health checker type Checker struct { - config *config.Config - concurrency int + config *config.Config + concurrency int ephemeralChecksEnabled bool - healthCheckSyncStatus bool - interval time.Duration - valkeyClient store.ValkeyClientIface + healthCheckSyncStatus bool + interval time.Duration + valkeyClient store.ValkeyClientIface ephemeralChecks map[string]*ephemeralState // key: chain|endpointID|protocol ephemeralChecksInterval time.Duration @@ -446,7 +446,7 @@ func (c *Checker) makeRPCCall(ctx context.Context, url, method, chain, endpointI } // Define the structure of the response - var response rpcResponse + var response RpcResponse // Parse the response if err := json.Unmarshal(body, &response); err != nil { @@ -502,7 +502,7 @@ func (c *Checker) makeWSRPCCall(url, method, chain, endpointID string) (any, err wsConn.SetReadDeadline(time.Now().Add(5 * time.Second)) // Read the response - var response rpcResponse + var response RpcResponse if err := wsConn.ReadJSON(&response); err != nil { log.Error(). @@ -523,8 +523,8 @@ func (c *Checker) makeWSRPCCall(url, method, chain, endpointID string) (any, err return response.Result, nil } -// parseBlockNumber parses a hex string block number and validates it's > 0 -func parseBlockNumber(blockResult any) (blockNumber int64, isHealthy bool) { +// ParseBlockNumber parses a hex string block number and validates it's > 0 +func ParseBlockNumber(blockResult any) (blockNumber int64, isHealthy bool) { blockStr, ok := blockResult.(string) if !ok { return 0, false @@ -558,7 +558,7 @@ func parseSyncStatus(syncResult any) bool { // checkHealthParams checks all health parameters and logs detailed info func (c *Checker) checkHealthParams(chain, endpointID, url, protocol string, syncResult, blockResult any) (healthy bool, blockNumber int64) { // Parse results - blockNumber, blockHealthy := parseBlockNumber(blockResult) + blockNumber, blockHealthy := ParseBlockNumber(blockResult) // Block number check is always required healthy = blockHealthy diff --git a/internal/server/rate_limit_scheduler.go b/internal/server/rate_limit_scheduler.go index 6a0b9a1..2a51532 100644 --- a/internal/server/rate_limit_scheduler.go +++ b/internal/server/rate_limit_scheduler.go @@ -1,12 +1,16 @@ package server import ( + "bytes" "context" + "encoding/json" + "io" "net/http" "sync" "time" "aetherlay/internal/config" + "aetherlay/internal/health" "aetherlay/internal/helpers" "aetherlay/internal/store" @@ -19,7 +23,7 @@ type RateLimitScheduler struct { valkeyClient store.ValkeyClientIface // Active monitoring tracking - activeMonitoring map[string]bool // key: chain:endpoint + activeMonitoring map[string]bool // key: chain:endpoint cancelFuncs map[string]context.CancelFunc // Cancel functions for active goroutines mu sync.RWMutex shutdownCtx context.Context @@ -243,7 +247,7 @@ func (rls *RateLimitScheduler) performRecoveryCheck(ctx context.Context, chain, } // Perform the health check - success := rls.checkEndpointHealth(endpoint) + success := rls.checkEndpointHealth(ctx, endpoint) if success { state.ConsecutiveSuccess++ @@ -312,7 +316,7 @@ func (rls *RateLimitScheduler) performRecoveryCheck(ctx context.Context, chain, } // checkEndpointHealth performs a simple HTTP health check on the endpoint -func (rls *RateLimitScheduler) checkEndpointHealth(endpoint config.Endpoint) bool { +func (rls *RateLimitScheduler) checkEndpointHealth(ctx context.Context, endpoint config.Endpoint) bool { if endpoint.HTTPURL == "" { return false } @@ -322,8 +326,9 @@ func (rls *RateLimitScheduler) checkEndpointHealth(endpoint config.Endpoint) boo Timeout: 10 * time.Second, } - // Create a simple POST request (similar to what the proxy would do) - req, err := http.NewRequest("POST", endpoint.HTTPURL, http.NoBody) + // Create a proper JSON-RPC request (same as regular health checks) + payload := []byte(`{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}`) + req, err := http.NewRequestWithContext(ctx, "POST", endpoint.HTTPURL, bytes.NewBuffer(payload)) if err != nil { log.Debug().Err(err).Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Failed to create recovery check request") return false @@ -339,19 +344,60 @@ func (rls *RateLimitScheduler) checkEndpointHealth(endpoint config.Endpoint) boo } defer resp.Body.Close() - // Consider 2xx responses as success, 429 as still rate limited, others as failure - if resp.StatusCode >= 200 && resp.StatusCode < 300 { - log.Debug().Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Int("status", resp.StatusCode).Msg("Recovery check successful") - return true + // Check HTTP status code first + if resp.StatusCode == 429 { + log.Debug().Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Recovery check still rate limited (HTTP 429)") + return false } - if resp.StatusCode == 429 { - log.Debug().Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Recovery check still rate limited") - } else { + // Successful response to the eth_blockNumber call should always be 200 + if resp.StatusCode != 200 { log.Debug().Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Int("status", resp.StatusCode).Msg("Recovery check failed with error status") + return false } - return false + // Parse the JSON-RPC response to check for rate limit errors in the body + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Debug().Err(err).Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Recovery check failed to read response body") + return false + } + + var rpcResp health.RpcResponse + if err := json.Unmarshal(body, &rpcResp); err != nil { + log.Debug().Err(err).Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Recovery check failed to parse JSON-RPC response") + return false + } + + // Check for JSON-RPC errors (rate limits could come as JSON-RPC errors with 200 HTTP status) + if rpcResp.Error != nil { + evt := log.Debug(). + Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)). + Int("code", rpcResp.Error.Code). + Str("message", rpcResp.Error.Message) + if isJSONRPCRateLimitCode(rpcResp.Error.Code) { + evt.Msg("Recovery check received JSON-RPC rate-limit error") + } else { + evt.Msg("Recovery check received JSON-RPC error") + } + return false + } + + // Validate the eth_blockNumber result + _, isHealthy := health.ParseBlockNumber(rpcResp.Result) + if !isHealthy { + log.Debug().Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Recovery check received null or unparseable block number") + return false + } + + log.Debug().Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Recovery check successful") + return true +} + +// isJSONRPCRateLimitCode reports whether a JSON-RPC error code indicates rate limiting. +// -32005 is the standard "Request limit exceeded" code used by Infura, Alchemy, and others. +func isJSONRPCRateLimitCode(code int) bool { + return code == -32005 } // shouldResetBackoff determines if the backoff cycle should be reset diff --git a/internal/server/rate_limit_scheduler_test.go b/internal/server/rate_limit_scheduler_test.go index 74a3fdc..4a0f6db 100644 --- a/internal/server/rate_limit_scheduler_test.go +++ b/internal/server/rate_limit_scheduler_test.go @@ -87,9 +87,11 @@ func TestStartMonitoringDoesNotDuplicate(t *testing.T) { } func TestCheckEndpointHealthSuccess(t *testing.T) { - // Create a test HTTP server that returns 200 + // Create a test HTTP server that returns 200 with a valid JSON-RPC response server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"jsonrpc":"2.0","result":"0x1234567","id":1}`)) })) defer server.Close() @@ -110,7 +112,7 @@ func TestCheckEndpointHealthSuccess(t *testing.T) { scheduler := NewRateLimitScheduler(cfg, mockValkey) endpoint := cfg.Endpoints["ethereum"]["test-endpoint"] - healthy := scheduler.checkEndpointHealth(endpoint) + healthy := scheduler.checkEndpointHealth(context.Background(), endpoint) if !healthy { t.Error("Expected endpoint to be healthy") @@ -141,7 +143,7 @@ func TestCheckEndpointHealthRateLimited(t *testing.T) { scheduler := NewRateLimitScheduler(cfg, mockValkey) endpoint := cfg.Endpoints["ethereum"]["test-endpoint"] - healthy := scheduler.checkEndpointHealth(endpoint) + healthy := scheduler.checkEndpointHealth(context.Background(), endpoint) if healthy { t.Error("Expected endpoint to be unhealthy due to rate limiting") @@ -240,9 +242,11 @@ func TestPerformRecoveryCheckStopsAfterMaxRetries(t *testing.T) { } func TestPerformRecoveryCheckRecovery(t *testing.T) { - // Create a test HTTP server that returns 200 + // Create a test HTTP server that returns 200 with a valid JSON-RPC response server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"jsonrpc":"2.0","result":"0x1234567","id":1}`)) })) defer server.Close()