NIFI-3743: Include RENAME TABLE events in CaptureChangeMySQL

This closes #1701.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Matt Burgess 2017-04-26 14:53:21 -04:00 committed by Koji Kawamura
parent da0454d80f
commit d66eac2ea1
2 changed files with 41 additions and 0 deletions

View File

@ -797,6 +797,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|| normalizedQuery.startsWith("alter ignore table")
|| normalizedQuery.startsWith("create table")
|| normalizedQuery.startsWith("truncate table")
|| normalizedQuery.startsWith("rename table")
|| normalizedQuery.startsWith("drop table")
|| normalizedQuery.startsWith("drop database")) {

View File

@ -787,6 +787,46 @@ class CaptureChangeMySQLTest {
testRunner.assertTransferCount(CaptureChangeMySQL.REL_SUCCESS, 1)
}
@Test
void testRenameTable() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'true')
testRunner.run(1, false, true)
// ROTATE
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
[binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
))
// BEGIN
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
[database: 'myDB', sql: 'BEGIN'] as QueryEventData
))
// RENAME TABLE
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
[database: 'myDB', sql: 'RENAME TABLE myTable TO myTable2'] as QueryEventData
))
// COMMIT
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 12] as EventHeaderV4,
{} as EventData
))
testRunner.run(1, true, false)
def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
assertEquals(1, resultFiles.size())
}
/********************************
* Mock and helper classes below
********************************/