mirror of https://github.com/apache/nifi.git
NIFI-11305: Fix logic to correctly stop CaptureChangeMySQL (#7059)
This commit is contained in:
parent
e370292d7f
commit
0c7de2c748
|
@ -492,7 +492,6 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|
|||
|
||||
private volatile boolean inTransaction = false;
|
||||
private volatile boolean skipTable = false;
|
||||
private final AtomicBoolean doStop = new AtomicBoolean(false);
|
||||
private final AtomicBoolean hasRun = new AtomicBoolean(false);
|
||||
|
||||
private int currentHost = 0;
|
||||
|
@ -923,7 +922,6 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|
|||
}
|
||||
|
||||
gtidSet = new GtidSet(binlogClient.getGtidSet());
|
||||
doStop.set(false);
|
||||
}
|
||||
|
||||
|
||||
|
@ -931,7 +929,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|
|||
RawBinlogEvent rawBinlogEvent;
|
||||
|
||||
// Drain the queue
|
||||
while ((rawBinlogEvent = queue.poll()) != null && !doStop.get()) {
|
||||
while (isScheduled() && (rawBinlogEvent = queue.poll()) != null) {
|
||||
Event event = rawBinlogEvent.getEvent();
|
||||
EventHeaderV4 header = event.getHeader();
|
||||
long timestamp = header.getTimestamp();
|
||||
|
@ -1266,7 +1264,6 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|
|||
currentSession.commitAsync();
|
||||
}
|
||||
|
||||
doStop.set(true);
|
||||
currentBinlogPosition = -1;
|
||||
} catch (IOException e) {
|
||||
throw new CDCException("Error closing CDC connection", e);
|
||||
|
|
Loading…
Reference in New Issue