From a2b98e0c9c17ba7603dcc666bfae543abeb1ea7a Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Mon, 30 Jan 2023 16:59:53 -0500 Subject: [PATCH] NIFI-11094: Allow CaptureChangeMySQL to send multiple events per FlowFile This closes #6907. Co-authored-by: Tamas Palfy Signed-off-by: Tamas Palfy --- .../cdc/event/io/AbstractEventWriter.java | 14 +- .../apache/nifi/cdc/event/io/EventWriter.java | 16 +- .../event/io/EventWriterConfiguration.java | 82 ++++++ .../event/io/FlowFileEventWriteStrategy.java | 54 ++++ .../event/io/AbstractBinlogEventWriter.java | 128 ++++++++-- .../io/AbstractBinlogTableEventWriter.java | 19 -- .../io/CommitTransactionEventWriter.java | 15 ++ .../cdc/mysql/event/io/DDLEventWriter.java | 31 ++- .../cdc/mysql/event/io/DeleteRowsWriter.java | 35 +-- .../cdc/mysql/event/io/InsertRowsWriter.java | 34 ++- .../cdc/mysql/event/io/UpdateRowsWriter.java | 34 ++- .../mysql/processors/CaptureChangeMySQL.java | 165 ++++++++++--- .../processors/CaptureChangeMySQLTest.groovy | 233 ++++++++++++++++++ 13 files changed, 733 insertions(+), 127 deletions(-) create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriterConfiguration.java create mode 100644 nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/FlowFileEventWriteStrategy.java diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/AbstractEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/AbstractEventWriter.java index c805d3cf7f..e18a3db5d1 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/AbstractEventWriter.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/AbstractEventWriter.java @@ -33,7 +33,9 @@ public abstract class AbstractEventWriter implements EventW // Common method to create a JSON generator and start the root object. Should be called by sub-classes unless they need their own generator and such. protected void startJson(OutputStream outputStream, T event) throws IOException { - jsonGenerator = createJsonGenerator(outputStream); + if (jsonGenerator == null) { + jsonGenerator = createJsonGenerator(outputStream); + } jsonGenerator.writeStartObject(); String eventType = event.getEventType(); if (eventType == null) { @@ -54,11 +56,15 @@ public abstract class AbstractEventWriter implements EventW throw new IOException("endJson called without a JsonGenerator"); } jsonGenerator.writeEndObject(); - jsonGenerator.flush(); - jsonGenerator.close(); } - private JsonGenerator createJsonGenerator(OutputStream out) throws IOException { + protected void endFile() throws IOException { + jsonGenerator.flush(); + jsonGenerator.close(); + jsonGenerator = null; + } + + protected JsonGenerator createJsonGenerator(OutputStream out) throws IOException { return JSON_FACTORY.createGenerator(out); } } diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriter.java index 096e3c1e69..e74a56ee99 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriter.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriter.java @@ -32,12 +32,14 @@ public interface EventWriter { /** * Writes the given event to the process session, possibly via transferring it to the specified relationship (usually used for success) * - * @param session The session to write the event to - * @param transitUri The URI indicating the source MySQL system from which the specified event is associated - * @param eventInfo The event data - * @param currentSequenceId the current sequence ID - * @param relationship A relationship to transfer any flowfile(s) to - * @return a sequence ID, usually incremented from the specified current sequence id by the number of flow files transferred and/or committed + * @param session The session to write the event to + * @param transitUri The URI indicating the source MySQL system from which the specified event is associated + * @param eventInfo The event data + * @param currentSequenceId The current sequence ID + * @param relationship A relationship to transfer any flowfile(s) to + * @param eventWriterConfiguration A configuration object used for FlowFile management (how many events to write to each FlowFile, e.g.) + * @return a sequence ID, usually incremented from the specified current sequence ID by the number of FlowFiles transferred and/or committed */ - long writeEvent(final ProcessSession session, String transitUri, final T eventInfo, final long currentSequenceId, Relationship relationship); + long writeEvent(final ProcessSession session, String transitUri, final T eventInfo, final long currentSequenceId, Relationship relationship, + final EventWriterConfiguration eventWriterConfiguration); } diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriterConfiguration.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriterConfiguration.java new file mode 100644 index 0000000000..153ce8319c --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/EventWriterConfiguration.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.event.io; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.nifi.flowfile.FlowFile; + +import java.io.IOException; +import java.io.OutputStream; + +public class EventWriterConfiguration { + + private final FlowFileEventWriteStrategy flowFileEventWriteStrategy; + private final int numberOfEventsPerFlowFile; + + private int numberOfEventsWritten; + + private FlowFile currentFlowFile; + private OutputStream flowFileOutputStream; + private JsonGenerator jsonGenerator; + + public EventWriterConfiguration(FlowFileEventWriteStrategy flowFileEventWriteStrategy, int numberOfEventsPerFlowFile) { + this.flowFileEventWriteStrategy = flowFileEventWriteStrategy; + this.numberOfEventsPerFlowFile = numberOfEventsPerFlowFile; + } + + public FlowFileEventWriteStrategy getFlowFileEventWriteStrategy() { + return flowFileEventWriteStrategy; + } + + public int getNumberOfEventsWritten() { + return numberOfEventsWritten; + } + + public void incrementNumberOfEventsWritten() { + this.numberOfEventsWritten++; + } + + public void startNewFlowFile(FlowFile flowFile, OutputStream flowFileOutputStream, JsonGenerator jsonGenerator) { + this.currentFlowFile = flowFile; + this.flowFileOutputStream = flowFileOutputStream; + this.jsonGenerator = jsonGenerator; + } + + public void cleanUp() throws IOException { + this.currentFlowFile = null; + this.flowFileOutputStream.close(); + this.flowFileOutputStream = null; + this.jsonGenerator = null; + this.numberOfEventsWritten = 0; + } + + public int getNumberOfEventsPerFlowFile() { + return numberOfEventsPerFlowFile; + } + + public FlowFile getCurrentFlowFile() { + return currentFlowFile; + } + + public OutputStream getFlowFileOutputStream() { + return flowFileOutputStream; + } + + public JsonGenerator getJsonGenerator() { + return jsonGenerator; + } +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/FlowFileEventWriteStrategy.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/FlowFileEventWriteStrategy.java new file mode 100644 index 0000000000..bd39174db2 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/io/FlowFileEventWriteStrategy.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.event.io; + +import org.apache.nifi.components.DescribedValue; + +public enum FlowFileEventWriteStrategy implements DescribedValue { + MAX_EVENTS_PER_FLOWFILE( + "Max Events Per FlowFile", + "This strategy causes at most the number of events specified in the 'Number of Events Per FlowFile' property to be written per FlowFile. If the processor is stopped before the " + + "specified number of events has been written (or the event queue becomes empty), the fewer number of events will still be written as a FlowFile before stopping." + ), + ONE_TRANSACTION_PER_FLOWFILE( + "One Transaction Per FlowFile", + "This strategy causes each event from a transaction (from BEGIN to COMMIT) to be written to a FlowFile" + ); + + private String displayName; + private String description; + + FlowFileEventWriteStrategy(String displayName, String description) { + this.displayName = displayName; + this.description = description; + } + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java index 09186f23c8..420da6c2d4 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogEventWriter.java @@ -16,14 +16,21 @@ */ package org.apache.nifi.cdc.mysql.event.io; +import org.apache.nifi.cdc.event.EventInfo; +import org.apache.nifi.cdc.event.io.EventWriterConfiguration; +import org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy; import org.apache.nifi.cdc.mysql.event.BinlogEventInfo; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.cdc.event.io.AbstractEventWriter; import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.FlowFileAccessException; +import org.apache.nifi.processor.exception.ProcessException; import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; import java.util.HashMap; import java.util.Map; @@ -44,35 +51,114 @@ public abstract class AbstractBinlogEventWriter exten } protected Map getCommonAttributes(final long sequenceId, BinlogEventInfo eventInfo) { - return new HashMap() { - { - put(SEQUENCE_ID_KEY, Long.toString(sequenceId)); - put(CDC_EVENT_TYPE_ATTRIBUTE, eventInfo.getEventType()); - String gtidSet = eventInfo.getBinlogGtidSet(); - if (gtidSet == null) { - put(BinlogEventInfo.BINLOG_FILENAME_KEY, eventInfo.getBinlogFilename()); - put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(eventInfo.getBinlogPosition())); - } else { - put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet); - } - put(CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON); - } - }; + final Map commonAttributeMap = new HashMap<>(); + + commonAttributeMap.put(SEQUENCE_ID_KEY, Long.toString(sequenceId)); + commonAttributeMap.put(CDC_EVENT_TYPE_ATTRIBUTE, eventInfo.getEventType()); + String gtidSet = eventInfo.getBinlogGtidSet(); + if (gtidSet == null) { + commonAttributeMap.put(BinlogEventInfo.BINLOG_FILENAME_KEY, eventInfo.getBinlogFilename()); + commonAttributeMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(eventInfo.getBinlogPosition())); + } else { + commonAttributeMap.put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet); + } + commonAttributeMap.put(CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON); + + return commonAttributeMap; } // Default implementation for binlog events @Override - public long writeEvent(ProcessSession session, String transitUri, T eventInfo, long currentSequenceId, Relationship relationship) { - FlowFile flowFile = session.create(); - flowFile = session.write(flowFile, (outputStream) -> { + public long writeEvent(ProcessSession session, String transitUri, T eventInfo, long currentSequenceId, Relationship relationship, + final EventWriterConfiguration eventWriterConfiguration) { + configureEventWriter(eventWriterConfiguration, session, eventInfo); + + OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream(); + try { super.startJson(outputStream, eventInfo); writeJson(eventInfo); // Nothing in the body super.endJson(); - }); - flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo)); - session.transfer(flowFile, relationship); - session.getProvenanceReporter().receive(flowFile, transitUri); + } catch (IOException ioe) { + throw new UncheckedIOException("Write JSON start array failed", ioe); + } + + eventWriterConfiguration.incrementNumberOfEventsWritten(); + + // Check if it is time to finish the FlowFile + if (maxEventsPerFlowFile(eventWriterConfiguration) + && eventWriterConfiguration.getNumberOfEventsWritten() == eventWriterConfiguration.getNumberOfEventsPerFlowFile()) { + finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, currentSequenceId, eventInfo, relationship); + } return currentSequenceId + 1; } + + public void finishAndTransferFlowFile(final ProcessSession session, final EventWriterConfiguration eventWriterConfiguration, final String transitUri, final long seqId, + final BinlogEventInfo eventInfo, final Relationship relationship) { + if (writtenMultipleEvents(eventWriterConfiguration)) { + try { + jsonGenerator.writeEndArray(); + } catch (IOException ioe) { + throw new UncheckedIOException("Write JSON end array failed", ioe); + } + } + try { + endFile(); + + FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile(); + if (session == null || flowFile == null) { + throw new ProcessException("No open FlowFile or ProcessSession to write to"); + } + flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId, eventInfo)); + session.transfer(flowFile, relationship); + session.getProvenanceReporter().receive(flowFile, transitUri); + + eventWriterConfiguration.cleanUp(); + } catch (IOException ioe) { + throw new FlowFileAccessException("Failed to close event writer", ioe); + } + } + + protected void configureEventWriter(final EventWriterConfiguration eventWriterConfiguration, final ProcessSession session, final EventInfo eventInfo) { + FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile(); + if (flowFile == null) { + flowFile = session.create(); + OutputStream flowFileOutputStream = session.write(flowFile); + if (eventWriterConfiguration.getJsonGenerator() == null) { + try { + jsonGenerator = createJsonGenerator(flowFileOutputStream); + } catch (IOException ioe) { + throw new UncheckedIOException("JSON Generator creation failed", ioe); + } + } + if (multipleEventsPerFlowFile(eventWriterConfiguration)) { + try { + jsonGenerator.writeStartArray(); + } catch (IOException ioe) { + throw new UncheckedIOException("Write JSON start array failed", ioe); + } + } + eventWriterConfiguration.startNewFlowFile(flowFile, flowFileOutputStream, jsonGenerator); + } + jsonGenerator = eventWriterConfiguration.getJsonGenerator(); + } + + private boolean multipleEventsPerFlowFile(EventWriterConfiguration eventWriterConfiguration) { + return (maxEventsPerFlowFile(eventWriterConfiguration) + && eventWriterConfiguration.getNumberOfEventsPerFlowFile() > 1) + || oneTransactionPerFlowFile(eventWriterConfiguration); + } + + private boolean writtenMultipleEvents(EventWriterConfiguration eventWriterConfiguration) { + return eventWriterConfiguration.getNumberOfEventsWritten() > 1 + || oneTransactionPerFlowFile(eventWriterConfiguration); + } + + protected boolean maxEventsPerFlowFile(EventWriterConfiguration eventWriterConfiguration) { + return FlowFileEventWriteStrategy.MAX_EVENTS_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy()); + } + + protected boolean oneTransactionPerFlowFile(EventWriterConfiguration eventWriterConfiguration) { + return FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy()); + } } diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java index a674386de7..f618a76a6f 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java @@ -16,9 +16,6 @@ */ package org.apache.nifi.cdc.mysql.event.io; -import org.apache.nifi.flowfile.FlowFile; -import org.apache.nifi.processor.ProcessSession; -import org.apache.nifi.processor.Relationship; import org.apache.nifi.cdc.mysql.event.BinlogTableEventInfo; import java.io.IOException; @@ -46,20 +43,4 @@ public abstract class AbstractBinlogTableEventWriter { - super.startJson(outputStream, eventInfo); - writeJson(eventInfo); - // Nothing in the body - super.endJson(); - }); - flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo)); - session.transfer(flowFile, relationship); - session.getProvenanceReporter().receive(flowFile, transitUri); - return currentSequenceId + 1; - } } diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java index c69b4b242b..9ed576b023 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/CommitTransactionEventWriter.java @@ -17,7 +17,10 @@ package org.apache.nifi.cdc.mysql.event.io; +import org.apache.nifi.cdc.event.io.EventWriterConfiguration; import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; import java.io.IOException; @@ -25,6 +28,18 @@ import java.io.IOException; * A writer for events corresponding to the end (i.e. commit) of a MySQL transaction */ public class CommitTransactionEventWriter extends AbstractBinlogEventWriter { + + @Override + public long writeEvent(ProcessSession session, String transitUri, CommitTransactionEventInfo eventInfo, long currentSequenceId, + Relationship relationship, EventWriterConfiguration eventWriterConfiguration) { + long sequenceId = super.writeEvent(session, transitUri, eventInfo, currentSequenceId, relationship, eventWriterConfiguration); + // If writing one transaction per flowfile, finish the flowfile here before committing the session + if (oneTransactionPerFlowFile(eventWriterConfiguration)) { + super.finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, sequenceId, eventInfo, relationship); + } + return sequenceId; + } + protected void writeJson(CommitTransactionEventInfo event) throws IOException { super.writeJson(event); if (event.getDatabaseName() != null) { diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java index 0064c29322..afe084c6ad 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.java @@ -16,28 +16,43 @@ */ package org.apache.nifi.cdc.mysql.event.io; -import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.cdc.event.io.EventWriterConfiguration; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.Relationship; import org.apache.nifi.cdc.mysql.event.DDLEventInfo; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; + + /** * A writer class to output MySQL binlog Data Definition Language (DDL) events to flow file(s). */ public class DDLEventWriter extends AbstractBinlogTableEventWriter { @Override - public long writeEvent(ProcessSession session, String transitUri, DDLEventInfo eventInfo, long currentSequenceId, Relationship relationship) { - FlowFile flowFile = session.create(); - flowFile = session.write(flowFile, (outputStream) -> { + public long writeEvent(ProcessSession session, String transitUri, DDLEventInfo eventInfo, long currentSequenceId, Relationship relationship, + final EventWriterConfiguration eventWriterConfiguration) { + configureEventWriter(eventWriterConfiguration, session, eventInfo); + OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream(); + + try { super.startJson(outputStream, eventInfo); super.writeJson(eventInfo); jsonGenerator.writeStringField("query", eventInfo.getQuery()); super.endJson(); - }); - flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo)); - session.transfer(flowFile, relationship); - session.getProvenanceReporter().receive(flowFile, transitUri); + } catch (IOException ioe) { + throw new UncheckedIOException("Write JSON start array failed", ioe); + } + + eventWriterConfiguration.incrementNumberOfEventsWritten(); + + // Check if it is time to finish the FlowFile + if (maxEventsPerFlowFile(eventWriterConfiguration) + && eventWriterConfiguration.getNumberOfEventsWritten() == eventWriterConfiguration.getNumberOfEventsPerFlowFile()) { + finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, currentSequenceId, eventInfo, relationship); + } return currentSequenceId + 1; } } diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java index 7e316d41c5..4828ce8662 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java @@ -16,18 +16,18 @@ */ package org.apache.nifi.cdc.mysql.event.io; +import org.apache.nifi.cdc.event.io.EventWriterConfiguration; import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils; -import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.cdc.event.ColumnDefinition; import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo; import org.apache.nifi.processor.Relationship; import java.io.IOException; +import java.io.OutputStream; import java.io.Serializable; +import java.io.UncheckedIOException; import java.util.BitSet; -import java.util.concurrent.atomic.AtomicLong; - /** * A writer class to output MySQL binlog "delete rows" events to flow file(s). @@ -42,12 +42,13 @@ public class DeleteRowsWriter extends AbstractBinlogTableEventWriter { + configureEventWriter(eventWriterConfiguration, session, eventInfo); + OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream(); + try { super.startJson(outputStream, eventInfo); super.writeJson(eventInfo); @@ -56,14 +57,20 @@ public class DeleteRowsWriter extends AbstractBinlogTableEventWriter { + configureEventWriter(eventWriterConfiguration, session, eventInfo); + OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream(); + try { super.startJson(outputStream, eventInfo); super.writeJson(eventInfo); @@ -56,14 +58,20 @@ public class InsertRowsWriter extends AbstractBinlogTableEventWriter row : eventInfo.getRows()) { + configureEventWriter(eventWriterConfiguration, session, eventInfo); + OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream(); - FlowFile flowFile = session.create(); - flowFile = session.write(flowFile, outputStream -> { - + try { super.startJson(outputStream, eventInfo); super.writeJson(eventInfo); @@ -57,14 +59,20 @@ public class UpdateRowsWriter extends AbstractBinlogTableEventWriter row, BitSet includedColumns) throws IOException { diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java index 71bea808e1..d2665fa231 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQL.java @@ -44,7 +44,6 @@ import org.apache.nifi.cdc.event.ColumnDefinition; import org.apache.nifi.cdc.event.RowEventException; import org.apache.nifi.cdc.event.TableInfo; import org.apache.nifi.cdc.event.TableInfoCacheKey; -import org.apache.nifi.cdc.event.io.EventWriter; import org.apache.nifi.cdc.mysql.event.BeginTransactionEventInfo; import org.apache.nifi.cdc.mysql.event.BinlogEventInfo; import org.apache.nifi.cdc.mysql.event.BinlogEventListener; @@ -52,9 +51,12 @@ import org.apache.nifi.cdc.mysql.event.BinlogLifecycleListener; import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo; import org.apache.nifi.cdc.mysql.event.DDLEventInfo; import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo; +import org.apache.nifi.cdc.event.io.EventWriterConfiguration; +import org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy; import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo; import org.apache.nifi.cdc.mysql.event.RawBinlogEvent; import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo; +import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter; import org.apache.nifi.cdc.mysql.event.io.BeginTransactionEventWriter; import org.apache.nifi.cdc.mysql.event.io.CommitTransactionEventWriter; import org.apache.nifi.cdc.mysql.event.io.DDLEventWriter; @@ -78,6 +80,7 @@ import org.apache.nifi.distributed.cache.client.Deserializer; import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; import org.apache.nifi.distributed.cache.client.Serializer; import org.apache.nifi.expression.ExpressionLanguageScope; +import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractSessionFactoryProcessor; import org.apache.nifi.processor.ProcessContext; @@ -132,6 +135,9 @@ import static com.github.shyiko.mysql.binlog.event.EventType.PRE_GA_WRITE_ROWS; import static com.github.shyiko.mysql.binlog.event.EventType.ROTATE; import static com.github.shyiko.mysql.binlog.event.EventType.WRITE_ROWS; import static com.github.shyiko.mysql.binlog.event.EventType.XID; +import static org.apache.nifi.cdc.event.io.EventWriter.CDC_EVENT_TYPE_ATTRIBUTE; +import static org.apache.nifi.cdc.event.io.EventWriter.SEQUENCE_ID_KEY; +import static org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy.MAX_EVENTS_PER_FLOWFILE; /** @@ -140,15 +146,17 @@ import static com.github.shyiko.mysql.binlog.event.EventType.XID; @TriggerSerially @PrimaryNodeOnly @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN) -@Tags({"sql", "jdbc", "cdc", "mysql"}) +@Tags({"sql", "jdbc", "cdc", "mysql", "transaction", "event"}) @CapabilityDescription("Retrieves Change Data Capture (CDC) events from a MySQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events " - + "are output as individual flow files ordered by the time at which the operation occurred.") + + "are output as either a group of a specified number of events (the default is 1 so each event becomes its own flow file) or grouped as a full transaction (BEGIN to COMMIT). All events " + + "are ordered by the time at which the operation occurred. NOTE: If the processor is stopped before the specified number of events have been written to a flow file, " + + "the partial flow file will be output in order to maintain the consistency of the event stream.") @Stateful(scopes = Scope.CLUSTER, description = "Information such as a 'pointer' to the current CDC event in the database is stored by this processor, such " + "that it can continue from the same location if restarted.") @WritesAttributes({ - @WritesAttribute(attribute = EventWriter.SEQUENCE_ID_KEY, description = "A sequence identifier (i.e. strictly increasing integer value) specifying the order " + @WritesAttribute(attribute = SEQUENCE_ID_KEY, description = "A sequence identifier (i.e. strictly increasing integer value) specifying the order " + "of the CDC event flow file relative to the other event flow file(s)."), - @WritesAttribute(attribute = EventWriter.CDC_EVENT_TYPE_ATTRIBUTE, description = "A string indicating the type of CDC event that occurred, including (but not limited to) " + @WritesAttribute(attribute = CDC_EVENT_TYPE_ATTRIBUTE, description = "A string indicating the type of CDC event that occurred, including (but not limited to) " + "'begin', 'insert', 'update', 'delete', 'ddl' and 'commit'."), @WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to " + "application/json") @@ -185,6 +193,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { SSLMode.VERIFY_IDENTITY.toString(), "Connect with TLS or fail when server support not enabled. Verify server hostname matches presented X.509 certificate names or fail when not matched"); + // Properties public static final PropertyDescriptor DATABASE_NAME_PATTERN = new PropertyDescriptor.Builder() .name("capture-change-mysql-db-name-pattern") @@ -269,6 +278,31 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) .build(); + public static final PropertyDescriptor EVENTS_PER_FLOWFILE_STRATEGY = new PropertyDescriptor.Builder() + .name("events-per-flowfile-strategy") + .displayName("Event Processing Strategy") + .description("Specifies the strategy to use when writing events to FlowFile(s), such as '" + MAX_EVENTS_PER_FLOWFILE.getDisplayName() + "'") + .required(true) + .sensitive(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .allowableValues(FlowFileEventWriteStrategy.class) + .defaultValue(MAX_EVENTS_PER_FLOWFILE.getValue()) + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .build(); + + public static final PropertyDescriptor NUMBER_OF_EVENTS_PER_FLOWFILE = new PropertyDescriptor.Builder() + .name("number-of-events-per-flowfile") + .displayName("Events Per FlowFile") + .description("Specifies how many events should be written to a single FlowFile. If the processor is stopped before the specified number of events has been written," + + "the events will still be written as a FlowFile before stopping.") + .required(true) + .sensitive(false) + .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) + .defaultValue("1") + .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY) + .dependsOn(EVENTS_PER_FLOWFILE_STRATEGY, MAX_EVENTS_PER_FLOWFILE.getValue()) + .build(); + public static final PropertyDescriptor SERVER_ID = new PropertyDescriptor.Builder() .name("capture-change-mysql-server-id") .displayName("Server ID") @@ -479,6 +513,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { private final DeleteRowsWriter deleteRowsWriter = new DeleteRowsWriter(); private final UpdateRowsWriter updateRowsWriter = new UpdateRowsWriter(); + private volatile EventWriterConfiguration eventWriterConfiguration; + private volatile BinlogEventInfo currentEventInfo; + private AbstractBinlogEventWriter currentEventWriter; + static { final Set r = new HashSet<>(); @@ -491,6 +529,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { pds.add(DRIVER_LOCATION); pds.add(USERNAME); pds.add(PASSWORD); + pds.add(EVENTS_PER_FLOWFILE_STRATEGY); + pds.add(NUMBER_OF_EVENTS_PER_FLOWFILE); pds.add(SERVER_ID); pds.add(DATABASE_NAME_PATTERN); pds.add(TABLE_NAME_PATTERN); @@ -569,6 +609,13 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { return; } + // Build a event writer config object for the event writers to use + final FlowFileEventWriteStrategy flowFileEventWriteStrategy = FlowFileEventWriteStrategy.valueOf(context.getProperty(EVENTS_PER_FLOWFILE_STRATEGY).getValue()); + eventWriterConfiguration = new EventWriterConfiguration( + flowFileEventWriteStrategy, + context.getProperty(NUMBER_OF_EVENTS_PER_FLOWFILE).evaluateAttributeExpressions().asInteger() + ); + PropertyValue dbNameValue = context.getProperty(DATABASE_NAME_PATTERN); databaseNamePattern = dbNameValue.isSet() ? Pattern.compile(dbNameValue.getValue()) : null; @@ -624,7 +671,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { } // Get current sequence ID from state - String seqIdString = stateMap.get(EventWriter.SEQUENCE_ID_KEY); + String seqIdString = stateMap.get(SEQUENCE_ID_KEY); if (StringUtils.isEmpty(seqIdString)) { // Use Initial Sequence ID property if none is found in state PropertyValue seqIdProp = context.getProperty(INIT_SEQUENCE_ID); @@ -888,7 +935,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !useGtid) { currentBinlogPosition = header.getPosition(); } - log.debug("Got message event type: {} ", new Object[]{header.getEventType().toString()}); + log.debug("Got message event type: {} ", header.getEventType().toString()); switch (eventType) { case TABLE_MAP: // This is sent to inform which table is about to be changed by subsequent events @@ -952,7 +999,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { BeginTransactionEventInfo beginEvent = useGtid ? new BeginTransactionEventInfo(currentDatabase, timestamp, currentGtidSet) : new BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition); - currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS)); + currentEventInfo = beginEvent; + currentEventWriter = beginEventWriter; + currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration)); + } inTransaction = true; //update inTransaction value to state @@ -963,17 +1013,35 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { + "This could indicate that your binlog position is invalid."); } // InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here - if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) { - CommitTransactionEventInfo commitTransactionEvent = useGtid - ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet) - : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition); - currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS)); + if (includeBeginCommit) { + if (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches()) { + CommitTransactionEventInfo commitTransactionEvent = useGtid + ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet) + : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition); + currentEventInfo = commitTransactionEvent; + currentEventWriter = commitEventWriter; + currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration)); + } + } else { + // If the COMMIT event is not to be written, the FlowFile should still be finished and the session committed. + if (currentSession != null) { + FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile(); + if (flowFile != null && currentEventWriter != null) { + // Flush the events to the FlowFile when the processor is stopped + currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS); + } + currentSession.commitAsync(); + } } + //update inTransaction value to state inTransaction = false; updateState(session); - // Commit the NiFi session - session.commitAsync(); + // If there is no FlowFile open, commit the session + if (eventWriterConfiguration.getCurrentFlowFile() == null) { + // Commit the NiFi session + session.commitAsync(); + } currentTable = null; } else { // Check for DDL events (alter table, e.g.). Normalize the query to do string matching on the type of change @@ -993,7 +1061,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { DDLEventInfo ddlEvent = useGtid ? new DDLEventInfo(ddlTableInfo, timestamp, currentGtidSet, sql) : new DDLEventInfo(ddlTableInfo, timestamp, currentBinlogFile, currentBinlogPosition, sql); - currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS)); + currentEventInfo = ddlEvent; + currentEventWriter = ddlEventWriter; + currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration)); } // Remove all the keys from the cache that this processor added if (cacheClient != null) { @@ -1002,7 +1072,19 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { // If not in a transaction, commit the session so the DDL event(s) will be transferred if (includeDDLEvents && !inTransaction) { updateState(session); - session.commitAsync(); + if (FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())) { + if (currentSession != null) { + FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile(); + if (flowFile != null && currentEventWriter != null) { + // Flush the events to the FlowFile when the processor is stopped + currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS); + } + } + } + // If there is no FlowFile open, commit the session + if (eventWriterConfiguration.getCurrentFlowFile() == null) { + session.commitAsync(); + } } } } @@ -1013,19 +1095,35 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { throw new IOException("COMMIT event received while not processing a transaction (i.e. no corresponding BEGIN event). " + "This could indicate that your binlog position is invalid."); } - if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) { - CommitTransactionEventInfo commitTransactionEvent = useGtid - ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet) - : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition); - currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS)); + if (includeBeginCommit) { + if (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches()) { + CommitTransactionEventInfo commitTransactionEvent = useGtid + ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet) + : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition); + currentEventInfo = commitTransactionEvent; + currentEventWriter = commitEventWriter; + currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration)); + } + } else { + // If the COMMIT event is not to be written, the FlowFile should still be finished and the session committed. + if (currentSession != null) { + FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile(); + if (flowFile != null && currentEventWriter != null) { + // Flush the events to the FlowFile when the processor is stopped + currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS); + } + currentSession.commitAsync(); + } } - // Commit the NiFi session // update inTransaction value and save next position // so when restart this processor,we will not read xid event again inTransaction = false; currentBinlogPosition = header.getNextPosition(); updateState(session); - session.commitAsync(); + // If there is no FlowFile open, commit the session + if (eventWriterConfiguration.getCurrentFlowFile() == null) { + session.commitAsync(); + } currentTable = null; currentDatabase = null; break; @@ -1060,7 +1158,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { InsertRowsEventInfo eventInfo = useGtid ? new InsertRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData()) : new InsertRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData()); - currentSequenceId.set(insertRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS)); + currentEventInfo = eventInfo; + currentEventWriter = insertRowsWriter; + currentSequenceId.set(insertRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration)); } else if (eventType == DELETE_ROWS || eventType == EXT_DELETE_ROWS @@ -1069,14 +1169,18 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { DeleteRowsEventInfo eventInfo = useGtid ? new DeleteRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData()) : new DeleteRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData()); - currentSequenceId.set(deleteRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS)); + currentEventInfo = eventInfo; + currentEventWriter = deleteRowsWriter; + currentSequenceId.set(deleteRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration)); } else { // Update event UpdateRowsEventInfo eventInfo = useGtid ? new UpdateRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData()) : new UpdateRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData()); - currentSequenceId.set(updateRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS)); + currentEventInfo = eventInfo; + currentEventWriter = updateRowsWriter; + currentSequenceId.set(updateRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration)); } break; @@ -1144,6 +1248,11 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { } if (currentSession != null) { + FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile(); + if (flowFile != null && currentEventWriter != null) { + // Flush the events to the FlowFile when the processor is stopped + currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS); + } currentSession.commitAsync(); } @@ -1174,7 +1283,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { } newStateMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(binlogPosition)); - newStateMap.put(EventWriter.SEQUENCE_ID_KEY, String.valueOf(sequenceId)); + newStateMap.put(SEQUENCE_ID_KEY, String.valueOf(sequenceId)); //add inTransaction value into state newStateMap.put("inTransaction", inTransaction ? "true" : "false"); diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy index 66828c3570..f67567bfa5 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy @@ -35,6 +35,7 @@ import org.apache.nifi.cdc.event.ColumnDefinition import org.apache.nifi.cdc.event.TableInfo import org.apache.nifi.cdc.event.TableInfoCacheKey import org.apache.nifi.cdc.event.io.EventWriter +import org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy import org.apache.nifi.cdc.mysql.MockBinlogClient import org.apache.nifi.cdc.mysql.event.BinlogEventInfo import org.apache.nifi.cdc.mysql.processors.ssl.BinaryLogSSLSocketFactory @@ -734,6 +735,238 @@ class CaptureChangeMySQLTest { assertEquals(5, resultFiles.size()) } + @Test + void testSkipTableMultipleEventsPerFlowFile() throws Exception { + testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION) + testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306') + testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root') + testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password') + testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds') + testRunner.setProperty(CaptureChangeMySQL.DATABASE_NAME_PATTERN, "myDB") + testRunner.setProperty(CaptureChangeMySQL.TABLE_NAME_PATTERN, "user") + testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true') + testRunner.setProperty(CaptureChangeMySQL.NUMBER_OF_EVENTS_PER_FLOWFILE, '2') + + testRunner.run(1, false, true) + + // ROTATE + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4, + [binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData + )) + + // BEGIN + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4, + [database: 'myDB', sql: 'BEGIN'] as QueryEventData + )) + + // TABLE MAP for table not matching the regex (note the s on the end of users vs the regex of 'user') + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4, + [tableId: 1, database: 'myDB', table: 'users', columnTypes: [4, -4] as byte[]] as TableMapEventData + )) + + // This WRITE ROWS should be skipped + def cols = new BitSet() + cols.set(1) + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4, + [tableId: 1, includedColumns: cols, + rows : [[2, 'Smith'] as Serializable[]] as List] as WriteRowsEventData + )) + + // TABLE MAP for table matching, all modification events (1) should be emitted + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 10] as EventHeaderV4, + [tableId: 1, database: 'myDB', table: 'user', columnTypes: [4, -4] as byte[]] as TableMapEventData + )) + + // WRITE ROWS for matching table + cols = new BitSet() + cols.set(1) + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 12] as EventHeaderV4, + [tableId: 1, includedColumns: cols, + rows : [[10, 'Cruz'] as Serializable[]] as List] as WriteRowsEventData + )) + + // COMMIT + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4, + {} as EventData + )) + + //////////////////////// + // Test database filter + //////////////////////// + + // BEGIN + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4, + [database: 'myDB', sql: 'BEGIN'] as QueryEventData + )) + + // TABLE MAP for database not matching the regex + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4, + [tableId: 1, database: 'myDB', table: 'notMyDB', columnTypes: [4, -4] as byte[]] as TableMapEventData + )) + + // This WRITE ROWS should be skipped + cols = new BitSet() + cols.set(1) + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4, + [tableId: 1, includedColumns: cols, + rows : [[2, 'Smith'] as Serializable[]] as List] as WriteRowsEventData + )) + + // COMMIT + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4, + {} as EventData + )) + + testRunner.run(1, true, false) + + def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS) + // Five events total, 2 max per flow file, so 3 flow files + assertEquals(3, resultFiles.size()) + def json = new JsonSlurper().parseText(new String(resultFiles[0].toByteArray())) + assertTrue (json instanceof ArrayList) + assertEquals(2, json.size()) + // BEGIN, INSERT, COMMIT (verifies that one of the INSERTs was skipped) + assertEquals('begin', json[0]?.type) + assertEquals('insert', json[1]?.type) + + json = new JsonSlurper().parseText(new String(resultFiles[1].toByteArray())) + assertTrue (json instanceof ArrayList) + assertEquals(2, json.size()) + assertEquals('commit', json[0]?.type) + assertEquals('begin', json[1]?.type) + + json = new JsonSlurper().parseText(new String(resultFiles[2].toByteArray())) + assertTrue (json instanceof ArrayList) + // One event left + assertEquals(1, json.size()) + assertEquals('commit', json[0]?.type) + } + + @Test + void testSkipTableOneTransactionPerFlowFile() throws Exception { + testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION) + testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306') + testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root') + testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password') + testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds') + testRunner.setProperty(CaptureChangeMySQL.DATABASE_NAME_PATTERN, "myDB") + testRunner.setProperty(CaptureChangeMySQL.TABLE_NAME_PATTERN, "user") + testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true') + testRunner.setProperty(CaptureChangeMySQL.EVENTS_PER_FLOWFILE_STRATEGY, FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.name()) + + testRunner.run(1, false, true) + + // ROTATE + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4, + [binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData + )) + + // BEGIN + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4, + [database: 'myDB', sql: 'BEGIN'] as QueryEventData + )) + + // TABLE MAP for table not matching the regex (note the s on the end of users vs the regex of 'user') + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4, + [tableId: 1, database: 'myDB', table: 'users', columnTypes: [4, -4] as byte[]] as TableMapEventData + )) + + // This WRITE ROWS should be skipped + def cols = new BitSet() + cols.set(1) + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4, + [tableId: 1, includedColumns: cols, + rows : [[2, 'Smith'] as Serializable[]] as List] as WriteRowsEventData + )) + + // TABLE MAP for table matching, all modification events (1) should be emitted + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 10] as EventHeaderV4, + [tableId: 1, database: 'myDB', table: 'user', columnTypes: [4, -4] as byte[]] as TableMapEventData + )) + + // WRITE ROWS for matching table + cols = new BitSet() + cols.set(1) + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 12] as EventHeaderV4, + [tableId: 1, includedColumns: cols, + rows : [[10, 'Cruz'] as Serializable[]] as List] as WriteRowsEventData + )) + + // COMMIT + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4, + {} as EventData + )) + + //////////////////////// + // Test database filter + //////////////////////// + + // BEGIN + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4, + [database: 'myDB', sql: 'BEGIN'] as QueryEventData + )) + + // TABLE MAP for database not matching the regex + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4, + [tableId: 1, database: 'myDB', table: 'notMyDB', columnTypes: [4, -4] as byte[]] as TableMapEventData + )) + + // This WRITE ROWS should be skipped + cols = new BitSet() + cols.set(1) + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4, + [tableId: 1, includedColumns: cols, + rows : [[2, 'Smith'] as Serializable[]] as List] as WriteRowsEventData + )) + + // COMMIT + client.sendEvent(new Event( + [timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4, + {} as EventData + )) + + testRunner.run(1, true, false) + + def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS) + // Five events total, 3 max per flow file, so 2 flow files + assertEquals(2, resultFiles.size()) + def json = new JsonSlurper().parseText(new String(resultFiles[0].toByteArray())) + assertTrue (json instanceof ArrayList) + assertEquals(3, json.size()) + // BEGIN, INSERT, COMMIT (verifies that one of the INSERTs was skipped) + assertEquals('begin', json[0]?.type) + assertEquals('insert', json[1]?.type) + assertEquals('commit', json[2]?.type) + + json = new JsonSlurper().parseText(new String(resultFiles[1].toByteArray())) + assertTrue (json instanceof ArrayList) + // Only two events left + assertEquals(2, json.size()) + assertEquals('begin', json[0]?.type) + assertEquals('commit', json[1]?.type) + } + @Test void testFilterDatabase() throws Exception { testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION)