Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions src/main/java/com/zendesk/maxwell/Maxwell.java
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,21 @@ private void logColumnCastError(ColumnDefCastException e) throws SQLException, S
*/
protected Position getInitialPosition() throws Exception {
/* first method: do we have a stored position for this server? */
LOGGER.info("[maxwell] looking up stored position for clientID={}", config.clientID);
Position initial = this.context.getInitialPosition();

if (initial != null) {
LOGGER.info("[maxwell] resuming from stored position: {}", initial);
}

if (initial == null) {

/* second method: are we recovering from a master swap? */
if ( config.masterRecovery ) {
LOGGER.info("[maxwell] no stored position found, attempting master recovery");
initial = attemptMasterRecovery();
if ( initial != null )
LOGGER.info("[maxwell] master recovery position: {}", initial);
}

/* third method: is there a previous client_id?
Expand All @@ -169,15 +177,17 @@ protected Position getInitialPosition() throws Exception {
if ( initial == null ) {
initial = this.context.getOtherClientPosition();
if ( initial != null ) {
LOGGER.info("Found previous client position: " + initial);
LOGGER.info("[maxwell] found previous client position: {}", initial);
}
}

/* fourth method: capture the current master position. */
if ( initial == null ) {
LOGGER.info("[maxwell] no prior position found, capturing current master position");
try ( Connection c = context.getReplicationConnection() ) {
initial = Position.capture(c, config.gtidMode);
}
LOGGER.info("[maxwell] starting fresh at master position: {}", initial);
}

/* if the initial position didn't come from the store, store it */
Expand Down Expand Up @@ -239,11 +249,13 @@ public void start() throws Exception {
}

private void startInner() throws Exception {
LOGGER.info("[maxwell] stage 1/6: verifying MySQL replication and Maxwell schema state");
try ( Connection connection = this.context.getReplicationConnection();
Connection rawConnection = this.context.getRawMaxwellConnection() ) {
MaxwellMysqlStatus.ensureReplicationMysqlState(connection);
MaxwellMysqlStatus.ensureMaxwellMysqlState(rawConnection);
if (config.gtidMode) {
LOGGER.info("[maxwell] gtidMode=true, verifying GTID MySQL state");
MaxwellMysqlStatus.ensureGtidMysqlState(connection);
}

Expand All @@ -254,23 +266,32 @@ private void startInner() throws Exception {
}
}

LOGGER.info("[maxwell] stage 2/6: creating producer");
AbstractProducer producer = this.context.getProducer();
LOGGER.info("[maxwell] producer created: {}", producer.getClass().getSimpleName());

LOGGER.info("[maxwell] stage 3/6: determining initial replication position");
Position initPosition = getInitialPosition();
logBanner(producer, initPosition);
this.context.setPosition(initPosition);
LOGGER.info("[maxwell] initial position: {}", initPosition);

LOGGER.info("[maxwell] stage 4/6: loading schema store");
MysqlSchemaStore mysqlSchemaStore = new MysqlSchemaStore(this.context, initPosition);
BootstrapController bootstrapController = this.context.getBootstrapController(mysqlSchemaStore.getSchemaID());

this.context.startSchemaCompactor();

if (config.recaptureSchema) {
LOGGER.info("[maxwell] recaptureSchema=true, capturing full schema before starting");
mysqlSchemaStore.captureAndSaveSchema();
}

mysqlSchemaStore.getSchema(); // trigger schema to load / capture before we start the replicator.
LOGGER.info("[maxwell] triggering initial schema load");
mysqlSchemaStore.getSchema();
LOGGER.info("[maxwell] schema loaded, schemaID={}", mysqlSchemaStore.getSchemaID());

LOGGER.info("[maxwell] stage 5/6: creating replicator");
this.replicator = new BinlogConnectorReplicator(
mysqlSchemaStore,
producer,
Expand All @@ -295,6 +316,7 @@ private void startInner() throws Exception {
context.setReplicator(replicator);
this.context.start();

LOGGER.info("[maxwell] stage 6/6: starting replicator and entering run loop");
replicator.startReplicator();
this.onReplicatorStart();

Expand All @@ -303,6 +325,7 @@ private void startInner() throws Exception {
} catch ( ColumnDefCastException e ) {
logColumnCastError(e);
}
LOGGER.info("[maxwell] run loop exited normally");
}


Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
package com.zendesk.maxwell.producer;

import com.codahale.metrics.Gauge;
import com.zendesk.maxwell.MaxwellConfig;
import com.zendesk.maxwell.MaxwellContext;
import com.zendesk.maxwell.monitoring.Metrics;
import com.zendesk.maxwell.replication.Position;
import com.zendesk.maxwell.row.RowMap;

import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractAsyncProducer extends AbstractProducer {
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAsyncProducer.class);
private static final long SLOT_WAIT_WARN_MS = 5_000;

public class CallbackCompleter {
private InflightMessageList inflightMessages;
Expand Down Expand Up @@ -66,8 +69,10 @@ public AbstractAsyncProducer(MaxwellContext context) {
@Override
public final void push(RowMap r) throws Exception {
Position position = r.getNextPosition();
// Rows that do not get sent to the prodcuer will be automatically marked as complete.
// Rows that do not get sent to the producer will be automatically marked as complete.
if(!r.shouldOutput(outputConfig)) {
LOGGER.debug("[async-producer] skipping row (shouldOutput=false): db={} table={} type={}",
r.getDatabase(), r.getTable(), r.getRowType());
if ( position != null ) {
inflightMessages.addMessage(position, r.getTimestampMillis(), 0L);

Expand All @@ -79,9 +84,19 @@ public final void push(RowMap r) throws Exception {
return;
}

// back-pressure from slow producers
// back-pressure: if the inflight list is at capacity, waitForSlot blocks
int inflightSize = inflightMessages.size();
if ( inflightSize > 5000 ) {
LOGGER.warn("[async-producer] high inflight message count: {} messages pending. Producer may be slow or stuck.", inflightSize);
}

long slotWaitStart = System.currentTimeMillis();
long messageID = inflightMessages.waitForSlot();
long slotWaitMs = System.currentTimeMillis() - slotWaitStart;
if ( slotWaitMs > SLOT_WAIT_WARN_MS ) {
LOGGER.warn("[async-producer] waited {}ms for an inflight slot (capacity full). db={} table={} inflightSize={}",
slotWaitMs, r.getDatabase(), r.getTable(), inflightMessages.size());
}

if(r.isTXCommit()) {
inflightMessages.addMessage(position, r.getTimestampMillis(), messageID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.concurrent.Semaphore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InflightMessageList {
private static final Logger LOGGER = LoggerFactory.getLogger(InflightMessageList.class);

class InflightMessage {
public final Position position;
public boolean isComplete;
Expand Down Expand Up @@ -87,9 +91,6 @@ private synchronized InflightMessage head() {
private void checkStuckHead(long messageID) {
// If the head is stuck for the length of time (configurable)
// we assume the head will unlikely get acknowledged, hence terminate Maxwell.
// This gatekeeper is the last resort since if anything goes wrong,
// producer should have raised exceptions earlier, but sometimes kafka just goes to lunch and eats
// a message entirely

if (producerAckTimeoutMS == 0)
return;
Expand All @@ -98,12 +99,20 @@ private void checkStuckHead(long messageID) {
if ( message == null || message.messageID == messageID )
return;

if ( message.timeAsBlockedHead() > producerAckTimeoutMS ) {
long blockedForMs = message.timeAsBlockedHead();
if ( blockedForMs > producerAckTimeoutMS ) {
LOGGER.error("[inflight] head of inflight list has been stuck for {}ms (timeout={}ms), terminating Maxwell. position={} messageID={}",
blockedForMs, producerAckTimeoutMS, message.position, message.messageID);
LOGGER.error("[inflight] inflight list size={} at time of termination", this.linkedMap.size());
IllegalStateException e = new IllegalStateException(
"Did not receive acknowledgement for the head of the inflight message list for " + producerAckTimeoutMS + " ms"
);
context.terminate(e);
} else {
if ( blockedForMs > producerAckTimeoutMS / 2 ) {
LOGGER.warn("[inflight] head of inflight list has been blocked for {}ms (timeout={}ms), position={} inflightSize={}",
blockedForMs, producerAckTimeoutMS, message.position, this.linkedMap.size());
}
message.markBlockedHead();
}
}
Expand All @@ -127,6 +136,7 @@ public synchronized InflightMessage completeMessage(Position p) {
m.isComplete = true;

InflightMessage completeUntil = null;
int drained = 0;
Iterator<InflightMessage> iterator = iterator();

while ( iterator.hasNext() ) {
Expand All @@ -137,6 +147,12 @@ public synchronized InflightMessage completeMessage(Position p) {

completeUntil = msg;
iterator.remove();
drained++;
}

if ( drained > 0 ) {
LOGGER.debug("[inflight] drained {} completed messages, position advanced to={}, remaining={}",
drained, completeUntil != null ? completeUntil.position : "none", this.linkedMap.size());
}

return completeUntil;
Expand Down
Loading