NIFI-11094: Allow CaptureChangeMySQL to send multiple events per FlowFile

This closes #6907.

Co-authored-by: Tamas Palfy <tpalfy@apache.org>

Signed-off-by: Tamas Palfy <tpalfy@apache.org>
This commit is contained in:
Matthew Burgess 2023-01-30 16:59:53 -05:00 committed by Tamas Palfy
parent 7e97c1ce19
commit a2b98e0c9c
13 changed files with 733 additions and 127 deletions

View File

@ -33,7 +33,9 @@ public abstract class AbstractEventWriter<T extends EventInfo> implements EventW
// Common method to create a JSON generator and start the root object. Should be called by sub-classes unless they need their own generator and such.
protected void startJson(OutputStream outputStream, T event) throws IOException {
jsonGenerator = createJsonGenerator(outputStream);
if (jsonGenerator == null) {
jsonGenerator = createJsonGenerator(outputStream);
}
jsonGenerator.writeStartObject();
String eventType = event.getEventType();
if (eventType == null) {
@ -54,11 +56,15 @@ public abstract class AbstractEventWriter<T extends EventInfo> implements EventW
throw new IOException("endJson called without a JsonGenerator");
}
jsonGenerator.writeEndObject();
jsonGenerator.flush();
jsonGenerator.close();
}
private JsonGenerator createJsonGenerator(OutputStream out) throws IOException {
protected void endFile() throws IOException {
jsonGenerator.flush();
jsonGenerator.close();
jsonGenerator = null;
}
protected JsonGenerator createJsonGenerator(OutputStream out) throws IOException {
return JSON_FACTORY.createGenerator(out);
}
}

View File

@ -32,12 +32,14 @@ public interface EventWriter<T extends EventInfo> {
/**
* Writes the given event to the process session, possibly via transferring it to the specified relationship (usually used for success)
*
* @param session The session to write the event to
* @param transitUri The URI indicating the source MySQL system from which the specified event is associated
* @param eventInfo The event data
* @param currentSequenceId the current sequence ID
* @param relationship A relationship to transfer any flowfile(s) to
* @return a sequence ID, usually incremented from the specified current sequence id by the number of flow files transferred and/or committed
* @param session The session to write the event to
* @param transitUri The URI indicating the source MySQL system from which the specified event is associated
* @param eventInfo The event data
* @param currentSequenceId The current sequence ID
* @param relationship A relationship to transfer any flowfile(s) to
* @param eventWriterConfiguration A configuration object used for FlowFile management (how many events to write to each FlowFile, e.g.)
* @return a sequence ID, usually incremented from the specified current sequence ID by the number of FlowFiles transferred and/or committed
*/
long writeEvent(final ProcessSession session, String transitUri, final T eventInfo, final long currentSequenceId, Relationship relationship);
long writeEvent(final ProcessSession session, String transitUri, final T eventInfo, final long currentSequenceId, Relationship relationship,
final EventWriterConfiguration eventWriterConfiguration);
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.event.io;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.nifi.flowfile.FlowFile;
import java.io.IOException;
import java.io.OutputStream;
public class EventWriterConfiguration {
private final FlowFileEventWriteStrategy flowFileEventWriteStrategy;
private final int numberOfEventsPerFlowFile;
private int numberOfEventsWritten;
private FlowFile currentFlowFile;
private OutputStream flowFileOutputStream;
private JsonGenerator jsonGenerator;
public EventWriterConfiguration(FlowFileEventWriteStrategy flowFileEventWriteStrategy, int numberOfEventsPerFlowFile) {
this.flowFileEventWriteStrategy = flowFileEventWriteStrategy;
this.numberOfEventsPerFlowFile = numberOfEventsPerFlowFile;
}
public FlowFileEventWriteStrategy getFlowFileEventWriteStrategy() {
return flowFileEventWriteStrategy;
}
public int getNumberOfEventsWritten() {
return numberOfEventsWritten;
}
public void incrementNumberOfEventsWritten() {
this.numberOfEventsWritten++;
}
public void startNewFlowFile(FlowFile flowFile, OutputStream flowFileOutputStream, JsonGenerator jsonGenerator) {
this.currentFlowFile = flowFile;
this.flowFileOutputStream = flowFileOutputStream;
this.jsonGenerator = jsonGenerator;
}
public void cleanUp() throws IOException {
this.currentFlowFile = null;
this.flowFileOutputStream.close();
this.flowFileOutputStream = null;
this.jsonGenerator = null;
this.numberOfEventsWritten = 0;
}
public int getNumberOfEventsPerFlowFile() {
return numberOfEventsPerFlowFile;
}
public FlowFile getCurrentFlowFile() {
return currentFlowFile;
}
public OutputStream getFlowFileOutputStream() {
return flowFileOutputStream;
}
public JsonGenerator getJsonGenerator() {
return jsonGenerator;
}
}

View File

@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.event.io;
import org.apache.nifi.components.DescribedValue;
public enum FlowFileEventWriteStrategy implements DescribedValue {
MAX_EVENTS_PER_FLOWFILE(
"Max Events Per FlowFile",
"This strategy causes at most the number of events specified in the 'Number of Events Per FlowFile' property to be written per FlowFile. If the processor is stopped before the "
+ "specified number of events has been written (or the event queue becomes empty), the fewer number of events will still be written as a FlowFile before stopping."
),
ONE_TRANSACTION_PER_FLOWFILE(
"One Transaction Per FlowFile",
"This strategy causes each event from a transaction (from BEGIN to COMMIT) to be written to a FlowFile"
);
private String displayName;
private String description;
FlowFileEventWriteStrategy(String displayName, String description) {
this.displayName = displayName;
this.description = description;
}
@Override
public String getValue() {
return name();
}
@Override
public String getDisplayName() {
return displayName;
}
@Override
public String getDescription() {
return description;
}
}

View File

@ -16,14 +16,21 @@
*/
package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.event.EventInfo;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy;
import org.apache.nifi.cdc.mysql.event.BinlogEventInfo;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.cdc.event.io.AbstractEventWriter;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.FlowFileAccessException;
import org.apache.nifi.processor.exception.ProcessException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.util.HashMap;
import java.util.Map;
@ -44,35 +51,114 @@ public abstract class AbstractBinlogEventWriter<T extends BinlogEventInfo> exten
}
protected Map<String, String> getCommonAttributes(final long sequenceId, BinlogEventInfo eventInfo) {
return new HashMap<String, String>() {
{
put(SEQUENCE_ID_KEY, Long.toString(sequenceId));
put(CDC_EVENT_TYPE_ATTRIBUTE, eventInfo.getEventType());
String gtidSet = eventInfo.getBinlogGtidSet();
if (gtidSet == null) {
put(BinlogEventInfo.BINLOG_FILENAME_KEY, eventInfo.getBinlogFilename());
put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(eventInfo.getBinlogPosition()));
} else {
put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet);
}
put(CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
}
};
final Map<String, String> commonAttributeMap = new HashMap<>();
commonAttributeMap.put(SEQUENCE_ID_KEY, Long.toString(sequenceId));
commonAttributeMap.put(CDC_EVENT_TYPE_ATTRIBUTE, eventInfo.getEventType());
String gtidSet = eventInfo.getBinlogGtidSet();
if (gtidSet == null) {
commonAttributeMap.put(BinlogEventInfo.BINLOG_FILENAME_KEY, eventInfo.getBinlogFilename());
commonAttributeMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(eventInfo.getBinlogPosition()));
} else {
commonAttributeMap.put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet);
}
commonAttributeMap.put(CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
return commonAttributeMap;
}
// Default implementation for binlog events
@Override
public long writeEvent(ProcessSession session, String transitUri, T eventInfo, long currentSequenceId, Relationship relationship) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, (outputStream) -> {
public long writeEvent(ProcessSession session, String transitUri, T eventInfo, long currentSequenceId, Relationship relationship,
final EventWriterConfiguration eventWriterConfiguration) {
configureEventWriter(eventWriterConfiguration, session, eventInfo);
OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream();
try {
super.startJson(outputStream, eventInfo);
writeJson(eventInfo);
// Nothing in the body
super.endJson();
});
flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo));
session.transfer(flowFile, relationship);
session.getProvenanceReporter().receive(flowFile, transitUri);
} catch (IOException ioe) {
throw new UncheckedIOException("Write JSON start array failed", ioe);
}
eventWriterConfiguration.incrementNumberOfEventsWritten();
// Check if it is time to finish the FlowFile
if (maxEventsPerFlowFile(eventWriterConfiguration)
&& eventWriterConfiguration.getNumberOfEventsWritten() == eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, currentSequenceId, eventInfo, relationship);
}
return currentSequenceId + 1;
}
public void finishAndTransferFlowFile(final ProcessSession session, final EventWriterConfiguration eventWriterConfiguration, final String transitUri, final long seqId,
final BinlogEventInfo eventInfo, final Relationship relationship) {
if (writtenMultipleEvents(eventWriterConfiguration)) {
try {
jsonGenerator.writeEndArray();
} catch (IOException ioe) {
throw new UncheckedIOException("Write JSON end array failed", ioe);
}
}
try {
endFile();
FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
if (session == null || flowFile == null) {
throw new ProcessException("No open FlowFile or ProcessSession to write to");
}
flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId, eventInfo));
session.transfer(flowFile, relationship);
session.getProvenanceReporter().receive(flowFile, transitUri);
eventWriterConfiguration.cleanUp();
} catch (IOException ioe) {
throw new FlowFileAccessException("Failed to close event writer", ioe);
}
}
protected void configureEventWriter(final EventWriterConfiguration eventWriterConfiguration, final ProcessSession session, final EventInfo eventInfo) {
FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
if (flowFile == null) {
flowFile = session.create();
OutputStream flowFileOutputStream = session.write(flowFile);
if (eventWriterConfiguration.getJsonGenerator() == null) {
try {
jsonGenerator = createJsonGenerator(flowFileOutputStream);
} catch (IOException ioe) {
throw new UncheckedIOException("JSON Generator creation failed", ioe);
}
}
if (multipleEventsPerFlowFile(eventWriterConfiguration)) {
try {
jsonGenerator.writeStartArray();
} catch (IOException ioe) {
throw new UncheckedIOException("Write JSON start array failed", ioe);
}
}
eventWriterConfiguration.startNewFlowFile(flowFile, flowFileOutputStream, jsonGenerator);
}
jsonGenerator = eventWriterConfiguration.getJsonGenerator();
}
private boolean multipleEventsPerFlowFile(EventWriterConfiguration eventWriterConfiguration) {
return (maxEventsPerFlowFile(eventWriterConfiguration)
&& eventWriterConfiguration.getNumberOfEventsPerFlowFile() > 1)
|| oneTransactionPerFlowFile(eventWriterConfiguration);
}
private boolean writtenMultipleEvents(EventWriterConfiguration eventWriterConfiguration) {
return eventWriterConfiguration.getNumberOfEventsWritten() > 1
|| oneTransactionPerFlowFile(eventWriterConfiguration);
}
protected boolean maxEventsPerFlowFile(EventWriterConfiguration eventWriterConfiguration) {
return FlowFileEventWriteStrategy.MAX_EVENTS_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy());
}
protected boolean oneTransactionPerFlowFile(EventWriterConfiguration eventWriterConfiguration) {
return FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy());
}
}

View File

@ -16,9 +16,6 @@
*/
package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.cdc.mysql.event.BinlogTableEventInfo;
import java.io.IOException;
@ -46,20 +43,4 @@ public abstract class AbstractBinlogTableEventWriter<T extends BinlogTableEventI
jsonGenerator.writeNullField("table_id");
}
}
// Default implementation for table-related binlog events
@Override
public long writeEvent(ProcessSession session, String transitUri, T eventInfo, long currentSequenceId, Relationship relationship) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, (outputStream) -> {
super.startJson(outputStream, eventInfo);
writeJson(eventInfo);
// Nothing in the body
super.endJson();
});
flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo));
session.transfer(flowFile, relationship);
session.getProvenanceReporter().receive(flowFile, transitUri);
return currentSequenceId + 1;
}
}

View File

@ -17,7 +17,10 @@
package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import java.io.IOException;
@ -25,6 +28,18 @@ import java.io.IOException;
* A writer for events corresponding to the end (i.e. commit) of a MySQL transaction
*/
public class CommitTransactionEventWriter extends AbstractBinlogEventWriter<CommitTransactionEventInfo> {
@Override
public long writeEvent(ProcessSession session, String transitUri, CommitTransactionEventInfo eventInfo, long currentSequenceId,
Relationship relationship, EventWriterConfiguration eventWriterConfiguration) {
long sequenceId = super.writeEvent(session, transitUri, eventInfo, currentSequenceId, relationship, eventWriterConfiguration);
// If writing one transaction per flowfile, finish the flowfile here before committing the session
if (oneTransactionPerFlowFile(eventWriterConfiguration)) {
super.finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, sequenceId, eventInfo, relationship);
}
return sequenceId;
}
protected void writeJson(CommitTransactionEventInfo event) throws IOException {
super.writeJson(event);
if (event.getDatabaseName() != null) {

View File

@ -16,28 +16,43 @@
*/
package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.cdc.mysql.event.DDLEventInfo;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
/**
* A writer class to output MySQL binlog Data Definition Language (DDL) events to flow file(s).
*/
public class DDLEventWriter extends AbstractBinlogTableEventWriter<DDLEventInfo> {
@Override
public long writeEvent(ProcessSession session, String transitUri, DDLEventInfo eventInfo, long currentSequenceId, Relationship relationship) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, (outputStream) -> {
public long writeEvent(ProcessSession session, String transitUri, DDLEventInfo eventInfo, long currentSequenceId, Relationship relationship,
final EventWriterConfiguration eventWriterConfiguration) {
configureEventWriter(eventWriterConfiguration, session, eventInfo);
OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream();
try {
super.startJson(outputStream, eventInfo);
super.writeJson(eventInfo);
jsonGenerator.writeStringField("query", eventInfo.getQuery());
super.endJson();
});
flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo));
session.transfer(flowFile, relationship);
session.getProvenanceReporter().receive(flowFile, transitUri);
} catch (IOException ioe) {
throw new UncheckedIOException("Write JSON start array failed", ioe);
}
eventWriterConfiguration.incrementNumberOfEventsWritten();
// Check if it is time to finish the FlowFile
if (maxEventsPerFlowFile(eventWriterConfiguration)
&& eventWriterConfiguration.getNumberOfEventsWritten() == eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, currentSequenceId, eventInfo, relationship);
}
return currentSequenceId + 1;
}
}

View File

@ -16,18 +16,18 @@
*/
package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
import org.apache.nifi.processor.Relationship;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.BitSet;
import java.util.concurrent.atomic.AtomicLong;
/**
* A writer class to output MySQL binlog "delete rows" events to flow file(s).
@ -42,12 +42,13 @@ public class DeleteRowsWriter extends AbstractBinlogTableEventWriter<DeleteRowsE
* @return The next available CDC sequence ID for use by the CDC processor
*/
@Override
public long writeEvent(final ProcessSession session, String transitUri, final DeleteRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) {
final AtomicLong seqId = new AtomicLong(currentSequenceId);
public long writeEvent(final ProcessSession session, String transitUri, final DeleteRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship,
final EventWriterConfiguration eventWriterConfiguration) {
long seqId = currentSequenceId;
for (Serializable[] row : eventInfo.getRows()) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, outputStream -> {
configureEventWriter(eventWriterConfiguration, session, eventInfo);
OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream();
try {
super.startJson(outputStream, eventInfo);
super.writeJson(eventInfo);
@ -56,14 +57,20 @@ public class DeleteRowsWriter extends AbstractBinlogTableEventWriter<DeleteRowsE
writeRow(eventInfo, row, bitSet);
super.endJson();
});
} catch (IOException ioe) {
throw new UncheckedIOException("Write JSON start array failed", ioe);
}
flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo));
session.transfer(flowFile, relationship);
session.getProvenanceReporter().receive(flowFile, transitUri);
seqId.getAndIncrement();
eventWriterConfiguration.incrementNumberOfEventsWritten();
// Check if it is time to finish the FlowFile
if (maxEventsPerFlowFile(eventWriterConfiguration)
&& eventWriterConfiguration.getNumberOfEventsWritten() == eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, seqId, eventInfo, relationship);
}
seqId++;
}
return seqId.get();
return seqId;
}
protected void writeRow(DeleteRowsEventInfo event, Serializable[] row, BitSet includedColumns) throws IOException {

View File

@ -16,17 +16,18 @@
*/
package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
import org.apache.nifi.processor.Relationship;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.BitSet;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -42,12 +43,13 @@ public class InsertRowsWriter extends AbstractBinlogTableEventWriter<InsertRowsE
* @return The next available CDC sequence ID for use by the CDC processor
*/
@Override
public long writeEvent(final ProcessSession session, String transitUri, final InsertRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) {
final AtomicLong seqId = new AtomicLong(currentSequenceId);
public long writeEvent(final ProcessSession session, String transitUri, final InsertRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship,
final EventWriterConfiguration eventWriterConfiguration) {
long seqId = currentSequenceId;
for (Serializable[] row : eventInfo.getRows()) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, outputStream -> {
configureEventWriter(eventWriterConfiguration, session, eventInfo);
OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream();
try {
super.startJson(outputStream, eventInfo);
super.writeJson(eventInfo);
@ -56,14 +58,20 @@ public class InsertRowsWriter extends AbstractBinlogTableEventWriter<InsertRowsE
writeRow(eventInfo, row, bitSet);
super.endJson();
});
} catch (IOException ioe) {
throw new UncheckedIOException("Write JSON start array failed", ioe);
}
flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo));
session.transfer(flowFile, relationship);
session.getProvenanceReporter().receive(flowFile, transitUri);
seqId.getAndIncrement();
eventWriterConfiguration.incrementNumberOfEventsWritten();
// Check if it is time to finish the FlowFile
if (maxEventsPerFlowFile(eventWriterConfiguration)
&& eventWriterConfiguration.getNumberOfEventsWritten() == eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, seqId, eventInfo, relationship);
}
seqId++;
}
return seqId.get();
return seqId;
}
protected void writeRow(InsertRowsEventInfo event, Serializable[] row, BitSet includedColumns) throws IOException {

View File

@ -16,18 +16,19 @@
*/
package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
import org.apache.nifi.processor.Relationship;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.BitSet;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
@ -43,13 +44,14 @@ public class UpdateRowsWriter extends AbstractBinlogTableEventWriter<UpdateRowsE
* @return The next available CDC sequence ID for use by the CDC processor
*/
@Override
public long writeEvent(final ProcessSession session, String transitUri, final UpdateRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) {
final AtomicLong seqId = new AtomicLong(currentSequenceId);
public long writeEvent(final ProcessSession session, String transitUri, final UpdateRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship,
final EventWriterConfiguration eventWriterConfiguration) {
long seqId = currentSequenceId;
for (Map.Entry<Serializable[], Serializable[]> row : eventInfo.getRows()) {
configureEventWriter(eventWriterConfiguration, session, eventInfo);
OutputStream outputStream = eventWriterConfiguration.getFlowFileOutputStream();
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, outputStream -> {
try {
super.startJson(outputStream, eventInfo);
super.writeJson(eventInfo);
@ -57,14 +59,20 @@ public class UpdateRowsWriter extends AbstractBinlogTableEventWriter<UpdateRowsE
writeRow(eventInfo, row, bitSet);
super.endJson();
});
} catch (IOException ioe) {
throw new UncheckedIOException("Write JSON start array failed", ioe);
}
flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo));
session.transfer(flowFile, relationship);
session.getProvenanceReporter().receive(flowFile, transitUri);
seqId.getAndIncrement();
eventWriterConfiguration.incrementNumberOfEventsWritten();
// Check if it is time to finish the FlowFile
if (maxEventsPerFlowFile(eventWriterConfiguration)
&& eventWriterConfiguration.getNumberOfEventsWritten() == eventWriterConfiguration.getNumberOfEventsPerFlowFile()) {
finishAndTransferFlowFile(session, eventWriterConfiguration, transitUri, seqId, eventInfo, relationship);
}
seqId++;
}
return seqId.get();
return seqId;
}
protected void writeRow(UpdateRowsEventInfo event, Map.Entry<Serializable[], Serializable[]> row, BitSet includedColumns) throws IOException {

View File

@ -44,7 +44,6 @@ import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.event.RowEventException;
import org.apache.nifi.cdc.event.TableInfo;
import org.apache.nifi.cdc.event.TableInfoCacheKey;
import org.apache.nifi.cdc.event.io.EventWriter;
import org.apache.nifi.cdc.mysql.event.BeginTransactionEventInfo;
import org.apache.nifi.cdc.mysql.event.BinlogEventInfo;
import org.apache.nifi.cdc.mysql.event.BinlogEventListener;
@ -52,9 +51,12 @@ import org.apache.nifi.cdc.mysql.event.BinlogLifecycleListener;
import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
import org.apache.nifi.cdc.mysql.event.DDLEventInfo;
import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy;
import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
import org.apache.nifi.cdc.mysql.event.RawBinlogEvent;
import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
import org.apache.nifi.cdc.mysql.event.io.BeginTransactionEventWriter;
import org.apache.nifi.cdc.mysql.event.io.CommitTransactionEventWriter;
import org.apache.nifi.cdc.mysql.event.io.DDLEventWriter;
@ -78,6 +80,7 @@ import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
@ -132,6 +135,9 @@ import static com.github.shyiko.mysql.binlog.event.EventType.PRE_GA_WRITE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.ROTATE;
import static com.github.shyiko.mysql.binlog.event.EventType.WRITE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.XID;
import static org.apache.nifi.cdc.event.io.EventWriter.CDC_EVENT_TYPE_ATTRIBUTE;
import static org.apache.nifi.cdc.event.io.EventWriter.SEQUENCE_ID_KEY;
import static org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy.MAX_EVENTS_PER_FLOWFILE;
/**
@ -140,15 +146,17 @@ import static com.github.shyiko.mysql.binlog.event.EventType.XID;
@TriggerSerially
@PrimaryNodeOnly
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags({"sql", "jdbc", "cdc", "mysql"})
@Tags({"sql", "jdbc", "cdc", "mysql", "transaction", "event"})
@CapabilityDescription("Retrieves Change Data Capture (CDC) events from a MySQL database. CDC Events include INSERT, UPDATE, DELETE operations. Events "
+ "are output as individual flow files ordered by the time at which the operation occurred.")
+ "are output as either a group of a specified number of events (the default is 1 so each event becomes its own flow file) or grouped as a full transaction (BEGIN to COMMIT). All events "
+ "are ordered by the time at which the operation occurred. NOTE: If the processor is stopped before the specified number of events have been written to a flow file, "
+ "the partial flow file will be output in order to maintain the consistency of the event stream.")
@Stateful(scopes = Scope.CLUSTER, description = "Information such as a 'pointer' to the current CDC event in the database is stored by this processor, such "
+ "that it can continue from the same location if restarted.")
@WritesAttributes({
@WritesAttribute(attribute = EventWriter.SEQUENCE_ID_KEY, description = "A sequence identifier (i.e. strictly increasing integer value) specifying the order "
@WritesAttribute(attribute = SEQUENCE_ID_KEY, description = "A sequence identifier (i.e. strictly increasing integer value) specifying the order "
+ "of the CDC event flow file relative to the other event flow file(s)."),
@WritesAttribute(attribute = EventWriter.CDC_EVENT_TYPE_ATTRIBUTE, description = "A string indicating the type of CDC event that occurred, including (but not limited to) "
@WritesAttribute(attribute = CDC_EVENT_TYPE_ATTRIBUTE, description = "A string indicating the type of CDC event that occurred, including (but not limited to) "
+ "'begin', 'insert', 'update', 'delete', 'ddl' and 'commit'."),
@WritesAttribute(attribute = "mime.type", description = "The processor outputs flow file content in JSON format, and sets the mime.type attribute to "
+ "application/json")
@ -185,6 +193,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
SSLMode.VERIFY_IDENTITY.toString(),
"Connect with TLS or fail when server support not enabled. Verify server hostname matches presented X.509 certificate names or fail when not matched");
// Properties
public static final PropertyDescriptor DATABASE_NAME_PATTERN = new PropertyDescriptor.Builder()
.name("capture-change-mysql-db-name-pattern")
@ -269,6 +278,31 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor EVENTS_PER_FLOWFILE_STRATEGY = new PropertyDescriptor.Builder()
.name("events-per-flowfile-strategy")
.displayName("Event Processing Strategy")
.description("Specifies the strategy to use when writing events to FlowFile(s), such as '" + MAX_EVENTS_PER_FLOWFILE.getDisplayName() + "'")
.required(true)
.sensitive(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues(FlowFileEventWriteStrategy.class)
.defaultValue(MAX_EVENTS_PER_FLOWFILE.getValue())
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final PropertyDescriptor NUMBER_OF_EVENTS_PER_FLOWFILE = new PropertyDescriptor.Builder()
.name("number-of-events-per-flowfile")
.displayName("Events Per FlowFile")
.description("Specifies how many events should be written to a single FlowFile. If the processor is stopped before the specified number of events has been written,"
+ "the events will still be written as a FlowFile before stopping.")
.required(true)
.sensitive(false)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("1")
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.dependsOn(EVENTS_PER_FLOWFILE_STRATEGY, MAX_EVENTS_PER_FLOWFILE.getValue())
.build();
public static final PropertyDescriptor SERVER_ID = new PropertyDescriptor.Builder()
.name("capture-change-mysql-server-id")
.displayName("Server ID")
@ -479,6 +513,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
private final DeleteRowsWriter deleteRowsWriter = new DeleteRowsWriter();
private final UpdateRowsWriter updateRowsWriter = new UpdateRowsWriter();
private volatile EventWriterConfiguration eventWriterConfiguration;
private volatile BinlogEventInfo currentEventInfo;
private AbstractBinlogEventWriter<? extends BinlogEventInfo> currentEventWriter;
static {
final Set<Relationship> r = new HashSet<>();
@ -491,6 +529,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
pds.add(DRIVER_LOCATION);
pds.add(USERNAME);
pds.add(PASSWORD);
pds.add(EVENTS_PER_FLOWFILE_STRATEGY);
pds.add(NUMBER_OF_EVENTS_PER_FLOWFILE);
pds.add(SERVER_ID);
pds.add(DATABASE_NAME_PATTERN);
pds.add(TABLE_NAME_PATTERN);
@ -569,6 +609,13 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
return;
}
// Build a event writer config object for the event writers to use
final FlowFileEventWriteStrategy flowFileEventWriteStrategy = FlowFileEventWriteStrategy.valueOf(context.getProperty(EVENTS_PER_FLOWFILE_STRATEGY).getValue());
eventWriterConfiguration = new EventWriterConfiguration(
flowFileEventWriteStrategy,
context.getProperty(NUMBER_OF_EVENTS_PER_FLOWFILE).evaluateAttributeExpressions().asInteger()
);
PropertyValue dbNameValue = context.getProperty(DATABASE_NAME_PATTERN);
databaseNamePattern = dbNameValue.isSet() ? Pattern.compile(dbNameValue.getValue()) : null;
@ -624,7 +671,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
// Get current sequence ID from state
String seqIdString = stateMap.get(EventWriter.SEQUENCE_ID_KEY);
String seqIdString = stateMap.get(SEQUENCE_ID_KEY);
if (StringUtils.isEmpty(seqIdString)) {
// Use Initial Sequence ID property if none is found in state
PropertyValue seqIdProp = context.getProperty(INIT_SEQUENCE_ID);
@ -888,7 +935,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !useGtid) {
currentBinlogPosition = header.getPosition();
}
log.debug("Got message event type: {} ", new Object[]{header.getEventType().toString()});
log.debug("Got message event type: {} ", header.getEventType().toString());
switch (eventType) {
case TABLE_MAP:
// This is sent to inform which table is about to be changed by subsequent events
@ -952,7 +999,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
BeginTransactionEventInfo beginEvent = useGtid
? new BeginTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
: new BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS));
currentEventInfo = beginEvent;
currentEventWriter = beginEventWriter;
currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
}
inTransaction = true;
//update inTransaction value to state
@ -963,17 +1013,35 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
+ "This could indicate that your binlog position is invalid.");
}
// InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here
if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
CommitTransactionEventInfo commitTransactionEvent = useGtid
? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
: new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
if (includeBeginCommit) {
if (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches()) {
CommitTransactionEventInfo commitTransactionEvent = useGtid
? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
: new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
currentEventInfo = commitTransactionEvent;
currentEventWriter = commitEventWriter;
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
}
} else {
// If the COMMIT event is not to be written, the FlowFile should still be finished and the session committed.
if (currentSession != null) {
FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
if (flowFile != null && currentEventWriter != null) {
// Flush the events to the FlowFile when the processor is stopped
currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS);
}
currentSession.commitAsync();
}
}
//update inTransaction value to state
inTransaction = false;
updateState(session);
// Commit the NiFi session
session.commitAsync();
// If there is no FlowFile open, commit the session
if (eventWriterConfiguration.getCurrentFlowFile() == null) {
// Commit the NiFi session
session.commitAsync();
}
currentTable = null;
} else {
// Check for DDL events (alter table, e.g.). Normalize the query to do string matching on the type of change
@ -993,7 +1061,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
DDLEventInfo ddlEvent = useGtid
? new DDLEventInfo(ddlTableInfo, timestamp, currentGtidSet, sql)
: new DDLEventInfo(ddlTableInfo, timestamp, currentBinlogFile, currentBinlogPosition, sql);
currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS));
currentEventInfo = ddlEvent;
currentEventWriter = ddlEventWriter;
currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
}
// Remove all the keys from the cache that this processor added
if (cacheClient != null) {
@ -1002,7 +1072,19 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
// If not in a transaction, commit the session so the DDL event(s) will be transferred
if (includeDDLEvents && !inTransaction) {
updateState(session);
session.commitAsync();
if (FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())) {
if (currentSession != null) {
FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
if (flowFile != null && currentEventWriter != null) {
// Flush the events to the FlowFile when the processor is stopped
currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS);
}
}
}
// If there is no FlowFile open, commit the session
if (eventWriterConfiguration.getCurrentFlowFile() == null) {
session.commitAsync();
}
}
}
}
@ -1013,19 +1095,35 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
throw new IOException("COMMIT event received while not processing a transaction (i.e. no corresponding BEGIN event). "
+ "This could indicate that your binlog position is invalid.");
}
if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
CommitTransactionEventInfo commitTransactionEvent = useGtid
? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
: new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
if (includeBeginCommit) {
if (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches()) {
CommitTransactionEventInfo commitTransactionEvent = useGtid
? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
: new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
currentEventInfo = commitTransactionEvent;
currentEventWriter = commitEventWriter;
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
}
} else {
// If the COMMIT event is not to be written, the FlowFile should still be finished and the session committed.
if (currentSession != null) {
FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
if (flowFile != null && currentEventWriter != null) {
// Flush the events to the FlowFile when the processor is stopped
currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS);
}
currentSession.commitAsync();
}
}
// Commit the NiFi session
// update inTransaction value and save next position
// so when restart this processor,we will not read xid event again
inTransaction = false;
currentBinlogPosition = header.getNextPosition();
updateState(session);
session.commitAsync();
// If there is no FlowFile open, commit the session
if (eventWriterConfiguration.getCurrentFlowFile() == null) {
session.commitAsync();
}
currentTable = null;
currentDatabase = null;
break;
@ -1060,7 +1158,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
InsertRowsEventInfo eventInfo = useGtid
? new InsertRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData())
: new InsertRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
currentSequenceId.set(insertRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS));
currentEventInfo = eventInfo;
currentEventWriter = insertRowsWriter;
currentSequenceId.set(insertRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
} else if (eventType == DELETE_ROWS
|| eventType == EXT_DELETE_ROWS
@ -1069,14 +1169,18 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
DeleteRowsEventInfo eventInfo = useGtid
? new DeleteRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData())
: new DeleteRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
currentSequenceId.set(deleteRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS));
currentEventInfo = eventInfo;
currentEventWriter = deleteRowsWriter;
currentSequenceId.set(deleteRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
} else {
// Update event
UpdateRowsEventInfo eventInfo = useGtid
? new UpdateRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData())
: new UpdateRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
currentSequenceId.set(updateRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS));
currentEventInfo = eventInfo;
currentEventWriter = updateRowsWriter;
currentSequenceId.set(updateRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
}
break;
@ -1144,6 +1248,11 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
if (currentSession != null) {
FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
if (flowFile != null && currentEventWriter != null) {
// Flush the events to the FlowFile when the processor is stopped
currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS);
}
currentSession.commitAsync();
}
@ -1174,7 +1283,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
newStateMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(binlogPosition));
newStateMap.put(EventWriter.SEQUENCE_ID_KEY, String.valueOf(sequenceId));
newStateMap.put(SEQUENCE_ID_KEY, String.valueOf(sequenceId));
//add inTransaction value into state
newStateMap.put("inTransaction", inTransaction ? "true" : "false");

View File

@ -35,6 +35,7 @@ import org.apache.nifi.cdc.event.ColumnDefinition
import org.apache.nifi.cdc.event.TableInfo
import org.apache.nifi.cdc.event.TableInfoCacheKey
import org.apache.nifi.cdc.event.io.EventWriter
import org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy
import org.apache.nifi.cdc.mysql.MockBinlogClient
import org.apache.nifi.cdc.mysql.event.BinlogEventInfo
import org.apache.nifi.cdc.mysql.processors.ssl.BinaryLogSSLSocketFactory
@ -734,6 +735,238 @@ class CaptureChangeMySQLTest {
assertEquals(5, resultFiles.size())
}
@Test
void testSkipTableMultipleEventsPerFlowFile() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION)
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
testRunner.setProperty(CaptureChangeMySQL.DATABASE_NAME_PATTERN, "myDB")
testRunner.setProperty(CaptureChangeMySQL.TABLE_NAME_PATTERN, "user")
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
testRunner.setProperty(CaptureChangeMySQL.NUMBER_OF_EVENTS_PER_FLOWFILE, '2')
testRunner.run(1, false, true)
// ROTATE
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
[binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
))
// BEGIN
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
[database: 'myDB', sql: 'BEGIN'] as QueryEventData
))
// TABLE MAP for table not matching the regex (note the s on the end of users vs the regex of 'user')
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4,
[tableId: 1, database: 'myDB', table: 'users', columnTypes: [4, -4] as byte[]] as TableMapEventData
))
// This WRITE ROWS should be skipped
def cols = new BitSet()
cols.set(1)
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
[tableId: 1, includedColumns: cols,
rows : [[2, 'Smith'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
))
// TABLE MAP for table matching, all modification events (1) should be emitted
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 10] as EventHeaderV4,
[tableId: 1, database: 'myDB', table: 'user', columnTypes: [4, -4] as byte[]] as TableMapEventData
))
// WRITE ROWS for matching table
cols = new BitSet()
cols.set(1)
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 12] as EventHeaderV4,
[tableId: 1, includedColumns: cols,
rows : [[10, 'Cruz'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
))
// COMMIT
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4,
{} as EventData
))
////////////////////////
// Test database filter
////////////////////////
// BEGIN
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
[database: 'myDB', sql: 'BEGIN'] as QueryEventData
))
// TABLE MAP for database not matching the regex
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4,
[tableId: 1, database: 'myDB', table: 'notMyDB', columnTypes: [4, -4] as byte[]] as TableMapEventData
))
// This WRITE ROWS should be skipped
cols = new BitSet()
cols.set(1)
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
[tableId: 1, includedColumns: cols,
rows : [[2, 'Smith'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
))
// COMMIT
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4,
{} as EventData
))
testRunner.run(1, true, false)
def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
// Five events total, 2 max per flow file, so 3 flow files
assertEquals(3, resultFiles.size())
def json = new JsonSlurper().parseText(new String(resultFiles[0].toByteArray()))
assertTrue (json instanceof ArrayList)
assertEquals(2, json.size())
// BEGIN, INSERT, COMMIT (verifies that one of the INSERTs was skipped)
assertEquals('begin', json[0]?.type)
assertEquals('insert', json[1]?.type)
json = new JsonSlurper().parseText(new String(resultFiles[1].toByteArray()))
assertTrue (json instanceof ArrayList)
assertEquals(2, json.size())
assertEquals('commit', json[0]?.type)
assertEquals('begin', json[1]?.type)
json = new JsonSlurper().parseText(new String(resultFiles[2].toByteArray()))
assertTrue (json instanceof ArrayList)
// One event left
assertEquals(1, json.size())
assertEquals('commit', json[0]?.type)
}
@Test
void testSkipTableOneTransactionPerFlowFile() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION)
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
testRunner.setProperty(CaptureChangeMySQL.DATABASE_NAME_PATTERN, "myDB")
testRunner.setProperty(CaptureChangeMySQL.TABLE_NAME_PATTERN, "user")
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
testRunner.setProperty(CaptureChangeMySQL.EVENTS_PER_FLOWFILE_STRATEGY, FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.name())
testRunner.run(1, false, true)
// ROTATE
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
[binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
))
// BEGIN
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
[database: 'myDB', sql: 'BEGIN'] as QueryEventData
))
// TABLE MAP for table not matching the regex (note the s on the end of users vs the regex of 'user')
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4,
[tableId: 1, database: 'myDB', table: 'users', columnTypes: [4, -4] as byte[]] as TableMapEventData
))
// This WRITE ROWS should be skipped
def cols = new BitSet()
cols.set(1)
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
[tableId: 1, includedColumns: cols,
rows : [[2, 'Smith'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
))
// TABLE MAP for table matching, all modification events (1) should be emitted
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 10] as EventHeaderV4,
[tableId: 1, database: 'myDB', table: 'user', columnTypes: [4, -4] as byte[]] as TableMapEventData
))
// WRITE ROWS for matching table
cols = new BitSet()
cols.set(1)
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 12] as EventHeaderV4,
[tableId: 1, includedColumns: cols,
rows : [[10, 'Cruz'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
))
// COMMIT
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4,
{} as EventData
))
////////////////////////
// Test database filter
////////////////////////
// BEGIN
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
[database: 'myDB', sql: 'BEGIN'] as QueryEventData
))
// TABLE MAP for database not matching the regex
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4,
[tableId: 1, database: 'myDB', table: 'notMyDB', columnTypes: [4, -4] as byte[]] as TableMapEventData
))
// This WRITE ROWS should be skipped
cols = new BitSet()
cols.set(1)
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
[tableId: 1, includedColumns: cols,
rows : [[2, 'Smith'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
))
// COMMIT
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4,
{} as EventData
))
testRunner.run(1, true, false)
def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
// Five events total, 3 max per flow file, so 2 flow files
assertEquals(2, resultFiles.size())
def json = new JsonSlurper().parseText(new String(resultFiles[0].toByteArray()))
assertTrue (json instanceof ArrayList)
assertEquals(3, json.size())
// BEGIN, INSERT, COMMIT (verifies that one of the INSERTs was skipped)
assertEquals('begin', json[0]?.type)
assertEquals('insert', json[1]?.type)
assertEquals('commit', json[2]?.type)
json = new JsonSlurper().parseText(new String(resultFiles[1].toByteArray()))
assertTrue (json instanceof ArrayList)
// Only two events left
assertEquals(2, json.size())
assertEquals('begin', json[0]?.type)
assertEquals('commit', json[1]?.type)
}
@Test
void testFilterDatabase() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION)