From 3bf1195f4b71bda845ff02cc58e3ddc15ed48d92 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Wed, 22 Mar 2023 09:47:27 -0400 Subject: [PATCH] NIFI-11305: Fixed out-of-order handling of binlog client shutdown in CaptureChangeMySQL (#7068) NIFI-11305: Fixed out-of-order handling of binlog client shutdown in CaptureChangeMySQL - Changed to allow 'unexpected' events to still be processed --- .../cdc/mysql/event/BinlogEventListener.java | 8 +++-- .../mysql/processors/CaptureChangeMySQL.java | 36 ++++++++++++++----- 2 files changed, 34 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java index c0f5800f67..48f35d3e65 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BinlogEventListener.java @@ -18,6 +18,8 @@ package org.apache.nifi.cdc.mysql.event; import com.github.shyiko.mysql.binlog.BinaryLogClient; import com.github.shyiko.mysql.binlog.event.Event; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; @@ -28,6 +30,8 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public class BinlogEventListener implements BinaryLogClient.EventListener { + private static final Logger logger = LoggerFactory.getLogger(BinlogEventListener.class); + private final AtomicBoolean stopNow = new AtomicBoolean(false); private static final int QUEUE_OFFER_TIMEOUT_MSEC = 100; @@ -57,9 +61,9 @@ public class BinlogEventListener implements BinaryLogClient.EventListener { } } - throw new RuntimeException("Stopped while waiting to enqueue event"); + logger.info("Stopped while waiting to enqueue event"); } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while adding event to the queue"); + logger.warn("Interrupted while adding event to the queue", e); } } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java index 4298af4379..76ada9b306 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java @@ -719,6 +719,12 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { connect(hosts, username, password, serverId, createEnrichmentConnection, driverLocation, driverName, connectTimeout, sslContextService, sslMode); } catch (IOException | IllegalStateException e) { + if (eventListener != null) { + eventListener.stop(); + if (binlogClient != null) { + binlogClient.unregisterEventListener(eventListener); + } + } context.yield(); binlogClient = null; throw new ProcessException(e.getMessage(), e); @@ -901,6 +907,12 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { } } if (!binlogClient.isConnected()) { + if (eventListener != null) { + eventListener.stop(); + if (binlogClient != null) { + binlogClient.unregisterEventListener(eventListener); + } + } binlogClient.disconnect(); binlogClient = null; throw new IOException("Could not connect binlog client to any of the specified hosts due to: " + lastConnectException.getMessage(), lastConnectException); @@ -915,9 +927,18 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { // Ensure connection can be created. getJdbcConnection(); } catch (SQLException e) { - binlogClient.disconnect(); - binlogClient = null; - throw new IOException("Error creating binlog enrichment JDBC connection to any of the specified hosts", e); + getLogger().error("Error creating binlog enrichment JDBC connection to any of the specified hosts", e); + if (eventListener != null) { + eventListener.stop(); + if (binlogClient != null) { + binlogClient.unregisterEventListener(eventListener); + } + } + if (binlogClient != null) { + binlogClient.disconnect(); + binlogClient = null; + } + return; } } @@ -1151,8 +1172,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { } if (!inTransaction) { // These events should only happen inside a transaction, warn the user otherwise - log.warn("Table modification event occurred outside of a transaction."); - break; + log.info("Event {} occurred outside of a transaction, which is unexpected.", eventType.name()); } if (currentTable == null && cacheClient != null) { // No Table Map event was processed prior to this event, which should not happen, so throw an error @@ -1245,15 +1265,15 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { protected void stop() throws CDCException { try { - if (binlogClient != null) { - binlogClient.disconnect(); - } if (eventListener != null) { eventListener.stop(); if (binlogClient != null) { binlogClient.unregisterEventListener(eventListener); } } + if (binlogClient != null) { + binlogClient.disconnect(); + } if (currentSession != null) { FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();