Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 69 additions & 10 deletions sdk/go/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"log/slog"
"net/http"
"net/url"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -36,14 +37,31 @@ import (
// then error. Not cached.
//
// NOTE: a Resolver is safe for concurrent use; the in-process memo is
// guarded by a mutex.
// guarded by a mutex and concurrent Resolve calls for the same address
// are coalesced into a single probe (single-flight).
type Resolver struct {
fileCache *cache
httpc *http.Client
logger *slog.Logger

mu sync.Mutex
memCache map[string]NodeInfo
inflight map[string]*resolveCall
}

// resolveCall is the shared state of a single in-flight Resolve for one
// address.
// The elected goroutine creates it, runs the disk-cache check and
// network probe, and closes done; every concurrent caller for the same
// address reads info/err once done is closed and skips the probe.
// NOTE: info, err, and waiters must only be read or written with the
// owning Resolver's mu held until done is closed; after close, info and
// err are immutable and safe to read without the lock.
type resolveCall struct {
done chan struct{}
info NodeInfo
err error
waiters int
}

// New constructs a Resolver from functional options.
Expand Down Expand Up @@ -71,11 +89,16 @@ func New(opts ...Option) (*Resolver, error) {
httpc: o.httpClient,
logger: o.logger,
memCache: map[string]NodeInfo{},
inflight: map[string]*resolveCall{},
}, nil
}

// Resolve returns the NodeInfo for addr.
// Trailing slashes in addr are normalized away before lookup.
// Concurrent callers for the same address are coalesced: the first
// caller probes, every other caller waits for that probe's result and
// sees the same NodeInfo (or the same error).
// A failed probe is not memoized; the next caller retries the network.
func (r *Resolver) Resolve(ctx context.Context, addr string) (NodeInfo, error) {
addr = strings.TrimRight(addr, "/")

Expand All @@ -84,11 +107,45 @@ func (r *Resolver) Resolve(ctx context.Context, addr string) (NodeInfo, error) {
r.mu.Unlock()
return info, nil
}
if call, ok := r.inflight[addr]; ok {
call.waiters++
r.mu.Unlock()
select {
case <-call.done:
return call.info, call.err
case <-ctx.Done():
return NodeInfo{}, ctx.Err()
}
}
call := &resolveCall{done: make(chan struct{})}
r.inflight[addr] = call
r.mu.Unlock()

info, err := r.resolveUncached(ctx, addr)

r.mu.Lock()
call.info = info
call.err = err
if err == nil {
r.memCache[addr] = info
}
delete(r.inflight, addr)
// Close before releasing the lock so a new caller arriving on the
// same addr cannot register a fresh probe while existing waiters
// are still blocked on this call's done channel.
close(call.done)
Comment thread
peteski22 marked this conversation as resolved.
r.mu.Unlock()

return info, err
}

// resolveUncached runs the disk-cache check and network probe for addr.
// NOTE: callers must hold the single-flight election for addr;
// this helper does no in-process memoization and is not safe to call
// directly outside the Resolve flow.
func (r *Resolver) resolveUncached(ctx context.Context, addr string) (NodeInfo, error) {
if r.fileCache != nil {
if info, ok := r.fileCache.get(addr); ok {
r.cacheInMemory(addr, info)
return info, nil
}
}
Expand All @@ -107,17 +164,9 @@ func (r *Resolver) Resolve(ctx context.Context, addr string) (NodeInfo, error) {
r.logger.Warn("discovery: cache write failed", "addr", addr, "err", err)
}
}
r.cacheInMemory(addr, info)
return info, nil
}

// cacheInMemory records a resolved NodeInfo for addr in the in-process memo.
func (r *Resolver) cacheInMemory(addr string, info NodeInfo) {
r.mu.Lock()
r.memCache[addr] = info
r.mu.Unlock()
}

// fetchWithRetry issues a GET to u and retries up to attempts times on
// transport errors and 5xx responses, with a short linear backoff
// between attempts.
Expand Down Expand Up @@ -254,6 +303,16 @@ func validate(info NodeInfo) error {
if parsed.Host == "" {
return fmt.Errorf("api_base_url %q is missing a host", info.APIBaseURL)
}
// url.Parse already rejects non-numeric port segments, but it
// accepts numeric ports outside the uint16 range (e.g.
// "https://host:99999/").
// Reject those here so the failure surfaces as a discovery-domain
// message rather than an opaque transport error.
if port := parsed.Port(); port != "" {
if _, err := strconv.ParseUint(port, 10, 16); err != nil {
return fmt.Errorf("api_base_url %q has an invalid port %q", info.APIBaseURL, port)
Comment thread
peteski22 marked this conversation as resolved.
}
}
if info.APIVersion == "" {
return errors.New("api_version is required")
}
Expand Down
Loading
Loading