Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 35 additions & 132 deletions src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,90 +39,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class BigQueryCallback implements ApiFutureCallback<AppendRowsResponse> {
public final Logger LOGGER = LoggerFactory.getLogger(BigQueryCallback.class);

private final MaxwellBigQueryProducerWorker parent;
private final AbstractAsyncProducer.CallbackCompleter cc;
private final Position position;
private MaxwellContext context;
AppendContext appendContext;

private Counter succeededMessageCount;
private Counter failedMessageCount;
private Meter succeededMessageMeter;
private Meter failedMessageMeter;

private static final int MAX_RETRY_COUNT = 2;
private final ImmutableList<Code> RETRIABLE_ERROR_CODES = ImmutableList.of(Code.INTERNAL, Code.ABORTED,
Code.CANCELLED);

public BigQueryCallback(MaxwellBigQueryProducerWorker parent,
AppendContext appendContext,
AbstractAsyncProducer.CallbackCompleter cc,
Position position,
Counter producedMessageCount, Counter failedMessageCount,
Meter succeededMessageMeter, Meter failedMessageMeter,
MaxwellContext context) {
this.parent = parent;
this.appendContext = appendContext;
this.cc = cc;
this.position = position;
this.succeededMessageCount = producedMessageCount;
this.failedMessageCount = failedMessageCount;
this.succeededMessageMeter = succeededMessageMeter;
this.failedMessageMeter = failedMessageMeter;
this.context = context;
}

@Override
public void onSuccess(AppendRowsResponse response) {
this.succeededMessageCount.inc();
this.succeededMessageMeter.mark();

if (LOGGER.isDebugEnabled()) {
try {
LOGGER.debug("-> {}\n" +
" {}\n",
this.appendContext.r.toJSON(), this.position);
} catch (Exception e) {
e.printStackTrace();
}
}
cc.markCompleted();
}

@Override
public void onFailure(Throwable t) {
this.failedMessageCount.inc();
this.failedMessageMeter.mark();

LOGGER.error(t.getClass().getSimpleName() + " @ " + position);
LOGGER.error(t.getLocalizedMessage());

Status status = Status.fromThrowable(t);
if (appendContext.retryCount < MAX_RETRY_COUNT
&& RETRIABLE_ERROR_CODES.contains(status.getCode())) {
appendContext.retryCount++;
try {
this.parent.sendAsync(appendContext.r, this.cc);
cc.markCompleted();
return;
} catch (Exception e) {
System.out.format("Failed to retry append: %s\n", e);
}
}

synchronized (this.parent.lock) {
if (this.parent.error == null) {
StorageException storageException = Exceptions.toStorageException(t);
this.parent.error = (storageException != null) ? storageException : new RuntimeException(t);
}
}
cc.markCompleted();
}
}

public class MaxwellBigQueryProducer extends AbstractProducer {

private final ArrayBlockingQueue<RowMap> queue;
Expand All @@ -149,27 +65,19 @@ public MaxwellBigQueryProducer(MaxwellContext context,String bigQueryProjectId,
thread.start();
}



@Override
public void push(RowMap r) throws Exception {
this.queue.put(r);
}
}

class AppendContext {
JSONArray data;
int retryCount = 0;
RowMap r = null;

AppendContext(JSONArray data, int retryCount, RowMap r) {
this.data = data;
this.retryCount = retryCount;
this.r = r;
@Override
public StoppableTask getStoppableTask() {
return this.worker;
}

}

class MaxwellBigQueryProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask {
class MaxwellBigQueryProducerWorker implements Runnable, StoppableTask {
static final Logger LOGGER = LoggerFactory.getLogger(MaxwellBigQueryProducerWorker.class);

private final ArrayBlockingQueue<RowMap> queue;
Expand All @@ -185,12 +93,8 @@ public MaxwellBigQueryProducerWorker(MaxwellContext context,
this.taskState = new StoppableTaskState("MaxwellBigQueryProducerWorker");
}

public final Object lock = new Object();
private JsonStreamWriter streamWriter;

@GuardedBy("lock")
public RuntimeException error = null;

public void initialize(TableName tName)
throws DescriptorValidationException, IOException, InterruptedException {

Expand All @@ -205,11 +109,6 @@ public void initialize(TableName tName)
public void requestStop() throws Exception {
taskState.requestStop();
streamWriter.close();
synchronized (this.lock) {
if (this.error != null) {
throw this.error;
}
}
}

@Override
Expand All @@ -221,13 +120,20 @@ public void awaitStop(Long timeout) throws TimeoutException {
public void run() {
this.thread = Thread.currentThread();
while (true) {
// pull rows from queue, and add them to arraylist
// once we have 100 elements, we will push in bulk via BQ API
ArrayList<RowMap> rows = new ArrayList<RowMap>(100);
try {
RowMap row = queue.take();
if (!taskState.isRunning()) {
taskState.stopped();
return;
}
this.push(row);
rows.add(row);
if(rows.size() >= 100) {
this.sendMany(rows);
rows.clear();
}
} catch (Exception e) {
taskState.stopped();
context.terminate(e);
Expand All @@ -236,32 +142,29 @@ public void run() {
}
}

@Override
public void sendAsync(RowMap r, CallbackCompleter cc) throws Exception {
synchronized (this.lock) {
if (this.error != null) {
throw this.error;
}
}
public void sendMany(ArrayList<RowMap> rows) throws Exception {
JSONArray jsonArr = new JSONArray();
JSONObject record = new JSONObject(r.toJSON(outputConfig));
LOGGER.debug("maxwell incoming log -> " + r.toJSON(outputConfig));
//stringfy columns in order to adapt noon cdc log table schema
String data = record.getJSONObject("data").toString();
String old = record.getJSONObject("old").toString();
String primary_key = record.get("primary_key").toString();
record.put("data", data);
record.put("old", old);
record.put("primary_key", primary_key);

jsonArr.put(record);
AppendContext appendContext = new AppendContext(jsonArr, 0, r);
for(RowMap r : rows) {
JSONObject record = new JSONObject(r.toJSON(outputConfig));
LOGGER.debug("maxwell incoming log -> " + r.toJSON(outputConfig));
//stringfy columns in order to adapt noon cdc log table schema
String data = record.getJSONObject("data").toString();
String old = record.getJSONObject("old").toString();
String primary_key = record.get("primary_key").toString();
record.put("data", data);
record.put("old", old);
record.put("primary_key", primary_key);
jsonArr.put(record);
}

ApiFuture<AppendRowsResponse> future = streamWriter.append(appendContext.data);
ApiFutures.addCallback(
future, new BigQueryCallback(this, appendContext, cc, r.getNextPosition(),
this.succeededMessageCount, this.failedMessageCount, this.succeededMessageMeter, this.failedMessageMeter,
this.context),
MoreExecutors.directExecutor());
ApiFuture<AppendRowsResponse> future = streamWriter.append(jsonArr);
// will throw TimeoutException if takes more than 60 seconds, crashing maxwell and restarting
AppendRowsResponse res = future.get(60, TimeUnit.SECONDS);
if(res.hasError()) {
// could attempt a retry, but just crash maxwell and let it restart
throw new Exception("error with BQ API");
}
this.context.setPosition(rows.get(rows.size() - 1));
}

}