NIFI-11279: Allow event stream processing to continue in CaptureChangeMySQL after sync issue (#7039)

This commit is contained in:
Matt Burgess 2023-03-14 16:23:43 -04:00 committed by Mark Payne
parent 7b40dfe440
commit 9e804fec4e
3 changed files with 46 additions and 40 deletions

View File

@ -739,6 +739,11 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
setup(context);
}
// If no client could be created, try again
if (binlogClient == null) {
return;
}
// If the client has been disconnected, try to reconnect
if (!binlogClient.isConnected()) {
Exception e = lifecycleListener.getException();
@ -764,7 +769,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
try {
outputEvents(currentSession, log);
} catch (IOException ioe) {
} catch (Exception eventException) {
getLogger().error("Exception during event processing at file={} pos={}", currentBinlogFile, currentBinlogPosition, eventException);
try {
// Perform some processor-level "rollback", then rollback the session
currentBinlogFile = xactBinlogFile == null ? "" : xactBinlogFile;
@ -773,13 +779,14 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
currentGtidSet = xactGtidSet;
inTransaction = false;
stop();
queue.clear();
currentSession.rollback();
} catch (Exception e) {
// Not much we can recover from here
log.warn("Error occurred during rollback", e);
log.error("Error stopping CDC client", e);
} finally {
queue.clear();
currentSession.rollback();
}
throw new ProcessException(ioe);
context.yield();
}
}
@ -936,7 +943,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !useGtid) {
currentBinlogPosition = header.getPosition();
}
log.debug("Got message event type: {} ", header.getEventType().toString());
log.debug("Message event, type={} pos={} file={}", eventType, currentBinlogPosition, currentBinlogFile);
switch (eventType) {
case TABLE_MAP:
// This is sent to inform which table is about to be changed by subsequent events
@ -988,7 +995,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
if ("BEGIN".equals(sql)) {
// If we're already in a transaction, something bad happened, alert the user
if (inTransaction) {
throw new IOException("BEGIN event received while already processing a transaction. This could indicate that your binlog position is invalid.");
getLogger().debug("BEGIN event received at pos={} file={} while already processing a transaction. This could indicate that your binlog position is invalid "
+ "or the event stream is out of sync or there was an issue with the processor state.", currentBinlogPosition, currentBinlogFile);
}
// Mark the current binlog position and GTID in case we have to rollback the transaction (if the processor is stopped, e.g.)
xactBinlogFile = currentBinlogFile;
@ -1010,8 +1018,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
updateState(session);
} else if ("COMMIT".equals(sql)) {
if (!inTransaction) {
throw new IOException("COMMIT event received while not processing a transaction (i.e. no corresponding BEGIN event). "
+ "This could indicate that your binlog position is invalid.");
getLogger().debug("COMMIT event received at pos={} file={} while not processing a transaction (i.e. no corresponding BEGIN event). "
+ "This could indicate that your binlog position is invalid or the event stream is out of sync or there was an issue with the processor state "
+ "or there was an issue with the processor state.", currentBinlogPosition, currentBinlogFile);
}
// InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here
if (includeBeginCommit) {
@ -1093,8 +1102,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
case XID:
if (!inTransaction) {
throw new IOException("COMMIT event received while not processing a transaction (i.e. no corresponding BEGIN event). "
+ "This could indicate that your binlog position is invalid.");
getLogger().debug("COMMIT (XID) event received at pos={} file={} /while not processing a transaction (i.e. no corresponding BEGIN event). "
+ "This could indicate that your binlog position is invalid or the event stream is out of sync or there was an issue with the processor state.",
currentBinlogPosition, currentBinlogFile);
}
if (includeBeginCommit) {
if (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches()) {
@ -1113,7 +1123,6 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
// Flush the events to the FlowFile when the processor is stopped
currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS);
}
currentSession.commitAsync();
}
}
// update inTransaction value and save next position

View File

@ -60,7 +60,6 @@ import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.function.Executable
import javax.net.ssl.SSLContext
import java.sql.Connection
@ -74,7 +73,6 @@ import java.util.regex.Pattern
import static org.junit.jupiter.api.Assertions.assertEquals
import static org.junit.jupiter.api.Assertions.assertNotNull
import static org.junit.jupiter.api.Assertions.assertTrue
import static org.junit.jupiter.api.Assertions.assertThrows
import static org.mockito.ArgumentMatchers.anyString
import static org.mockito.Mockito.doReturn
import static org.mockito.Mockito.mock
@ -364,7 +362,8 @@ class CaptureChangeMySQLTest {
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 12] as EventHeaderV4,
{} as EventData
))
assertThrows(AssertionError.class, { testRunner.run(1, true, false) } as Executable)
// This should not throw an exception, rather warn that a COMMIT event was sent out-of-sync
testRunner.run(1, true, false)
}
@Test
@ -634,7 +633,8 @@ class CaptureChangeMySQLTest {
{} as EventData
))
assertThrows(AssertionError.class, { testRunner.run(1, true, false) } as Executable)
// Should not throw an exception
testRunner.run(1, true, false)
}
@Test
@ -1445,10 +1445,10 @@ class CaptureChangeMySQLTest {
header2.setTimestamp(new Date().getTime())
EventData eventData = new EventData() {
};
client.sendEvent(new Event(header2, eventData));
client.sendEvent(new Event(header2, eventData))
// when we ge a xid event without having got a 'begin' event ,throw an exception
assertThrows(AssertionError.class, () -> testRunner.run(1, false, false))
// when we ge a xid event without having got a 'begin' event , don't throw an exception, just warn the user
testRunner.run(1, false, false)
}
@Test
@ -1499,7 +1499,6 @@ class CaptureChangeMySQLTest {
}
static DistributedMapCacheClientImpl createCacheClient() throws InitializationException {
final DistributedMapCacheClientImpl client = new DistributedMapCacheClientImpl()

View File

@ -684,36 +684,34 @@ public class StandardProcessSession implements ProcessSession, ProvenanceEventEn
// Update local state
final StateManager stateManager = context.getStateManager();
if (checkpoint.localState != null) {
try {
final StateMap stateMap = stateManager.getState(Scope.LOCAL);
if (stateMap.getVersion() < checkpoint.localState.getVersion()) {
LOG.debug("Updating State Manager's Local State");
try {
stateManager.setState(checkpoint.localState.toMap(), Scope.LOCAL);
} catch (final Exception e) {
LOG.warn("Failed to update Local State for {}. If NiFi is restarted before the state is able to be updated, it could result in data duplication.", connectableDescription, e);
}
} else {
LOG.debug("Will not update State Manager's Local State because the State Manager reports the latest version as {}, which is newer than the session's known version of {}.",
stateMap.getVersion(), checkpoint.localState.getVersion());
}
} catch (final Exception e) {
LOG.warn("Failed to update Local State for {}. If NiFi is restarted before the state is able to be updated, it could result in data duplication.", connectableDescription, e);
}
}
// Update cluster state
if (checkpoint.clusterState != null) {
try {
final StateMap stateMap = stateManager.getState(Scope.CLUSTER);
if (stateMap.getVersion() < checkpoint.clusterState.getVersion()) {
LOG.debug("Updating State Manager's Cluster State");
try {
stateManager.setState(checkpoint.clusterState.toMap(), Scope.CLUSTER);
} catch (final Exception e) {
LOG.warn("Failed to update Cluster State for {}. If NiFi is restarted before the state is able to be updated, it could result in data duplication.", connectableDescription, e);
}
} else {
LOG.debug("Will not update State Manager's Cluster State because the State Manager reports the latest version as {}, which is newer than the session's known version of {}.",
stateMap.getVersion(), checkpoint.clusterState.getVersion());
}
} catch (final Exception e) {
LOG.warn("Failed to update Cluster State for {}. If NiFi is restarted before the state is able to be updated, it could result in data duplication.", connectableDescription, e);
}
}
// Acknowledge records in order to update counts for incoming connections' queues