diff --git a/internal/app/app_cacheprog.go b/internal/app/app_cacheprog.go index 95cbd64..9dccbe7 100644 --- a/internal/app/app_cacheprog.go +++ b/internal/app/app_cacheprog.go @@ -24,6 +24,7 @@ type CacheprogAppArgs struct { MaxConcurrentRemotePuts int `arg:"--max-concurrent-remote-puts,env:MAX_CONCURRENT_REMOTE_PUTS" placeholder:"NUM" help:"Max number of concurrent remote puts, unlimited if not provided"` MaxBackgroundWait time.Duration `arg:"--max-background-wait,env:MAX_BACKGROUND_WAIT" placeholder:"DURATION" default:"10s" help:"Max time to wait for waiting of background operations to complete"` MinRemotePutSize int64 `arg:"--min-remote-put-size,env:MIN_REMOTE_PUT_SIZE" placeholder:"SIZE" help:"Min size of object to push to remote storage, no size limit if not provided"` + RemoteGetTimeout time.Duration `arg:"--remote-get-timeout,env:REMOTE_GET_TIMEOUT" placeholder:"DURATION" help:"Max time for a remote GET including fetch, decompress, and local write. Prevents hangs on dead connections. 0 means no timeout."` DisableGet bool `arg:"--disable-get,env:DISABLE_GET" help:"Disable getting objects from any storage, useful to force rebuild of the project and rewrite cache"` DisablePut bool `arg:"--disable-put,env:DISABLE_PUT" help:"Disable writing to remote storage"` } @@ -125,6 +126,7 @@ func (a *CacheprogAppArgs) Run(ctx context.Context) error { MaxConcurrentRemoteGets: a.MaxConcurrentRemoteGets, MaxConcurrentRemotePuts: a.MaxConcurrentRemotePuts, LocalStorage: cacheprog.ObservingLocalStorage{LocalStorage: diskStorage}, + RemoteGetTimeout: a.RemoteGetTimeout, CloseTimeout: a.MaxBackgroundWait, CompressionCodec: compression.NewCodec(), DisableGet: a.DisableGet, diff --git a/internal/app/cacheprog/handler.go b/internal/app/cacheprog/handler.go index 5d2b922..09e1f1a 100644 --- a/internal/app/cacheprog/handler.go +++ b/internal/app/cacheprog/handler.go @@ -148,11 +148,12 @@ type Handler struct { minRemotePutSize int64 compressionCodec CompressionCodec - localStorage LocalStorage - closeTimeout time.Duration - closeChan chan struct{} // closed on "close" command - closeWG sync.WaitGroup - onClose func(ctx context.Context) error + localStorage LocalStorage + remoteGetTimeout time.Duration + closeTimeout time.Duration + closeChan chan struct{} // closed on "close" command + closeWG sync.WaitGroup + onClose func(ctx context.Context) error disableGet bool disablePut bool @@ -172,6 +173,7 @@ type HandlerOptions struct { CompressionCodec CompressionCodec // compression codec to use on remote storage, if not provided - no compression will be used DisableGet bool // disable getting objects from any storage, useful to force rebuild of the project and rewrite cache DisablePut bool // disable writing to remote storage + RemoteGetTimeout time.Duration // max time for a remote GET (fetch + decompress + local write), 0 - no timeout CloseTimeout time.Duration // max time to wait for handler to close, 0 - no timeout OnClose func(ctx context.Context) error // if provided - expected to be blocking, called on close command @@ -195,6 +197,7 @@ func NewHandler(opts HandlerOptions) *Handler { minRemotePutSize: opts.MinRemotePutSize, compressionCodec: opts.CompressionCodec, localStorage: opts.LocalStorage, + remoteGetTimeout: opts.RemoteGetTimeout, closeTimeout: opts.CloseTimeout, onClose: opts.OnClose, closeChan: make(chan struct{}), @@ -223,7 +226,6 @@ func (h *Handler) Handle(ctx context.Context, writer cacheproto.ResponseWriter, func (h *Handler) handleGet(ctx context.Context, writer cacheproto.ResponseWriter, req *cacheproto.Request) { if h.disableGet { - // this should never happen because compiler must not call this method if we announced disabled 'get' support h.writeResponse(ctx, writer, &cacheproto.Response{ ID: req.ID, Err: "getting objects from any storage is disabled", @@ -267,63 +269,70 @@ func (h *Handler) handleGet(ctx context.Context, writer cacheproto.ResponseWrite defer h.enterGetRemote()() - remoteObj, err := h.remoteStorage.Get(ctx, &GetRequest{ - ActionID: req.ActionID, - }) + if h.remoteGetTimeout > 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, h.remoteGetTimeout) + defer cancel() + } + + localObj, err = h.fetchAndStoreRemote(ctx, req.ActionID) switch { case errors.Is(err, nil): - // store object in local storage and return path - defer remoteObj.Body.Close() + h.getHits.Add(1) + h.writeResponse(ctx, writer, &cacheproto.Response{ + ID: req.ID, + OutputID: localObj.OutputID, + Time: &localObj.ModTime, + Size: localObj.Size, + DiskPath: localObj.DiskPath, + }) case errors.Is(err, ErrNotFound): h.writeResponse(ctx, writer, &cacheproto.Response{ ID: req.ID, Miss: true, }) - return default: h.writeResponse(ctx, writer, &cacheproto.Response{ ID: req.ID, Err: fmt.Sprintf("failed to get object: %v", err), }) - return } +} + +func (h *Handler) fetchAndStoreRemote(ctx context.Context, actionID []byte) (*LocalGetResponse, error) { + remoteObj, err := h.remoteStorage.Get(ctx, &GetRequest{ + ActionID: actionID, + }) + if err != nil { + return nil, err + } + defer remoteObj.Body.Close() decompressedObj, err := h.compressionCodec.Decompress(&DecompressRequest{ Body: remoteObj.Body, Algorithm: remoteObj.CompressionAlgorithm, }) if err != nil { - h.writeResponse(ctx, writer, &cacheproto.Response{ - ID: req.ID, - Err: fmt.Sprintf("failed to decompress object: %v", err), - }) - return + return nil, fmt.Errorf("decompress: %w", err) } - defer decompressedObj.Body.Close() - newLocalObj, err := h.localStorage.PutLocal(ctx, &LocalPutRequest{ - ActionID: req.ActionID, + localObj, err := h.localStorage.PutLocal(ctx, &LocalPutRequest{ + ActionID: actionID, OutputID: remoteObj.OutputID, Size: remoteObj.UncompressedSize, Body: decompressedObj.Body, }) if err != nil { - h.writeResponse(ctx, writer, &cacheproto.Response{ - ID: req.ID, - Err: fmt.Sprintf("failed to put object: %v", err), - }) - return + return nil, err } - h.getHits.Add(1) - h.writeResponse(ctx, writer, &cacheproto.Response{ - ID: req.ID, + return &LocalGetResponse{ OutputID: remoteObj.OutputID, - Time: &remoteObj.ModTime, + ModTime: remoteObj.ModTime, Size: remoteObj.UncompressedSize, - DiskPath: newLocalObj.DiskPath, - }) + DiskPath: localObj.DiskPath, + }, nil } func (h *Handler) handlePut(ctx context.Context, writer cacheproto.ResponseWriter, req *cacheproto.Request) { diff --git a/internal/app/cacheprog/handler_test.go b/internal/app/cacheprog/handler_test.go index 74429ed..1c16d41 100644 --- a/internal/app/cacheprog/handler_test.go +++ b/internal/app/cacheprog/handler_test.go @@ -183,6 +183,53 @@ func TestHandler_Handle_Get(t *testing.T) { assert.True(t, resp.Miss) }) + t.Run("remote get timeout returns error", func(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + ctrl := gomock.NewController(t) + + localStorage := NewMockLocalStorage(ctrl) + remoteStorage := NewMockRemoteStorage(ctrl) + compressionCodec := NewMockCompressionCodec(ctrl) + writer := &mockResponseWriter{} + + h := NewHandler(HandlerOptions{ + RemoteStorage: remoteStorage, + LocalStorage: localStorage, + CompressionCodec: compressionCodec, + RemoteGetTimeout: 100 * time.Millisecond, + }) + + localStorage.EXPECT(). + GetLocal(gomock.Any(), &LocalGetRequest{ActionID: actionID}). + Return(nil, ErrNotFound) + + // Remote storage blocks longer than the timeout + remoteStorage.EXPECT(). + Get(gomock.Any(), &GetRequest{ActionID: actionID}). + DoAndReturn(func(ctx context.Context, _ *GetRequest) (*GetResponse, error) { + <-ctx.Done() + return nil, ctx.Err() + }) + + startTime := time.Now() + h.Handle(ctx, writer, &cacheproto.Request{ + ID: 5, + Command: cacheproto.CmdGet, + ActionID: actionID, + }) + synctest.Wait() + elapsed := time.Since(startTime) + + require.Len(t, writer.responses, 1) + resp := writer.responses[0] + assert.Equal(t, int64(5), resp.ID) + assert.False(t, resp.Miss) + assert.Contains(t, resp.Err, "context deadline exceeded") + assert.GreaterOrEqual(t, elapsed, 100*time.Millisecond) + assert.Less(t, elapsed, 200*time.Millisecond) + }) + }) + t.Run("disable get", func(t *testing.T) { ctrl := gomock.NewController(t)