From 0c7de2c7482f18507877750ea00f9e9b9134fac7 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Mon, 20 Mar 2023 16:04:19 -0400 Subject: [PATCH] NIFI-11305: Fix logic to correctly stop CaptureChangeMySQL (#7059) --- .../apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java index 37a6820b8a..4298af4379 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java @@ -492,7 +492,6 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { private volatile boolean inTransaction = false; private volatile boolean skipTable = false; - private final AtomicBoolean doStop = new AtomicBoolean(false); private final AtomicBoolean hasRun = new AtomicBoolean(false); private int currentHost = 0; @@ -923,7 +922,6 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { } gtidSet = new GtidSet(binlogClient.getGtidSet()); - doStop.set(false); } @@ -931,7 +929,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { RawBinlogEvent rawBinlogEvent; // Drain the queue - while ((rawBinlogEvent = queue.poll()) != null && !doStop.get()) { + while (isScheduled() && (rawBinlogEvent = queue.poll()) != null) { Event event = rawBinlogEvent.getEvent(); EventHeaderV4 header = event.getHeader(); long timestamp = header.getTimestamp(); @@ -1266,7 +1264,6 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { currentSession.commitAsync(); } - doStop.set(true); currentBinlogPosition = -1; } catch (IOException e) { throw new CDCException("Error closing CDC connection", e);