NIFI-3746: Fixed DDL event transfer when outside a transaction in CaptureChangeMySQL

This closes #1702.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Matt Burgess 2017-04-26 15:19:12 -04:00 committed by Koji Kawamura
parent 24bb8cf95d
commit da0454d80f
2 changed files with 31 additions and 0 deletions

View File

@ -808,6 +808,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
if (cacheClient != null) {
cacheClient.removeByPattern(this.getIdentifier() + ".*");
}
// If not in a transaction, commit the session so the DDL event(s) will be transferred
if (includeDDLEvents && !inTransaction) {
session.commit();
}
}
}
break;

View File

@ -760,6 +760,33 @@ class CaptureChangeMySQLTest {
}
@Test
void testDDLOutsideTransaction() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'true')
testRunner.run(1, false, true)
// ROTATE
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
[binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
))
// DROP TABLE
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
[database: 'myDB', sql: 'DROP TABLE myTable'] as QueryEventData
))
testRunner.run(1, false, false)
testRunner.assertTransferCount(CaptureChangeMySQL.REL_SUCCESS, 1)
}
/********************************
* Mock and helper classes below
********************************/