diff --git a/src/main/java/com/zendesk/maxwell/Maxwell.java b/src/main/java/com/zendesk/maxwell/Maxwell.java index 04b0741d4..0f93f183d 100644 --- a/src/main/java/com/zendesk/maxwell/Maxwell.java +++ b/src/main/java/com/zendesk/maxwell/Maxwell.java @@ -153,13 +153,21 @@ private void logColumnCastError(ColumnDefCastException e) throws SQLException, S */ protected Position getInitialPosition() throws Exception { /* first method: do we have a stored position for this server? */ + LOGGER.info("[maxwell] looking up stored position for clientID={}", config.clientID); Position initial = this.context.getInitialPosition(); + if (initial != null) { + LOGGER.info("[maxwell] resuming from stored position: {}", initial); + } + if (initial == null) { /* second method: are we recovering from a master swap? */ if ( config.masterRecovery ) { + LOGGER.info("[maxwell] no stored position found, attempting master recovery"); initial = attemptMasterRecovery(); + if ( initial != null ) + LOGGER.info("[maxwell] master recovery position: {}", initial); } /* third method: is there a previous client_id? @@ -169,15 +177,17 @@ protected Position getInitialPosition() throws Exception { if ( initial == null ) { initial = this.context.getOtherClientPosition(); if ( initial != null ) { - LOGGER.info("Found previous client position: " + initial); + LOGGER.info("[maxwell] found previous client position: {}", initial); } } /* fourth method: capture the current master position. */ if ( initial == null ) { + LOGGER.info("[maxwell] no prior position found, capturing current master position"); try ( Connection c = context.getReplicationConnection() ) { initial = Position.capture(c, config.gtidMode); } + LOGGER.info("[maxwell] starting fresh at master position: {}", initial); } /* if the initial position didn't come from the store, store it */ @@ -239,11 +249,13 @@ public void start() throws Exception { } private void startInner() throws Exception { + LOGGER.info("[maxwell] stage 1/6: verifying MySQL replication and Maxwell schema state"); try ( Connection connection = this.context.getReplicationConnection(); Connection rawConnection = this.context.getRawMaxwellConnection() ) { MaxwellMysqlStatus.ensureReplicationMysqlState(connection); MaxwellMysqlStatus.ensureMaxwellMysqlState(rawConnection); if (config.gtidMode) { + LOGGER.info("[maxwell] gtidMode=true, verifying GTID MySQL state"); MaxwellMysqlStatus.ensureGtidMysqlState(connection); } @@ -254,23 +266,32 @@ private void startInner() throws Exception { } } + LOGGER.info("[maxwell] stage 2/6: creating producer"); AbstractProducer producer = this.context.getProducer(); + LOGGER.info("[maxwell] producer created: {}", producer.getClass().getSimpleName()); + LOGGER.info("[maxwell] stage 3/6: determining initial replication position"); Position initPosition = getInitialPosition(); logBanner(producer, initPosition); this.context.setPosition(initPosition); + LOGGER.info("[maxwell] initial position: {}", initPosition); + LOGGER.info("[maxwell] stage 4/6: loading schema store"); MysqlSchemaStore mysqlSchemaStore = new MysqlSchemaStore(this.context, initPosition); BootstrapController bootstrapController = this.context.getBootstrapController(mysqlSchemaStore.getSchemaID()); this.context.startSchemaCompactor(); if (config.recaptureSchema) { + LOGGER.info("[maxwell] recaptureSchema=true, capturing full schema before starting"); mysqlSchemaStore.captureAndSaveSchema(); } - mysqlSchemaStore.getSchema(); // trigger schema to load / capture before we start the replicator. + LOGGER.info("[maxwell] triggering initial schema load"); + mysqlSchemaStore.getSchema(); + LOGGER.info("[maxwell] schema loaded, schemaID={}", mysqlSchemaStore.getSchemaID()); + LOGGER.info("[maxwell] stage 5/6: creating replicator"); this.replicator = new BinlogConnectorReplicator( mysqlSchemaStore, producer, @@ -295,6 +316,7 @@ private void startInner() throws Exception { context.setReplicator(replicator); this.context.start(); + LOGGER.info("[maxwell] stage 6/6: starting replicator and entering run loop"); replicator.startReplicator(); this.onReplicatorStart(); @@ -303,6 +325,7 @@ private void startInner() throws Exception { } catch ( ColumnDefCastException e ) { logColumnCastError(e); } + LOGGER.info("[maxwell] run loop exited normally"); } diff --git a/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java b/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java index 97262b537..b0cd3b0bd 100644 --- a/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/AbstractAsyncProducer.java @@ -1,15 +1,18 @@ package com.zendesk.maxwell.producer; import com.codahale.metrics.Gauge; -import com.zendesk.maxwell.MaxwellConfig; import com.zendesk.maxwell.MaxwellContext; import com.zendesk.maxwell.monitoring.Metrics; import com.zendesk.maxwell.replication.Position; import com.zendesk.maxwell.row.RowMap; import java.util.concurrent.TimeUnit; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public abstract class AbstractAsyncProducer extends AbstractProducer { + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAsyncProducer.class); + private static final long SLOT_WAIT_WARN_MS = 5_000; public class CallbackCompleter { private InflightMessageList inflightMessages; @@ -66,8 +69,10 @@ public AbstractAsyncProducer(MaxwellContext context) { @Override public final void push(RowMap r) throws Exception { Position position = r.getNextPosition(); - // Rows that do not get sent to the prodcuer will be automatically marked as complete. + // Rows that do not get sent to the producer will be automatically marked as complete. if(!r.shouldOutput(outputConfig)) { + LOGGER.debug("[async-producer] skipping row (shouldOutput=false): db={} table={} type={}", + r.getDatabase(), r.getTable(), r.getRowType()); if ( position != null ) { inflightMessages.addMessage(position, r.getTimestampMillis(), 0L); @@ -79,9 +84,19 @@ public final void push(RowMap r) throws Exception { return; } - // back-pressure from slow producers + // back-pressure: if the inflight list is at capacity, waitForSlot blocks + int inflightSize = inflightMessages.size(); + if ( inflightSize > 5000 ) { + LOGGER.warn("[async-producer] high inflight message count: {} messages pending. Producer may be slow or stuck.", inflightSize); + } + long slotWaitStart = System.currentTimeMillis(); long messageID = inflightMessages.waitForSlot(); + long slotWaitMs = System.currentTimeMillis() - slotWaitStart; + if ( slotWaitMs > SLOT_WAIT_WARN_MS ) { + LOGGER.warn("[async-producer] waited {}ms for an inflight slot (capacity full). db={} table={} inflightSize={}", + slotWaitMs, r.getDatabase(), r.getTable(), inflightMessages.size()); + } if(r.isTXCommit()) { inflightMessages.addMessage(position, r.getTimestampMillis(), messageID); diff --git a/src/main/java/com/zendesk/maxwell/producer/InflightMessageList.java b/src/main/java/com/zendesk/maxwell/producer/InflightMessageList.java index ecd03a2f5..f991e1120 100644 --- a/src/main/java/com/zendesk/maxwell/producer/InflightMessageList.java +++ b/src/main/java/com/zendesk/maxwell/producer/InflightMessageList.java @@ -12,8 +12,12 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.concurrent.Semaphore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class InflightMessageList { + private static final Logger LOGGER = LoggerFactory.getLogger(InflightMessageList.class); + class InflightMessage { public final Position position; public boolean isComplete; @@ -87,9 +91,6 @@ private synchronized InflightMessage head() { private void checkStuckHead(long messageID) { // If the head is stuck for the length of time (configurable) // we assume the head will unlikely get acknowledged, hence terminate Maxwell. - // This gatekeeper is the last resort since if anything goes wrong, - // producer should have raised exceptions earlier, but sometimes kafka just goes to lunch and eats - // a message entirely if (producerAckTimeoutMS == 0) return; @@ -98,12 +99,20 @@ private void checkStuckHead(long messageID) { if ( message == null || message.messageID == messageID ) return; - if ( message.timeAsBlockedHead() > producerAckTimeoutMS ) { + long blockedForMs = message.timeAsBlockedHead(); + if ( blockedForMs > producerAckTimeoutMS ) { + LOGGER.error("[inflight] head of inflight list has been stuck for {}ms (timeout={}ms), terminating Maxwell. position={} messageID={}", + blockedForMs, producerAckTimeoutMS, message.position, message.messageID); + LOGGER.error("[inflight] inflight list size={} at time of termination", this.linkedMap.size()); IllegalStateException e = new IllegalStateException( "Did not receive acknowledgement for the head of the inflight message list for " + producerAckTimeoutMS + " ms" ); context.terminate(e); } else { + if ( blockedForMs > producerAckTimeoutMS / 2 ) { + LOGGER.warn("[inflight] head of inflight list has been blocked for {}ms (timeout={}ms), position={} inflightSize={}", + blockedForMs, producerAckTimeoutMS, message.position, this.linkedMap.size()); + } message.markBlockedHead(); } } @@ -127,6 +136,7 @@ public synchronized InflightMessage completeMessage(Position p) { m.isComplete = true; InflightMessage completeUntil = null; + int drained = 0; Iterator iterator = iterator(); while ( iterator.hasNext() ) { @@ -137,6 +147,12 @@ public synchronized InflightMessage completeMessage(Position p) { completeUntil = msg; iterator.remove(); + drained++; + } + + if ( drained > 0 ) { + LOGGER.debug("[inflight] drained {} completed messages, position advanced to={}, remaining={}", + drained, completeUntil != null ? completeUntil.position : "none", this.linkedMap.size()); } return completeUntil; diff --git a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java index 4a6141e69..2d0e738f6 100644 --- a/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java +++ b/src/main/java/com/zendesk/maxwell/producer/MaxwellBigQueryProducer.java @@ -88,54 +88,56 @@ public BigQueryCallback(MaxwellBigQueryProducerWorker parent, @Override public void onSuccess(AppendRowsResponse response) { - for (int i = 0; i < appendContext.callbacks.size(); i++) { + int batchSize = appendContext.callbacks.size(); + LOGGER.info("[bq-callback] worker={} batch SUCCESS: {} rows acknowledged, position={}", + parent.getWorkerId(), batchSize, this.position); + for (int i = 0; i < batchSize; i++) { this.succeededMessageCount.inc(); this.succeededMessageMeter.mark(); AbstractAsyncProducer.CallbackCompleter cc = (AbstractAsyncProducer.CallbackCompleter) appendContext.callbacks.get(i); cc.markCompleted(); - - if (LOGGER.isDebugEnabled()) { - try { - LOGGER.debug("Worker {} -> {}\n", parent.getWorkerId(), this.position); - } catch (Exception e) { - e.printStackTrace(); - } - } } } @Override public void onFailure(Throwable t) { - for (int i = 0; i < appendContext.callbacks.size(); i++) { + int batchSize = appendContext.callbacks.size(); + for (int i = 0; i < batchSize; i++) { this.failedMessageCount.inc(); this.failedMessageMeter.mark(); } - LOGGER.error("Worker {} " + t.getClass().getSimpleName() + " @ " + position, parent.getWorkerId()); - LOGGER.error("Worker {} " + t.getLocalizedMessage(), parent.getWorkerId()); - Status status = Status.fromThrowable(t); + LOGGER.error("[bq-callback] worker={} batch FAILED: {} rows, status={} ({}), position={}, retryCount={}/{}", + parent.getWorkerId(), batchSize, status.getCode(), t.getClass().getSimpleName(), + position, appendContext.retryCount, MAX_RETRY_COUNT); + LOGGER.error("[bq-callback] worker={} failure detail: {}", parent.getWorkerId(), t.getLocalizedMessage(), t); + if (appendContext.retryCount < MAX_RETRY_COUNT && RETRIABLE_ERROR_CODES.contains(status.getCode())) { appendContext.retryCount++; + LOGGER.warn("[bq-callback] worker={} retrying batch (attempt {}/{})", + parent.getWorkerId(), appendContext.retryCount, MAX_RETRY_COUNT); try { this.parent.attemptBatch(appendContext); return; } catch (Exception e) { - System.out.format("Worker {} Failed to retry append: %s\n", parent.getWorkerId(), e); + LOGGER.error("[bq-callback] worker={} retry attempt failed: {}", parent.getWorkerId(), e.getMessage(), e); } } synchronized (this.parent.getLock()) { if (this.parent.getError() == null && !this.context.getConfig().ignoreProducerError) { + LOGGER.error("[bq-callback] worker={} fatal error, terminating Maxwell. ignoreProducerError=false", parent.getWorkerId()); StorageException storageException = Exceptions.toStorageException(t); this.parent.setError((storageException != null) ? storageException : new RuntimeException(t)); context.terminate(); return; } } - // got an error, but we are ingoring producer error - for (int i = 0; i < appendContext.callbacks.size(); i++) { + LOGGER.warn("[bq-callback] worker={} ignoring batch failure (ignoreProducerError=true), marking {} rows complete", + parent.getWorkerId(), batchSize); + for (int i = 0; i < batchSize; i++) { AbstractAsyncProducer.CallbackCompleter cc = (AbstractAsyncProducer.CallbackCompleter) appendContext.callbacks.get(i); cc.markCompleted(); } @@ -155,7 +157,10 @@ public MaxwellBigQueryProducer(MaxwellContext context, String bigQueryProjectId, throws IOException { super(context); bigqueryThreads = Math.max(1, bigqueryThreads); - this.queue = new ArrayBlockingQueue<>(bigqueryThreads * MaxwellBigQueryProducerWorker.BATCH_SIZE); + int queueCapacity = bigqueryThreads * MaxwellBigQueryProducerWorker.BATCH_SIZE; + LOGGER.info("[bq-producer] initializing: project={}, dataset={}, table={}, threads={}, queueCapacity={}", + bigQueryProjectId, bigQueryDataset, bigQueryTable, bigqueryThreads, queueCapacity); + this.queue = new ArrayBlockingQueue<>(queueCapacity); ThreadFactory workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("bq-worker-%d").setDaemon(true).build(); this.workerExecutor = Executors.newFixedThreadPool(bigqueryThreads, workerThreadFactory); @@ -166,35 +171,40 @@ public MaxwellBigQueryProducer(MaxwellContext context, String bigQueryProjectId, this.workers = new ArrayList<>(bigqueryThreads); TableName tableName = TableName.of(bigQueryProjectId, bigQueryDataset, bigQueryTable); startWorkers(context, tableName); + LOGGER.info("[bq-producer] fully initialized with {} worker(s)", this.workers.size()); } private void startWorkers(MaxwellContext context, TableName tableName) throws IOException { int numWorkers = this.workers.size(); TableSchema tableSchema = getTableSchema(tableName); - // Create and start workers for (int i = 0; i < Math.max(1, numWorkers); i++) { + LOGGER.info("[bq-producer] initializing worker {}", i); try { MaxwellBigQueryProducerWorker worker = new MaxwellBigQueryProducerWorker( context, this.queue, - this.callbackExecutor, // Pass callback executor - i // Pass worker ID + this.callbackExecutor, + i ); worker.initialize(tableName, tableSchema); this.workers.add(worker); this.workerExecutor.submit(worker); + LOGGER.info("[bq-producer] worker {} started and submitted to executor", i); } catch (DescriptorValidationException | IOException | InterruptedException e) { - LOGGER.error("Failed to initialize MaxwellBigQueryProducer worker {}: {}", i, e.getMessage(), e); - // Don't try to shutdown executors, just throw + LOGGER.error("[bq-producer] failed to initialize worker {}: {}", i, e.getMessage(), e); throw new IOException("Failed to initialize worker " + i, e); } } - LOGGER.info("Submitted {} workers to executor.", this.workers.size()); + LOGGER.info("[bq-producer] {} worker(s) submitted to executor", this.workers.size()); } private TableSchema getTableSchema(TableName tName) throws IOException { + LOGGER.info("[bq-producer] fetching BigQuery table schema: {}", tName); BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(tName.getProject()).build().getService(); Table table = bigquery.getTable(tName.getDataset(), tName.getTable()); + if ( table == null ) { + throw new IOException("[bq-producer] BigQuery table not found: " + tName); + } Schema schema = table.getDefinition().getSchema(); // Filter out bq_inserted_at column from the schema List filteredFields = schema.getFields().stream() @@ -202,12 +212,25 @@ private TableSchema getTableSchema(TableName tName) throws IOException { .collect(Collectors.toList()); Schema filteredSchema = Schema.of(filteredFields); TableSchema tableSchema = BqToBqStorageSchemaConverter.convertTableSchema(filteredSchema); + LOGGER.info("[bq-producer] schema fetched for {}: {} fields (after filtering bq_inserted_at)", + tName, filteredFields.size()); return tableSchema; } + private static final long PUSH_QUEUE_WARN_MS = 5_000; + @Override public void push(RowMap r) throws Exception { - this.queue.put(r); + if ( !this.queue.offer(r) ) { + long waitStart = System.currentTimeMillis(); + LOGGER.warn("[bq-producer] row queue is full (capacity={}), blocking until a worker drains it. db={} table={}", + this.queue.size() + this.queue.remainingCapacity(), r.getDatabase(), r.getTable()); + this.queue.put(r); + long waitMs = System.currentTimeMillis() - waitStart; + if ( waitMs > PUSH_QUEUE_WARN_MS ) { + LOGGER.warn("[bq-producer] unblocked after {}ms waiting for queue space", waitMs); + } + } } } class MaxwellBigQueryProducerWorker extends AbstractAsyncProducer implements Runnable, StoppableTask { @@ -377,11 +400,14 @@ private void covertJSONObjectFieldsToString(JSONObject record) { public void initialize(TableName tName, TableSchema tableSchema) throws DescriptorValidationException, IOException, InterruptedException { + LOGGER.info("[bq-worker-{}] initializing JsonStreamWriter for table={}", workerId, tName); this.streamWriter = JsonStreamWriter.newBuilder(tName.toString(), tableSchema).build(); + LOGGER.info("[bq-worker-{}] JsonStreamWriter ready", workerId); } @Override public void requestStop() throws Exception { + LOGGER.info("[bq-worker-{}] stop requested", workerId); taskState.requestStop(); streamWriter.close(); scheduledExecutor.shutdown(); @@ -390,6 +416,7 @@ public void requestStop() throws Exception { throw this.error; } } + LOGGER.info("[bq-worker-{}] stopped", workerId); } @Override @@ -400,15 +427,25 @@ public void awaitStop(Long timeout) throws TimeoutException { @Override public void run() { this.thread = Thread.currentThread(); + LOGGER.info("[bq-worker-{}] run loop started", workerId); + long rowsProcessed = 0; while (true) { try { RowMap row = queue.take(); if (!taskState.isRunning()) { + LOGGER.info("[bq-worker-{}] stop flag set, exiting run loop after {} rows", workerId, rowsProcessed); taskState.stopped(); return; } this.push(row); + rowsProcessed++; + if ( rowsProcessed % 10_000 == 0 ) { + LOGGER.info("[bq-worker-{}] progress: {} rows sent to BQ, queue remaining={}", + workerId, rowsProcessed, queue.size()); + } } catch (Exception e) { + LOGGER.error("[bq-worker-{}] exception in run loop after {} rows, terminating Maxwell: {}", + workerId, rowsProcessed, e.getMessage(), e); taskState.stopped(); context.terminate(e); return; @@ -418,6 +455,8 @@ public void run() { @Override public void sendAsync(RowMap r, CallbackCompleter cc) throws Exception { + LOGGER.debug("[bq-worker-{}] sendAsync: type={} db={} table={} position={}", + workerId, r.getRowType(), r.getDatabase(), r.getTable(), r.getNextPosition()); JSONObject record = new JSONObject(r.toJSON(outputConfig)); applyColumnTransformations(r, record); @@ -425,7 +464,7 @@ public void sendAsync(RowMap r, CallbackCompleter cc) throws Exception { int recordSize = AppendContext.getJsonByteSize(record); if (recordSize >= 9 * 1024 * 1024) { - LOGGER.error("Worker {} skipping oversized record: {} bytes for table {}.{}, position {}", + LOGGER.error("[bq-worker-{}] skipping oversized record: {} bytes for table {}.{}, position {}", this.workerId, recordSize, r.getDatabase(), r.getTable(), r.getNextPosition()); cc.markCompleted(); return; @@ -444,8 +483,15 @@ public void sendAsync(RowMap r, CallbackCompleter cc) throws Exception { this.appendContext.addRow(r, record, cc); - if(this.appendContext.callbacks.size() >= BATCH_SIZE - || this.appendContext.getApproximateSize() >= MAX_MESSAGE_SIZE_BYTES) { + int currentBatchSize = this.appendContext.callbacks.size(); + long currentBatchBytes = this.appendContext.getApproximateSize(); + LOGGER.debug("[bq-worker-{}] append context: {}/{} rows, {}/{} bytes", + workerId, currentBatchSize, BATCH_SIZE, currentBatchBytes, MAX_MESSAGE_SIZE_BYTES); + + if(currentBatchSize >= BATCH_SIZE || currentBatchBytes >= MAX_MESSAGE_SIZE_BYTES) { + LOGGER.info("[bq-worker-{}] flushing full batch: {} rows, ~{} bytes (trigger: {})", + workerId, currentBatchSize, currentBatchBytes, + currentBatchSize >= BATCH_SIZE ? "row count" : "size limit"); synchronized (this.getLock()) { this.attemptBatch(this.appendContext); this.appendContext = null; @@ -457,6 +503,9 @@ public void attemptBatch(AppendContext appendContext) throws DescriptorValidatio if(appendContext.scheduledTask != null && !appendContext.scheduledTask.isDone()) { appendContext.scheduledTask.cancel(false); } + LOGGER.info("[bq-worker-{}] attempting to send batch: {} rows, ~{} bytes, retryCount={}, position={}", + workerId, appendContext.callbacks.size(), appendContext.getApproximateSize(), + appendContext.retryCount, appendContext.position); ApiFuture future = streamWriter.append(appendContext.data); ApiFutures.addCallback( @@ -469,17 +518,24 @@ future, new BigQueryCallback(this, appendContext, public void scheduleAttempt(final AppendContext appendContext) { + LOGGER.debug("[bq-worker-{}] scheduling batch flush in 1 minute (rows so far: {})", + workerId, appendContext.callbacks.size()); appendContext.scheduledTask = this.scheduledExecutor.schedule(() -> { try { synchronized (this.getLock()) { - this.attemptBatch(this.appendContext); - this.appendContext = null; // Nullify after attempting via scheduler + if (this.appendContext != null) { + LOGGER.info("[bq-worker-{}] scheduled flush triggered: {} rows, ~{} bytes", + workerId, this.appendContext.callbacks.size(), this.appendContext.getApproximateSize()); + this.attemptBatch(this.appendContext); + this.appendContext = null; + } else { + LOGGER.debug("[bq-worker-{}] scheduled flush found null appendContext (already flushed)", workerId); + } } } catch (Exception e) { - LOGGER.error("Error sending scheduled bigquery batch message"); - e.printStackTrace(); + LOGGER.error("[bq-worker-{}] error in scheduled batch flush: {}", workerId, e.getMessage(), e); } - }, 1, TimeUnit.MINUTES); // 1 minute delay + }, 1, TimeUnit.MINUTES); } private void applyColumnTransformations(RowMap row, JSONObject record) { diff --git a/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorEventListener.java b/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorEventListener.java index 06f5b5670..5aa887d93 100644 --- a/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorEventListener.java +++ b/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorEventListener.java @@ -18,6 +18,8 @@ class BinlogConnectorEventListener implements BinaryLogClient.EventListener { private static final Logger LOGGER = LoggerFactory.getLogger(BinlogConnectorEventListener.class); + private static final long LAG_WARN_THRESHOLD_MS = 30_000; + private static final long QUEUE_FULL_LOG_INTERVAL_MS = 5_000; private final BlockingQueue queue; private final Timer queueTimer; @@ -26,6 +28,8 @@ class BinlogConnectorEventListener implements BinaryLogClient.EventListener { private final MaxwellOutputConfig outputConfig; private long replicationLag; private String gtid; + private long lastQueueFullLogAt = 0; + private long queueFullOfferCount = 0; public BinlogConnectorEventListener( BinaryLogClient client, @@ -65,12 +69,29 @@ public void onEvent(Event event) { trackMetrics = true; eventSeenAt = System.currentTimeMillis(); replicationLag = eventSeenAt - event.getHeader().getTimestamp(); + if ( replicationLag > LAG_WARN_THRESHOLD_MS ) { + LOGGER.warn("[event-listener] high replication lag detected: {}ms ({}s)", replicationLag, replicationLag / 1000); + } } while (mustStop.get() != true) { try { if ( queue.offer(ep, 100, TimeUnit.MILLISECONDS ) ) { + if ( queueFullOfferCount > 0 ) { + LOGGER.info("[event-listener] queue unblocked after {} failed offers, current queue size={}", + queueFullOfferCount, queue.size()); + queueFullOfferCount = 0; + } break; + } else { + queueFullOfferCount++; + long now = System.currentTimeMillis(); + if ( now - lastQueueFullLogAt >= QUEUE_FULL_LOG_INTERVAL_MS ) { + LOGGER.warn("[event-listener] replication event queue is full (capacity={}, size={}), producer may be stuck. Blocked for ~{}ms on event type={}", + queue.size() + queue.remainingCapacity(), queue.size(), + queueFullOfferCount * 100L, eventType); + lastQueueFullLogAt = now; + } } } catch (InterruptedException e) { return; diff --git a/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java b/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java index fe24f17ba..fbbe7d24e 100644 --- a/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java +++ b/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.java @@ -46,8 +46,15 @@ public class BinlogConnectorReplicator extends RunLoopProcess implements Replica public static final int BAD_BINLOG_ERROR_CODE = 1236; public static final int ACCESS_DENIED_ERROR_CODE = 1227; + private long totalRowsProcessed = 0; + private long lastProgressLogAt = System.currentTimeMillis(); + private static final long PROGRESS_LOG_INTERVAL_MS = 60_000; + private long noEventStreak = 0; + private static final long NO_EVENT_WARN_THRESHOLD = 300; // 300 * 100ms poll = 30 seconds + private final String clientID; private final String maxwellSchemaDatabaseName; + private final String replicationHost; protected final BinaryLogClient client; private final int replicationReconnectionRetries; @@ -149,6 +156,7 @@ public BinlogConnectorReplicator( int binlogEventQueueSize ) { this.clientID = clientID; + this.replicationHost = mysqlConfig.host + ":" + mysqlConfig.port; this.bootstrapper = bootstrapper; this.maxwellSchemaDatabaseName = maxwellSchemaDatabaseName; this.producer = producer; @@ -241,6 +249,14 @@ public void work() throws Exception { rowCounter.inc(); rowMeter.mark(); + totalRowsProcessed++; + + long now = System.currentTimeMillis(); + if ( now - lastProgressLogAt >= PROGRESS_LOG_INTERVAL_MS ) { + LOGGER.info("[replicator] progress: {} total rows processed, queue size={}, connected={}", + totalRowsProcessed, queue.size(), isConnected); + lastProgressLogAt = now; + } if ( scripting != null && !isMaxwellRow(row)) scripting.invoke(row); @@ -250,12 +266,16 @@ public void work() throws Exception { private boolean replicatorStarted = false; public void startReplicator() throws Exception { + LOGGER.info("[replicator] connecting to binlog: host={}, gtidMode={}, serverID={}", + replicationHost, gtidPositioning, client.getServerId()); this.client.connect(5000); replicatorStarted = true; + LOGGER.info("[replicator] binlog client connected successfully"); } @Override protected void beforeStop() throws Exception { + LOGGER.info("[replicator] stopping: total rows processed={}, queue remaining={}", totalRowsProcessed, queue.size()); this.binlogEventListener.stop(); this.client.disconnect(); } @@ -336,17 +356,20 @@ private boolean shouldSkipRow(RowMap row) throws IOException { protected void processRow(RowMap row) throws Exception { if ( row instanceof HeartbeatRowMap) { + LOGGER.debug("[replicator] pushing heartbeat at position={}", row.getPosition()); producer.push(row); if (stopAtHeartbeat != null) { long thisHeartbeat = row.getPosition().getLastHeartbeatRead(); if (thisHeartbeat >= stopAtHeartbeat) { - LOGGER.info("received final heartbeat " + thisHeartbeat + "; stopping replicator"); - // terminate runLoop + LOGGER.info("[replicator] received final heartbeat {}; stopping replicator", thisHeartbeat); this.taskState.stopped(); } } - } else if ( !shouldSkipRow(row) ) + } else if ( !shouldSkipRow(row) ) { + LOGGER.debug("[replicator] pushing row: type={} db={} table={} position={}", + row.getRowType(), row.getDatabase(), row.getTable(), row.getNextPosition()); producer.push(row); + } } @@ -382,6 +405,7 @@ private RowMap processHeartbeats(RowMap row) { * @param timestamp The timestamp of the SQL binlog event */ private void processQueryEvent(String dbName, String sql, SchemaStore schemaStore, Position position, Position nextPosition, Long timestamp) throws Exception { + LOGGER.info("[replicator] processing DDL on db='{}' at position={}: {}", dbName, position, sql.length() > 200 ? sql.substring(0, 200) + "..." : sql); List changes = schemaStore.processSQL(sql, dbName, position); Long schemaId = getSchemaId(); @@ -514,11 +538,16 @@ private void tryReconnect() throws TimeoutException { while ((reconnectionAttempts += 1) <= this.replicationReconnectionRetries || this.replicationReconnectionRetries == 0) { try { - LOGGER.info(String.format("Reconnection attempt: %s of %s", reconnectionAttempts, replicationReconnectionRetries > 0 ? this.replicationReconnectionRetries : "unlimited")); + LOGGER.info("[replicator] reconnection attempt {} of {}", reconnectionAttempts, + replicationReconnectionRetries > 0 ? this.replicationReconnectionRetries : "unlimited"); client.connect(5000); + LOGGER.info("[replicator] reconnected successfully on attempt {}", reconnectionAttempts); return; - } catch (IOException | TimeoutException ignored) { } + } catch (IOException | TimeoutException e) { + LOGGER.warn("[replicator] reconnection attempt {} failed: {}", reconnectionAttempts, e.getMessage()); + } } + LOGGER.error("[replicator] exhausted all {} reconnection attempts, giving up", replicationReconnectionRetries); throw new TimeoutException("Maximum reconnection attempts reached."); } @@ -537,6 +566,10 @@ private void tryReconnect() throws TimeoutException { private RowMapBuffer getTransactionRows(BinlogConnectorEvent beginEvent) throws Exception { BinlogConnectorEvent event; RowMapBuffer buffer = new RowMapBuffer(MAX_TX_ELEMENTS, this.bufferMemoryUsage); + long txStartMs = System.currentTimeMillis(); + int nullEventCount = 0; + + LOGGER.debug("[replicator] BEGIN transaction at position={}", beginEvent.getPosition().fullPosition()); String currentQuery = null; @@ -544,6 +577,12 @@ private RowMapBuffer getTransactionRows(BinlogConnectorEvent beginEvent) throws event = pollEvent(); if (event == null) { + nullEventCount++; + if ( nullEventCount % 100 == 0 ) { + long stuckMs = System.currentTimeMillis() - txStartMs; + LOGGER.warn("[replicator] transaction has been open for {}ms with {} rows buffered, still waiting for COMMIT. position={}", + stuckMs, buffer.size(), beginEvent.getPosition().fullPosition()); + } ensureReplicatorThread(); continue; } @@ -555,6 +594,12 @@ private RowMapBuffer getTransactionRows(BinlogConnectorEvent beginEvent) throws long timeSpent = buffer.getLast().getTimestampMillis() - beginEvent.getEvent().getHeader().getTimestamp(); transactionExecutionTime.update(timeSpent); transactionRowCount.update(buffer.size()); + LOGGER.debug("[replicator] COMMIT transaction: {} rows, {}ms to complete, position={}", + buffer.size(), System.currentTimeMillis() - txStartMs, event.getPosition().fullPosition()); + if ( buffer.size() > 1000 ) { + LOGGER.info("[replicator] large transaction committed: {} rows at position={}", + buffer.size(), event.getPosition().fullPosition()); + } } if(eventType == EventType.XID) { buffer.setXid(event.xidData().getXid()); @@ -771,7 +816,17 @@ public RowMap getRow() throws Exception { } protected BinlogConnectorEvent pollEvent() throws InterruptedException { - return queue.poll(100, TimeUnit.MILLISECONDS); + BinlogConnectorEvent event = queue.poll(100, TimeUnit.MILLISECONDS); + if ( event == null ) { + noEventStreak++; + if ( noEventStreak == NO_EVENT_WARN_THRESHOLD || noEventStreak % (NO_EVENT_WARN_THRESHOLD * 2) == 0 ) { + LOGGER.warn("[replicator] no binlog events received for ~{}s, queue size={}, connected={}", + noEventStreak / 10, queue.size(), isConnected); + } + } else { + noEventStreak = 0; + } + return event; } public Schema getSchema() throws SchemaStoreException { diff --git a/src/main/java/com/zendesk/maxwell/schema/PositionStoreThread.java b/src/main/java/com/zendesk/maxwell/schema/PositionStoreThread.java index a05ec0e9e..d92aaa06f 100644 --- a/src/main/java/com/zendesk/maxwell/schema/PositionStoreThread.java +++ b/src/main/java/com/zendesk/maxwell/schema/PositionStoreThread.java @@ -30,6 +30,7 @@ public PositionStoreThread(MysqlPositionStore store, MaxwellContext context) { } public void start() { + LOGGER.info("[position-store] starting position flush thread"); this.thread = new Thread(this, "Position Flush Thread"); this.thread.setDaemon(true); thread.start(); @@ -37,12 +38,15 @@ public void start() { @Override public void run() { + LOGGER.info("[position-store] thread running"); try { runLoop(); } catch ( Exception e ) { + LOGGER.error("[position-store] thread error, terminating Maxwell: {}", e.getMessage(), e); this.exception = e; context.terminate(e); } finally { + LOGGER.info("[position-store] thread stopped. last stored position: {}", storedPosition); this.taskState.stopped(); } } @@ -91,21 +95,33 @@ boolean shouldHeartbeat(Position currentPosition) { return false; } + private long lastPositionLogAt = 0; + private static final long POSITION_LOG_INTERVAL_MS = 60_000; + public void work() throws Exception { Position newPosition = position; if ( newPosition != null && newPosition.newerThan(storedPosition) ) { + LOGGER.debug("[position-store] flushing position to DB: {}", newPosition); store.set(newPosition); storedPosition = newPosition; + + long now = System.currentTimeMillis(); + if ( now - lastPositionLogAt >= POSITION_LOG_INTERVAL_MS ) { + LOGGER.info("[position-store] position checkpoint: {}", storedPosition); + lastPositionLogAt = now; + } } try { Thread.sleep(1000); } catch (InterruptedException e) { } if ( shouldHeartbeat(newPosition) ) { + LOGGER.debug("[position-store] sending heartbeat from position={}", newPosition); lastHeartbeatSent = store.heartbeat(); if (newPosition != null) { lastHeartbeatSentFrom = newPosition.getBinlogPosition(); } + LOGGER.debug("[position-store] heartbeat sent: {}", lastHeartbeatSent); } }