diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java index 0468c3dc6..45f0bd701 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java @@ -39,90 +39,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class BigQueryCallback implements ApiFutureCallback { - public final Logger LOGGER = LoggerFactory.getLogger(BigQueryCallback.class); - - private final MaxwellBigQueryProducerWorker parent; - private final AbstractAsyncProducer.CallbackCompleter cc; - private final Position position; - private MaxwellContext context; - AppendContext appendContext; - - private Counter succeededMessageCount; - private Counter failedMessageCount; - private Meter succeededMessageMeter; - private Meter failedMessageMeter; - - private static final int MAX_RETRY_COUNT = 2; - private final ImmutableList RETRIABLE_ERROR_CODES = ImmutableList.of(Code.INTERNAL, Code.ABORTED, - Code.CANCELLED); - - public BigQueryCallback(MaxwellBigQueryProducerWorker parent, - AppendContext appendContext, - AbstractAsyncProducer.CallbackCompleter cc, - Position position, - Counter producedMessageCount, Counter failedMessageCount, - Meter succeededMessageMeter, Meter failedMessageMeter, - MaxwellContext context) { - this.parent = parent; - this.appendContext = appendContext; - this.cc = cc; - this.position = position; - this.succeededMessageCount = producedMessageCount; - this.failedMessageCount = failedMessageCount; - this.succeededMessageMeter = succeededMessageMeter; - this.failedMessageMeter = failedMessageMeter; - this.context = context; - } - - @Override - public void onSuccess(AppendRowsResponse response) { - this.succeededMessageCount.inc(); - this.succeededMessageMeter.mark(); - - if (LOGGER.isDebugEnabled()) { - try { - LOGGER.debug("-> {}\n" + - " {}\n", - this.appendContext.r.toJSON(), this.position); - } catch (Exception e) { - e.printStackTrace(); - } - } - cc.markCompleted(); - } - - @Override - public void onFailure(Throwable t) { - this.failedMessageCount.inc(); - this.failedMessageMeter.mark(); - - LOGGER.error(t.getClass().getSimpleName() + " @ " + position); - LOGGER.error(t.getLocalizedMessage()); - - Status status = Status.fromThrowable(t); - if (appendContext.retryCount < MAX_RETRY_COUNT - && RETRIABLE_ERROR_CODES.contains(status.getCode())) { - appendContext.retryCount++; - try { - this.parent.sendAsync(appendContext.r, this.cc); - cc.markCompleted(); - return; - } catch (Exception e) { - System.out.format("Failed to retry append: %s\n", e); - } - } - - synchronized (this.parent.lock) { - if (this.parent.error == null) { - StorageException storageException = Exceptions.toStorageException(t); - this.parent.error = (storageException != null) ? storageException : new RuntimeException(t); - } - } - cc.markCompleted(); - } -} - public class MaxwellBigQueryProducer extends AbstractProducer { private final ArrayBlockingQueue queue; @@ -149,27 +65,19 @@ public MaxwellBigQueryProducer(MaxwellContext context,String bigQueryProjectId, thread.start(); } - - @Override public void push(RowMap r) throws Exception { this.queue.put(r); } -} - -class AppendContext { - JSONArray data; - int retryCount = 0; - RowMap r = null; - AppendContext(JSONArray data, int retryCount, RowMap r) { - this.data = data; - this.retryCount = retryCount; - this.r = r; + @Override + public StoppableTask getStoppableTask() { + return this.worker; } + } -class MaxwellBigQueryProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask { +class MaxwellBigQueryProducerWorker implements Runnable, StoppableTask { static final Logger LOGGER = LoggerFactory.getLogger(MaxwellBigQueryProducerWorker.class); private final ArrayBlockingQueue queue; @@ -185,12 +93,8 @@ public MaxwellBigQueryProducerWorker(MaxwellContext context, this.taskState = new StoppableTaskState("MaxwellBigQueryProducerWorker"); } - public final Object lock = new Object(); private JsonStreamWriter streamWriter; - @GuardedBy("lock") - public RuntimeException error = null; - public void initialize(TableName tName) throws DescriptorValidationException, IOException, InterruptedException { @@ -205,11 +109,6 @@ public void initialize(TableName tName) public void requestStop() throws Exception { taskState.requestStop(); streamWriter.close(); - synchronized (this.lock) { - if (this.error != null) { - throw this.error; - } - } } @Override @@ -221,13 +120,20 @@ public void awaitStop(Long timeout) throws TimeoutException { public void run() { this.thread = Thread.currentThread(); while (true) { + // pull rows from queue, and add them to arraylist + // once we have 100 elements, we will push in bulk via BQ API + ArrayList rows = new ArrayList(100); try { RowMap row = queue.take(); if (!taskState.isRunning()) { taskState.stopped(); return; } - this.push(row); + rows.add(row); + if(rows.size() >= 100) { + this.sendMany(rows); + rows.clear(); + } } catch (Exception e) { taskState.stopped(); context.terminate(e); @@ -236,32 +142,29 @@ public void run() { } } - @Override - public void sendAsync(RowMap r, CallbackCompleter cc) throws Exception { - synchronized (this.lock) { - if (this.error != null) { - throw this.error; - } - } + public void sendMany(ArrayList rows) throws Exception { JSONArray jsonArr = new JSONArray(); - JSONObject record = new JSONObject(r.toJSON(outputConfig)); - LOGGER.debug("maxwell incoming log -> " + r.toJSON(outputConfig)); - //stringfy columns in order to adapt noon cdc log table schema - String data = record.getJSONObject("data").toString(); - String old = record.getJSONObject("old").toString(); - String primary_key = record.get("primary_key").toString(); - record.put("data", data); - record.put("old", old); - record.put("primary_key", primary_key); - - jsonArr.put(record); - AppendContext appendContext = new AppendContext(jsonArr, 0, r); + for(RowMap r : rows) { + JSONObject record = new JSONObject(r.toJSON(outputConfig)); + LOGGER.debug("maxwell incoming log -> " + r.toJSON(outputConfig)); + //stringfy columns in order to adapt noon cdc log table schema + String data = record.getJSONObject("data").toString(); + String old = record.getJSONObject("old").toString(); + String primary_key = record.get("primary_key").toString(); + record.put("data", data); + record.put("old", old); + record.put("primary_key", primary_key); + jsonArr.put(record); + } - ApiFuture future = streamWriter.append(appendContext.data); - ApiFutures.addCallback( - future, new BigQueryCallback(this, appendContext, cc, r.getNextPosition(), - this.succeededMessageCount, this.failedMessageCount, this.succeededMessageMeter, this.failedMessageMeter, - this.context), - MoreExecutors.directExecutor()); + ApiFuture future = streamWriter.append(jsonArr); + // will throw TimeoutException if takes more than 60 seconds, crashing maxwell and restarting + AppendRowsResponse res = future.get(60, TimeUnit.SECONDS); + if(res.hasError()) { + // could attempt a retry, but just crash maxwell and let it restart + throw new Exception("error with BQ API"); + } + this.context.setPosition(rows.get(rows.size() - 1)); } + }