Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/app/app_cacheprog.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type CacheprogAppArgs struct {
MaxConcurrentRemotePuts int `arg:"--max-concurrent-remote-puts,env:MAX_CONCURRENT_REMOTE_PUTS" placeholder:"NUM" help:"Max number of concurrent remote puts, unlimited if not provided"`
MaxBackgroundWait time.Duration `arg:"--max-background-wait,env:MAX_BACKGROUND_WAIT" placeholder:"DURATION" default:"10s" help:"Max time to wait for waiting of background operations to complete"`
MinRemotePutSize int64 `arg:"--min-remote-put-size,env:MIN_REMOTE_PUT_SIZE" placeholder:"SIZE" help:"Min size of object to push to remote storage, no size limit if not provided"`
RemoteGetTimeout time.Duration `arg:"--remote-get-timeout,env:REMOTE_GET_TIMEOUT" placeholder:"DURATION" help:"Max time for a remote GET including fetch, decompress, and local write. Prevents hangs on dead connections. 0 means no timeout."`
DisableGet bool `arg:"--disable-get,env:DISABLE_GET" help:"Disable getting objects from any storage, useful to force rebuild of the project and rewrite cache"`
DisablePut bool `arg:"--disable-put,env:DISABLE_PUT" help:"Disable writing to remote storage"`
}
Expand Down Expand Up @@ -125,6 +126,7 @@ func (a *CacheprogAppArgs) Run(ctx context.Context) error {
MaxConcurrentRemoteGets: a.MaxConcurrentRemoteGets,
MaxConcurrentRemotePuts: a.MaxConcurrentRemotePuts,
LocalStorage: cacheprog.ObservingLocalStorage{LocalStorage: diskStorage},
RemoteGetTimeout: a.RemoteGetTimeout,
CloseTimeout: a.MaxBackgroundWait,
CompressionCodec: compression.NewCodec(),
DisableGet: a.DisableGet,
Expand Down
73 changes: 41 additions & 32 deletions internal/app/cacheprog/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,12 @@ type Handler struct {
minRemotePutSize int64
compressionCodec CompressionCodec

localStorage LocalStorage
closeTimeout time.Duration
closeChan chan struct{} // closed on "close" command
closeWG sync.WaitGroup
onClose func(ctx context.Context) error
localStorage LocalStorage
remoteGetTimeout time.Duration
closeTimeout time.Duration
closeChan chan struct{} // closed on "close" command
closeWG sync.WaitGroup
onClose func(ctx context.Context) error

disableGet bool
disablePut bool
Expand All @@ -172,6 +173,7 @@ type HandlerOptions struct {
CompressionCodec CompressionCodec // compression codec to use on remote storage, if not provided - no compression will be used
DisableGet bool // disable getting objects from any storage, useful to force rebuild of the project and rewrite cache
DisablePut bool // disable writing to remote storage
RemoteGetTimeout time.Duration // max time for a remote GET (fetch + decompress + local write), 0 - no timeout

CloseTimeout time.Duration // max time to wait for handler to close, 0 - no timeout
OnClose func(ctx context.Context) error // if provided - expected to be blocking, called on close command
Expand All @@ -195,6 +197,7 @@ func NewHandler(opts HandlerOptions) *Handler {
minRemotePutSize: opts.MinRemotePutSize,
compressionCodec: opts.CompressionCodec,
localStorage: opts.LocalStorage,
remoteGetTimeout: opts.RemoteGetTimeout,
closeTimeout: opts.CloseTimeout,
onClose: opts.OnClose,
closeChan: make(chan struct{}),
Expand Down Expand Up @@ -223,7 +226,6 @@ func (h *Handler) Handle(ctx context.Context, writer cacheproto.ResponseWriter,

func (h *Handler) handleGet(ctx context.Context, writer cacheproto.ResponseWriter, req *cacheproto.Request) {
if h.disableGet {
// this should never happen because compiler must not call this method if we announced disabled 'get' support
h.writeResponse(ctx, writer, &cacheproto.Response{
ID: req.ID,
Err: "getting objects from any storage is disabled",
Expand Down Expand Up @@ -267,63 +269,70 @@ func (h *Handler) handleGet(ctx context.Context, writer cacheproto.ResponseWrite

defer h.enterGetRemote()()

remoteObj, err := h.remoteStorage.Get(ctx, &GetRequest{
ActionID: req.ActionID,
})
if h.remoteGetTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, h.remoteGetTimeout)
defer cancel()
}

localObj, err = h.fetchAndStoreRemote(ctx, req.ActionID)
switch {
case errors.Is(err, nil):
// store object in local storage and return path
defer remoteObj.Body.Close()
h.getHits.Add(1)
h.writeResponse(ctx, writer, &cacheproto.Response{
ID: req.ID,
OutputID: localObj.OutputID,
Time: &localObj.ModTime,
Size: localObj.Size,
DiskPath: localObj.DiskPath,
})
case errors.Is(err, ErrNotFound):
h.writeResponse(ctx, writer, &cacheproto.Response{
ID: req.ID,
Miss: true,
})
return
default:
h.writeResponse(ctx, writer, &cacheproto.Response{
ID: req.ID,
Err: fmt.Sprintf("failed to get object: %v", err),
})
return
}
}

func (h *Handler) fetchAndStoreRemote(ctx context.Context, actionID []byte) (*LocalGetResponse, error) {
remoteObj, err := h.remoteStorage.Get(ctx, &GetRequest{
ActionID: actionID,
})
if err != nil {
return nil, err
}
defer remoteObj.Body.Close()

decompressedObj, err := h.compressionCodec.Decompress(&DecompressRequest{
Body: remoteObj.Body,
Algorithm: remoteObj.CompressionAlgorithm,
})
if err != nil {
h.writeResponse(ctx, writer, &cacheproto.Response{
ID: req.ID,
Err: fmt.Sprintf("failed to decompress object: %v", err),
})
return
return nil, fmt.Errorf("decompress: %w", err)
}

defer decompressedObj.Body.Close()

newLocalObj, err := h.localStorage.PutLocal(ctx, &LocalPutRequest{
ActionID: req.ActionID,
localObj, err := h.localStorage.PutLocal(ctx, &LocalPutRequest{
ActionID: actionID,
OutputID: remoteObj.OutputID,
Size: remoteObj.UncompressedSize,
Body: decompressedObj.Body,
})
if err != nil {
h.writeResponse(ctx, writer, &cacheproto.Response{
ID: req.ID,
Err: fmt.Sprintf("failed to put object: %v", err),
})
return
return nil, err
}

h.getHits.Add(1)
h.writeResponse(ctx, writer, &cacheproto.Response{
ID: req.ID,
return &LocalGetResponse{
OutputID: remoteObj.OutputID,
Time: &remoteObj.ModTime,
ModTime: remoteObj.ModTime,
Size: remoteObj.UncompressedSize,
DiskPath: newLocalObj.DiskPath,
})
DiskPath: localObj.DiskPath,
}, nil
}

func (h *Handler) handlePut(ctx context.Context, writer cacheproto.ResponseWriter, req *cacheproto.Request) {
Expand Down
47 changes: 47 additions & 0 deletions internal/app/cacheprog/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,53 @@ func TestHandler_Handle_Get(t *testing.T) {
assert.True(t, resp.Miss)
})

t.Run("remote get timeout returns error", func(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
ctrl := gomock.NewController(t)

localStorage := NewMockLocalStorage(ctrl)
remoteStorage := NewMockRemoteStorage(ctrl)
compressionCodec := NewMockCompressionCodec(ctrl)
writer := &mockResponseWriter{}

h := NewHandler(HandlerOptions{
RemoteStorage: remoteStorage,
LocalStorage: localStorage,
CompressionCodec: compressionCodec,
RemoteGetTimeout: 100 * time.Millisecond,
})

localStorage.EXPECT().
GetLocal(gomock.Any(), &LocalGetRequest{ActionID: actionID}).
Return(nil, ErrNotFound)

// Remote storage blocks longer than the timeout
remoteStorage.EXPECT().
Get(gomock.Any(), &GetRequest{ActionID: actionID}).
DoAndReturn(func(ctx context.Context, _ *GetRequest) (*GetResponse, error) {
<-ctx.Done()
return nil, ctx.Err()
})

startTime := time.Now()
h.Handle(ctx, writer, &cacheproto.Request{
ID: 5,
Command: cacheproto.CmdGet,
ActionID: actionID,
})
synctest.Wait()
elapsed := time.Since(startTime)

require.Len(t, writer.responses, 1)
resp := writer.responses[0]
assert.Equal(t, int64(5), resp.ID)
assert.False(t, resp.Miss)
assert.Contains(t, resp.Err, "context deadline exceeded")
assert.GreaterOrEqual(t, elapsed, 100*time.Millisecond)
assert.Less(t, elapsed, 200*time.Millisecond)
})
})

t.Run("disable get", func(t *testing.T) {
ctrl := gomock.NewController(t)

Expand Down
Loading