Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 24 additions & 5 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package common

import (
"context"
"encoding/hex"
"encoding/json"
"errors"
Expand Down Expand Up @@ -570,9 +571,17 @@ func filterBlockPool(vtx []*walletrpc.CompactTx, poolTypes []walletrpc.PoolType)
}

// GetBlockRange returns a sequence of consecutive blocks in the given range.
func GetBlockRange(cache *BlockCache, blockOut chan<- *walletrpc.CompactBlock, errOut chan<- error, span *walletrpc.BlockRange) {
//
// The `ctx` parameter is used to abort iteration when the gRPC client cancels
// the stream. Without it, the producer goroutine would block indefinitely on
// the unbuffered `blockOut` send after the consumer (the gRPC handler) returns,
// leaking one goroutine per cancelled stream.
func GetBlockRange(ctx context.Context, cache *BlockCache, blockOut chan<- *walletrpc.CompactBlock, errOut chan<- error, span *walletrpc.BlockRange) {
if slices.Contains(span.PoolTypes, walletrpc.PoolType_POOL_TYPE_INVALID) {
errOut <- fmt.Errorf("GetBlockRange: invalid pool type requested")
select {
case errOut <- fmt.Errorf("GetBlockRange: invalid pool type requested"):
case <-ctx.Done():
}
return
}
// Go over [start, end] inclusive
Expand All @@ -591,16 +600,26 @@ func GetBlockRange(cache *BlockCache, blockOut chan<- *walletrpc.CompactBlock, e

block, err := GetBlock(cache, j)
if err != nil {
errOut <- err
select {
case errOut <- err:
case <-ctx.Done():
}
return
}
block.Vtx = filterBlockPool(block.Vtx, span.PoolTypes)

// Note that we do want to return blocks that have had all of its transactions filtered,
// as we have done in the past.
blockOut <- block
select {
case blockOut <- block:
case <-ctx.Done():
return
}
}
select {
case errOut <- nil:
case <-ctx.Done():
}
errOut <- nil
}

// ParseRawTransaction converts between the JSON result of a `zcashd`
Expand Down
5 changes: 3 additions & 2 deletions common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package common
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -454,7 +455,7 @@ func TestGetBlockRange(t *testing.T) {
Start: &walletrpc.BlockID{Height: 380640},
End: &walletrpc.BlockID{Height: 380642},
}
go GetBlockRange(testcache, blockChan, errChan, blockRange)
go GetBlockRange(context.Background(), testcache, blockChan, errChan, blockRange)

// read in block 380640
select {
Expand Down Expand Up @@ -557,7 +558,7 @@ func TestGetBlockRangeReverse(t *testing.T) {
Start: &walletrpc.BlockID{Height: 380642},
End: &walletrpc.BlockID{Height: 380640},
}
go GetBlockRange(testcache, blockChan, errChan, blockRange)
go GetBlockRange(context.Background(), testcache, blockChan, errChan, blockRange)

// read in block 380642
select {
Expand Down
18 changes: 14 additions & 4 deletions frontend/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,16 +229,21 @@ func (s *lwdStreamer) GetBlockNullifiers(ctx context.Context, id *walletrpc.Bloc
// 'end' inclusively.
func (s *lwdStreamer) GetBlockRange(span *walletrpc.BlockRange, resp walletrpc.CompactTxStreamer_GetBlockRangeServer) error {
common.Log.Debugf("gRPC GetBlockRange(%+v)\n", span)
blockChan := make(chan *walletrpc.CompactBlock)
if span.Start == nil || span.End == nil {
return status.Error(codes.InvalidArgument,
"GetBlockRange: must specify start and end heights")
}
ctx := resp.Context()
blockChan := make(chan *walletrpc.CompactBlock)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice that you're moving this down to where it's used (before the unrelated error return)

errChan := make(chan error)
go common.GetBlockRange(s.cache, blockChan, errChan, span)
go common.GetBlockRange(ctx, s.cache, blockChan, errChan, span)

for {
select {
case <-ctx.Done():
// Client cancelled / deadline exceeded; the producer's select-on-ctx
// will unblock its in-flight send and exit.
return ctx.Err()
case err := <-errChan:
// this will also catch context.DeadlineExceeded from the timeout
return err
Expand All @@ -255,7 +260,6 @@ func (s *lwdStreamer) GetBlockRange(span *walletrpc.BlockRange, resp walletrpc.C
// the actions contain only nullifiers (a subset of the full compact block).
func (s *lwdStreamer) GetBlockRangeNullifiers(span *walletrpc.BlockRange, resp walletrpc.CompactTxStreamer_GetBlockRangeNullifiersServer) error {
common.Log.Debugf("gRPC GetBlockRangeNullifiers(%+v)\n", span)
blockChan := make(chan *walletrpc.CompactBlock)
if span.Start == nil || span.End == nil {
return status.Error(codes.InvalidArgument,
"GetBlockRangeNullifiers: must specify start and end heights")
Expand All @@ -269,11 +273,17 @@ func (s *lwdStreamer) GetBlockRangeNullifiers(span *walletrpc.BlockRange, resp w
}
}
span.PoolTypes = filtered
ctx := resp.Context()
blockChan := make(chan *walletrpc.CompactBlock)
errChan := make(chan error)
go common.GetBlockRange(s.cache, blockChan, errChan, span)
go common.GetBlockRange(ctx, s.cache, blockChan, errChan, span)

for {
select {
case <-ctx.Done():
// Client cancelled / deadline exceeded; the producer's select-on-ctx
// will unblock its in-flight send and exit.
return ctx.Err()
case err := <-errChan:
// this will also catch context.DeadlineExceeded from the timeout
return err
Expand Down