From 3d638627242eb5fd767ec349ce2cbd1b972f8c4c Mon Sep 17 00:00:00 2001 From: Santiago Botto Date: Fri, 6 Mar 2026 11:27:45 -0300 Subject: [PATCH 1/6] fix: recovery check for rate-limited endpoints --- internal/server/rate_limit_scheduler.go | 59 ++++++++++++++++++++----- 1 file changed, 48 insertions(+), 11 deletions(-) diff --git a/internal/server/rate_limit_scheduler.go b/internal/server/rate_limit_scheduler.go index 6a0b9a1..30c2bc6 100644 --- a/internal/server/rate_limit_scheduler.go +++ b/internal/server/rate_limit_scheduler.go @@ -1,7 +1,10 @@ package server import ( + "bytes" "context" + "encoding/json" + "io" "net/http" "sync" "time" @@ -13,13 +16,22 @@ import ( "github.com/rs/zerolog/log" ) +// rpcResponse represents a JSON-RPC response for recovery checks +type rpcResponse struct { + Result any `json:"result"` + Error *struct { + Code int `json:"code"` + Message string `json:"message"` + } `json:"error"` +} + // RateLimitScheduler manages recovery checks for rate-limited endpoints type RateLimitScheduler struct { config *config.Config valkeyClient store.ValkeyClientIface // Active monitoring tracking - activeMonitoring map[string]bool // key: chain:endpoint + activeMonitoring map[string]bool // key: chain:endpoint cancelFuncs map[string]context.CancelFunc // Cancel functions for active goroutines mu sync.RWMutex shutdownCtx context.Context @@ -322,8 +334,9 @@ func (rls *RateLimitScheduler) checkEndpointHealth(endpoint config.Endpoint) boo Timeout: 10 * time.Second, } - // Create a simple POST request (similar to what the proxy would do) - req, err := http.NewRequest("POST", endpoint.HTTPURL, http.NoBody) + // Create a proper JSON-RPC request (same as regular health checks) + payload := []byte(`{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}`) + req, err := http.NewRequest("POST", endpoint.HTTPURL, bytes.NewBuffer(payload)) if err != nil { log.Debug().Err(err).Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Failed to create recovery check request") return false @@ -339,19 +352,43 @@ func (rls *RateLimitScheduler) checkEndpointHealth(endpoint config.Endpoint) boo } defer resp.Body.Close() - // Consider 2xx responses as success, 429 as still rate limited, others as failure - if resp.StatusCode >= 200 && resp.StatusCode < 300 { - log.Debug().Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Int("status", resp.StatusCode).Msg("Recovery check successful") - return true + // Check HTTP status code first + if resp.StatusCode == 429 { + log.Debug().Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Recovery check still rate limited (HTTP 429)") + return false } - if resp.StatusCode == 429 { - log.Debug().Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Recovery check still rate limited") - } else { + // Successful response to the eth_blockNumber call should always be 200 + if resp.StatusCode != 200 { log.Debug().Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Int("status", resp.StatusCode).Msg("Recovery check failed with error status") + return false + } + + // Parse the JSON-RPC response to check for rate limit errors in the body + body, err := io.ReadAll(resp.Body) + if err != nil { + log.Debug().Err(err).Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Recovery check failed to read response body") + return false + } + + var rpcResp rpcResponse + if err := json.Unmarshal(body, &rpcResp); err != nil { + log.Debug().Err(err).Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Recovery check failed to parse JSON-RPC response") + return false + } + + // Check for JSON-RPC errors (rate limits could come as JSON-RPC errors with 200 HTTP status) + if rpcResp.Error != nil { + log.Debug(). + Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)). + Int("code", rpcResp.Error.Code). + Str("message", rpcResp.Error.Message). + Msg("Recovery check received JSON-RPC error, still rate limited") + return false } - return false + log.Debug().Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Recovery check successful") + return true } // shouldResetBackoff determines if the backoff cycle should be reset From db46138f567bdb4e3a898101a90b79c3fa0aed78 Mon Sep 17 00:00:00 2001 From: Santiago Botto Date: Fri, 6 Mar 2026 12:34:33 -0300 Subject: [PATCH 2/6] Fix tests --- internal/server/rate_limit_scheduler_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/internal/server/rate_limit_scheduler_test.go b/internal/server/rate_limit_scheduler_test.go index 74a3fdc..f8d7d00 100644 --- a/internal/server/rate_limit_scheduler_test.go +++ b/internal/server/rate_limit_scheduler_test.go @@ -87,9 +87,11 @@ func TestStartMonitoringDoesNotDuplicate(t *testing.T) { } func TestCheckEndpointHealthSuccess(t *testing.T) { - // Create a test HTTP server that returns 200 + // Create a test HTTP server that returns 200 with a valid JSON-RPC response server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"jsonrpc":"2.0","result":"0x1234567","id":1}`)) })) defer server.Close() @@ -240,9 +242,11 @@ func TestPerformRecoveryCheckStopsAfterMaxRetries(t *testing.T) { } func TestPerformRecoveryCheckRecovery(t *testing.T) { - // Create a test HTTP server that returns 200 + // Create a test HTTP server that returns 200 with a valid JSON-RPC response server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"jsonrpc":"2.0","result":"0x1234567","id":1}`)) })) defer server.Close() From dfa166f05c05039c11ab7da35cb10934723b4c24 Mon Sep 17 00:00:00 2001 From: Santiago Botto Date: Fri, 6 Mar 2026 14:41:07 -0300 Subject: [PATCH 3/6] Consolidate duplicated rpcResponse type --- internal/health/checker.go | 20 ++++++++++---------- internal/server/rate_limit_scheduler.go | 11 ++--------- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/internal/health/checker.go b/internal/health/checker.go index b608e25..8c24a6a 100644 --- a/internal/health/checker.go +++ b/internal/health/checker.go @@ -25,8 +25,8 @@ import ( // ErrMethodNotFound indicates that the RPC method is not supported by the endpoint var ErrMethodNotFound = errors.New("method not found") -// rpcResponse represents a JSON-RPC 2.0 response -type rpcResponse struct { +// RpcResponse represents a JSON-RPC response +type RpcResponse struct { Result any `json:"result"` Error *struct { Code int `json:"code"` @@ -35,7 +35,7 @@ type rpcResponse struct { } // checkRPCError checks for errors in an RPC response and handles method-not-found errors specially -func checkRPCError(response *rpcResponse, method, protocol, chain, endpointID, url string) error { +func checkRPCError(response *RpcResponse, method, protocol, chain, endpointID, url string) error { if response.Error == nil { return nil } @@ -79,12 +79,12 @@ func containsMethodNotFound(message string) bool { // Checker represents a health checker type Checker struct { - config *config.Config - concurrency int + config *config.Config + concurrency int ephemeralChecksEnabled bool - healthCheckSyncStatus bool - interval time.Duration - valkeyClient store.ValkeyClientIface + healthCheckSyncStatus bool + interval time.Duration + valkeyClient store.ValkeyClientIface ephemeralChecks map[string]*ephemeralState // key: chain|endpointID|protocol ephemeralChecksInterval time.Duration @@ -446,7 +446,7 @@ func (c *Checker) makeRPCCall(ctx context.Context, url, method, chain, endpointI } // Define the structure of the response - var response rpcResponse + var response RpcResponse // Parse the response if err := json.Unmarshal(body, &response); err != nil { @@ -502,7 +502,7 @@ func (c *Checker) makeWSRPCCall(url, method, chain, endpointID string) (any, err wsConn.SetReadDeadline(time.Now().Add(5 * time.Second)) // Read the response - var response rpcResponse + var response RpcResponse if err := wsConn.ReadJSON(&response); err != nil { log.Error(). diff --git a/internal/server/rate_limit_scheduler.go b/internal/server/rate_limit_scheduler.go index 30c2bc6..281a7bd 100644 --- a/internal/server/rate_limit_scheduler.go +++ b/internal/server/rate_limit_scheduler.go @@ -10,20 +10,13 @@ import ( "time" "aetherlay/internal/config" + "aetherlay/internal/health" "aetherlay/internal/helpers" "aetherlay/internal/store" "github.com/rs/zerolog/log" ) -// rpcResponse represents a JSON-RPC response for recovery checks -type rpcResponse struct { - Result any `json:"result"` - Error *struct { - Code int `json:"code"` - Message string `json:"message"` - } `json:"error"` -} // RateLimitScheduler manages recovery checks for rate-limited endpoints type RateLimitScheduler struct { @@ -371,7 +364,7 @@ func (rls *RateLimitScheduler) checkEndpointHealth(endpoint config.Endpoint) boo return false } - var rpcResp rpcResponse + var rpcResp health.RpcResponse if err := json.Unmarshal(body, &rpcResp); err != nil { log.Debug().Err(err).Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Recovery check failed to parse JSON-RPC response") return false From 06ee7785de7e6379c3af812c8dfbce730402a003 Mon Sep 17 00:00:00 2001 From: Santiago Botto Date: Fri, 6 Mar 2026 14:42:51 -0300 Subject: [PATCH 4/6] Improve log message --- internal/server/rate_limit_scheduler.go | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/internal/server/rate_limit_scheduler.go b/internal/server/rate_limit_scheduler.go index 281a7bd..f28c70d 100644 --- a/internal/server/rate_limit_scheduler.go +++ b/internal/server/rate_limit_scheduler.go @@ -17,7 +17,6 @@ import ( "github.com/rs/zerolog/log" ) - // RateLimitScheduler manages recovery checks for rate-limited endpoints type RateLimitScheduler struct { config *config.Config @@ -372,11 +371,15 @@ func (rls *RateLimitScheduler) checkEndpointHealth(endpoint config.Endpoint) boo // Check for JSON-RPC errors (rate limits could come as JSON-RPC errors with 200 HTTP status) if rpcResp.Error != nil { - log.Debug(). + evt := log.Debug(). Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)). Int("code", rpcResp.Error.Code). - Str("message", rpcResp.Error.Message). - Msg("Recovery check received JSON-RPC error, still rate limited") + Str("message", rpcResp.Error.Message) + if isJSONRPCRateLimitCode(rpcResp.Error.Code) { + evt.Msg("Recovery check received JSON-RPC rate-limit error") + } else { + evt.Msg("Recovery check received JSON-RPC error") + } return false } @@ -384,6 +387,12 @@ func (rls *RateLimitScheduler) checkEndpointHealth(endpoint config.Endpoint) boo return true } +// isJSONRPCRateLimitCode reports whether a JSON-RPC error code indicates rate limiting. +// -32005 is the standard "Request limit exceeded" code used by Infura, Alchemy, and others. +func isJSONRPCRateLimitCode(code int) bool { + return code == -32005 +} + // shouldResetBackoff determines if the backoff cycle should be reset func (rls *RateLimitScheduler) shouldResetBackoff(state *store.RateLimitState, config config.RateLimitRecovery) bool { if state.FirstRateLimited.IsZero() { From 42971e0bb473ba444e108b471cf899c59a37d2c1 Mon Sep 17 00:00:00 2001 From: Santiago Botto Date: Mon, 9 Mar 2026 15:22:06 -0300 Subject: [PATCH 5/6] Pass context to checkEndpointHealth for recovery checks --- internal/server/rate_limit_scheduler.go | 6 +++--- internal/server/rate_limit_scheduler_test.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/server/rate_limit_scheduler.go b/internal/server/rate_limit_scheduler.go index f28c70d..7fef49d 100644 --- a/internal/server/rate_limit_scheduler.go +++ b/internal/server/rate_limit_scheduler.go @@ -247,7 +247,7 @@ func (rls *RateLimitScheduler) performRecoveryCheck(ctx context.Context, chain, } // Perform the health check - success := rls.checkEndpointHealth(endpoint) + success := rls.checkEndpointHealth(ctx, endpoint) if success { state.ConsecutiveSuccess++ @@ -316,7 +316,7 @@ func (rls *RateLimitScheduler) performRecoveryCheck(ctx context.Context, chain, } // checkEndpointHealth performs a simple HTTP health check on the endpoint -func (rls *RateLimitScheduler) checkEndpointHealth(endpoint config.Endpoint) bool { +func (rls *RateLimitScheduler) checkEndpointHealth(ctx context.Context, endpoint config.Endpoint) bool { if endpoint.HTTPURL == "" { return false } @@ -328,7 +328,7 @@ func (rls *RateLimitScheduler) checkEndpointHealth(endpoint config.Endpoint) boo // Create a proper JSON-RPC request (same as regular health checks) payload := []byte(`{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}`) - req, err := http.NewRequest("POST", endpoint.HTTPURL, bytes.NewBuffer(payload)) + req, err := http.NewRequestWithContext(ctx, "POST", endpoint.HTTPURL, bytes.NewBuffer(payload)) if err != nil { log.Debug().Err(err).Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Failed to create recovery check request") return false diff --git a/internal/server/rate_limit_scheduler_test.go b/internal/server/rate_limit_scheduler_test.go index f8d7d00..4a0f6db 100644 --- a/internal/server/rate_limit_scheduler_test.go +++ b/internal/server/rate_limit_scheduler_test.go @@ -112,7 +112,7 @@ func TestCheckEndpointHealthSuccess(t *testing.T) { scheduler := NewRateLimitScheduler(cfg, mockValkey) endpoint := cfg.Endpoints["ethereum"]["test-endpoint"] - healthy := scheduler.checkEndpointHealth(endpoint) + healthy := scheduler.checkEndpointHealth(context.Background(), endpoint) if !healthy { t.Error("Expected endpoint to be healthy") @@ -143,7 +143,7 @@ func TestCheckEndpointHealthRateLimited(t *testing.T) { scheduler := NewRateLimitScheduler(cfg, mockValkey) endpoint := cfg.Endpoints["ethereum"]["test-endpoint"] - healthy := scheduler.checkEndpointHealth(endpoint) + healthy := scheduler.checkEndpointHealth(context.Background(), endpoint) if healthy { t.Error("Expected endpoint to be unhealthy due to rate limiting") From 7eff0578e41bc428fdb58dadb707062c08931cd7 Mon Sep 17 00:00:00 2001 From: Santiago Botto Date: Mon, 9 Mar 2026 15:23:06 -0300 Subject: [PATCH 6/6] Validate the eth_blockNumber result on checkEndpointHealth() --- internal/health/checker.go | 6 +++--- internal/server/rate_limit_scheduler.go | 7 +++++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/internal/health/checker.go b/internal/health/checker.go index 8c24a6a..95709af 100644 --- a/internal/health/checker.go +++ b/internal/health/checker.go @@ -523,8 +523,8 @@ func (c *Checker) makeWSRPCCall(url, method, chain, endpointID string) (any, err return response.Result, nil } -// parseBlockNumber parses a hex string block number and validates it's > 0 -func parseBlockNumber(blockResult any) (blockNumber int64, isHealthy bool) { +// ParseBlockNumber parses a hex string block number and validates it's > 0 +func ParseBlockNumber(blockResult any) (blockNumber int64, isHealthy bool) { blockStr, ok := blockResult.(string) if !ok { return 0, false @@ -558,7 +558,7 @@ func parseSyncStatus(syncResult any) bool { // checkHealthParams checks all health parameters and logs detailed info func (c *Checker) checkHealthParams(chain, endpointID, url, protocol string, syncResult, blockResult any) (healthy bool, blockNumber int64) { // Parse results - blockNumber, blockHealthy := parseBlockNumber(blockResult) + blockNumber, blockHealthy := ParseBlockNumber(blockResult) // Block number check is always required healthy = blockHealthy diff --git a/internal/server/rate_limit_scheduler.go b/internal/server/rate_limit_scheduler.go index 7fef49d..2a51532 100644 --- a/internal/server/rate_limit_scheduler.go +++ b/internal/server/rate_limit_scheduler.go @@ -383,6 +383,13 @@ func (rls *RateLimitScheduler) checkEndpointHealth(ctx context.Context, endpoint return false } + // Validate the eth_blockNumber result + _, isHealthy := health.ParseBlockNumber(rpcResp.Result) + if !isHealthy { + log.Debug().Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Recovery check received null or unparseable block number") + return false + } + log.Debug().Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Recovery check successful") return true }