NIFI-11305: Fixed out-of-order handling of binlog client shutdown in CaptureChangeMySQL (#7068)

NIFI-11305: Fixed out-of-order handling of binlog client shutdown in CaptureChangeMySQL
- Changed to allow 'unexpected' events to still be processed
This commit is contained in:
Matt Burgess 2023-03-22 09:47:27 -04:00 committed by GitHub
parent 00707f684f
commit 3bf1195f4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 34 additions and 10 deletions

View File

@ -18,6 +18,8 @@ package org.apache.nifi.cdc.mysql.event;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
@ -28,6 +30,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class BinlogEventListener implements BinaryLogClient.EventListener {
private static final Logger logger = LoggerFactory.getLogger(BinlogEventListener.class);
private final AtomicBoolean stopNow = new AtomicBoolean(false);
private static final int QUEUE_OFFER_TIMEOUT_MSEC = 100;
@ -57,9 +61,9 @@ public class BinlogEventListener implements BinaryLogClient.EventListener {
}
}
throw new RuntimeException("Stopped while waiting to enqueue event");
logger.info("Stopped while waiting to enqueue event");
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while adding event to the queue");
logger.warn("Interrupted while adding event to the queue", e);
}
}
}

View File

@ -719,6 +719,12 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
connect(hosts, username, password, serverId, createEnrichmentConnection, driverLocation, driverName, connectTimeout, sslContextService, sslMode);
} catch (IOException | IllegalStateException e) {
if (eventListener != null) {
eventListener.stop();
if (binlogClient != null) {
binlogClient.unregisterEventListener(eventListener);
}
}
context.yield();
binlogClient = null;
throw new ProcessException(e.getMessage(), e);
@ -901,6 +907,12 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
}
if (!binlogClient.isConnected()) {
if (eventListener != null) {
eventListener.stop();
if (binlogClient != null) {
binlogClient.unregisterEventListener(eventListener);
}
}
binlogClient.disconnect();
binlogClient = null;
throw new IOException("Could not connect binlog client to any of the specified hosts due to: " + lastConnectException.getMessage(), lastConnectException);
@ -915,9 +927,18 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
// Ensure connection can be created.
getJdbcConnection();
} catch (SQLException e) {
binlogClient.disconnect();
binlogClient = null;
throw new IOException("Error creating binlog enrichment JDBC connection to any of the specified hosts", e);
getLogger().error("Error creating binlog enrichment JDBC connection to any of the specified hosts", e);
if (eventListener != null) {
eventListener.stop();
if (binlogClient != null) {
binlogClient.unregisterEventListener(eventListener);
}
}
if (binlogClient != null) {
binlogClient.disconnect();
binlogClient = null;
}
return;
}
}
@ -1151,8 +1172,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
if (!inTransaction) {
// These events should only happen inside a transaction, warn the user otherwise
log.warn("Table modification event occurred outside of a transaction.");
break;
log.info("Event {} occurred outside of a transaction, which is unexpected.", eventType.name());
}
if (currentTable == null && cacheClient != null) {
// No Table Map event was processed prior to this event, which should not happen, so throw an error
@ -1245,15 +1265,15 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
protected void stop() throws CDCException {
try {
if (binlogClient != null) {
binlogClient.disconnect();
}
if (eventListener != null) {
eventListener.stop();
if (binlogClient != null) {
binlogClient.unregisterEventListener(eventListener);
}
}
if (binlogClient != null) {
binlogClient.disconnect();
}
if (currentSession != null) {
FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();