diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfo.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfo.java index d59306b537..ada962a175 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfo.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfo.java @@ -17,22 +17,14 @@ package org.apache.nifi.cdc.event; import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.nifi.distributed.cache.client.exception.DeserializationException; -import org.apache.nifi.distributed.cache.client.exception.SerializationException; -import java.io.IOException; -import java.io.OutputStream; -import java.util.ArrayList; import java.util.List; -import java.util.stream.Collectors; /** * A POJO for holding table information related to update events. */ public class TableInfo { - final static String DB_TABLE_NAME_DELIMITER = "@!@"; - private String databaseName; private String tableName; private Long tableId; @@ -92,64 +84,4 @@ public class TableInfo { result = 31 * result + (columns != null ? columns.hashCode() : 0); return result; } - - public static class Serializer implements org.apache.nifi.distributed.cache.client.Serializer { - - @Override - public void serialize(TableInfo value, OutputStream output) throws SerializationException, IOException { - StringBuilder sb = new StringBuilder(value.getDatabaseName()); - sb.append(DB_TABLE_NAME_DELIMITER); - sb.append(value.getTableName()); - sb.append(DB_TABLE_NAME_DELIMITER); - sb.append(value.getTableId()); - List columnDefinitions = value.getColumns(); - if (columnDefinitions != null && !columnDefinitions.isEmpty()) { - sb.append(DB_TABLE_NAME_DELIMITER); - sb.append(columnDefinitions.stream().map((col) -> col.getName() + DB_TABLE_NAME_DELIMITER + col.getType()).collect(Collectors.joining(DB_TABLE_NAME_DELIMITER))); - } - output.write(sb.toString().getBytes()); - } - } - - public static class Deserializer implements org.apache.nifi.distributed.cache.client.Deserializer { - - @Override - public TableInfo deserialize(byte[] input) throws DeserializationException, IOException { - // Don't bother deserializing if empty, just return null. This usually happens when the key is not found in the cache - if (input == null || input.length == 0) { - return null; - } - String inputString = new String(input); - String[] tokens = inputString.split(DB_TABLE_NAME_DELIMITER); - int numTokens = tokens.length; - if (numTokens < 3) { - throw new IOException("Could not deserialize TableInfo from the following value: " + inputString); - } - String dbName = tokens[0]; - String tableName = tokens[1]; - Long tableId; - try { - tableId = Long.parseLong(tokens[2]); - } catch (NumberFormatException nfe) { - throw new IOException("Illegal table ID: " + tokens[2]); - } - // Parse column names and types - List columnDefinitions = new ArrayList<>(); - for (int i = 0; i < numTokens - 3; i += 2) { - try { - int columnTypeIndex = i + 4; - int columnNameIndex = i + 3; - if (columnTypeIndex < numTokens) { - columnDefinitions.add(new ColumnDefinition(Integer.parseInt(tokens[columnTypeIndex]), tokens[columnNameIndex])); - } else { - throw new IOException("No type detected for column: " + tokens[columnNameIndex]); - } - } catch (NumberFormatException nfe) { - throw new IOException("Illegal column type value for column " + (i / 2 + 1) + ": " + tokens[i + 4]); - } - } - - return new TableInfo(dbName, tableName, tableId, columnDefinitions); - } - } } diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfoCacheKey.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfoCacheKey.java index 2f185e9b7e..81392642bf 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfoCacheKey.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-api/src/main/java/org/apache/nifi/cdc/event/TableInfoCacheKey.java @@ -17,12 +17,6 @@ package org.apache.nifi.cdc.event; import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.nifi.distributed.cache.client.exception.SerializationException; - -import java.io.IOException; -import java.io.OutputStream; - -import static org.apache.nifi.cdc.event.TableInfo.DB_TABLE_NAME_DELIMITER; /** * This class represents a key in a cache that contains information (column definitions, e.g.) for a database table @@ -80,19 +74,4 @@ public class TableInfoCacheKey { public String getUuidPrefix() { return uuidPrefix; } - - public static class Serializer implements org.apache.nifi.distributed.cache.client.Serializer { - - @Override - public void serialize(TableInfoCacheKey key, OutputStream output) throws SerializationException, IOException { - StringBuilder sb = new StringBuilder(key.getUuidPrefix()); - sb.append(DB_TABLE_NAME_DELIMITER); - sb.append(key.getDatabaseName()); - sb.append(DB_TABLE_NAME_DELIMITER); - sb.append(key.getTableName()); - sb.append(DB_TABLE_NAME_DELIMITER); - sb.append(key.getTableId()); - output.write(sb.toString().getBytes()); - } - } } \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml index 9dddc26a0c..d8cc4801e7 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/pom.xml @@ -41,7 +41,7 @@ language governing permissions and limitations under the License. --> com.zendesk mysql-binlog-connector-java - 0.27.6 + 0.28.0 org.apache.nifi diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DataCaptureState.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DataCaptureState.java new file mode 100644 index 0000000000..08ad96c8c8 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/DataCaptureState.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event; + +public class DataCaptureState { + + private String binlogFile = null; + private long binlogPosition = 4; + + private boolean useGtid = false; + private String gtidSet = null; + private long sequenceId = 0L; + + public DataCaptureState() { + } + + public DataCaptureState(String binlogFile, long binlogPosition, boolean useGtid, String gtidSet, long sequenceId) { + this.binlogFile = binlogFile; + this.binlogPosition = binlogPosition; + this.useGtid = useGtid; + this.gtidSet = gtidSet; + this.sequenceId = sequenceId; + } + + public String getBinlogFile() { + return binlogFile; + } + + public void setBinlogFile(String binlogFile) { + this.binlogFile = binlogFile; + } + + public long getBinlogPosition() { + return binlogPosition; + } + + public void setBinlogPosition(long binlogPosition) { + this.binlogPosition = binlogPosition; + } + + public boolean isUseGtid() { + return useGtid; + } + + public void setUseGtid(boolean useGtid) { + this.useGtid = useGtid; + } + + public String getGtidSet() { + return gtidSet; + } + + public void setGtidSet(String gtidSet) { + this.gtidSet = gtidSet; + } + + public long getSequenceId() { + return sequenceId; + } + + public void setSequenceId(long sequenceId) { + this.sequenceId = sequenceId; + } + + public DataCaptureState copy() { + return new DataCaptureState(binlogFile, binlogPosition, useGtid, gtidSet, sequenceId); + } +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtils.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtils.java deleted file mode 100644 index 313bfce763..0000000000 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtils.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.cdc.mysql.event; - -import java.io.Serializable; - -/** - * A utility class to provide MySQL- / binlog-specific constants and methods for processing events and data - */ -public class MySQLCDCUtils { - - public static Object getWritableObject(Integer type, Serializable value) { - if (value == null) { - return null; - } - if (type == null) { - if (value instanceof byte[]) { - return new String((byte[]) value); - } else if (value instanceof Number) { - return value; - } - } else if (value instanceof Number) { - return value; - } else { - if (value instanceof byte[]) { - return new String((byte[]) value); - } else { - return value.toString(); - } - } - return null; - } -} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BeginEventHandler.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BeginEventHandler.java new file mode 100644 index 0000000000..a58319d20f --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BeginEventHandler.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event.handler; + +import com.github.shyiko.mysql.binlog.event.QueryEventData; +import org.apache.nifi.cdc.event.io.EventWriterConfiguration; +import org.apache.nifi.cdc.mysql.event.BeginTransactionEventInfo; +import org.apache.nifi.cdc.mysql.event.DataCaptureState; +import org.apache.nifi.cdc.mysql.event.io.BeginTransactionEventWriter; +import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL; +import org.apache.nifi.processor.ProcessSession; + +import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS; + +public class BeginEventHandler implements BinlogEventHandler { + + private final BeginTransactionEventWriter eventWriter = new BeginTransactionEventWriter(); + + @Override + public void handleEvent(final QueryEventData eventData, final boolean writeEvent, final DataCaptureState dataCaptureState, + final CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState, + final String sql, final EventWriterConfiguration eventWriterConfiguration, final ProcessSession session, final long timestamp) { + if (writeEvent) { + final String currentDatabase = eventData.getDatabase(); + final BeginTransactionEventInfo beginEvent = dataCaptureState.isUseGtid() + ? new BeginTransactionEventInfo(currentDatabase, timestamp, dataCaptureState.getGtidSet()) + : new BeginTransactionEventInfo(currentDatabase, timestamp, dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition()); + + binlogEventState.setCurrentEventInfo(beginEvent); + binlogEventState.setCurrentEventWriter(eventWriter); + dataCaptureState.setSequenceId(eventWriter.writeEvent(session, binlogResourceInfo.getTransitUri(), beginEvent, dataCaptureState.getSequenceId(), + REL_SUCCESS, eventWriterConfiguration)); + } + binlogResourceInfo.setInTransaction(true); + } +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BinlogEventHandler.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BinlogEventHandler.java new file mode 100644 index 0000000000..44b8d505e7 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/BinlogEventHandler.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event.handler; + +import com.github.shyiko.mysql.binlog.event.EventData; +import org.apache.nifi.cdc.event.io.EventWriterConfiguration; +import org.apache.nifi.cdc.mysql.event.BinlogEventInfo; +import org.apache.nifi.cdc.mysql.event.DataCaptureState; +import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL; +import org.apache.nifi.processor.ProcessSession; + +public interface BinlogEventHandler { + + void handleEvent(final T eventData, + final boolean writeEvent, + final DataCaptureState dataCaptureState, + final CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, + final CaptureChangeMySQL.BinlogEventState binlogEventState, + final String sql, + final EventWriterConfiguration eventWriterConfiguration, + final ProcessSession session, + final long timestamp); +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/CommitEventHandler.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/CommitEventHandler.java new file mode 100644 index 0000000000..14027204ab --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/CommitEventHandler.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event.handler; + +import com.github.shyiko.mysql.binlog.event.EventData; +import org.apache.nifi.cdc.event.io.EventWriterConfiguration; +import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo; +import org.apache.nifi.cdc.mysql.event.DataCaptureState; +import org.apache.nifi.cdc.mysql.event.io.CommitTransactionEventWriter; +import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.processor.ProcessSession; + +import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS; + +public class CommitEventHandler implements BinlogEventHandler { + + private final CommitTransactionEventWriter eventWriter = new CommitTransactionEventWriter(); + + @Override + public void handleEvent(final EventData eventData, final boolean writeEvent, final DataCaptureState dataCaptureState, + final CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState, + final String sql, final EventWriterConfiguration eventWriterConfiguration, final ProcessSession session, final long timestamp) { + final String currentDatabase = binlogResourceInfo.getCurrentDatabase(); + final CommitTransactionEventInfo commitEvent = dataCaptureState.isUseGtid() + ? new CommitTransactionEventInfo(currentDatabase, timestamp, dataCaptureState.getGtidSet()) + : new CommitTransactionEventInfo(currentDatabase, timestamp, dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition()); + + if (writeEvent) { + binlogEventState.setCurrentEventInfo(commitEvent); + binlogEventState.setCurrentEventWriter(eventWriter); + dataCaptureState.setSequenceId(eventWriter.writeEvent(session, binlogResourceInfo.getTransitUri(), commitEvent, dataCaptureState.getSequenceId(), + REL_SUCCESS, eventWriterConfiguration)); + } else { + // If the COMMIT event is not to be written, the FlowFile should still be finished and the session committed. + if (session != null) { + FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile(); + if (flowFile != null) { + // Flush the events to the FlowFile when the processor is stopped + eventWriter.finishAndTransferFlowFile(session, eventWriterConfiguration, binlogResourceInfo.getTransitUri(), dataCaptureState.getSequenceId(), commitEvent, REL_SUCCESS); + } + session.commitAsync(); + } + } + + // Update inTransaction value to state + binlogResourceInfo.setInTransaction(false); + } +} \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DDLEventHandler.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DDLEventHandler.java new file mode 100644 index 0000000000..698d7fe011 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DDLEventHandler.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event.handler; + +import com.github.shyiko.mysql.binlog.event.QueryEventData; +import org.apache.nifi.cdc.event.TableInfo; +import org.apache.nifi.cdc.event.io.EventWriterConfiguration; +import org.apache.nifi.cdc.mysql.event.DDLEventInfo; +import org.apache.nifi.cdc.mysql.event.DataCaptureState; +import org.apache.nifi.cdc.mysql.event.io.DDLEventWriter; +import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL; +import org.apache.nifi.processor.ProcessSession; + +import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS; + +public class DDLEventHandler implements BinlogEventHandler { + + private final DDLEventWriter eventWriter = new DDLEventWriter(); + + @Override + public void handleEvent(final QueryEventData eventData, final boolean writeEvent, final DataCaptureState dataCaptureState, + final CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState, + final String sql, final EventWriterConfiguration eventWriterConfiguration, final ProcessSession session, final long timestamp) { + if (writeEvent) { + final TableInfo ddlTableInfo = binlogResourceInfo.getCurrentTable() != null + ? binlogResourceInfo.getCurrentTable() + : new TableInfo(binlogResourceInfo.getCurrentDatabase(), null, null, null); + final DDLEventInfo ddlEvent = dataCaptureState.isUseGtid() + ? new DDLEventInfo(ddlTableInfo, timestamp, dataCaptureState.getGtidSet(), sql) + : new DDLEventInfo(ddlTableInfo, timestamp, dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition(), sql); + + binlogEventState.setCurrentEventInfo(ddlEvent); + binlogEventState.setCurrentEventWriter(eventWriter); + dataCaptureState.setSequenceId(eventWriter.writeEvent(session, binlogResourceInfo.getTransitUri(), ddlEvent, dataCaptureState.getSequenceId(), + REL_SUCCESS, eventWriterConfiguration)); + } + } +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DeleteEventHandler.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DeleteEventHandler.java new file mode 100644 index 0000000000..453a779aaf --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/DeleteEventHandler.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event.handler; + +import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData; +import org.apache.nifi.cdc.event.io.EventWriterConfiguration; +import org.apache.nifi.cdc.mysql.event.DataCaptureState; +import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo; +import org.apache.nifi.cdc.mysql.event.io.DeleteRowsWriter; +import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL; +import org.apache.nifi.processor.ProcessSession; + +import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS; + +public class DeleteEventHandler implements BinlogEventHandler { + + private final DeleteRowsWriter eventWriter = new DeleteRowsWriter(); + + @Override + public void handleEvent(final DeleteRowsEventData eventData, final boolean writeEvent, final DataCaptureState dataCaptureState, + final CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState, + final String sql, final EventWriterConfiguration eventWriterConfiguration, final ProcessSession session, final long timestamp) { + if (writeEvent) { + final DeleteRowsEventInfo eventInfo = dataCaptureState.isUseGtid() + ? new DeleteRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, dataCaptureState.getGtidSet(), eventData) + : new DeleteRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition(), eventData); + + binlogEventState.setCurrentEventInfo(eventInfo); + binlogEventState.setCurrentEventWriter(eventWriter); + dataCaptureState.setSequenceId(eventWriter.writeEvent(session, binlogResourceInfo.getTransitUri(), eventInfo, dataCaptureState.getSequenceId(), + REL_SUCCESS, eventWriterConfiguration)); + } + } +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/InsertEventHandler.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/InsertEventHandler.java new file mode 100644 index 0000000000..0feae1f277 --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/InsertEventHandler.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event.handler; + +import com.github.shyiko.mysql.binlog.event.WriteRowsEventData; +import org.apache.nifi.cdc.event.io.EventWriterConfiguration; +import org.apache.nifi.cdc.mysql.event.DataCaptureState; +import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo; +import org.apache.nifi.cdc.mysql.event.io.InsertRowsWriter; +import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL; +import org.apache.nifi.processor.ProcessSession; + +import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS; + +public class InsertEventHandler implements BinlogEventHandler { + + private final InsertRowsWriter eventWriter = new InsertRowsWriter(); + @Override + public void handleEvent(final WriteRowsEventData eventData, final boolean writeEvent, final DataCaptureState dataCaptureState, + final CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState, + final String sql, final EventWriterConfiguration eventWriterConfiguration, final ProcessSession session, final long timestamp) { + if (writeEvent) { + final InsertRowsEventInfo eventInfo = dataCaptureState.isUseGtid() + ? new InsertRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, dataCaptureState.getGtidSet(), eventData) + : new InsertRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition(), eventData); + + binlogEventState.setCurrentEventInfo(eventInfo); + binlogEventState.setCurrentEventWriter(eventWriter); + dataCaptureState.setSequenceId(eventWriter.writeEvent(session, binlogResourceInfo.getTransitUri(), eventInfo, dataCaptureState.getSequenceId(), + REL_SUCCESS, eventWriterConfiguration)); + } + } +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/UpdateEventHandler.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/UpdateEventHandler.java new file mode 100644 index 0000000000..2d538af06e --- /dev/null +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/handler/UpdateEventHandler.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.cdc.mysql.event.handler; + +import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData; +import org.apache.nifi.cdc.event.io.EventWriterConfiguration; +import org.apache.nifi.cdc.mysql.event.DataCaptureState; +import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo; +import org.apache.nifi.cdc.mysql.event.io.UpdateRowsWriter; +import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL; +import org.apache.nifi.processor.ProcessSession; + +import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS; + +public class UpdateEventHandler implements BinlogEventHandler { + + private final UpdateRowsWriter eventWriter = new UpdateRowsWriter(); + + @Override + public void handleEvent(final UpdateRowsEventData eventData, final boolean writeEvent, final DataCaptureState dataCaptureState, + final CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState, + final String sql, final EventWriterConfiguration eventWriterConfiguration, final ProcessSession session, final long timestamp) { + if (writeEvent) { + final UpdateRowsEventInfo eventInfo = dataCaptureState.isUseGtid() + ? new UpdateRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, dataCaptureState.getGtidSet(), eventData) + : new UpdateRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition(), eventData); + + binlogEventState.setCurrentEventInfo(eventInfo); + binlogEventState.setCurrentEventWriter(eventWriter); + dataCaptureState.setSequenceId(eventWriter.writeEvent(session, binlogResourceInfo.getTransitUri(), eventInfo, dataCaptureState.getSequenceId(), + REL_SUCCESS, eventWriterConfiguration)); + } + } +} diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java index f618a76a6f..013b63edf9 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.java @@ -19,12 +19,36 @@ package org.apache.nifi.cdc.mysql.event.io; import org.apache.nifi.cdc.mysql.event.BinlogTableEventInfo; import java.io.IOException; +import java.io.Serializable; /** * An abstract base class for writing MYSQL table-related binlog events into flow file(s), e.g. */ public abstract class AbstractBinlogTableEventWriter extends AbstractBinlogEventWriter { + protected Object getWritableObject(Integer type, Serializable value) { + if (value == null) { + return null; + } + if (type == null) { + if (value instanceof byte[]) { + return new String((byte[]) value); + } else if (value instanceof Number) { + return value; + } else { + return null; + } + } else { + if (value instanceof byte[]) { + return new String((byte[]) value); + } else if (value instanceof Number) { + return value; + } else { + return value.toString(); + } + } + } + protected void writeJson(T event) throws IOException { super.writeJson(event); if (event.getDatabaseName() != null) { diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java index 4828ce8662..8bf4b0220b 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.java @@ -17,7 +17,6 @@ package org.apache.nifi.cdc.mysql.event.io; import org.apache.nifi.cdc.event.io.EventWriterConfiguration; -import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.cdc.event.ColumnDefinition; import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo; @@ -89,7 +88,7 @@ public class DeleteRowsWriter extends AbstractBinlogTableEventWriter propDescriptors; - private volatile ProcessSession currentSession; - private BinaryLogClient binlogClient; - private BinlogEventListener eventListener; - private BinlogLifecycleListener lifecycleListener; - private GtidSet gtidSet; + private volatile BinaryLogClient binlogClient; + private volatile BinlogEventListener eventListener; + private volatile BinlogLifecycleListener lifecycleListener; + private volatile GtidSet gtidSet; // Set queue capacity to avoid excessive memory consumption private final BlockingQueue queue = new LinkedBlockingQueue<>(1000); - private volatile String currentBinlogFile = null; - private volatile long currentBinlogPosition = 4; - private volatile String currentGtidSet = null; - // The following variables save the value of the binlog filename, position, (sequence id), and gtid at the beginning of a transaction. Used for rollback - private volatile String xactBinlogFile = null; - private volatile long xactBinlogPosition = 4; - private volatile long xactSequenceId = 0; - private volatile String xactGtidSet = null; + private final Map tableInfoCache = new HashMap<>(); + + private volatile ProcessSession currentSession; + private DataCaptureState currentDataCaptureState = new DataCaptureState(); + + private volatile BinlogResourceInfo binlogResourceInfo = new BinlogResourceInfo(); - private volatile TableInfo currentTable = null; - private volatile String currentDatabase = null; private volatile Pattern databaseNamePattern; private volatile Pattern tableNamePattern; - private volatile boolean includeBeginCommit = false; - private volatile boolean includeDDLEvents = false; - private volatile boolean useGtid = false; - - private volatile boolean inTransaction = false; private volatile boolean skipTable = false; - private final AtomicBoolean hasRun = new AtomicBoolean(false); - private int currentHost = 0; - private String transitUri = ""; + private volatile JDBCConnectionHolder jdbcConnectionHolder = null; - private final AtomicLong currentSequenceId = new AtomicLong(0); + private final BinlogEventState binlogEventState = new BinlogEventState(); - private volatile DistributedMapCacheClient cacheClient = null; - private final Serializer cacheKeySerializer = new TableInfoCacheKey.Serializer(); - private final Serializer cacheValueSerializer = new TableInfo.Serializer(); - private final Deserializer cacheValueDeserializer = new TableInfo.Deserializer(); - - private JDBCConnectionHolder jdbcConnectionHolder = null; - - private final BeginTransactionEventWriter beginEventWriter = new BeginTransactionEventWriter(); - private final CommitTransactionEventWriter commitEventWriter = new CommitTransactionEventWriter(); - private final DDLEventWriter ddlEventWriter = new DDLEventWriter(); - private final InsertRowsWriter insertRowsWriter = new InsertRowsWriter(); - private final DeleteRowsWriter deleteRowsWriter = new DeleteRowsWriter(); - private final UpdateRowsWriter updateRowsWriter = new UpdateRowsWriter(); + private final BeginEventHandler beginEventHandler = new BeginEventHandler(); + private final CommitEventHandler commitEventHandler = new CommitEventHandler(); + private final DDLEventHandler ddlEventHandler = new DDLEventHandler(); + private final InsertEventHandler insertEventHandler = new InsertEventHandler(); + private final DeleteEventHandler deleteEventHandler = new DeleteEventHandler(); + private final UpdateEventHandler updateEventHandler = new UpdateEventHandler(); private volatile EventWriterConfiguration eventWriterConfiguration; - private volatile BinlogEventInfo currentEventInfo; - private AbstractBinlogEventWriter currentEventWriter; + static { @@ -609,6 +580,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { return; } + // Get inTransaction value from state + binlogResourceInfo.setInTransaction("true".equals(stateMap.get("inTransaction"))); + // Build a event writer config object for the event writers to use final FlowFileEventWriteStrategy flowFileEventWriteStrategy = FlowFileEventWriteStrategy.valueOf(context.getProperty(EVENTS_PER_FLOWFILE_STRATEGY).getValue()); eventWriterConfiguration = new EventWriterConfiguration( @@ -624,49 +598,52 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { boolean getAllRecords = context.getProperty(RETRIEVE_ALL_RECORDS).asBoolean(); - includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean(); - includeDDLEvents = context.getProperty(INCLUDE_DDL_EVENTS).asBoolean(); - useGtid = context.getProperty(USE_BINLOG_GTID).asBoolean(); + if (binlogResourceInfo == null) { + binlogResourceInfo = new BinlogResourceInfo(); + } + currentDataCaptureState.setUseGtid(context.getProperty(USE_BINLOG_GTID).asBoolean()); - if (useGtid) { + if (currentDataCaptureState.isUseGtid()) { // Set current gtid to whatever is in State, falling back to the Retrieve All Records then Initial Gtid if no State variable is present - currentGtidSet = stateMap.get(BinlogEventInfo.BINLOG_GTIDSET_KEY); - if (currentGtidSet == null) { + currentDataCaptureState.setGtidSet(stateMap.get(BinlogEventInfo.BINLOG_GTIDSET_KEY)); + if (currentDataCaptureState.getGtidSet() == null) { if (!getAllRecords && context.getProperty(INIT_BINLOG_GTID).isSet()) { - currentGtidSet = context.getProperty(INIT_BINLOG_GTID).evaluateAttributeExpressions().getValue(); + currentDataCaptureState.setGtidSet(context.getProperty(INIT_BINLOG_GTID).evaluateAttributeExpressions().getValue()); } else { // If we're starting from the beginning of all binlogs, the binlog gtid must be the empty string (not null) - currentGtidSet = ""; + currentDataCaptureState.setGtidSet(""); } } - currentBinlogFile = ""; - currentBinlogPosition = DO_NOT_SET; + currentDataCaptureState.setBinlogFile(""); + currentDataCaptureState.setBinlogPosition(DO_NOT_SET); } else { // Set current binlog filename to whatever is in State, falling back to the Retrieve All Records then Initial Binlog Filename if no State variable is present - currentBinlogFile = stateMap.get(BinlogEventInfo.BINLOG_FILENAME_KEY); + final String currentBinlogFile = stateMap.get(BinlogEventInfo.BINLOG_FILENAME_KEY); if (currentBinlogFile == null) { if (!getAllRecords) { if (context.getProperty(INIT_BINLOG_FILENAME).isSet()) { - currentBinlogFile = context.getProperty(INIT_BINLOG_FILENAME).evaluateAttributeExpressions().getValue(); + currentDataCaptureState.setBinlogFile(context.getProperty(INIT_BINLOG_FILENAME).evaluateAttributeExpressions().getValue()); } } else { // If we're starting from the beginning of all binlogs, the binlog filename must be the empty string (not null) - currentBinlogFile = ""; + currentDataCaptureState.setBinlogFile(""); } + } else { + currentDataCaptureState.setBinlogFile(currentBinlogFile); } // Set current binlog position to whatever is in State, falling back to the Retrieve All Records then Initial Binlog Filename if no State variable is present - String binlogPosition = stateMap.get(BinlogEventInfo.BINLOG_POSITION_KEY); + final String binlogPosition = stateMap.get(BinlogEventInfo.BINLOG_POSITION_KEY); if (binlogPosition != null) { - currentBinlogPosition = Long.valueOf(binlogPosition); + currentDataCaptureState.setBinlogPosition(Long.parseLong(binlogPosition)); } else if (!getAllRecords) { if (context.getProperty(INIT_BINLOG_POSITION).isSet()) { - currentBinlogPosition = context.getProperty(INIT_BINLOG_POSITION).evaluateAttributeExpressions().asLong(); + currentDataCaptureState.setBinlogPosition(context.getProperty(INIT_BINLOG_POSITION).evaluateAttributeExpressions().asLong()); } else { - currentBinlogPosition = DO_NOT_SET; + currentDataCaptureState.setBinlogPosition(DO_NOT_SET); } } else { - currentBinlogPosition = -1; + currentDataCaptureState.setBinlogPosition(-1); } } @@ -676,22 +653,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { // Use Initial Sequence ID property if none is found in state PropertyValue seqIdProp = context.getProperty(INIT_SEQUENCE_ID); if (seqIdProp.isSet()) { - currentSequenceId.set(seqIdProp.evaluateAttributeExpressions().asInteger()); + currentDataCaptureState.setSequenceId(seqIdProp.evaluateAttributeExpressions().asInteger()); } } else { - currentSequenceId.set(Long.parseLong(seqIdString)); - } - //get inTransaction value from state - inTransaction = "true".equals(stateMap.get("inTransaction")); - - // Get reference to Distributed Cache if one exists. If it does not, no enrichment (resolution of column names, e.g.) will be performed - boolean createEnrichmentConnection = false; - if (context.getProperty(DIST_CACHE_CLIENT).isSet()) { - cacheClient = context.getProperty(DIST_CACHE_CLIENT).asControllerService(DistributedMapCacheClient.class); - createEnrichmentConnection = true; - } else { - logger.warn("No Distributed Map Cache Client is specified, so no event enrichment (resolution of column names, e.g.) will be performed."); - cacheClient = null; + currentDataCaptureState.setSequenceId(Long.parseLong(seqIdString)); } final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE) @@ -717,7 +682,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { Long serverId = context.getProperty(SERVER_ID).evaluateAttributeExpressions().asLong(); - connect(hosts, username, password, serverId, createEnrichmentConnection, driverLocation, driverName, connectTimeout, sslContextService, sslMode); + connect(hosts, username, password, serverId, driverLocation, driverName, connectTimeout, sslContextService, sslMode); } catch (IOException | IllegalStateException e) { if (eventListener != null) { eventListener.stop(); @@ -735,8 +700,6 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { @Override public synchronized void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException { - // Indicate that this processor has executed at least once, so we know whether or not the state values are valid and should be updated - hasRun.set(true); ComponentLog log = getLogger(); // Create a client if we don't have one @@ -773,16 +736,12 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { } try { - outputEvents(currentSession, log); + outputEvents(currentSession, context, log); } catch (Exception eventException) { - getLogger().error("Exception during event processing at file={} pos={}", currentBinlogFile, currentBinlogPosition, eventException); + getLogger().error("Exception during event processing at file={} pos={}", currentDataCaptureState.getBinlogFile(), currentDataCaptureState.getBinlogPosition(), eventException); try { // Perform some processor-level "rollback", then rollback the session - currentBinlogFile = xactBinlogFile == null ? "" : xactBinlogFile; - currentBinlogPosition = xactBinlogPosition; - currentSequenceId.set(xactSequenceId); - currentGtidSet = xactGtidSet; - inTransaction = false; + binlogResourceInfo.setInTransaction(false); stop(); } catch (Exception e) { // Not much we can recover from here @@ -795,16 +754,6 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { } } - @OnStopped - @OnShutdown - public void onStopped(ProcessContext context) { - try { - stop(); - } catch (CDCException ioe) { - throw new ProcessException(ioe); - } - } - /** * Get a list of hosts from a NiFi property, e.g. * @@ -821,16 +770,19 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { for (String item : hostsSplit) { String[] addresses = item.split(":"); - if (addresses.length != 2) { + if (addresses.length > 2 || addresses.length == 0) { throw new ArrayIndexOutOfBoundsException("Not in host:port format"); + } else if (addresses.length > 1) { + hostsList.add(new InetSocketAddress(addresses[0].trim(), Integer.parseInt(addresses[1].trim()))); + } else { + // Assume default port of 3306 + hostsList.add(new InetSocketAddress(addresses[0].trim(), DEFAULT_MYSQL_PORT)); } - - hostsList.add(new InetSocketAddress(addresses[0].trim(), Integer.parseInt(addresses[1].trim()))); } return hostsList; } - protected void connect(List hosts, String username, String password, Long serverId, boolean createEnrichmentConnection, + protected void connect(List hosts, String username, String password, Long serverId, String driverLocation, String driverName, long connectTimeout, final SSLContextService sslContextService, final SSLMode sslMode) throws IOException { @@ -839,17 +791,15 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { InetSocketAddress connectedHost = null; Exception lastConnectException = new Exception("Unknown connection error"); - if (createEnrichmentConnection) { - try { - // Ensure driverLocation and driverName are correct before establishing binlog connection - // to avoid failing after binlog messages are received. - // Actual JDBC connection is created after binlog client gets started, because we need - // the connect-able host same as the binlog client. - registerDriver(driverLocation, driverName); - } catch (InitializationException e) { - throw new RuntimeException("Failed to register JDBC driver. Ensure MySQL Driver Location(s)" + - " and MySQL Driver Class Name are configured correctly. " + e, e); - } + try { + // Ensure driverLocation and driverName are correct before establishing binlog connection + // to avoid failing after binlog messages are received. + // Actual JDBC connection is created after binlog client gets started, because we need + // the connect-able host same as the binlog client. + registerDriver(driverLocation, driverName); + } catch (InitializationException e) { + throw new RuntimeException("Failed to register JDBC driver. Ensure MySQL Driver Location(s)" + + " and MySQL Driver Class Name are configured correctly. " + e, e); } while (connectedHost == null && connectionAttempts < numHosts) { @@ -871,12 +821,12 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { } binlogClient.registerLifecycleListener(lifecycleListener); - binlogClient.setBinlogFilename(currentBinlogFile); - if (currentBinlogPosition != DO_NOT_SET) { - binlogClient.setBinlogPosition(currentBinlogPosition); + binlogClient.setBinlogFilename(currentDataCaptureState.getBinlogFile()); + if (currentDataCaptureState.getBinlogPosition() != DO_NOT_SET) { + binlogClient.setBinlogPosition(currentDataCaptureState.getBinlogPosition()); } - binlogClient.setGtidSet(currentGtidSet); + binlogClient.setGtidSet(currentDataCaptureState.getGtidSet()); binlogClient.setGtidSetFallbackToPurged(true); if (serverId != null) { @@ -895,12 +845,12 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { connectTimeout = Long.MAX_VALUE; } binlogClient.connect(connectTimeout); - transitUri = "mysql://" + connectedHost.getHostString() + ":" + connectedHost.getPort(); + binlogResourceInfo.setTransitUri("mysql://" + connectedHost.getHostString() + ":" + connectedHost.getPort()); } catch (IOException | TimeoutException te) { // Try the next host connectedHost = null; - transitUri = ""; + binlogResourceInfo.setTransitUri(""); currentHost = (currentHost + 1) % numHosts; connectionAttempts++; lastConnectException = te; @@ -918,36 +868,37 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { throw new IOException("Could not connect binlog client to any of the specified hosts due to: " + lastConnectException.getMessage(), lastConnectException); } - if (createEnrichmentConnection) { - final TlsConfiguration tlsConfiguration = sslContextService == null ? null : sslContextService.createTlsConfiguration(); - final ConnectionPropertiesProvider connectionPropertiesProvider = new StandardConnectionPropertiesProvider(sslMode, tlsConfiguration); - final Map jdbcConnectionProperties = connectionPropertiesProvider.getConnectionProperties(); - jdbcConnectionHolder = new JDBCConnectionHolder(connectedHost, username, password, jdbcConnectionProperties, connectTimeout); - try { - // Ensure connection can be created. - getJdbcConnection(); - } catch (SQLException e) { - getLogger().error("Error creating binlog enrichment JDBC connection to any of the specified hosts", e); - if (eventListener != null) { - eventListener.stop(); - if (binlogClient != null) { - binlogClient.unregisterEventListener(eventListener); - } - } + final TlsConfiguration tlsConfiguration = sslContextService == null ? null : sslContextService.createTlsConfiguration(); + final ConnectionPropertiesProvider connectionPropertiesProvider = new StandardConnectionPropertiesProvider(sslMode, tlsConfiguration); + final Map jdbcConnectionProperties = connectionPropertiesProvider.getConnectionProperties(); + jdbcConnectionHolder = new JDBCConnectionHolder(connectedHost, username, password, jdbcConnectionProperties, connectTimeout); + try { + // Ensure connection can be created. + getJdbcConnection(); + } catch (SQLException e) { + getLogger().error("Error creating binlog enrichment JDBC connection to any of the specified hosts", e); + if (eventListener != null) { + eventListener.stop(); if (binlogClient != null) { - binlogClient.disconnect(); - binlogClient = null; + binlogClient.unregisterEventListener(eventListener); } - return; } + if (binlogClient != null) { + binlogClient.disconnect(); + binlogClient = null; + } + return; } gtidSet = new GtidSet(binlogClient.getGtidSet()); } - public void outputEvents(ProcessSession session, ComponentLog log) throws IOException { + public void outputEvents(ProcessSession session, ProcessContext context, ComponentLog log) throws IOException { RawBinlogEvent rawBinlogEvent; + DataCaptureState dataCaptureState = currentDataCaptureState.copy(); + final boolean includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean(); + final boolean includeDDLEvents = context.getProperty(INCLUDE_DDL_EVENTS).asBoolean(); // Drain the queue while (isScheduled() && (rawBinlogEvent = queue.poll()) != null) { @@ -959,10 +910,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { // We always get ROTATE and FORMAT_DESCRIPTION messages no matter where we start (even from the end), and they won't have the correct "next position" value, so only // advance the position if it is not that type of event. ROTATE events don't generate output CDC events and have the current binlog position in a special field, which // is filled in during the ROTATE case - if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !useGtid) { - currentBinlogPosition = header.getPosition(); + if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !currentDataCaptureState.isUseGtid()) { + dataCaptureState.setBinlogPosition(header.getPosition()); } - log.debug("Message event, type={} pos={} file={}", eventType, currentBinlogPosition, currentBinlogFile); + log.debug("Message event, type={} pos={} file={}", eventType, dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile()); switch (eventType) { case TABLE_MAP: // This is sent to inform which table is about to be changed by subsequent events @@ -974,139 +925,92 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { if (!skipTable) { TableInfoCacheKey key = new TableInfoCacheKey(this.getIdentifier(), data.getDatabase(), data.getTable(), data.getTableId()); - if (cacheClient != null) { + binlogResourceInfo.setCurrentTable(tableInfoCache.get(key)); + if (binlogResourceInfo.getCurrentTable() == null) { + // We don't have an entry for this table yet, so fetch the info from the database and populate the cache try { - currentTable = cacheClient.get(key, cacheKeySerializer, cacheValueDeserializer); - } catch (ConnectException ce) { - throw new IOException("Could not connect to Distributed Map Cache server to get table information", ce); + binlogResourceInfo.setCurrentTable(loadTableInfo(key)); + tableInfoCache.put(key, binlogResourceInfo.getCurrentTable()); + } catch (SQLException se) { + // Propagate the error up, so things like rollback and logging/bulletins can be handled + throw new IOException(se.getMessage(), se); } - - if (currentTable == null) { - // We don't have an entry for this table yet, so fetch the info from the database and populate the cache - try { - currentTable = loadTableInfo(key); - try { - cacheClient.put(key, currentTable, cacheKeySerializer, cacheValueSerializer); - } catch (ConnectException ce) { - throw new IOException("Could not connect to Distributed Map Cache server to put table information", ce); - } - } catch (SQLException se) { - // Propagate the error up, so things like rollback and logging/bulletins can be handled - throw new IOException(se.getMessage(), se); - } - } - } else { - // Populate a limited version of TableInfo without column information - currentTable = new TableInfo(key.getDatabaseName(), key.getTableName(), key.getTableId(), Collections.emptyList()); } } else { - // Clear the current table, to force a reload next time we get a TABLE_MAP event we care about - currentTable = null; + // Clear the current table, to force reload next time we get a TABLE_MAP event we care about + binlogResourceInfo.setCurrentTable(null); } break; case QUERY: QueryEventData queryEventData = event.getData(); - currentDatabase = queryEventData.getDatabase(); + binlogResourceInfo.setCurrentDatabase(queryEventData.getDatabase()); String sql = queryEventData.getSql(); // Is this the start of a transaction? if ("BEGIN".equals(sql)) { // If we're already in a transaction, something bad happened, alert the user - if (inTransaction) { + if (binlogResourceInfo.isInTransaction()) { getLogger().debug("BEGIN event received at pos={} file={} while already processing a transaction. This could indicate that your binlog position is invalid " - + "or the event stream is out of sync or there was an issue with the processor state.", currentBinlogPosition, currentBinlogFile); + + "or the event stream is out of sync or there was an issue with the processor state.", dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile()); } - // Mark the current binlog position and GTID in case we have to rollback the transaction (if the processor is stopped, e.g.) - xactBinlogFile = currentBinlogFile; - xactBinlogPosition = currentBinlogPosition; - xactSequenceId = currentSequenceId.get(); - xactGtidSet = currentGtidSet; - - if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) { - BeginTransactionEventInfo beginEvent = useGtid - ? new BeginTransactionEventInfo(currentDatabase, timestamp, currentGtidSet) - : new BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition); - currentEventInfo = beginEvent; - currentEventWriter = beginEventWriter; - currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration)); + if (databaseNamePattern == null || databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()) { + beginEventHandler.handleEvent(queryEventData, includeBeginCommit, currentDataCaptureState, binlogResourceInfo, + binlogEventState, sql, eventWriterConfiguration, currentSession, timestamp); } - inTransaction = true; - //update inTransaction value to state - updateState(session); + // Whether we skip this event or not, it's still the beginning of a transaction + binlogResourceInfo.setInTransaction(true); + + // Update inTransaction value to state + updateState(session, dataCaptureState); } else if ("COMMIT".equals(sql)) { - if (!inTransaction) { + // InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here + if (!binlogResourceInfo.isInTransaction()) { getLogger().debug("COMMIT event received at pos={} file={} while not processing a transaction (i.e. no corresponding BEGIN event). " + "This could indicate that your binlog position is invalid or the event stream is out of sync or there was an issue with the processor state " - + "or there was an issue with the processor state.", currentBinlogPosition, currentBinlogFile); + + "or there was an issue with the processor state.", dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile()); } - // InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here - if (includeBeginCommit) { - if (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches()) { - CommitTransactionEventInfo commitTransactionEvent = useGtid - ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet) - : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition); - currentEventInfo = commitTransactionEvent; - currentEventWriter = commitEventWriter; - currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration)); - } - } else { - // If the COMMIT event is not to be written, the FlowFile should still be finished and the session committed. - if (currentSession != null) { - FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile(); - if (flowFile != null && currentEventWriter != null) { - // Flush the events to the FlowFile when the processor is stopped - currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS); - } - currentSession.commitAsync(); - } + if (databaseNamePattern == null || databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()) { + commitEventHandler.handleEvent(queryEventData, includeBeginCommit, currentDataCaptureState, binlogResourceInfo, + binlogEventState, sql, eventWriterConfiguration, currentSession, timestamp); } - - //update inTransaction value to state - inTransaction = false; - updateState(session); + // Whether we skip this event or not, it's the end of a transaction + binlogResourceInfo.setInTransaction(false); + updateState(session, dataCaptureState); // If there is no FlowFile open, commit the session if (eventWriterConfiguration.getCurrentFlowFile() == null) { // Commit the NiFi session session.commitAsync(); } - currentTable = null; + binlogResourceInfo.setCurrentTable(null); + binlogResourceInfo.setCurrentDatabase(null); } else { // Check for DDL events (alter table, e.g.). Normalize the query to do string matching on the type of change String normalizedQuery = normalizeQuery(sql); - if (normalizedQuery.startsWith("alter table") - || normalizedQuery.startsWith("alter ignore table") - || normalizedQuery.startsWith("create table") - || normalizedQuery.startsWith("truncate table") - || normalizedQuery.startsWith("rename table") - || normalizedQuery.startsWith("drop table") - || normalizedQuery.startsWith("drop database")) { + if (isQueryDDL(normalizedQuery)) { + if (databaseNamePattern == null || databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()) { + if (queryEventData.getDatabase() == null) { + queryEventData.setDatabase(binlogResourceInfo.getCurrentDatabase()); + } + ddlEventHandler.handleEvent(queryEventData, includeDDLEvents, currentDataCaptureState, binlogResourceInfo, + binlogEventState, sql, eventWriterConfiguration, currentSession, timestamp); - if (includeDDLEvents && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) { - // If we don't have table information, we can still use the database name - TableInfo ddlTableInfo = (currentTable != null) ? currentTable : new TableInfo(currentDatabase, null, null, null); - DDLEventInfo ddlEvent = useGtid - ? new DDLEventInfo(ddlTableInfo, timestamp, currentGtidSet, sql) - : new DDLEventInfo(ddlTableInfo, timestamp, currentBinlogFile, currentBinlogPosition, sql); - currentEventInfo = ddlEvent; - currentEventWriter = ddlEventWriter; - currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration)); - } - // Remove all the keys from the cache that this processor added - if (cacheClient != null) { - cacheClient.removeByPattern(this.getIdentifier() + ".*"); + // The altered table may not be the "active" table, so clear the cache to pick up changes + tableInfoCache.clear(); } + // If not in a transaction, commit the session so the DDL event(s) will be transferred - if (includeDDLEvents && !inTransaction) { - updateState(session); + if (includeDDLEvents && !binlogResourceInfo.isInTransaction()) { + updateState(session, dataCaptureState); if (FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())) { if (currentSession != null) { FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile(); - if (flowFile != null && currentEventWriter != null) { + if (flowFile != null && binlogEventState.getCurrentEventWriter() != null) { // Flush the events to the FlowFile when the processor is stopped - currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS); + binlogEventState.getCurrentEventWriter().finishAndTransferFlowFile(currentSession, eventWriterConfiguration, binlogResourceInfo.getTransitUri(), + dataCaptureState.getSequenceId(), binlogEventState.getCurrentEventInfo(), REL_SUCCESS); } } } @@ -1120,41 +1024,26 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { break; case XID: - if (!inTransaction) { + if (!binlogResourceInfo.isInTransaction()) { getLogger().debug("COMMIT (XID) event received at pos={} file={} /while not processing a transaction (i.e. no corresponding BEGIN event). " - + "This could indicate that your binlog position is invalid or the event stream is out of sync or there was an issue with the processor state.", - currentBinlogPosition, currentBinlogFile); + + "This could indicate that your binlog position is invalid or the event stream is out of sync or there was an issue with the processor state.", + dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile()); } - if (includeBeginCommit) { - if (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches()) { - CommitTransactionEventInfo commitTransactionEvent = useGtid - ? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet) - : new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition); - currentEventInfo = commitTransactionEvent; - currentEventWriter = commitEventWriter; - currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration)); - } - } else { - // If the COMMIT event is not to be written, the FlowFile should still be finished and the session committed. - if (currentSession != null) { - FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile(); - if (flowFile != null && currentEventWriter != null) { - // Flush the events to the FlowFile when the processor is stopped - currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS); - } - } + if (databaseNamePattern == null || databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()) { + commitEventHandler.handleEvent(event.getData(), includeBeginCommit, currentDataCaptureState, binlogResourceInfo, + binlogEventState, null, eventWriterConfiguration, currentSession, timestamp); } - // update inTransaction value and save next position - // so when restart this processor,we will not read xid event again - inTransaction = false; - currentBinlogPosition = header.getNextPosition(); - updateState(session); + // Whether we skip this event or not, it's the end of a transaction + binlogResourceInfo.setInTransaction(false); + dataCaptureState.setBinlogPosition(header.getNextPosition()); + updateState(session, dataCaptureState); // If there is no FlowFile open, commit the session if (eventWriterConfiguration.getCurrentFlowFile() == null) { + // Commit the NiFi session session.commitAsync(); } - currentTable = null; - currentDatabase = null; + binlogResourceInfo.setCurrentTable(null); + binlogResourceInfo.setCurrentDatabase(null); break; case WRITE_ROWS: @@ -1170,11 +1059,11 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { if (skipTable) { break; } - if (!inTransaction) { + if (!binlogResourceInfo.isInTransaction()) { // These events should only happen inside a transaction, warn the user otherwise log.info("Event {} occurred outside of a transaction, which is unexpected.", eventType.name()); } - if (currentTable == null && cacheClient != null) { + if (binlogResourceInfo.getCurrentTable() == null) { // No Table Map event was processed prior to this event, which should not happen, so throw an error throw new RowEventException("No table information is available for this event, cannot process further."); } @@ -1183,52 +1072,42 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { || eventType == EXT_WRITE_ROWS || eventType == PRE_GA_WRITE_ROWS) { - InsertRowsEventInfo eventInfo = useGtid - ? new InsertRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData()) - : new InsertRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData()); - currentEventInfo = eventInfo; - currentEventWriter = insertRowsWriter; - currentSequenceId.set(insertRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration)); + insertEventHandler.handleEvent(event.getData(), true, currentDataCaptureState, binlogResourceInfo, + binlogEventState, null, eventWriterConfiguration, currentSession, timestamp); } else if (eventType == DELETE_ROWS || eventType == EXT_DELETE_ROWS || eventType == PRE_GA_DELETE_ROWS) { - DeleteRowsEventInfo eventInfo = useGtid - ? new DeleteRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData()) - : new DeleteRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData()); - currentEventInfo = eventInfo; - currentEventWriter = deleteRowsWriter; - currentSequenceId.set(deleteRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration)); - + deleteEventHandler.handleEvent(event.getData(), true, currentDataCaptureState, binlogResourceInfo, + binlogEventState, null, eventWriterConfiguration, currentSession, timestamp); } else { // Update event - UpdateRowsEventInfo eventInfo = useGtid - ? new UpdateRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData()) - : new UpdateRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData()); - currentEventInfo = eventInfo; - currentEventWriter = updateRowsWriter; - currentSequenceId.set(updateRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration)); + updateEventHandler.handleEvent(event.getData(), true, currentDataCaptureState, binlogResourceInfo, + binlogEventState, null, eventWriterConfiguration, currentSession, timestamp); } break; case ROTATE: - if (!useGtid) { + if (!currentDataCaptureState.isUseGtid()) { // Update current binlog filename RotateEventData rotateEventData = event.getData(); - currentBinlogFile = rotateEventData.getBinlogFilename(); - currentBinlogPosition = rotateEventData.getBinlogPosition(); + dataCaptureState.setBinlogFile(rotateEventData.getBinlogFilename()); + dataCaptureState.setBinlogPosition(rotateEventData.getBinlogPosition()); } - updateState(session); + updateState(session, dataCaptureState); break; case GTID: - if (useGtid) { + if (currentDataCaptureState.isUseGtid()) { // Update current binlog gtid GtidEventData gtidEventData = event.getData(); - gtidSet.add(gtidEventData.getGtid()); - currentGtidSet = gtidSet.toString(); - updateState(session); + MySqlGtid mySqlGtid = gtidEventData.getMySqlGtid(); + if (mySqlGtid != null) { + gtidSet.add(mySqlGtid.toString()); + dataCaptureState.setGtidSet(gtidSet.toString()); + updateState(session, dataCaptureState); + } } break; @@ -1239,12 +1118,22 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { // Advance the current binlog position. This way if no more events are received and the processor is stopped, it will resume after the event that was just processed. // We always get ROTATE and FORMAT_DESCRIPTION messages no matter where we start (even from the end), and they won't have the correct "next position" value, so only // advance the position if it is not that type of event. - if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !useGtid && eventType != XID) { - currentBinlogPosition = header.getNextPosition(); + if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !currentDataCaptureState.isUseGtid() && eventType != XID) { + dataCaptureState.setBinlogPosition(header.getNextPosition()); } } } + private boolean isQueryDDL(String sql) { + return sql.startsWith("alter table") + || sql.startsWith("alter ignore table") + || sql.startsWith("create table") + || sql.startsWith("truncate table") + || sql.startsWith("rename table") + || sql.startsWith("drop table") + || sql.startsWith("drop database"); + } + protected void clearState() throws IOException { if (currentSession == null) { throw new IllegalStateException("No current session"); @@ -1263,7 +1152,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { return normalizedQuery; } - protected void stop() throws CDCException { + @OnStopped + public void stop() throws CDCException { try { if (eventListener != null) { eventListener.stop(); @@ -1277,14 +1167,18 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { if (currentSession != null) { FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile(); - if (flowFile != null && currentEventWriter != null) { + if (flowFile != null && binlogEventState.getCurrentEventWriter() != null) { // Flush the events to the FlowFile when the processor is stopped - currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS); + binlogEventState.getCurrentEventWriter().finishAndTransferFlowFile( + currentSession, + eventWriterConfiguration, + binlogResourceInfo.getTransitUri(), + currentDataCaptureState.getSequenceId(), binlogEventState.getCurrentEventInfo(), REL_SUCCESS); } currentSession.commitAsync(); } - currentBinlogPosition = -1; + currentDataCaptureState.setBinlogPosition(-1); } catch (IOException e) { throw new CDCException("Error closing CDC connection", e); } finally { @@ -1296,29 +1190,30 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { } } - private void updateState(ProcessSession session) throws IOException { - updateState(session, currentBinlogFile, currentBinlogPosition, currentSequenceId.get(), currentGtidSet, inTransaction); + private void updateState(ProcessSession session, DataCaptureState dataCaptureState) throws IOException { + updateState(session, dataCaptureState, binlogResourceInfo.isInTransaction()); } - private void updateState(ProcessSession session, String binlogFile, long binlogPosition, long sequenceId, String gtidSet, boolean inTransaction) throws IOException { + private void updateState(ProcessSession session, DataCaptureState dataCaptureState, boolean inTransaction) throws IOException { // Update state with latest values final Map newStateMap = new HashMap<>(session.getState(Scope.CLUSTER).toMap()); // Save current binlog filename, position and GTID to the state map - if (binlogFile != null) { - newStateMap.put(BinlogEventInfo.BINLOG_FILENAME_KEY, binlogFile); + if (dataCaptureState.getBinlogFile() != null) { + newStateMap.put(BinlogEventInfo.BINLOG_FILENAME_KEY, dataCaptureState.getBinlogFile()); } - newStateMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(binlogPosition)); - newStateMap.put(SEQUENCE_ID_KEY, String.valueOf(sequenceId)); + newStateMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(dataCaptureState.getBinlogPosition())); + newStateMap.put(SEQUENCE_ID_KEY, String.valueOf(dataCaptureState.getSequenceId())); //add inTransaction value into state newStateMap.put("inTransaction", inTransaction ? "true" : "false"); - if (gtidSet != null) { - newStateMap.put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet); + if (dataCaptureState.getGtidSet() != null) { + newStateMap.put(BinlogEventInfo.BINLOG_GTIDSET_KEY, dataCaptureState.getGtidSet()); } session.setState(newStateMap, Scope.CLUSTER); + currentDataCaptureState = dataCaptureState; } @@ -1391,8 +1286,13 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { private JDBCConnectionHolder(InetSocketAddress host, String username, String password, Map customProperties, long connectionTimeoutMillis) { this.connectionUrl = "jdbc:mysql://" + host.getHostString() + ":" + host.getPort(); connectionProps.putAll(customProperties); - connectionProps.put("user", username); - connectionProps.put("password", password); + if (username != null) { + connectionProps.put("user", username); + if (password != null) { + connectionProps.put("password", password); + } + } + this.connectionTimeoutMillis = connectionTimeoutMillis; } @@ -1486,7 +1386,71 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor { public Logger getParentLogger() throws SQLFeatureNotSupportedException { return driver.getParentLogger(); } - } + public static class BinlogEventState { + private BinlogEventInfo currentEventInfo; + private AbstractBinlogEventWriter currentEventWriter; + + public BinlogEventInfo getCurrentEventInfo() { + return currentEventInfo; + } + + public void setCurrentEventInfo(BinlogEventInfo currentEventInfo) { + this.currentEventInfo = currentEventInfo; + } + + public AbstractBinlogEventWriter getCurrentEventWriter() { + return currentEventWriter; + } + + public void setCurrentEventWriter(AbstractBinlogEventWriter currentEventWriter) { + this.currentEventWriter = currentEventWriter; + } + } + + public static class BinlogResourceInfo { + private TableInfo currentTable = null; + private String currentDatabase = null; + + private boolean inTransaction = false; + + private String transitUri = ""; + + public BinlogResourceInfo() { + + } + + public TableInfo getCurrentTable() { + return currentTable; + } + + public void setCurrentTable(TableInfo currentTable) { + this.currentTable = currentTable; + } + + public String getCurrentDatabase() { + return currentDatabase; + } + + public void setCurrentDatabase(String currentDatabase) { + this.currentDatabase = currentDatabase; + } + + public boolean isInTransaction() { + return inTransaction; + } + + public void setInTransaction(boolean inTransaction) { + this.inTransaction = inTransaction; + } + + public String getTransitUri() { + return transitUri; + } + + public void setTransitUri(String transitUri) { + this.transitUri = transitUri; + } + } } diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy index 83c3a66f69..20523cbc06 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/groovy/org/apache/nifi/cdc/mysql/processors/CaptureChangeMySQLTest.groovy @@ -23,6 +23,7 @@ import com.github.shyiko.mysql.binlog.event.EventData import com.github.shyiko.mysql.binlog.event.EventHeaderV4 import com.github.shyiko.mysql.binlog.event.EventType import com.github.shyiko.mysql.binlog.event.GtidEventData +import com.github.shyiko.mysql.binlog.event.MySqlGtid import com.github.shyiko.mysql.binlog.event.QueryEventData import com.github.shyiko.mysql.binlog.event.RotateEventData import com.github.shyiko.mysql.binlog.event.TableMapEventData @@ -30,7 +31,6 @@ import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData import com.github.shyiko.mysql.binlog.event.WriteRowsEventData import com.github.shyiko.mysql.binlog.network.SSLMode import groovy.json.JsonSlurper -import org.apache.commons.io.output.WriterOutputStream import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading import org.apache.nifi.cdc.event.ColumnDefinition import org.apache.nifi.cdc.event.TableInfo @@ -40,22 +40,12 @@ import org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy import org.apache.nifi.cdc.mysql.MockBinlogClient import org.apache.nifi.cdc.mysql.event.BinlogEventInfo import org.apache.nifi.cdc.mysql.processors.ssl.BinaryLogSSLSocketFactory -import org.apache.nifi.components.PropertyDescriptor import org.apache.nifi.components.state.Scope -import org.apache.nifi.controller.AbstractControllerService -import org.apache.nifi.distributed.cache.client.Deserializer -import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient -import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService -import org.apache.nifi.distributed.cache.client.Serializer import org.apache.nifi.flowfile.attributes.CoreAttributes -import org.apache.nifi.logging.ComponentLog import org.apache.nifi.processor.exception.ProcessException import org.apache.nifi.provenance.ProvenanceEventType import org.apache.nifi.reporting.InitializationException import org.apache.nifi.ssl.SSLContextService -import org.apache.nifi.state.MockStateManager -import org.apache.nifi.util.MockComponentLog -import org.apache.nifi.util.MockControllerServiceInitializationContext import org.apache.nifi.util.TestRunner import org.apache.nifi.util.TestRunners import org.junit.jupiter.api.BeforeEach @@ -67,8 +57,6 @@ import java.sql.ResultSet import java.sql.SQLException import java.sql.Statement import java.util.concurrent.TimeoutException -import java.util.regex.Matcher -import java.util.regex.Pattern import static org.junit.jupiter.api.Assertions.assertEquals import static org.junit.jupiter.api.Assertions.assertNotNull @@ -378,13 +366,6 @@ class CaptureChangeMySQLTest { testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_POSITION, '4') testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true') testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'true') - final DistributedMapCacheClientImpl cacheClient = createCacheClient() - def clientProperties = [:] - clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 'localhost') - testRunner.addControllerService('client', cacheClient, clientProperties) - testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client') - testRunner.enableControllerService(cacheClient) - testRunner.run(1, false, true) @@ -521,7 +502,7 @@ class CaptureChangeMySQLTest { @Test void testExcludeSchemaChanges() throws Exception { testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION) - testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306') + testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost') // Don't include port here, should default to 3306 testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root') testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password') testRunner.setProperty(CaptureChangeMySQL.SERVER_ID, '1') @@ -530,13 +511,6 @@ class CaptureChangeMySQLTest { testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_POSITION, '4') testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true') testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'false') - final DistributedMapCacheClientImpl cacheClient = createCacheClient() - def clientProperties = [:] - clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 'localhost') - testRunner.addControllerService('client', cacheClient, clientProperties) - testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client') - testRunner.enableControllerService(cacheClient) - testRunner.run(1, false, true) @@ -595,16 +569,10 @@ class CaptureChangeMySQLTest { @Test void testNoTableInformationAvailable() throws Exception { testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION) - testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306') + testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost') // Port should default to 3306 testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root') testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password') testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds') - final DistributedMapCacheClientImpl cacheClient = createCacheClient() - def clientProperties = [:] - clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 'localhost') - testRunner.addControllerService('client', cacheClient, clientProperties) - testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client') - testRunner.enableControllerService(cacheClient) testRunner.run(1, false, true) @@ -1106,12 +1074,7 @@ class CaptureChangeMySQLTest { testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root') testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password') testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds') - final DistributedMapCacheClientImpl cacheClient = createCacheClient() - def clientProperties = [:] - clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 'localhost') - testRunner.addControllerService('client', cacheClient, clientProperties) - testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client') - testRunner.enableControllerService(cacheClient) + testRunner.run(1, false, true) // ROTATE @@ -1174,19 +1137,13 @@ class CaptureChangeMySQLTest { testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password') testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds') testRunner.setProperty(CaptureChangeMySQL.USE_BINLOG_GTID, 'true') - final DistributedMapCacheClientImpl cacheClient = createCacheClient() - def clientProperties = [:] - clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 'localhost') - testRunner.addControllerService('client', cacheClient, clientProperties) - testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client') - testRunner.enableControllerService(cacheClient) testRunner.run(1, false, true) // GTID client.sendEvent(new Event( [timestamp: new Date().time, eventType: EventType.GTID, nextPosition: 2] as EventHeaderV4, - [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1'] as GtidEventData + [gtid: MySqlGtid.fromString( 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1')] as GtidEventData )) // BEGIN @@ -1206,7 +1163,7 @@ class CaptureChangeMySQLTest { // Stop the processor and verify the state is set testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, '', Scope.CLUSTER) testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '6', Scope.CLUSTER) - testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1-1', Scope.CLUSTER) + testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-1', Scope.CLUSTER) ((CaptureChangeMySQL) testRunner.getProcessor()).clearState() testRunner.stateManager.clear(Scope.CLUSTER) @@ -1218,7 +1175,7 @@ class CaptureChangeMySQLTest { // GTID client.sendEvent(new Event( [timestamp: new Date().time, eventType: EventType.GTID, nextPosition: 8] as EventHeaderV4, - [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2'] as GtidEventData + [gtid: 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:2'] as GtidEventData )) // BEGIN @@ -1239,12 +1196,12 @@ class CaptureChangeMySQLTest { testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, '', Scope.CLUSTER) testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '12', Scope.CLUSTER) - testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-2', Scope.CLUSTER) + testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:2-2', Scope.CLUSTER) // GTID client.sendEvent(new Event( [timestamp: new Date().time, eventType: EventType.GTID, nextPosition: 14] as EventHeaderV4, - [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:3'] as GtidEventData + [gtid: 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:3'] as GtidEventData )) // BEGIN @@ -1263,7 +1220,7 @@ class CaptureChangeMySQLTest { testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, '', Scope.CLUSTER) testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '18', Scope.CLUSTER) - testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-3', Scope.CLUSTER) + testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:2-3', Scope.CLUSTER) } @Test @@ -1340,12 +1297,12 @@ class CaptureChangeMySQLTest { testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root') testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds') testRunner.setProperty(CaptureChangeMySQL.USE_BINLOG_GTID, 'true') - testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_GTID, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1') + testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_GTID, 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1') testRunner.setProperty(CaptureChangeMySQL.INIT_SEQUENCE_ID, '10') testRunner.setProperty(CaptureChangeMySQL.RETRIEVE_ALL_RECORDS, 'false') testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true') testRunner.getStateManager().setState([ - ("${BinlogEventInfo.BINLOG_GTIDSET_KEY}".toString()): 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2', + ("${BinlogEventInfo.BINLOG_GTIDSET_KEY}".toString()): 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:2', ("${EventWriter.SEQUENCE_ID_KEY}".toString()): '1' ], Scope.CLUSTER) @@ -1354,7 +1311,7 @@ class CaptureChangeMySQLTest { // GTID client.sendEvent(new Event( [timestamp: new Date().time, eventType: EventType.GTID, nextPosition: 2] as EventHeaderV4, - [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:3'] as GtidEventData + [gtid: 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:3'] as GtidEventData )) // BEGIN @@ -1375,7 +1332,7 @@ class CaptureChangeMySQLTest { assertEquals(2, resultFiles.size()) assertEquals( - 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-3', + 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:2-3', resultFiles.last().getAttribute(BinlogEventInfo.BINLOG_GTIDSET_KEY) ) } @@ -1388,7 +1345,7 @@ class CaptureChangeMySQLTest { testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password') testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds') testRunner.setProperty(CaptureChangeMySQL.USE_BINLOG_GTID, 'true') - testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_GTID, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1') + testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_GTID, 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1') testRunner.setProperty(CaptureChangeMySQL.RETRIEVE_ALL_RECORDS, 'false') testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true') @@ -1397,7 +1354,7 @@ class CaptureChangeMySQLTest { // GTID client.sendEvent(new Event( [timestamp: new Date().time, eventType: EventType.GTID, nextPosition: 2] as EventHeaderV4, - [gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:3'] as GtidEventData + [gtid: 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:3'] as GtidEventData )) // BEGIN @@ -1418,7 +1375,7 @@ class CaptureChangeMySQLTest { assertEquals(2, resultFiles.size()) assertEquals( - 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1-1:3-3', + 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-1:3-3', resultFiles.last().getAttribute(BinlogEventInfo.BINLOG_GTIDSET_KEY) ) } @@ -1430,12 +1387,6 @@ class CaptureChangeMySQLTest { testRunner.setProperty(CaptureChangeMySQL.USERNAME, "root") testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, "2 seconds") testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, "true") - final DistributedMapCacheClientImpl cacheClient = createCacheClient() - Map clientProperties = new HashMap<>() - clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost") - testRunner.addControllerService("client", cacheClient, clientProperties) - testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, "client") - testRunner.enableControllerService(cacheClient) testRunner.run(1, false, true) // COMMIT @@ -1496,131 +1447,5 @@ class CaptureChangeMySQLTest { when(mockStatement.executeQuery(anyString())).thenReturn(mockResultSet) return mockConnection } - - } - - static DistributedMapCacheClientImpl createCacheClient() throws InitializationException { - - final DistributedMapCacheClientImpl client = new DistributedMapCacheClientImpl() - final ComponentLog logger = new MockComponentLog("client", client) - final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client", logger, new MockStateManager(client)) - - client.initialize(clientInitContext) - - return client - } - - static - final class DistributedMapCacheClientImpl extends AbstractControllerService implements DistributedMapCacheClient { - - private Map cacheMap = new HashMap<>() - - @Override - void close() throws IOException { - } - - @Override - void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { - } - - @Override - protected List getSupportedPropertyDescriptors() { - - return [DistributedMapCacheClientService.HOSTNAME, - DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT, - DistributedMapCacheClientService.PORT, - DistributedMapCacheClientService.SSL_CONTEXT_SERVICE] - } - - @Override - boolean putIfAbsent( - final K key, - final V value, - final Serializer keySerializer, final Serializer valueSerializer) throws IOException { - - StringWriter keyWriter = new StringWriter() - keySerializer.serialize(key, new WriterOutputStream(keyWriter)) - String keyString = keyWriter.toString() - - if (cacheMap.containsKey(keyString)) return false - - StringWriter valueWriter = new StringWriter() - valueSerializer.serialize(value, new WriterOutputStream(valueWriter)) - return true - } - - @Override - @SuppressWarnings("unchecked") - V getAndPutIfAbsent( - final K key, final V value, final Serializer keySerializer, final Serializer valueSerializer, - final Deserializer valueDeserializer) throws IOException { - StringWriter keyWriter = new StringWriter() - keySerializer.serialize(key, new WriterOutputStream(keyWriter)) - String keyString = keyWriter.toString() - - if (cacheMap.containsKey(keyString)) return valueDeserializer.deserialize(cacheMap.get(keyString).bytes) - - StringWriter valueWriter = new StringWriter() - valueSerializer.serialize(value, new WriterOutputStream(valueWriter)) - return null - } - - @Override - boolean containsKey(final K key, final Serializer keySerializer) throws IOException { - StringWriter keyWriter = new StringWriter() - keySerializer.serialize(key, new WriterOutputStream(keyWriter)) - String keyString = keyWriter.toString() - - return cacheMap.containsKey(keyString) - } - - @Override - V get( - final K key, - final Serializer keySerializer, final Deserializer valueDeserializer) throws IOException { - StringWriter keyWriter = new StringWriter() - keySerializer.serialize(key, new WriterOutputStream(keyWriter)) - String keyString = keyWriter.toString() - - return (cacheMap.containsKey(keyString)) ? valueDeserializer.deserialize(cacheMap.get(keyString).bytes) : null - } - - @Override - boolean remove(final K key, final Serializer serializer) throws IOException { - StringWriter keyWriter = new StringWriter() - serializer.serialize(key, new WriterOutputStream(keyWriter)) - String keyString = keyWriter.toString() - - boolean removed = (cacheMap.containsKey(keyString)) - cacheMap.remove(keyString) - return removed - } - - @Override - long removeByPattern(String regex) throws IOException { - final List removedRecords = new ArrayList<>() - Pattern p = Pattern.compile(regex) - for (String key : cacheMap.keySet()) { - // Key must be backed by something that can be converted into a String - Matcher m = p.matcher(key) - if (m.matches()) { - removedRecords.add(cacheMap.get(key)) - } - } - final long numRemoved = removedRecords.size() - removedRecords.each {cacheMap.remove(it)} - return numRemoved - } - - @Override - void put( - final K key, - final V value, - final Serializer keySerializer, final Serializer valueSerializer) throws IOException { - StringWriter keyWriter = new StringWriter() - keySerializer.serialize(key, new WriterOutputStream(keyWriter)) - StringWriter valueWriter = new StringWriter() - valueSerializer.serialize(value, new WriterOutputStream(valueWriter)) - } } } diff --git a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtilsTest.java b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/io/TestInsertRowsWriter.java similarity index 65% rename from nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtilsTest.java rename to nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/io/TestInsertRowsWriter.java index 73573a2bd5..a34565ddf6 100644 --- a/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/MySQLCDCUtilsTest.java +++ b/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/test/java/org/apache/nifi/cdc/mysql/event/io/TestInsertRowsWriter.java @@ -14,7 +14,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.nifi.cdc.mysql.event; +package org.apache.nifi.cdc.mysql.event.io; + import org.junit.jupiter.api.Test; @@ -23,17 +24,15 @@ import java.sql.Types; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +class TestInsertRowsWriter { -/** - * Unit Tests for MySQLCDCUtils utility class - */ -public class MySQLCDCUtilsTest { @Test - public void testGetWritableObject() throws Exception { - assertNull(MySQLCDCUtils.getWritableObject(null, null)); - assertNull(MySQLCDCUtils.getWritableObject(Types.INTEGER, null)); - assertEquals((byte) 1, MySQLCDCUtils.getWritableObject(Types.INTEGER, (byte) 1)); - assertEquals("Hello", MySQLCDCUtils.getWritableObject(Types.VARCHAR, "Hello".getBytes())); + public void testGetWritableObject() { + InsertRowsWriter insertRowsWriter = new InsertRowsWriter(); + assertNull(insertRowsWriter.getWritableObject(null, null)); + assertNull(insertRowsWriter.getWritableObject(Types.INTEGER, null)); + assertEquals((byte) 1, insertRowsWriter.getWritableObject(Types.INTEGER, (byte) 1)); + assertEquals("Hello", insertRowsWriter.getWritableObject(Types.VARCHAR, "Hello".getBytes())); } } \ No newline at end of file