NIFI-3728: Detect 'truncate table' event, allow exclusion of schema change events in CaptureChangeMySQL

NIFI-3728: Changed 'Include Schema Change Events' to 'Include DDL Events', updated tests

This closes #1690.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Matt Burgess 2017-04-24 09:48:39 -04:00 committed by Koji Kawamura
parent 9ed001869b
commit 26ca40c0d9
2 changed files with 103 additions and 10 deletions

View File

@ -274,6 +274,17 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR) .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build(); .build();
public static final PropertyDescriptor INCLUDE_DDL_EVENTS = new PropertyDescriptor.Builder()
.name("capture-change-mysql-include-ddl-events")
.displayName("Include DDL Events")
.description("Specifies whether to emit events corresponding to Data Definition Language (DDL) events such as ALTER TABLE, TRUNCATE TABLE, e.g. in the binary log. Set to true "
+ "if the DDL events are desired/necessary in the downstream flow, otherwise set to false, which suppresses generation of these events and can increase flow performance.")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final PropertyDescriptor STATE_UPDATE_INTERVAL = new PropertyDescriptor.Builder() public static final PropertyDescriptor STATE_UPDATE_INTERVAL = new PropertyDescriptor.Builder()
.name("capture-change-mysql-state-update-interval") .name("capture-change-mysql-state-update-interval")
.displayName("State Update Interval") .displayName("State Update Interval")
@ -343,6 +354,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
private volatile Pattern databaseNamePattern; private volatile Pattern databaseNamePattern;
private volatile Pattern tableNamePattern; private volatile Pattern tableNamePattern;
private volatile boolean includeBeginCommit = false; private volatile boolean includeBeginCommit = false;
private volatile boolean includeDDLEvents = false;
private volatile boolean inTransaction = false; private volatile boolean inTransaction = false;
private volatile boolean skipTable = false; private volatile boolean skipTable = false;
@ -389,6 +401,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
pds.add(DIST_CACHE_CLIENT); pds.add(DIST_CACHE_CLIENT);
pds.add(RETRIEVE_ALL_RECORDS); pds.add(RETRIEVE_ALL_RECORDS);
pds.add(INCLUDE_BEGIN_COMMIT); pds.add(INCLUDE_BEGIN_COMMIT);
pds.add(INCLUDE_DDL_EVENTS);
pds.add(STATE_UPDATE_INTERVAL); pds.add(STATE_UPDATE_INTERVAL);
pds.add(INIT_SEQUENCE_ID); pds.add(INIT_SEQUENCE_ID);
pds.add(INIT_BINLOG_FILENAME); pds.add(INIT_BINLOG_FILENAME);
@ -434,6 +447,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
boolean getAllRecords = context.getProperty(RETRIEVE_ALL_RECORDS).asBoolean(); boolean getAllRecords = context.getProperty(RETRIEVE_ALL_RECORDS).asBoolean();
includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean(); includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
includeDDLEvents = context.getProperty(INCLUDE_DDL_EVENTS).asBoolean();
// Set current binlog filename to whatever is in State, falling back to the Retrieve All Records then Initial Binlog Filename if no State variable is present // Set current binlog filename to whatever is in State, falling back to the Retrieve All Records then Initial Binlog Filename if no State variable is present
currentBinlogFile = stateMap.get(BinlogEventInfo.BINLOG_FILENAME_KEY); currentBinlogFile = stateMap.get(BinlogEventInfo.BINLOG_FILENAME_KEY);
@ -777,15 +791,19 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
currentTable = null; currentTable = null;
} else { } else {
// Check for schema change events (alter table, e.g.). Normalize the query to do string matching on the type of change // Check for DDL events (alter table, e.g.). Normalize the query to do string matching on the type of change
String normalizedQuery = sql.toLowerCase().trim().replaceAll(" {2,}", " "); String normalizedQuery = sql.toLowerCase().trim().replaceAll(" {2,}", " ");
if (normalizedQuery.startsWith("alter table") if (normalizedQuery.startsWith("alter table")
|| normalizedQuery.startsWith("alter ignore table") || normalizedQuery.startsWith("alter ignore table")
|| normalizedQuery.startsWith("create table") || normalizedQuery.startsWith("create table")
|| normalizedQuery.startsWith("truncate table")
|| normalizedQuery.startsWith("drop table") || normalizedQuery.startsWith("drop table")
|| normalizedQuery.startsWith("drop database")) { || normalizedQuery.startsWith("drop database")) {
SchemaChangeEventInfo schemaChangeEvent = new SchemaChangeEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, normalizedQuery);
currentSequenceId.set(schemaChangeEventWriter.writeEvent(currentSession, transitUri, schemaChangeEvent, currentSequenceId.get(), REL_SUCCESS)); if (includeDDLEvents) {
SchemaChangeEventInfo schemaChangeEvent = new SchemaChangeEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, normalizedQuery);
currentSequenceId.set(schemaChangeEventWriter.writeEvent(currentSession, transitUri, schemaChangeEvent, currentSequenceId.get(), REL_SUCCESS));
}
// Remove all the keys from the cache that this processor added // Remove all the keys from the cache that this processor added
if (cacheClient != null) { if (cacheClient != null) {
cacheClient.removeByPattern(this.getIdentifier() + ".*"); cacheClient.removeByPattern(this.getIdentifier() + ".*");

View File

@ -205,7 +205,7 @@ class CaptureChangeMySQLTest {
def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS) def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
resultFiles.eachWithIndex { e, i -> resultFiles.eachWithIndex {e, i ->
// Sequence ID should start from 1 (as was put into the state map), showing that the // Sequence ID should start from 1 (as was put into the state map), showing that the
// Initial Sequence ID value was ignored // Initial Sequence ID value was ignored
assertEquals(i + 1, Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY))) assertEquals(i + 1, Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY)))
@ -245,7 +245,7 @@ class CaptureChangeMySQLTest {
def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS) def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
resultFiles.eachWithIndex { e, i -> resultFiles.eachWithIndex {e, i ->
assertEquals(i + 10, Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY))) assertEquals(i + 10, Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY)))
} }
} }
@ -280,6 +280,7 @@ class CaptureChangeMySQLTest {
testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_FILENAME, 'master.000001') testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_FILENAME, 'master.000001')
testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_POSITION, '4') testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_POSITION, '4')
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true') testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'true')
final DistributedMapCacheClientImpl cacheClient = createCacheClient() final DistributedMapCacheClientImpl cacheClient = createCacheClient()
def clientProperties = [:] def clientProperties = [:]
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 'localhost') clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 'localhost')
@ -404,16 +405,90 @@ class CaptureChangeMySQLTest {
List<String> expectedEventTypes = ([] + 'begin' + Collections.nCopies(3, 'insert') + 'commit' + 'begin' + 'update' + 'commit' List<String> expectedEventTypes = ([] + 'begin' + Collections.nCopies(3, 'insert') + 'commit' + 'begin' + 'update' + 'commit'
+ 'begin' + 'schema_change' + Collections.nCopies(2, 'delete') + 'commit') + 'begin' + 'schema_change' + Collections.nCopies(2, 'delete') + 'commit')
resultFiles.eachWithIndex { e, i -> resultFiles.eachWithIndex {e, i ->
assertEquals(i, Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY))) assertEquals(i, Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY)))
assertEquals(EventWriter.APPLICATION_JSON, e.getAttribute(CoreAttributes.MIME_TYPE.key())) assertEquals(EventWriter.APPLICATION_JSON, e.getAttribute(CoreAttributes.MIME_TYPE.key()))
assertEquals((i < 8) ? 'master.000001' : 'master.000002', e.getAttribute(BinlogEventInfo.BINLOG_FILENAME_KEY)) assertEquals((i < 8) ? 'master.000001' : 'master.000002', e.getAttribute(BinlogEventInfo.BINLOG_FILENAME_KEY))
assertTrue(Long.valueOf(e.getAttribute(BinlogEventInfo.BINLOG_POSITION_KEY)) % 4 == 0L) assertTrue(Long.valueOf(e.getAttribute(BinlogEventInfo.BINLOG_POSITION_KEY)) % 4 == 0L)
assertEquals(e.getAttribute('cdc.event.type'), expectedEventTypes[i]) assertEquals(expectedEventTypes[i], e.getAttribute('cdc.event.type'))
} }
assertEquals(13, resultFiles.size()) assertEquals(13, resultFiles.size())
assertEquals(13, testRunner.provenanceEvents.size()) assertEquals(13, testRunner.provenanceEvents.size())
testRunner.provenanceEvents.each { assertEquals(ProvenanceEventType.RECEIVE, it.eventType)} testRunner.provenanceEvents.each {assertEquals(ProvenanceEventType.RECEIVE, it.eventType)}
}
@Test
void testExcludeSchemaChanges() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.SERVER_ID, '1')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_FILENAME, 'master.000001')
testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_POSITION, '4')
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'false')
final DistributedMapCacheClientImpl cacheClient = createCacheClient()
def clientProperties = [:]
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 'localhost')
testRunner.addControllerService('client', cacheClient, clientProperties)
testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
testRunner.enableControllerService(cacheClient)
testRunner.run(1, false, true)
// ROTATE scenario
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
[binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
))
// INSERT scenario
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
[database: 'myDB', sql: 'BEGIN'] as QueryEventData
))
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4,
[tableId: 1, database: 'myDB', table: 'myTable', columnTypes: [4, -4] as byte[]] as TableMapEventData
))
def cols = new BitSet()
cols.set(1)
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
[tableId: 1, includedColumns: cols,
rows : [[2, 'Smith'] as Serializable[], [3, 'Jones'] as Serializable[], [10, 'Cruz'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
))
// ALTER TABLE
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 32] as EventHeaderV4,
[database: 'myDB', sql: 'ALTER TABLE myTable add column col1 int'] as QueryEventData
))
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 40] as EventHeaderV4,
{} as EventData
))
testRunner.run(1, true, false)
def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
// No 'schema_change' events expected
List<String> expectedEventTypes = ([] + 'begin' + Collections.nCopies(3, 'insert') + 'commit')
resultFiles.eachWithIndex {e, i ->
assertEquals(i, Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY)))
assertEquals(EventWriter.APPLICATION_JSON, e.getAttribute(CoreAttributes.MIME_TYPE.key()))
assertEquals((i < 8) ? 'master.000001' : 'master.000002', e.getAttribute(BinlogEventInfo.BINLOG_FILENAME_KEY))
assertTrue(Long.valueOf(e.getAttribute(BinlogEventInfo.BINLOG_POSITION_KEY)) % 4 == 0L)
assertEquals(expectedEventTypes[i], e.getAttribute('cdc.event.type'))
}
assertEquals(5, resultFiles.size())
} }
@Test(expected = AssertionError.class) @Test(expected = AssertionError.class)
@ -833,7 +908,7 @@ class CaptureChangeMySQLTest {
} }
} }
final long numRemoved = removedRecords.size() final long numRemoved = removedRecords.size()
removedRecords.each { cacheMap.remove(it) } removedRecords.each {cacheMap.remove(it)}
return numRemoved return numRemoved
} }
@ -848,4 +923,4 @@ class CaptureChangeMySQLTest {
valueSerializer.serialize(value, new WriterOutputStream(valueWriter)) valueSerializer.serialize(value, new WriterOutputStream(valueWriter))
} }
} }
} }