Context
Executor selection currently picks a random (weighted) executor from the chain and sends the inference request to it. If that executor is slow (high latency), the user request is slow. There's no mechanism to prefer fast executors or retry with a different one if the selected node is degraded in terms of response time.
Latency tracking cannot be done on-chain (no consensus on wall-clock time per request). This must be handled in the decentralized-api layer.
Goal
Add an in-memory per-executor latency EMA (exponential moving average) tracker in the decentralized-api. When an executor is selected but has significantly higher latency than peers (based on local measurement history), retry with a different executor — without changing on-chain logic.
Implementation
1. Latency tracker (new file: decentralized-api/internal/executor/latency_tracker.go)
package executor
import (
"sync"
"time"
)
const (
DefaultEMAAlpha = 0.2 // decay factor (recent weight)
DefaultLatencyMinCount = 4 // minimum samples before applying penalty
DefaultPenaltyFactor = 2.0 // exclude if EMA > factor × global median
)
type LatencyTracker struct {
mu sync.RWMutex
ema map[string]float64 // address → EMA latency (ms)
count map[string]int
updatedAt map[string]time.Time
alpha float64
ttl time.Duration // entries older than TTL are reset
}
func NewLatencyTracker(alpha float64, ttl time.Duration) *LatencyTracker {
return &LatencyTracker{
ema: make(map[string]float64),
count: make(map[string]int),
updatedAt: make(map[string]time.Time),
alpha: alpha,
ttl: ttl,
}
}
// Record updates the EMA latency for an executor.
func (t *LatencyTracker) Record(address string, latencyMs float64) {
t.mu.Lock()
defer t.mu.Unlock()
prev, exists := t.ema[address]
if !exists || time.Since(t.updatedAt[address]) > t.ttl {
t.ema[address] = latencyMs
t.count[address] = 1
} else {
t.ema[address] = t.alpha*latencyMs + (1-t.alpha)*prev
t.count[address]++
}
t.updatedAt[address] = time.Now()
}
// IsDegraded returns true if the executor's latency significantly exceeds the median.
func (t *LatencyTracker) IsDegraded(address string, penaltyFactor float64, minCount int) bool {
t.mu.RLock()
defer t.mu.RUnlock()
cnt, ok := t.count[address]
if !ok || cnt < minCount {
return false // not enough data
}
median := t.globalMedian()
if median <= 0 {
return false
}
return t.ema[address] > penaltyFactor*median
}
func (t *LatencyTracker) globalMedian() float64 {
// compute median of all EMA values (called under read lock)
if len(t.ema) == 0 { return 0 }
vals := make([]float64, 0, len(t.ema))
for _, v := range t.ema { vals = append(vals, v) }
// simple sort-based median
sort.Float64s(vals)
mid := len(vals) / 2
if len(vals)%2 == 0 {
return (vals[mid-1] + vals[mid]) / 2
}
return vals[mid]
}
2. Integrate into getExecutorForRequest (post_chat_handler.go)
func (s *Server) getExecutorForRequest(ctx context.Context, model string) (*ExecutorDestination, error) {
const maxRetries = 2
for attempt := 0; attempt <= maxRetries; attempt++ {
queryClient := s.recorder.NewInferenceQueryClient()
response, err := queryClient.GetRandomExecutor(ctx, &types.QueryGetRandomExecutorRequest{
Model: model,
})
if err != nil {
return nil, err
}
executor := response.Executor
if attempt < maxRetries && s.latencyTracker.IsDegraded(executor.Address, 2.0, 4) {
logging.Info("Executor has high latency, retrying selection",
types.Inferences, "address", executor.Address, "attempt", attempt)
continue
}
logging.Info("Executor selected", types.Inferences,
"address", executor.Address, "url", executor.InferenceUrl)
return &ExecutorDestination{
Url: executor.InferenceUrl,
Address: executor.Address,
}, nil
}
// Fallback: return last selected even if degraded
return lastExecutor, nil
}
3. Record latency after each inference call
In the inference forwarding path, wrap the HTTP call to the executor and measure round-trip time. After success or timeout, call s.latencyTracker.Record(executorAddress, latencyMs).
4. Wire up LatencyTracker into Server struct
Add latencyTracker *executor.LatencyTracker to the Server struct in internal/server/public/. Initialize in server constructor:
latencyTracker: executor.NewLatencyTracker(0.2, 30*time.Minute),
Files to Modify
decentralized-api/internal/executor/latency_tracker.go — new file
decentralized-api/internal/server/public/post_chat_handler.go — getExecutorForRequest, latency recording
decentralized-api/internal/server/public/server.go — add LatencyTracker field
Acceptance Criteria
Notes
Related
Context
Executor selection currently picks a random (weighted) executor from the chain and sends the inference request to it. If that executor is slow (high latency), the user request is slow. There's no mechanism to prefer fast executors or retry with a different one if the selected node is degraded in terms of response time.
Latency tracking cannot be done on-chain (no consensus on wall-clock time per request). This must be handled in the decentralized-api layer.
Goal
Add an in-memory per-executor latency EMA (exponential moving average) tracker in the decentralized-api. When an executor is selected but has significantly higher latency than peers (based on local measurement history), retry with a different executor — without changing on-chain logic.
Implementation
1. Latency tracker (new file:
decentralized-api/internal/executor/latency_tracker.go)2. Integrate into
getExecutorForRequest(post_chat_handler.go)3. Record latency after each inference call
In the inference forwarding path, wrap the HTTP call to the executor and measure round-trip time. After success or timeout, call
s.latencyTracker.Record(executorAddress, latencyMs).4. Wire up
LatencyTrackerintoServerstructAdd
latencyTracker *executor.LatencyTrackerto theServerstruct ininternal/server/public/. Initialize in server constructor:Files to Modify
decentralized-api/internal/executor/latency_tracker.go— new filedecentralized-api/internal/server/public/post_chat_handler.go—getExecutorForRequest, latency recordingdecentralized-api/internal/server/public/server.go— addLatencyTrackerfieldAcceptance Criteria
Notes
Related