NIFI-4217 - Retain original DDL in CaptureChangeMySQL

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #2036.
This commit is contained in:
Matt Burgess 2017-07-24 11:14:03 -04:00 committed by Pierre Villard
parent 743c6b9c17
commit 9a5d4ff6b3
2 changed files with 5 additions and 1 deletions

View File

@ -813,7 +813,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
if (includeDDLEvents && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
// If we don't have table information, we can still use the database name
TableInfo ddlTableInfo = (currentTable != null) ? currentTable : new TableInfo(currentDatabase, null, null, null);
DDLEventInfo ddlEvent = new DDLEventInfo(ddlTableInfo, timestamp, currentBinlogFile, currentBinlogPosition, normalizedQuery);
DDLEventInfo ddlEvent = new DDLEventInfo(ddlTableInfo, timestamp, currentBinlogFile, currentBinlogPosition, sql);
currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS));
}
// Remove all the keys from the cache that this processor added

View File

@ -446,6 +446,10 @@ class CaptureChangeMySQLTest {
assertEquals((i < 8) ? 'master.000001' : 'master.000002', e.getAttribute(BinlogEventInfo.BINLOG_FILENAME_KEY))
assertTrue(Long.valueOf(e.getAttribute(BinlogEventInfo.BINLOG_POSITION_KEY)) % 4 == 0L)
assertEquals(expectedEventTypes[i], e.getAttribute('cdc.event.type'))
// Check that DDL didn't change
if (e.getAttribute(BinlogEventInfo.BINLOG_POSITION_KEY) == "32") {
assertEquals('ALTER TABLE myTable add column col1 int', new JsonSlurper().parse(testRunner.getContentAsByteArray(e)).query?.toString())
}
}
assertEquals(13, resultFiles.size())
assertEquals(13, testRunner.provenanceEvents.size())