Skip to content

Off-chain latency-aware executor retry in decentralized-api #6

@mingles-agent

Description

@mingles-agent

Context

Executor selection currently picks a random (weighted) executor from the chain and sends the inference request to it. If that executor is slow (high latency), the user request is slow. There's no mechanism to prefer fast executors or retry with a different one if the selected node is degraded in terms of response time.

Latency tracking cannot be done on-chain (no consensus on wall-clock time per request). This must be handled in the decentralized-api layer.

Goal

Add an in-memory per-executor latency EMA (exponential moving average) tracker in the decentralized-api. When an executor is selected but has significantly higher latency than peers (based on local measurement history), retry with a different executor — without changing on-chain logic.

Implementation

1. Latency tracker (new file: decentralized-api/internal/executor/latency_tracker.go)

package executor

import (
    "sync"
    "time"
)

const (
    DefaultEMAAlpha        = 0.2  // decay factor (recent weight)
    DefaultLatencyMinCount = 4    // minimum samples before applying penalty
    DefaultPenaltyFactor   = 2.0  // exclude if EMA > factor × global median
)

type LatencyTracker struct {
    mu        sync.RWMutex
    ema       map[string]float64 // address → EMA latency (ms)
    count     map[string]int
    updatedAt map[string]time.Time
    alpha     float64
    ttl       time.Duration      // entries older than TTL are reset
}

func NewLatencyTracker(alpha float64, ttl time.Duration) *LatencyTracker {
    return &LatencyTracker{
        ema: make(map[string]float64),
        count: make(map[string]int),
        updatedAt: make(map[string]time.Time),
        alpha: alpha,
        ttl: ttl,
    }
}

// Record updates the EMA latency for an executor.
func (t *LatencyTracker) Record(address string, latencyMs float64) {
    t.mu.Lock()
    defer t.mu.Unlock()
    prev, exists := t.ema[address]
    if !exists || time.Since(t.updatedAt[address]) > t.ttl {
        t.ema[address] = latencyMs
        t.count[address] = 1
    } else {
        t.ema[address] = t.alpha*latencyMs + (1-t.alpha)*prev
        t.count[address]++
    }
    t.updatedAt[address] = time.Now()
}

// IsDegraded returns true if the executor's latency significantly exceeds the median.
func (t *LatencyTracker) IsDegraded(address string, penaltyFactor float64, minCount int) bool {
    t.mu.RLock()
    defer t.mu.RUnlock()
    cnt, ok := t.count[address]
    if !ok || cnt < minCount {
        return false // not enough data
    }
    median := t.globalMedian()
    if median <= 0 {
        return false
    }
    return t.ema[address] > penaltyFactor*median
}

func (t *LatencyTracker) globalMedian() float64 {
    // compute median of all EMA values (called under read lock)
    if len(t.ema) == 0 { return 0 }
    vals := make([]float64, 0, len(t.ema))
    for _, v := range t.ema { vals = append(vals, v) }
    // simple sort-based median
    sort.Float64s(vals)
    mid := len(vals) / 2
    if len(vals)%2 == 0 {
        return (vals[mid-1] + vals[mid]) / 2
    }
    return vals[mid]
}

2. Integrate into getExecutorForRequest (post_chat_handler.go)

func (s *Server) getExecutorForRequest(ctx context.Context, model string) (*ExecutorDestination, error) {
    const maxRetries = 2
    for attempt := 0; attempt <= maxRetries; attempt++ {
        queryClient := s.recorder.NewInferenceQueryClient()
        response, err := queryClient.GetRandomExecutor(ctx, &types.QueryGetRandomExecutorRequest{
            Model: model,
        })
        if err != nil {
            return nil, err
        }
        executor := response.Executor
        if attempt < maxRetries && s.latencyTracker.IsDegraded(executor.Address, 2.0, 4) {
            logging.Info("Executor has high latency, retrying selection",
                types.Inferences, "address", executor.Address, "attempt", attempt)
            continue
        }
        logging.Info("Executor selected", types.Inferences, 
            "address", executor.Address, "url", executor.InferenceUrl)
        return &ExecutorDestination{
            Url:     executor.InferenceUrl,
            Address: executor.Address,
        }, nil
    }
    // Fallback: return last selected even if degraded
    return lastExecutor, nil
}

3. Record latency after each inference call

In the inference forwarding path, wrap the HTTP call to the executor and measure round-trip time. After success or timeout, call s.latencyTracker.Record(executorAddress, latencyMs).

4. Wire up LatencyTracker into Server struct

Add latencyTracker *executor.LatencyTracker to the Server struct in internal/server/public/. Initialize in server constructor:

latencyTracker: executor.NewLatencyTracker(0.2, 30*time.Minute),

Files to Modify

  • decentralized-api/internal/executor/latency_tracker.go — new file
  • decentralized-api/internal/server/public/post_chat_handler.gogetExecutorForRequest, latency recording
  • decentralized-api/internal/server/public/server.go — add LatencyTracker field

Acceptance Criteria

  • Executor with EMA latency > 2× global median (with ≥4 samples) triggers retry
  • Maximum 2 retries before accepting the last-selected executor (no infinite loop)
  • Latency is recorded after each inference attempt (success and timeout)
  • Thread-safe: concurrent requests update tracker correctly
  • EMA entries expire after TTL (30 min default) to recover from stale data
  • Unit tests: degraded detection, EMA calculation, thread safety
  • No on-chain changes; zero consensus impact

Notes

Related

Metadata

Metadata

Assignees

No one assigned

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions