Skip to content

Commit 52b080e

Browse files
committed
fixed the flush of delete records pending inserts in streamingbatchwriter
1 parent 21b397a commit 52b080e

File tree

2 files changed

+90
-1
lines changed

2 files changed

+90
-1
lines changed

writers/streamingbatchwriter/streamingbatchwriter.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,31 @@ func (w *StreamingBatchWriter) Flush(context.Context) error {
154154
return nil // not checked below
155155
}
156156

157+
func (w *StreamingBatchWriter) flushInsertWorkers(ctx context.Context) error {
158+
w.workersLock.RLock()
159+
workers := make([]*streamingWorkerManager[*message.WriteInsert], 0, len(w.insertWorkers))
160+
for _, worker := range w.insertWorkers {
161+
workers = append(workers, worker)
162+
}
163+
w.workersLock.RUnlock()
164+
165+
for _, worker := range workers {
166+
done := make(chan bool)
167+
select {
168+
case <-ctx.Done():
169+
return ctx.Err()
170+
case worker.flush <- done:
171+
}
172+
select {
173+
case <-ctx.Done():
174+
return ctx.Err()
175+
case <-done:
176+
}
177+
}
178+
return nil
179+
}
180+
181+
157182
func (w *StreamingBatchWriter) Close(context.Context) error {
158183
w.workersLock.Lock()
159184
defer w.workersLock.Unlock()
@@ -323,6 +348,10 @@ func (w *StreamingBatchWriter) startWorker(ctx context.Context, errCh chan<- err
323348

324349
return nil
325350
case *message.WriteDeleteRecord:
351+
// flush pending inserts and table buffers before deletions
352+
if err := w.flushInsertWorkers(ctx); err != nil {
353+
return err
354+
}
326355
w.workersLock.Lock()
327356
defer w.workersLock.Unlock()
328357

@@ -331,7 +360,6 @@ func (w *StreamingBatchWriter) startWorker(ctx context.Context, errCh chan<- err
331360
return nil
332361
}
333362

334-
// TODO: flush all workers for nested tables as well (See https://github.com/cloudquery/plugin-sdk/issues/1296)
335363
w.deleteRecordWorker = &streamingWorkerManager[*message.WriteDeleteRecord]{
336364
ch: make(chan *message.WriteDeleteRecord),
337365
writeFunc: w.client.DeleteRecords,
@@ -516,3 +544,6 @@ func (s *streamingWorkerManager[T]) run(ctx context.Context, wg *sync.WaitGroup)
516544
}
517545
}
518546
}
547+
548+
549+

writers/streamingbatchwriter/streamingbatchwriter_test.go

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,3 +624,61 @@ func requireErrorCount(t *testing.T, errCh chan error, expectedMin, expectedMax
624624
}
625625
return -1
626626
}
627+
628+
func TestDeleteRecordFlushesPendingInserts(t *testing.T) {
629+
t.Parallel()
630+
631+
ctx := context.Background()
632+
errCh := make(chan error, 10)
633+
634+
testClient := newClient()
635+
wr, err := New(testClient, WithBatchSizeRows(1000000)) // large batch to avoid auto-flush
636+
if err != nil {
637+
t.Fatal(err)
638+
}
639+
640+
// Create a table for insert
641+
insertTable := &schema.Table{
642+
Name: "child_table",
643+
Columns: []schema.Column{
644+
{
645+
Name: "id",
646+
Type: arrow.PrimitiveTypes.Int64,
647+
},
648+
},
649+
}
650+
651+
// Build insert record
652+
bldr := array.NewRecordBuilder(memory.DefaultAllocator, insertTable.ToArrowSchema())
653+
bldr.Field(0).(*array.Int64Builder).Append(1)
654+
record := bldr.NewRecord()
655+
656+
md := arrow.NewMetadata(
657+
[]string{schema.MetadataTableName},
658+
[]string{insertTable.Name},
659+
)
660+
newSchema := arrow.NewSchema(
661+
record.Schema().Fields(),
662+
&md,
663+
)
664+
665+
record = array.NewRecord(newSchema, record.Columns(), record.NumRows())
666+
667+
// Send insert
668+
if err := wr.startWorker(ctx, errCh, &message.WriteInsert{Record: record}); err != nil {
669+
t.Fatal(err)
670+
}
671+
672+
// send delete record to trigger flush
673+
del := &message.WriteDeleteRecord{
674+
DeleteRecord: message.DeleteRecord{
675+
TableName: insertTable.Name,
676+
},
677+
}
678+
679+
if err := wr.startWorker(ctx, errCh, del); err != nil {
680+
t.Fatal(err)
681+
}
682+
waitForLength(t, testClient.MessageLen, messageTypeInsert, 1)
683+
_ = wr.Close(ctx)
684+
}

0 commit comments

Comments
 (0)