mirror of https://github.com/apache/nifi.git
NIFI-3735: Replaced 'Schema Change Event' references to 'DDL Event'
This closes #1703. Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
parent
97461657b1
commit
24bb8cf95d
|
@ -28,7 +28,7 @@ public interface EventInfo {
|
||||||
String INSERT_EVENT = "insert";
|
String INSERT_EVENT = "insert";
|
||||||
String DELETE_EVENT = "delete";
|
String DELETE_EVENT = "delete";
|
||||||
String UPDATE_EVENT = "update";
|
String UPDATE_EVENT = "update";
|
||||||
String SCHEMA_CHANGE = "schema_change";
|
String DDL_EVENT = "ddl";
|
||||||
|
|
||||||
String getEventType();
|
String getEventType();
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,7 @@ public class BaseBinlogTableEventInfo extends BaseBinlogEventInfo implements Bin
|
||||||
|
|
||||||
public BaseBinlogTableEventInfo(TableInfo tableInfo, String eventType, Long timestamp, String binlogFilename, Long binlogPosition) {
|
public BaseBinlogTableEventInfo(TableInfo tableInfo, String eventType, Long timestamp, String binlogFilename, Long binlogPosition) {
|
||||||
super(eventType, timestamp, binlogFilename, binlogPosition);
|
super(eventType, timestamp, binlogFilename, binlogPosition);
|
||||||
this.delegate = new BaseTableEventInfo(tableInfo, SCHEMA_CHANGE, timestamp);
|
this.delegate = new BaseTableEventInfo(tableInfo, DDL_EVENT, timestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -21,14 +21,14 @@ import org.apache.nifi.cdc.event.TableInfo;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An event class corresponding to table schema changes (add/drop column, add/drop table, etc.)
|
* An event class corresponding to Data Definition Language (DDL) events, such as schema changes (add/drop column, add/drop table, etc.) and others (truncate table, e.g.)
|
||||||
*/
|
*/
|
||||||
public class SchemaChangeEventInfo extends BaseBinlogTableEventInfo implements TableEventInfo {
|
public class DDLEventInfo extends BaseBinlogTableEventInfo implements TableEventInfo {
|
||||||
|
|
||||||
private String query;
|
private String query;
|
||||||
|
|
||||||
public SchemaChangeEventInfo(TableInfo tableInfo, Long timestamp, String binlogFilename, long binlogPosition, String query) {
|
public DDLEventInfo(TableInfo tableInfo, Long timestamp, String binlogFilename, long binlogPosition, String query) {
|
||||||
super(tableInfo, SCHEMA_CHANGE, timestamp, binlogFilename, binlogPosition);
|
super(tableInfo, DDL_EVENT, timestamp, binlogFilename, binlogPosition);
|
||||||
this.query = query;
|
this.query = query;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,15 +19,15 @@ package org.apache.nifi.cdc.mysql.event.io;
|
||||||
import org.apache.nifi.flowfile.FlowFile;
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.processor.ProcessSession;
|
import org.apache.nifi.processor.ProcessSession;
|
||||||
import org.apache.nifi.processor.Relationship;
|
import org.apache.nifi.processor.Relationship;
|
||||||
import org.apache.nifi.cdc.mysql.event.SchemaChangeEventInfo;
|
import org.apache.nifi.cdc.mysql.event.DDLEventInfo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A writer class to output MySQL binlog "schema change" (ALTER TABLE, e.g.) events to flow file(s).
|
* A writer class to output MySQL binlog Data Definition Language (DDL) events to flow file(s).
|
||||||
*/
|
*/
|
||||||
public class SchemaChangeEventWriter extends AbstractBinlogTableEventWriter<SchemaChangeEventInfo> {
|
public class DDLEventWriter extends AbstractBinlogTableEventWriter<DDLEventInfo> {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long writeEvent(ProcessSession session, String transitUri, SchemaChangeEventInfo eventInfo, long currentSequenceId, Relationship relationship) {
|
public long writeEvent(ProcessSession session, String transitUri, DDLEventInfo eventInfo, long currentSequenceId, Relationship relationship) {
|
||||||
FlowFile flowFile = session.create();
|
FlowFile flowFile = session.create();
|
||||||
flowFile = session.write(flowFile, (outputStream) -> {
|
flowFile = session.write(flowFile, (outputStream) -> {
|
||||||
super.startJson(outputStream, eventInfo);
|
super.startJson(outputStream, eventInfo);
|
|
@ -47,13 +47,13 @@ import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
|
||||||
import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
|
import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
|
||||||
import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
|
import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
|
||||||
import org.apache.nifi.cdc.mysql.event.RawBinlogEvent;
|
import org.apache.nifi.cdc.mysql.event.RawBinlogEvent;
|
||||||
import org.apache.nifi.cdc.mysql.event.SchemaChangeEventInfo;
|
import org.apache.nifi.cdc.mysql.event.DDLEventInfo;
|
||||||
import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
|
import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
|
||||||
import org.apache.nifi.cdc.mysql.event.io.BeginTransactionEventWriter;
|
import org.apache.nifi.cdc.mysql.event.io.BeginTransactionEventWriter;
|
||||||
import org.apache.nifi.cdc.mysql.event.io.CommitTransactionEventWriter;
|
import org.apache.nifi.cdc.mysql.event.io.CommitTransactionEventWriter;
|
||||||
import org.apache.nifi.cdc.mysql.event.io.DeleteRowsWriter;
|
import org.apache.nifi.cdc.mysql.event.io.DeleteRowsWriter;
|
||||||
import org.apache.nifi.cdc.mysql.event.io.InsertRowsWriter;
|
import org.apache.nifi.cdc.mysql.event.io.InsertRowsWriter;
|
||||||
import org.apache.nifi.cdc.mysql.event.io.SchemaChangeEventWriter;
|
import org.apache.nifi.cdc.mysql.event.io.DDLEventWriter;
|
||||||
import org.apache.nifi.cdc.mysql.event.io.UpdateRowsWriter;
|
import org.apache.nifi.cdc.mysql.event.io.UpdateRowsWriter;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.PropertyValue;
|
import org.apache.nifi.components.PropertyValue;
|
||||||
|
@ -128,7 +128,7 @@ import static com.github.shyiko.mysql.binlog.event.EventType.WRITE_ROWS;
|
||||||
@WritesAttribute(attribute = EventWriter.SEQUENCE_ID_KEY, description = "A sequence identifier (i.e. strictly increasing integer value) specifying the order "
|
@WritesAttribute(attribute = EventWriter.SEQUENCE_ID_KEY, description = "A sequence identifier (i.e. strictly increasing integer value) specifying the order "
|
||||||
+ "of the CDC event flow file relative to the other event flow file(s)."),
|
+ "of the CDC event flow file relative to the other event flow file(s)."),
|
||||||
@WritesAttribute(attribute = EventWriter.CDC_EVENT_TYPE_ATTRIBUTE, description = "A string indicating the type of CDC event that occurred, including (but not limited to) "
|
@WritesAttribute(attribute = EventWriter.CDC_EVENT_TYPE_ATTRIBUTE, description = "A string indicating the type of CDC event that occurred, including (but not limited to) "
|
||||||
+ "'begin', 'insert', 'update', 'delete', 'schema_change' and 'commit'."),
|
+ "'begin', 'insert', 'update', 'delete', 'ddl' and 'commit'."),
|
||||||
@WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to "
|
@WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to "
|
||||||
+ "application/json")
|
+ "application/json")
|
||||||
})
|
})
|
||||||
|
@ -377,7 +377,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|
||||||
|
|
||||||
private final BeginTransactionEventWriter beginEventWriter = new BeginTransactionEventWriter();
|
private final BeginTransactionEventWriter beginEventWriter = new BeginTransactionEventWriter();
|
||||||
private final CommitTransactionEventWriter commitEventWriter = new CommitTransactionEventWriter();
|
private final CommitTransactionEventWriter commitEventWriter = new CommitTransactionEventWriter();
|
||||||
private final SchemaChangeEventWriter schemaChangeEventWriter = new SchemaChangeEventWriter();
|
private final DDLEventWriter ddlEventWriter = new DDLEventWriter();
|
||||||
private final InsertRowsWriter insertRowsWriter = new InsertRowsWriter();
|
private final InsertRowsWriter insertRowsWriter = new InsertRowsWriter();
|
||||||
private final DeleteRowsWriter deleteRowsWriter = new DeleteRowsWriter();
|
private final DeleteRowsWriter deleteRowsWriter = new DeleteRowsWriter();
|
||||||
private final UpdateRowsWriter updateRowsWriter = new UpdateRowsWriter();
|
private final UpdateRowsWriter updateRowsWriter = new UpdateRowsWriter();
|
||||||
|
@ -801,8 +801,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|
||||||
|| normalizedQuery.startsWith("drop database")) {
|
|| normalizedQuery.startsWith("drop database")) {
|
||||||
|
|
||||||
if (includeDDLEvents) {
|
if (includeDDLEvents) {
|
||||||
SchemaChangeEventInfo schemaChangeEvent = new SchemaChangeEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, normalizedQuery);
|
DDLEventInfo ddlEvent = new DDLEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, normalizedQuery);
|
||||||
currentSequenceId.set(schemaChangeEventWriter.writeEvent(currentSession, transitUri, schemaChangeEvent, currentSequenceId.get(), REL_SUCCESS));
|
currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS));
|
||||||
}
|
}
|
||||||
// Remove all the keys from the cache that this processor added
|
// Remove all the keys from the cache that this processor added
|
||||||
if (cacheClient != null) {
|
if (cacheClient != null) {
|
||||||
|
|
|
@ -403,7 +403,7 @@ class CaptureChangeMySQLTest {
|
||||||
|
|
||||||
def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
|
def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
|
||||||
List<String> expectedEventTypes = ([] + 'begin' + Collections.nCopies(3, 'insert') + 'commit' + 'begin' + 'update' + 'commit'
|
List<String> expectedEventTypes = ([] + 'begin' + Collections.nCopies(3, 'insert') + 'commit' + 'begin' + 'update' + 'commit'
|
||||||
+ 'begin' + 'schema_change' + Collections.nCopies(2, 'delete') + 'commit')
|
+ 'begin' + 'ddl' + Collections.nCopies(2, 'delete') + 'commit')
|
||||||
|
|
||||||
resultFiles.eachWithIndex {e, i ->
|
resultFiles.eachWithIndex {e, i ->
|
||||||
assertEquals(i, Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY)))
|
assertEquals(i, Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY)))
|
||||||
|
@ -478,7 +478,7 @@ class CaptureChangeMySQLTest {
|
||||||
testRunner.run(1, true, false)
|
testRunner.run(1, true, false)
|
||||||
|
|
||||||
def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
|
def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
|
||||||
// No 'schema_change' events expected
|
// No DDL events expected
|
||||||
List<String> expectedEventTypes = ([] + 'begin' + Collections.nCopies(3, 'insert') + 'commit')
|
List<String> expectedEventTypes = ([] + 'begin' + Collections.nCopies(3, 'insert') + 'commit')
|
||||||
|
|
||||||
resultFiles.eachWithIndex {e, i ->
|
resultFiles.eachWithIndex {e, i ->
|
||||||
|
|
Loading…
Reference in New Issue