From 24bb8cf95d7228555215aadb1e46c62e1f0bff91 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Wed, 26 Apr 2017 15:36:00 -0400 Subject: [PATCH] NIFI-3735: Replaced 'Schema Change Event' references to 'DDL Event' This closes #1703. Signed-off-by: Koji Kawamura --- .../java/org/apache/nifi/cdc/event/EventInfo.java | 2 +- .../cdc/mysql/event/BaseBinlogTableEventInfo.java | 2 +- ...{SchemaChangeEventInfo.java => DDLEventInfo.java} | 8 ++++---- ...emaChangeEventWriter.java => DDLEventWriter.java} | 8 ++++---- .../cdc/mysql/processors/CaptureChangeMySQL.java | 12 ++++++------ .../mysql/processors/CaptureChangeMySQLTest.groovy | 4 ++-- 6 files changed, 18 insertions(+), 18 deletions(-) rename nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/{SchemaChangeEventInfo.java => DDLEventInfo.java} (70%) rename nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/{SchemaChangeEventWriter.java => DDLEventWriter.java} (82%) diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java index 9ad8d0e76b..64500b9749 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/EventInfo.java @@ -28,7 +28,7 @@ public interface EventInfo { String INSERT_EVENT = "insert"; String DELETE_EVENT = "delete"; String UPDATE_EVENT = "update"; - String SCHEMA_CHANGE = "schema_change"; + String DDL_EVENT = "ddl"; String getEventType(); diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java index 763d6950d6..c5d3ddf918 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/BaseBinlogTableEventInfo.java @@ -32,7 +32,7 @@ public class BaseBinlogTableEventInfo extends BaseBinlogEventInfo implements Bin public BaseBinlogTableEventInfo(TableInfo tableInfo, String eventType, Long timestamp, String binlogFilename, Long binlogPosition) { super(eventType, timestamp, binlogFilename, binlogPosition); - this.delegate = new BaseTableEventInfo(tableInfo, SCHEMA_CHANGE, timestamp); + this.delegate = new BaseTableEventInfo(tableInfo, DDL_EVENT, timestamp); } @Override diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/SchemaChangeEventInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DDLEventInfo.java similarity index 70% rename from nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/SchemaChangeEventInfo.java rename to nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DDLEventInfo.java index a385b11467..bc2c87140c 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/SchemaChangeEventInfo.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DDLEventInfo.java @@ -21,14 +21,14 @@ import org.apache.nifi.cdc.event.TableInfo; /** - * An event class corresponding to table schema changes (add/drop column, add/drop table, etc.) + * An event class corresponding to Data Definition Language (DDL) events, such as schema changes (add/drop column, add/drop table, etc.) and others (truncate table, e.g.) */ -public class SchemaChangeEventInfo extends BaseBinlogTableEventInfo implements TableEventInfo { +public class DDLEventInfo extends BaseBinlogTableEventInfo implements TableEventInfo { private String query; - public SchemaChangeEventInfo(TableInfo tableInfo, Long timestamp, String binlogFilename, long binlogPosition, String query) { - super(tableInfo, SCHEMA_CHANGE, timestamp, binlogFilename, binlogPosition); + public DDLEventInfo(TableInfo tableInfo, Long timestamp, String binlogFilename, long binlogPosition, String query) { + super(tableInfo, DDL_EVENT, timestamp, binlogFilename, binlogPosition); this.query = query; } diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/SchemaChangeEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java similarity index 82% rename from nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/SchemaChangeEventWriter.java rename to nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java index fe31c078a7..0064c29322 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/SchemaChangeEventWriter.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java @@ -19,15 +19,15 @@ package org.apache.nifi.cdc.mysql.event.io; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; -import org.apache.nifi.cdc.mysql.event.SchemaChangeEventInfo; +import org.apache.nifi.cdc.mysql.event.DDLEventInfo; /** - * A writer class to output MySQL binlog "schema change" (ALTER TABLE, e.g.) events to flow file(s). + * A writer class to output MySQL binlog Data Definition Language (DDL) events to flow file(s). */ -public class SchemaChangeEventWriter extends AbstractBinlogTableEventWriter { +public class DDLEventWriter extends AbstractBinlogTableEventWriter { @Override - public long writeEvent(ProcessSession session, String transitUri, SchemaChangeEventInfo eventInfo, long currentSequenceId, Relationship relationship) { + public long writeEvent(ProcessSession session, String transitUri, DDLEventInfo eventInfo, long currentSequenceId, Relationship relationship) { FlowFile flowFile = session.create(); flowFile = session.write(flowFile, (outputStream) -> { super.startJson(outputStream, eventInfo); diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java index 688dff1dc1..a8c3336bfb 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java @@ -47,13 +47,13 @@ import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo; import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo; import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo; import org.apache.nifi.cdc.mysql.event.RawBinlogEvent; -import org.apache.nifi.cdc.mysql.event.SchemaChangeEventInfo; +import org.apache.nifi.cdc.mysql.event.DDLEventInfo; import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo; import org.apache.nifi.cdc.mysql.event.io.BeginTransactionEventWriter; import org.apache.nifi.cdc.mysql.event.io.CommitTransactionEventWriter; import org.apache.nifi.cdc.mysql.event.io.DeleteRowsWriter; import org.apache.nifi.cdc.mysql.event.io.InsertRowsWriter; -import org.apache.nifi.cdc.mysql.event.io.SchemaChangeEventWriter; +import org.apache.nifi.cdc.mysql.event.io.DDLEventWriter; import org.apache.nifi.cdc.mysql.event.io.UpdateRowsWriter; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; @@ -128,7 +128,7 @@ import static com.github.shyiko.mysql.binlog.event.EventType.WRITE_ROWS; @WritesAttribute(attribute = EventWriter.SEQUENCE_ID_KEY, description = "A sequence identifier (i.e. strictly increasing integer value) specifying the order " + "of the CDC event flow file relative to the other event flow file(s)."), @WritesAttribute(attribute = EventWriter.CDC_EVENT_TYPE_ATTRIBUTE, description = "A string indicating the type of CDC event that occurred, including (but not limited to) " - + "'begin', 'insert', 'update', 'delete', 'schema_change' and 'commit'."), + + "'begin', 'insert', 'update', 'delete', 'ddl' and 'commit'."), @WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to " + "application/json") }) @@ -377,7 +377,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { private final BeginTransactionEventWriter beginEventWriter = new BeginTransactionEventWriter(); private final CommitTransactionEventWriter commitEventWriter = new CommitTransactionEventWriter(); - private final SchemaChangeEventWriter schemaChangeEventWriter = new SchemaChangeEventWriter(); + private final DDLEventWriter ddlEventWriter = new DDLEventWriter(); private final InsertRowsWriter insertRowsWriter = new InsertRowsWriter(); private final DeleteRowsWriter deleteRowsWriter = new DeleteRowsWriter(); private final UpdateRowsWriter updateRowsWriter = new UpdateRowsWriter(); @@ -801,8 +801,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { || normalizedQuery.startsWith("drop database")) { if (includeDDLEvents) { - SchemaChangeEventInfo schemaChangeEvent = new SchemaChangeEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, normalizedQuery); - currentSequenceId.set(schemaChangeEventWriter.writeEvent(currentSession, transitUri, schemaChangeEvent, currentSequenceId.get(), REL_SUCCESS)); + DDLEventInfo ddlEvent = new DDLEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, normalizedQuery); + currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS)); } // Remove all the keys from the cache that this processor added if (cacheClient != null) { diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy index f82028e21f..eb1f32b920 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy @@ -403,7 +403,7 @@ class CaptureChangeMySQLTest { def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS) List expectedEventTypes = ([] + 'begin' + Collections.nCopies(3, 'insert') + 'commit' + 'begin' + 'update' + 'commit' - + 'begin' + 'schema_change' + Collections.nCopies(2, 'delete') + 'commit') + + 'begin' + 'ddl' + Collections.nCopies(2, 'delete') + 'commit') resultFiles.eachWithIndex {e, i -> assertEquals(i, Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY))) @@ -478,7 +478,7 @@ class CaptureChangeMySQLTest { testRunner.run(1, true, false) def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS) - // No 'schema_change' events expected + // No DDL events expected List expectedEventTypes = ([] + 'begin' + Collections.nCopies(3, 'insert') + 'commit') resultFiles.eachWithIndex {e, i ->