Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ Configure an `http.Client` to use rcpx as its transport. For each request, rcpx
* Tries upstreams sequentially, in priority order.
* Tries each eligible upstream at most once per request.
* By default, continues to the next upstream on any transport error, or on HTTP status `429`, `502`, `503`, `504`.
* Treats HTTP statuses other than `429`, `502`, `503`, `504` as success and returns them unchanged, even if non-2xx.
* By default, treats HTTP statuses other than `429`, `502`, `503`, `504` as success and returns them unchanged, even if non-2xx.
* Does not inspect JSON-RPC response bodies; HTTP `200` with a JSON-RPC error body is returned unchanged.
* If the request context is canceled or deadline exceeded, returns immediately and does not consult the retry policy.
* Buffers the request body once per request so it can resend it across upstreams, capped by `BodyBufferBytes`.
Expand Down Expand Up @@ -211,6 +211,7 @@ Default behavior summary:
| Setting | Default |
|---|---|
| Retryable HTTP statuses | `429`, `502`, `503`, `504` |
| Additional retryable HTTP statuses | None |
| Body buffer cap | `rcpx.DefaultBodyBufferBytes` (`1 MiB`) |
| Cooldown | Enabled |
| Cooldown threshold | `rcpx.DefaultCooldownFailAfterConsecutive` (`3`) consecutive failover-causing failures |
Expand All @@ -223,7 +224,7 @@ Default behavior summary:
* Upstreams are tried sequentially, in priority order.
* Each eligible upstream is tried at most once per request.
* By default, rcpx continues to the next upstream on any transport error, or on HTTP status `429`, `502`, `503`, `504`.
* An attempt succeeds when `err == nil` and the status code is not `429`, `502`, `503`, or `504`.
* An attempt succeeds when `err == nil` and the status code is not retryable.
* Other HTTP status codes are treated as success from rcpx's perspective and returned unchanged.
* JSON-RPC response bodies are not inspected. A JSON-RPC error returned with HTTP `200` is returned unchanged.
* If the request context is canceled or deadline exceeded, rcpx returns immediately and does not consult the retry policy.
Expand Down Expand Up @@ -274,6 +275,21 @@ rcpx handles misbehaving base transports defensively:
* It will not return `(nil, nil)` from `RoundTrip`; if that happens, rcpx returns an error.
* If a base transport returns both `resp != nil` and `err != nil`, rcpx closes `resp.Body` and treats it as an error to avoid leaks.

### Additional retryable HTTP statuses

```go
type Config struct {
AdditionalRetryableStatusCodes []int
// ...
}
```

`AdditionalRetryableStatusCodes` adds status codes to the built-in retryable set: `429`, `502`, `503`, and `504`.

For example, `[]int{500}` makes HTTP `500` retryable in addition to the defaults. Duplicates are ignored, and values must be three-digit HTTP status codes.

Retryable status classification happens before `RetryPolicy` is called.

### Retry policy

```go
Expand All @@ -289,9 +305,9 @@ You can override the default retry or failover behavior with `Config.RetryPolicy
* JSON-RPC info (best-effort): `Method`, `Batch`
* `StatusCode` (0 if no HTTP response was obtained)
* `Err` (the error from the base transport, if any)
* `RetryableByDefault` (rcpx's default classification for this outcome)
* `RetryableByDefault` (whether rcpx classifies the outcome as retryable)

`RetryableByDefault` is true for transport errors (`Err != nil`) and for HTTP status `429`, `502`, `503`, `504`.
`RetryableByDefault` is true for transport errors (`Err != nil`), for HTTP status `429`, `502`, `503`, `504`, and for statuses configured with `AdditionalRetryableStatusCodes`.

The retry policy is only consulted after rcpx has classified an attempt as non-success and there is another eligible upstream to try.

Expand All @@ -301,7 +317,7 @@ The retry policy is not called:
* for the last eligible upstream;
* when the request context is canceled or its deadline is exceeded.

Because HTTP statuses other than `429`, `502`, `503`, and `504` are treated as success from rcpx's perspective, `RetryPolicy` cannot make additional HTTP status codes, such as `500`, retryable under the current design.
`RetryPolicy` cannot make a non-retryable HTTP status retryable by itself, because the policy is only called after rcpx has already classified an attempt as non-success. To make an additional HTTP status such as `500` retryable, configure `AdditionalRetryableStatusCodes`.

`RetryPolicy` receives `AttemptOutcome`; it cannot inspect response bodies or response headers. It can decide whether rcpx should continue after an already-classified non-success attempt, but it is not a general response validation hook.

Expand Down
35 changes: 35 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ type Config struct {

// Retry/failover policy hook. If nil, the default policy is used.
RetryPolicy RetryPolicy

// AdditionalRetryableStatusCodes are HTTP status codes retried in addition to
// the defaults: 429, 502, 503, and 504.
AdditionalRetryableStatusCodes []int
}

// CooldownConfig configures cooldown behavior.
Expand Down Expand Up @@ -68,6 +72,8 @@ type resolvedConfig struct {
allowNI bool
bodyCap int
policy RetryPolicy

retryableStatuses map[int]struct{}
}

func resolveConfig(cfg Config) (resolvedConfig, error) {
Expand Down Expand Up @@ -120,13 +126,20 @@ func resolveConfig(cfg Config) (resolvedConfig, error) {
policy = defaultRetryPolicy
}

statuses, err := resolveRetryableStatusCodes(cfg.AdditionalRetryableStatusCodes)
if err != nil {
return resolvedConfig{}, err
}

return resolvedConfig{
upstreams: upstreams,
base: base,
cooldown: cooldown,
allowNI: cfg.AllowNonIdempotent,
bodyCap: bodyCap,
policy: policy,

retryableStatuses: statuses,
}, nil
}

Expand Down Expand Up @@ -166,3 +179,25 @@ func resolveCooldown(cc *CooldownConfig) (effectiveCooldown, error) {
duration: dur,
}, nil
}

func resolveRetryableStatusCodes(additional []int) (map[int]struct{}, error) {
statuses := map[int]struct{}{
429: {},
502: {},
503: {},
504: {},
}

for i, code := range additional {
if !validHTTPStatusCode(code) {
return nil, fmt.Errorf("rcpx: invalid AdditionalRetryableStatusCodes[%d] %d", i, code)
}
statuses[code] = struct{}{}
}

return statuses, nil
}

func validHTTPStatusCode(code int) bool {
return code >= 100 && code <= 999
}
44 changes: 44 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,50 @@ func TestNewRoundTripper_validation(t *testing.T) {
}
})
})

t.Run("invalid additional retryable status codes", func(t *testing.T) {
tests := []struct {
name string
code int
}{
{name: "negative", code: -1},
{name: "zero", code: 0},
{name: "below three digits", code: 99},
{name: "above three digits", code: 1000},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := NewRoundTripper(Config{
Upstreams: []string{upstream},
AdditionalRetryableStatusCodes: []int{tt.code},
})
if err == nil {
t.Fatalf("expected error for status code %d, got nil", tt.code)
}
})
}
})

t.Run("valid additional retryable status code is accepted", func(t *testing.T) {
_, err := NewRoundTripper(Config{
Upstreams: []string{upstream},
AdditionalRetryableStatusCodes: []int{500},
})
if err != nil {
t.Fatalf("expected valid status code to be accepted, got %v", err)
}
})

t.Run("duplicate additional retryable status codes are accepted", func(t *testing.T) {
_, err := NewRoundTripper(Config{
Upstreams: []string{upstream},
AdditionalRetryableStatusCodes: []int{500, 500},
})
if err != nil {
t.Fatalf("expected duplicate status codes to be accepted, got %v", err)
}
})
}

func TestNewRoundTripper_defaults(t *testing.T) {
Expand Down
25 changes: 17 additions & 8 deletions policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ type AttemptOutcome struct {
StatusCode int
Err error

RetryableByDefault bool // rcpx's default classification for this outcome
RetryableByDefault bool // whether rcpx classifies this outcome as retryable
}

func defaultRetryPolicy(out AttemptOutcome) bool {
return out.RetryableByDefault
}

func isRetryableStatus(code int) bool {
func isBuiltInRetryableStatus(code int) bool {
switch code {
case 429, 502, 503, 504:
return true
Expand All @@ -41,27 +41,36 @@ func isRetryableStatus(code int) bool {
}
}

func isAttemptSuccess(statusCode int, err error) bool {
return err == nil && !isRetryableStatus(statusCode)
func (cfg resolvedConfig) isRetryableStatus(code int) bool {
if cfg.retryableStatuses == nil {
return isBuiltInRetryableStatus(code)
}

_, ok := cfg.retryableStatuses[code]
return ok
}

func (cfg resolvedConfig) isAttemptSuccess(statusCode int, err error) bool {
return err == nil && !cfg.isRetryableStatus(statusCode)
}

func defaultRetryableByOutcome(statusCode int, err error) bool {
func (cfg resolvedConfig) retryableByOutcome(statusCode int, err error) bool {
if err != nil {
return true
}
return isRetryableStatus(statusCode)
return cfg.isRetryableStatus(statusCode)
}

// statusCode should be 0 when no HTTP response was obtained.
func buildAttemptOutcome(attempt int, upstream, method string, batch bool, statusCode int, err error) AttemptOutcome {
func (cfg resolvedConfig) buildAttemptOutcome(attempt int, upstream, method string, batch bool, statusCode int, err error) AttemptOutcome {
return AttemptOutcome{
Attempt: attempt,
Upstream: upstream,
Method: method,
Batch: batch,
StatusCode: statusCode,
Err: err,
RetryableByDefault: defaultRetryableByOutcome(statusCode, err),
RetryableByDefault: cfg.retryableByOutcome(statusCode, err),
}
}

Expand Down
6 changes: 3 additions & 3 deletions transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,10 @@ func (t *transport) RoundTrip(req *http.Request) (*http.Response, error) {
status = resp.StatusCode
}

// Success = err==nil and status is not one of retryable statuses.
// Success = err==nil and status is not retryable.
// Non-retryable HTTP statuses are treated as "success" from rcpx's
// perspective and returned unchanged.
if isAttemptSuccess(status, rerr) {
if t.cfg.isAttemptSuccess(status, rerr) {
if t.cooldown != nil {
t.cooldown.recordSuccess(idx)
}
Expand All @@ -162,7 +162,7 @@ func (t *transport) RoundTrip(req *http.Request) (*http.Response, error) {
cause = &httpStatusError{code: status, upstream: up.raw}
}

out := buildAttemptOutcome(attemptNo, up.raw, method, batch, status, rerr)
out := t.cfg.buildAttemptOutcome(attemptNo, up.raw, method, batch, status, rerr)

hasNext := pos < len(eligible)-1
continueToNext := false
Expand Down
97 changes: 97 additions & 0 deletions transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,103 @@ func TestRoundTrip_HTTP500_IsTreatedAsSuccess_NoFailover(t *testing.T) {
assertCalls(t, base, u1)
}

func TestRoundTrip_AdditionalRetryableStatus_FailsOver(t *testing.T) {
u1 := "https://u1.test/rpc"
u2 := "https://u2.test/rpc"

respBody := newTrackingBody("internal error")
base := &scriptRT{
results: map[string][]rtResult{
u1: {
{resp: &http.Response{StatusCode: 500, Body: respBody}, err: nil},
},
u2: {
{resp: httpResp(200, "ok"), err: nil},
},
},
}

rt := mustNewTransport(t, Config{
Upstreams: []string{u1, u2},
Base: base,
AdditionalRetryableStatusCodes: []int{500},
})

req := newRPCRequest(t, u1, "eth_blockNumber")
resp := mustRoundTrip(t, rt, req)
assertStatus(t, resp, 200)
if !respBody.Closed() {
t.Fatalf("expected 500 response body to be closed before failover")
}
assertCalls(t, base, u1, u2)
}

func TestRoundTrip_AdditionalRetryableStatus_DoesNotReplaceDefaults(t *testing.T) {
u1 := "https://u1.test/rpc"
u2 := "https://u2.test/rpc"

respBody := newTrackingBody("service unavailable")
base := &scriptRT{
results: map[string][]rtResult{
u1: {
{resp: &http.Response{StatusCode: 503, Body: respBody}, err: nil},
},
u2: {
{resp: httpResp(200, "ok"), err: nil},
},
},
}

rt := mustNewTransport(t, Config{
Upstreams: []string{u1, u2},
Base: base,
AdditionalRetryableStatusCodes: []int{500},
})

req := newRPCRequest(t, u1, "eth_blockNumber")
resp := mustRoundTrip(t, rt, req)
assertStatus(t, resp, 200)
if !respBody.Closed() {
t.Fatalf("expected 503 response body to be closed before failover")
}
assertCalls(t, base, u1, u2)
}

func TestRoundTrip_NonConfiguredStatus_IsTreatedAsSuccess_NoFailover(t *testing.T) {
u1 := "https://u1.test/rpc"
u2 := "https://u2.test/rpc"

respBody := newTrackingBody("not implemented")
base := &scriptRT{
results: map[string][]rtResult{
u1: {
{resp: &http.Response{StatusCode: 501, Body: respBody}, err: nil},
},
},
}

rt := mustNewTransport(t, Config{
Upstreams: []string{u1, u2},
Base: base,
AdditionalRetryableStatusCodes: []int{500},
})

req := newRPCRequest(t, u1, "eth_blockNumber")
resp, err := rt.RoundTrip(req)
if err != nil {
t.Fatalf("RoundTrip returned error: %v", err)
}
t.Cleanup(func() { resp.Body.Close() })

assertStatus(t, resp, 501)

if respBody.Closed() {
t.Fatalf("expected returned response body to remain open")
}

assertCalls(t, base, u1)
}

func TestRoundTrip_HTTP200_WithJSONRPCErrorPayload_IsTreatedAsSuccess_NoFailover(t *testing.T) {
u1 := "https://u1.test/rpc"
u2 := "https://u2.test/rpc"
Expand Down
Loading