From da0454d80f5c0e5b0268dbd279a80d21d62e7c85 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Wed, 26 Apr 2017 15:19:12 -0400 Subject: [PATCH] NIFI-3746: Fixed DDL event transfer when outside a transaction in CaptureChangeMySQL This closes #1702. Signed-off-by: Koji Kawamura --- .../mysql/processors/CaptureChangeMySQL.java | 4 +++ .../processors/CaptureChangeMySQLTest.groovy | 27 +++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java index a8c3336bfb..e5bc8e06a3 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java @@ -808,6 +808,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { if (cacheClient != null) { cacheClient.removeByPattern(this.getIdentifier() + ".*"); } + // If not in a transaction, commit the session so the DDL event(s) will be transferred + if (includeDDLEvents && !inTransaction) { + session.commit(); + } } } break; diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy index eb1f32b920..1e80383289 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy @@ -760,6 +760,33 @@ class CaptureChangeMySQLTest { } + @Test + void testDDLOutsideTransaction() throws Exception { + testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar') + testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306') + testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root') + testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password') + testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds') + testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'true') + + testRunner.run(1, false, true) + + // ROTATE + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4, + [binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData + )) + + // DROP TABLE + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4, + [database: 'myDB', sql: 'DROP TABLE myTable'] as QueryEventData + )) + + testRunner.run(1, false, false) + testRunner.assertTransferCount(CaptureChangeMySQL.REL_SUCCESS, 1) + } + /******************************** * Mock and helper classes below ********************************/