From 26ca40c0d916b4ac1426ee15535601b2c1c493e9 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Mon, 24 Apr 2017 09:48:39 -0400 Subject: [PATCH] NIFI-3728: Detect 'truncate table' event, allow exclusion of schema change events in CaptureChangeMySQL NIFI-3728: Changed 'Include Schema Change Events' to 'Include DDL Events', updated tests This closes #1690. Signed-off-by: Koji Kawamura --- .../mysql/processors/CaptureChangeMySQL.java | 24 ++++- .../processors/CaptureChangeMySQLTest.groovy | 89 +++++++++++++++++-- 2 files changed, 103 insertions(+), 10 deletions(-) diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java index 2e0cfea293..688dff1dc1 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java @@ -274,6 +274,17 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .build(); + public static final PropertyDescriptor INCLUDE_DDL_EVENTS = new PropertyDescriptor.Builder() + .name("capture-change-mysql-include-ddl-events") + .displayName("Include DDL Events") + .description("Specifies whether to emit events corresponding to Data Definition Language (DDL) events such as ALTER TABLE, TRUNCATE TABLE, e.g. in the binary log. Set to true " + + "if the DDL events are desired/necessary in the downstream flow, otherwise set to false, which suppresses generation of these events and can increase flow performance.") + .required(true) + .allowableValues("true", "false") + .defaultValue("false") + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final PropertyDescriptor STATE_UPDATE_INTERVAL = new PropertyDescriptor.Builder() .name("capture-change-mysql-state-update-interval") .displayName("State Update Interval") @@ -343,6 +354,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { private volatile Pattern databaseNamePattern; private volatile Pattern tableNamePattern; private volatile boolean includeBeginCommit = false; + private volatile boolean includeDDLEvents = false; private volatile boolean inTransaction = false; private volatile boolean skipTable = false; @@ -389,6 +401,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { pds.add(DIST_CACHE_CLIENT); pds.add(RETRIEVE_ALL_RECORDS); pds.add(INCLUDE_BEGIN_COMMIT); + pds.add(INCLUDE_DDL_EVENTS); pds.add(STATE_UPDATE_INTERVAL); pds.add(INIT_SEQUENCE_ID); pds.add(INIT_BINLOG_FILENAME); @@ -434,6 +447,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { boolean getAllRecords = context.getProperty(RETRIEVE_ALL_RECORDS).asBoolean(); includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean(); + includeDDLEvents = context.getProperty(INCLUDE_DDL_EVENTS).asBoolean(); // Set current binlog filename to whatever is in State, falling back to the Retrieve All Records then Initial Binlog Filename if no State variable is present currentBinlogFile = stateMap.get(BinlogEventInfo.BINLOG_FILENAME_KEY); @@ -777,15 +791,19 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { currentTable = null; } else { - // Check for schema change events (alter table, e.g.). Normalize the query to do string matching on the type of change + // Check for DDL events (alter table, e.g.). Normalize the query to do string matching on the type of change String normalizedQuery = sql.toLowerCase().trim().replaceAll(" {2,}", " "); if (normalizedQuery.startsWith("alter table") || normalizedQuery.startsWith("alter ignore table") || normalizedQuery.startsWith("create table") + || normalizedQuery.startsWith("truncate table") || normalizedQuery.startsWith("drop table") || normalizedQuery.startsWith("drop database")) { - SchemaChangeEventInfo schemaChangeEvent = new SchemaChangeEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, normalizedQuery); - currentSequenceId.set(schemaChangeEventWriter.writeEvent(currentSession, transitUri, schemaChangeEvent, currentSequenceId.get(), REL_SUCCESS)); + + if (includeDDLEvents) { + SchemaChangeEventInfo schemaChangeEvent = new SchemaChangeEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, normalizedQuery); + currentSequenceId.set(schemaChangeEventWriter.writeEvent(currentSession, transitUri, schemaChangeEvent, currentSequenceId.get(), REL_SUCCESS)); + } // Remove all the keys from the cache that this processor added if (cacheClient != null) { cacheClient.removeByPattern(this.getIdentifier() + ".*"); diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy index 22959e6173..f82028e21f 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy @@ -205,7 +205,7 @@ class CaptureChangeMySQLTest { def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS) - resultFiles.eachWithIndex { e, i -> + resultFiles.eachWithIndex {e, i -> // Sequence ID should start from 1 (as was put into the state map), showing that the // Initial Sequence ID value was ignored assertEquals(i + 1, Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY))) @@ -245,7 +245,7 @@ class CaptureChangeMySQLTest { def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS) - resultFiles.eachWithIndex { e, i -> + resultFiles.eachWithIndex {e, i -> assertEquals(i + 10, Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY))) } } @@ -280,6 +280,7 @@ class CaptureChangeMySQLTest { testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_FILENAME, 'master.000001') testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_POSITION, '4') testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true') + testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'true') final DistributedMapCacheClientImpl cacheClient = createCacheClient() def clientProperties = [:] clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 'localhost') @@ -404,16 +405,90 @@ class CaptureChangeMySQLTest { List expectedEventTypes = ([] + 'begin' + Collections.nCopies(3, 'insert') + 'commit' + 'begin' + 'update' + 'commit' + 'begin' + 'schema_change' + Collections.nCopies(2, 'delete') + 'commit') - resultFiles.eachWithIndex { e, i -> + resultFiles.eachWithIndex {e, i -> assertEquals(i, Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY))) assertEquals(EventWriter.APPLICATION_JSON, e.getAttribute(CoreAttributes.MIME_TYPE.key())) assertEquals((i < 8) ? 'master.000001' : 'master.000002', e.getAttribute(BinlogEventInfo.BINLOG_FILENAME_KEY)) assertTrue(Long.valueOf(e.getAttribute(BinlogEventInfo.BINLOG_POSITION_KEY)) % 4 == 0L) - assertEquals(e.getAttribute('cdc.event.type'), expectedEventTypes[i]) + assertEquals(expectedEventTypes[i], e.getAttribute('cdc.event.type')) } assertEquals(13, resultFiles.size()) assertEquals(13, testRunner.provenanceEvents.size()) - testRunner.provenanceEvents.each { assertEquals(ProvenanceEventType.RECEIVE, it.eventType)} + testRunner.provenanceEvents.each {assertEquals(ProvenanceEventType.RECEIVE, it.eventType)} + } + + @Test + void testExcludeSchemaChanges() throws Exception { + testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar') + testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306') + testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root') + testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password') + testRunner.setProperty(CaptureChangeMySQL.SERVER_ID, '1') + testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds') + testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_FILENAME, 'master.000001') + testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_POSITION, '4') + testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true') + testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'false') + final DistributedMapCacheClientImpl cacheClient = createCacheClient() + def clientProperties = [:] + clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 'localhost') + testRunner.addControllerService('client', cacheClient, clientProperties) + testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client') + testRunner.enableControllerService(cacheClient) + + + testRunner.run(1, false, true) + + // ROTATE scenario + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4, + [binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData + )) + + // INSERT scenario + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4, + [database: 'myDB', sql: 'BEGIN'] as QueryEventData + )) + + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4, + [tableId: 1, database: 'myDB', table: 'myTable', columnTypes: [4, -4] as byte[]] as TableMapEventData + )) + + def cols = new BitSet() + cols.set(1) + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4, + [tableId: 1, includedColumns: cols, + rows : [[2, 'Smith'] as Serializable[], [3, 'Jones'] as Serializable[], [10, 'Cruz'] as Serializable[]] as List] as WriteRowsEventData + )) + + // ALTER TABLE + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 32] as EventHeaderV4, + [database: 'myDB', sql: 'ALTER TABLE myTable add column col1 int'] as QueryEventData + )) + + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 40] as EventHeaderV4, + {} as EventData + )) + + testRunner.run(1, true, false) + + def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS) + // No 'schema_change' events expected + List expectedEventTypes = ([] + 'begin' + Collections.nCopies(3, 'insert') + 'commit') + + resultFiles.eachWithIndex {e, i -> + assertEquals(i, Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY))) + assertEquals(EventWriter.APPLICATION_JSON, e.getAttribute(CoreAttributes.MIME_TYPE.key())) + assertEquals((i < 8) ? 'master.000001' : 'master.000002', e.getAttribute(BinlogEventInfo.BINLOG_FILENAME_KEY)) + assertTrue(Long.valueOf(e.getAttribute(BinlogEventInfo.BINLOG_POSITION_KEY)) % 4 == 0L) + assertEquals(expectedEventTypes[i], e.getAttribute('cdc.event.type')) + } + assertEquals(5, resultFiles.size()) } @Test(expected = AssertionError.class) @@ -833,7 +908,7 @@ class CaptureChangeMySQLTest { } } final long numRemoved = removedRecords.size() - removedRecords.each { cacheMap.remove(it) } + removedRecords.each {cacheMap.remove(it)} return numRemoved } @@ -848,4 +923,4 @@ class CaptureChangeMySQLTest { valueSerializer.serialize(value, new WriterOutputStream(valueWriter)) } } -} \ No newline at end of file +}