Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions internal/health/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (
// ErrMethodNotFound indicates that the RPC method is not supported by the endpoint
var ErrMethodNotFound = errors.New("method not found")

// rpcResponse represents a JSON-RPC 2.0 response
type rpcResponse struct {
// RpcResponse represents a JSON-RPC response
type RpcResponse struct {
Result any `json:"result"`
Error *struct {
Code int `json:"code"`
Expand All @@ -35,7 +35,7 @@ type rpcResponse struct {
}

// checkRPCError checks for errors in an RPC response and handles method-not-found errors specially
func checkRPCError(response *rpcResponse, method, protocol, chain, endpointID, url string) error {
func checkRPCError(response *RpcResponse, method, protocol, chain, endpointID, url string) error {
if response.Error == nil {
return nil
}
Expand Down Expand Up @@ -79,12 +79,12 @@ func containsMethodNotFound(message string) bool {

// Checker represents a health checker
type Checker struct {
config *config.Config
concurrency int
config *config.Config
concurrency int
ephemeralChecksEnabled bool
healthCheckSyncStatus bool
interval time.Duration
valkeyClient store.ValkeyClientIface
healthCheckSyncStatus bool
interval time.Duration
valkeyClient store.ValkeyClientIface

ephemeralChecks map[string]*ephemeralState // key: chain|endpointID|protocol
ephemeralChecksInterval time.Duration
Expand Down Expand Up @@ -446,7 +446,7 @@ func (c *Checker) makeRPCCall(ctx context.Context, url, method, chain, endpointI
}

// Define the structure of the response
var response rpcResponse
var response RpcResponse

// Parse the response
if err := json.Unmarshal(body, &response); err != nil {
Expand Down Expand Up @@ -502,7 +502,7 @@ func (c *Checker) makeWSRPCCall(url, method, chain, endpointID string) (any, err
wsConn.SetReadDeadline(time.Now().Add(5 * time.Second))

// Read the response
var response rpcResponse
var response RpcResponse

if err := wsConn.ReadJSON(&response); err != nil {
log.Error().
Expand All @@ -523,8 +523,8 @@ func (c *Checker) makeWSRPCCall(url, method, chain, endpointID string) (any, err
return response.Result, nil
}

// parseBlockNumber parses a hex string block number and validates it's > 0
func parseBlockNumber(blockResult any) (blockNumber int64, isHealthy bool) {
// ParseBlockNumber parses a hex string block number and validates it's > 0
func ParseBlockNumber(blockResult any) (blockNumber int64, isHealthy bool) {
blockStr, ok := blockResult.(string)
if !ok {
return 0, false
Expand Down Expand Up @@ -558,7 +558,7 @@ func parseSyncStatus(syncResult any) bool {
// checkHealthParams checks all health parameters and logs detailed info
func (c *Checker) checkHealthParams(chain, endpointID, url, protocol string, syncResult, blockResult any) (healthy bool, blockNumber int64) {
// Parse results
blockNumber, blockHealthy := parseBlockNumber(blockResult)
blockNumber, blockHealthy := ParseBlockNumber(blockResult)

// Block number check is always required
healthy = blockHealthy
Expand Down
72 changes: 59 additions & 13 deletions internal/server/rate_limit_scheduler.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package server

import (
"bytes"
"context"
"encoding/json"
"io"
"net/http"
"sync"
"time"

"aetherlay/internal/config"
"aetherlay/internal/health"
"aetherlay/internal/helpers"
"aetherlay/internal/store"

Expand All @@ -19,7 +23,7 @@ type RateLimitScheduler struct {
valkeyClient store.ValkeyClientIface

// Active monitoring tracking
activeMonitoring map[string]bool // key: chain:endpoint
activeMonitoring map[string]bool // key: chain:endpoint
cancelFuncs map[string]context.CancelFunc // Cancel functions for active goroutines
mu sync.RWMutex
shutdownCtx context.Context
Expand Down Expand Up @@ -243,7 +247,7 @@ func (rls *RateLimitScheduler) performRecoveryCheck(ctx context.Context, chain,
}

// Perform the health check
success := rls.checkEndpointHealth(endpoint)
success := rls.checkEndpointHealth(ctx, endpoint)

if success {
state.ConsecutiveSuccess++
Expand Down Expand Up @@ -312,7 +316,7 @@ func (rls *RateLimitScheduler) performRecoveryCheck(ctx context.Context, chain,
}

// checkEndpointHealth performs a simple HTTP health check on the endpoint
func (rls *RateLimitScheduler) checkEndpointHealth(endpoint config.Endpoint) bool {
func (rls *RateLimitScheduler) checkEndpointHealth(ctx context.Context, endpoint config.Endpoint) bool {
if endpoint.HTTPURL == "" {
return false
}
Expand All @@ -322,8 +326,9 @@ func (rls *RateLimitScheduler) checkEndpointHealth(endpoint config.Endpoint) boo
Timeout: 10 * time.Second,
}

// Create a simple POST request (similar to what the proxy would do)
req, err := http.NewRequest("POST", endpoint.HTTPURL, http.NoBody)
// Create a proper JSON-RPC request (same as regular health checks)
payload := []byte(`{"jsonrpc":"2.0","method":"eth_blockNumber","params":[],"id":1}`)
req, err := http.NewRequestWithContext(ctx, "POST", endpoint.HTTPURL, bytes.NewBuffer(payload))
if err != nil {
log.Debug().Err(err).Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Failed to create recovery check request")
return false
Expand All @@ -339,19 +344,60 @@ func (rls *RateLimitScheduler) checkEndpointHealth(endpoint config.Endpoint) boo
}
defer resp.Body.Close()

// Consider 2xx responses as success, 429 as still rate limited, others as failure
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
log.Debug().Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Int("status", resp.StatusCode).Msg("Recovery check successful")
return true
// Check HTTP status code first
if resp.StatusCode == 429 {
log.Debug().Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Recovery check still rate limited (HTTP 429)")
return false
}

if resp.StatusCode == 429 {
log.Debug().Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Recovery check still rate limited")
} else {
// Successful response to the eth_blockNumber call should always be 200
if resp.StatusCode != 200 {
log.Debug().Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Int("status", resp.StatusCode).Msg("Recovery check failed with error status")
return false
}

return false
// Parse the JSON-RPC response to check for rate limit errors in the body
body, err := io.ReadAll(resp.Body)
if err != nil {
log.Debug().Err(err).Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Recovery check failed to read response body")
return false
}

var rpcResp health.RpcResponse
if err := json.Unmarshal(body, &rpcResp); err != nil {
log.Debug().Err(err).Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Recovery check failed to parse JSON-RPC response")
return false
}

// Check for JSON-RPC errors (rate limits could come as JSON-RPC errors with 200 HTTP status)
if rpcResp.Error != nil {
evt := log.Debug().
Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).
Int("code", rpcResp.Error.Code).
Str("message", rpcResp.Error.Message)
if isJSONRPCRateLimitCode(rpcResp.Error.Code) {
evt.Msg("Recovery check received JSON-RPC rate-limit error")
} else {
evt.Msg("Recovery check received JSON-RPC error")
}
return false
}

// Validate the eth_blockNumber result
_, isHealthy := health.ParseBlockNumber(rpcResp.Result)
if !isHealthy {
log.Debug().Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Recovery check received null or unparseable block number")
return false
}

log.Debug().Str("url", helpers.RedactAPIKey(endpoint.HTTPURL)).Msg("Recovery check successful")
return true
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

// isJSONRPCRateLimitCode reports whether a JSON-RPC error code indicates rate limiting.
// -32005 is the standard "Request limit exceeded" code used by Infura, Alchemy, and others.
func isJSONRPCRateLimitCode(code int) bool {
return code == -32005
}

// shouldResetBackoff determines if the backoff cycle should be reset
Expand Down
12 changes: 8 additions & 4 deletions internal/server/rate_limit_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,11 @@ func TestStartMonitoringDoesNotDuplicate(t *testing.T) {
}

func TestCheckEndpointHealthSuccess(t *testing.T) {
// Create a test HTTP server that returns 200
// Create a test HTTP server that returns 200 with a valid JSON-RPC response
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"jsonrpc":"2.0","result":"0x1234567","id":1}`))
}))
defer server.Close()

Expand All @@ -110,7 +112,7 @@ func TestCheckEndpointHealthSuccess(t *testing.T) {
scheduler := NewRateLimitScheduler(cfg, mockValkey)

endpoint := cfg.Endpoints["ethereum"]["test-endpoint"]
healthy := scheduler.checkEndpointHealth(endpoint)
healthy := scheduler.checkEndpointHealth(context.Background(), endpoint)

if !healthy {
t.Error("Expected endpoint to be healthy")
Expand Down Expand Up @@ -141,7 +143,7 @@ func TestCheckEndpointHealthRateLimited(t *testing.T) {
scheduler := NewRateLimitScheduler(cfg, mockValkey)

endpoint := cfg.Endpoints["ethereum"]["test-endpoint"]
healthy := scheduler.checkEndpointHealth(endpoint)
healthy := scheduler.checkEndpointHealth(context.Background(), endpoint)

if healthy {
t.Error("Expected endpoint to be unhealthy due to rate limiting")
Expand Down Expand Up @@ -240,9 +242,11 @@ func TestPerformRecoveryCheckStopsAfterMaxRetries(t *testing.T) {
}

func TestPerformRecoveryCheckRecovery(t *testing.T) {
// Create a test HTTP server that returns 200
// Create a test HTTP server that returns 200 with a valid JSON-RPC response
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"jsonrpc":"2.0","result":"0x1234567","id":1}`))
}))
defer server.Close()

Expand Down