NIFI-11380: Refactor CaptureChangeMySQL with improvements

This closes #7116

Signed-off-by: Nandor Soma Abonyi <nsabonyi@apache.org>
This commit is contained in:
Matthew Burgess 2023-04-03 16:14:35 -04:00 committed by Nandor Soma Abonyi
parent 8ebecdc3ab
commit 78fd7fadcd
No known key found for this signature in database
GPG Key ID: AFFFD8C3A1A88ED7
19 changed files with 777 additions and 677 deletions

View File

@ -17,22 +17,14 @@
package org.apache.nifi.cdc.event;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* A POJO for holding table information related to update events.
*/
public class TableInfo {
final static String DB_TABLE_NAME_DELIMITER = "@!@";
private String databaseName;
private String tableName;
private Long tableId;
@ -92,64 +84,4 @@ public class TableInfo {
result = 31 * result + (columns != null ? columns.hashCode() : 0);
return result;
}
public static class Serializer implements org.apache.nifi.distributed.cache.client.Serializer<TableInfo> {
@Override
public void serialize(TableInfo value, OutputStream output) throws SerializationException, IOException {
StringBuilder sb = new StringBuilder(value.getDatabaseName());
sb.append(DB_TABLE_NAME_DELIMITER);
sb.append(value.getTableName());
sb.append(DB_TABLE_NAME_DELIMITER);
sb.append(value.getTableId());
List<ColumnDefinition> columnDefinitions = value.getColumns();
if (columnDefinitions != null && !columnDefinitions.isEmpty()) {
sb.append(DB_TABLE_NAME_DELIMITER);
sb.append(columnDefinitions.stream().map((col) -> col.getName() + DB_TABLE_NAME_DELIMITER + col.getType()).collect(Collectors.joining(DB_TABLE_NAME_DELIMITER)));
}
output.write(sb.toString().getBytes());
}
}
public static class Deserializer implements org.apache.nifi.distributed.cache.client.Deserializer<TableInfo> {
@Override
public TableInfo deserialize(byte[] input) throws DeserializationException, IOException {
// Don't bother deserializing if empty, just return null. This usually happens when the key is not found in the cache
if (input == null || input.length == 0) {
return null;
}
String inputString = new String(input);
String[] tokens = inputString.split(DB_TABLE_NAME_DELIMITER);
int numTokens = tokens.length;
if (numTokens < 3) {
throw new IOException("Could not deserialize TableInfo from the following value: " + inputString);
}
String dbName = tokens[0];
String tableName = tokens[1];
Long tableId;
try {
tableId = Long.parseLong(tokens[2]);
} catch (NumberFormatException nfe) {
throw new IOException("Illegal table ID: " + tokens[2]);
}
// Parse column names and types
List<ColumnDefinition> columnDefinitions = new ArrayList<>();
for (int i = 0; i < numTokens - 3; i += 2) {
try {
int columnTypeIndex = i + 4;
int columnNameIndex = i + 3;
if (columnTypeIndex < numTokens) {
columnDefinitions.add(new ColumnDefinition(Integer.parseInt(tokens[columnTypeIndex]), tokens[columnNameIndex]));
} else {
throw new IOException("No type detected for column: " + tokens[columnNameIndex]);
}
} catch (NumberFormatException nfe) {
throw new IOException("Illegal column type value for column " + (i / 2 + 1) + ": " + tokens[i + 4]);
}
}
return new TableInfo(dbName, tableName, tableId, columnDefinitions);
}
}
}

View File

@ -17,12 +17,6 @@
package org.apache.nifi.cdc.event;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import java.io.IOException;
import java.io.OutputStream;
import static org.apache.nifi.cdc.event.TableInfo.DB_TABLE_NAME_DELIMITER;
/**
* This class represents a key in a cache that contains information (column definitions, e.g.) for a database table
@ -80,19 +74,4 @@ public class TableInfoCacheKey {
public String getUuidPrefix() {
return uuidPrefix;
}
public static class Serializer implements org.apache.nifi.distributed.cache.client.Serializer<TableInfoCacheKey> {
@Override
public void serialize(TableInfoCacheKey key, OutputStream output) throws SerializationException, IOException {
StringBuilder sb = new StringBuilder(key.getUuidPrefix());
sb.append(DB_TABLE_NAME_DELIMITER);
sb.append(key.getDatabaseName());
sb.append(DB_TABLE_NAME_DELIMITER);
sb.append(key.getTableName());
sb.append(DB_TABLE_NAME_DELIMITER);
sb.append(key.getTableId());
output.write(sb.toString().getBytes());
}
}
}

View File

@ -41,7 +41,7 @@ language governing permissions and limitations under the License. -->
<dependency>
<groupId>com.zendesk</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.27.6</version>
<version>0.28.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event;
public class DataCaptureState {
private String binlogFile = null;
private long binlogPosition = 4;
private boolean useGtid = false;
private String gtidSet = null;
private long sequenceId = 0L;
public DataCaptureState() {
}
public DataCaptureState(String binlogFile, long binlogPosition, boolean useGtid, String gtidSet, long sequenceId) {
this.binlogFile = binlogFile;
this.binlogPosition = binlogPosition;
this.useGtid = useGtid;
this.gtidSet = gtidSet;
this.sequenceId = sequenceId;
}
public String getBinlogFile() {
return binlogFile;
}
public void setBinlogFile(String binlogFile) {
this.binlogFile = binlogFile;
}
public long getBinlogPosition() {
return binlogPosition;
}
public void setBinlogPosition(long binlogPosition) {
this.binlogPosition = binlogPosition;
}
public boolean isUseGtid() {
return useGtid;
}
public void setUseGtid(boolean useGtid) {
this.useGtid = useGtid;
}
public String getGtidSet() {
return gtidSet;
}
public void setGtidSet(String gtidSet) {
this.gtidSet = gtidSet;
}
public long getSequenceId() {
return sequenceId;
}
public void setSequenceId(long sequenceId) {
this.sequenceId = sequenceId;
}
public DataCaptureState copy() {
return new DataCaptureState(binlogFile, binlogPosition, useGtid, gtidSet, sequenceId);
}
}

View File

@ -1,47 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event;
import java.io.Serializable;
/**
* A utility class to provide MySQL- / binlog-specific constants and methods for processing events and data
*/
public class MySQLCDCUtils {
public static Object getWritableObject(Integer type, Serializable value) {
if (value == null) {
return null;
}
if (type == null) {
if (value instanceof byte[]) {
return new String((byte[]) value);
} else if (value instanceof Number) {
return value;
}
} else if (value instanceof Number) {
return value;
} else {
if (value instanceof byte[]) {
return new String((byte[]) value);
} else {
return value.toString();
}
}
return null;
}
}

View File

@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event.handler;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.mysql.event.BeginTransactionEventInfo;
import org.apache.nifi.cdc.mysql.event.DataCaptureState;
import org.apache.nifi.cdc.mysql.event.io.BeginTransactionEventWriter;
import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
import org.apache.nifi.processor.ProcessSession;
import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
public class BeginEventHandler implements BinlogEventHandler<QueryEventData, BeginTransactionEventInfo> {
private final BeginTransactionEventWriter eventWriter = new BeginTransactionEventWriter();
@Override
public void handleEvent(final QueryEventData eventData, final boolean writeEvent, final DataCaptureState dataCaptureState,
final CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState,
final String sql, final EventWriterConfiguration eventWriterConfiguration, final ProcessSession session, final long timestamp) {
if (writeEvent) {
final String currentDatabase = eventData.getDatabase();
final BeginTransactionEventInfo beginEvent = dataCaptureState.isUseGtid()
? new BeginTransactionEventInfo(currentDatabase, timestamp, dataCaptureState.getGtidSet())
: new BeginTransactionEventInfo(currentDatabase, timestamp, dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition());
binlogEventState.setCurrentEventInfo(beginEvent);
binlogEventState.setCurrentEventWriter(eventWriter);
dataCaptureState.setSequenceId(eventWriter.writeEvent(session, binlogResourceInfo.getTransitUri(), beginEvent, dataCaptureState.getSequenceId(),
REL_SUCCESS, eventWriterConfiguration));
}
binlogResourceInfo.setInTransaction(true);
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event.handler;
import com.github.shyiko.mysql.binlog.event.EventData;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.mysql.event.BinlogEventInfo;
import org.apache.nifi.cdc.mysql.event.DataCaptureState;
import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
import org.apache.nifi.processor.ProcessSession;
public interface BinlogEventHandler<T extends EventData, S extends BinlogEventInfo> {
void handleEvent(final T eventData,
final boolean writeEvent,
final DataCaptureState dataCaptureState,
final CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo,
final CaptureChangeMySQL.BinlogEventState binlogEventState,
final String sql,
final EventWriterConfiguration eventWriterConfiguration,
final ProcessSession session,
final long timestamp);
}

View File

@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event.handler;
import com.github.shyiko.mysql.binlog.event.EventData;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
import org.apache.nifi.cdc.mysql.event.DataCaptureState;
import org.apache.nifi.cdc.mysql.event.io.CommitTransactionEventWriter;
import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
public class CommitEventHandler implements BinlogEventHandler<EventData, CommitTransactionEventInfo> {
private final CommitTransactionEventWriter eventWriter = new CommitTransactionEventWriter();
@Override
public void handleEvent(final EventData eventData, final boolean writeEvent, final DataCaptureState dataCaptureState,
final CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState,
final String sql, final EventWriterConfiguration eventWriterConfiguration, final ProcessSession session, final long timestamp) {
final String currentDatabase = binlogResourceInfo.getCurrentDatabase();
final CommitTransactionEventInfo commitEvent = dataCaptureState.isUseGtid()
? new CommitTransactionEventInfo(currentDatabase, timestamp, dataCaptureState.getGtidSet())
: new CommitTransactionEventInfo(currentDatabase, timestamp, dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition());
if (writeEvent) {
binlogEventState.setCurrentEventInfo(commitEvent);
binlogEventState.setCurrentEventWriter(eventWriter);
dataCaptureState.setSequenceId(eventWriter.writeEvent(session, binlogResourceInfo.getTransitUri(), commitEvent, dataCaptureState.getSequenceId(),
REL_SUCCESS, eventWriterConfiguration));
} else {
// If the COMMIT event is not to be written, the FlowFile should still be finished and the session committed.
if (session != null) {
FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
if (flowFile != null) {
// Flush the events to the FlowFile when the processor is stopped
eventWriter.finishAndTransferFlowFile(session, eventWriterConfiguration, binlogResourceInfo.getTransitUri(), dataCaptureState.getSequenceId(), commitEvent, REL_SUCCESS);
}
session.commitAsync();
}
}
// Update inTransaction value to state
binlogResourceInfo.setInTransaction(false);
}
}

View File

@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event.handler;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import org.apache.nifi.cdc.event.TableInfo;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.mysql.event.DDLEventInfo;
import org.apache.nifi.cdc.mysql.event.DataCaptureState;
import org.apache.nifi.cdc.mysql.event.io.DDLEventWriter;
import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
import org.apache.nifi.processor.ProcessSession;
import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
public class DDLEventHandler implements BinlogEventHandler<QueryEventData, DDLEventInfo> {
private final DDLEventWriter eventWriter = new DDLEventWriter();
@Override
public void handleEvent(final QueryEventData eventData, final boolean writeEvent, final DataCaptureState dataCaptureState,
final CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState,
final String sql, final EventWriterConfiguration eventWriterConfiguration, final ProcessSession session, final long timestamp) {
if (writeEvent) {
final TableInfo ddlTableInfo = binlogResourceInfo.getCurrentTable() != null
? binlogResourceInfo.getCurrentTable()
: new TableInfo(binlogResourceInfo.getCurrentDatabase(), null, null, null);
final DDLEventInfo ddlEvent = dataCaptureState.isUseGtid()
? new DDLEventInfo(ddlTableInfo, timestamp, dataCaptureState.getGtidSet(), sql)
: new DDLEventInfo(ddlTableInfo, timestamp, dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition(), sql);
binlogEventState.setCurrentEventInfo(ddlEvent);
binlogEventState.setCurrentEventWriter(eventWriter);
dataCaptureState.setSequenceId(eventWriter.writeEvent(session, binlogResourceInfo.getTransitUri(), ddlEvent, dataCaptureState.getSequenceId(),
REL_SUCCESS, eventWriterConfiguration));
}
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event.handler;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.mysql.event.DataCaptureState;
import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
import org.apache.nifi.cdc.mysql.event.io.DeleteRowsWriter;
import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
import org.apache.nifi.processor.ProcessSession;
import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
public class DeleteEventHandler implements BinlogEventHandler<DeleteRowsEventData, DeleteRowsEventInfo> {
private final DeleteRowsWriter eventWriter = new DeleteRowsWriter();
@Override
public void handleEvent(final DeleteRowsEventData eventData, final boolean writeEvent, final DataCaptureState dataCaptureState,
final CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState,
final String sql, final EventWriterConfiguration eventWriterConfiguration, final ProcessSession session, final long timestamp) {
if (writeEvent) {
final DeleteRowsEventInfo eventInfo = dataCaptureState.isUseGtid()
? new DeleteRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, dataCaptureState.getGtidSet(), eventData)
: new DeleteRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition(), eventData);
binlogEventState.setCurrentEventInfo(eventInfo);
binlogEventState.setCurrentEventWriter(eventWriter);
dataCaptureState.setSequenceId(eventWriter.writeEvent(session, binlogResourceInfo.getTransitUri(), eventInfo, dataCaptureState.getSequenceId(),
REL_SUCCESS, eventWriterConfiguration));
}
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event.handler;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.mysql.event.DataCaptureState;
import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
import org.apache.nifi.cdc.mysql.event.io.InsertRowsWriter;
import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
import org.apache.nifi.processor.ProcessSession;
import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
public class InsertEventHandler implements BinlogEventHandler<WriteRowsEventData, InsertRowsEventInfo> {
private final InsertRowsWriter eventWriter = new InsertRowsWriter();
@Override
public void handleEvent(final WriteRowsEventData eventData, final boolean writeEvent, final DataCaptureState dataCaptureState,
final CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState,
final String sql, final EventWriterConfiguration eventWriterConfiguration, final ProcessSession session, final long timestamp) {
if (writeEvent) {
final InsertRowsEventInfo eventInfo = dataCaptureState.isUseGtid()
? new InsertRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, dataCaptureState.getGtidSet(), eventData)
: new InsertRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition(), eventData);
binlogEventState.setCurrentEventInfo(eventInfo);
binlogEventState.setCurrentEventWriter(eventWriter);
dataCaptureState.setSequenceId(eventWriter.writeEvent(session, binlogResourceInfo.getTransitUri(), eventInfo, dataCaptureState.getSequenceId(),
REL_SUCCESS, eventWriterConfiguration));
}
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event.handler;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.mysql.event.DataCaptureState;
import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
import org.apache.nifi.cdc.mysql.event.io.UpdateRowsWriter;
import org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL;
import org.apache.nifi.processor.ProcessSession;
import static org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL.REL_SUCCESS;
public class UpdateEventHandler implements BinlogEventHandler<UpdateRowsEventData, UpdateRowsEventInfo> {
private final UpdateRowsWriter eventWriter = new UpdateRowsWriter();
@Override
public void handleEvent(final UpdateRowsEventData eventData, final boolean writeEvent, final DataCaptureState dataCaptureState,
final CaptureChangeMySQL.BinlogResourceInfo binlogResourceInfo, final CaptureChangeMySQL.BinlogEventState binlogEventState,
final String sql, final EventWriterConfiguration eventWriterConfiguration, final ProcessSession session, final long timestamp) {
if (writeEvent) {
final UpdateRowsEventInfo eventInfo = dataCaptureState.isUseGtid()
? new UpdateRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, dataCaptureState.getGtidSet(), eventData)
: new UpdateRowsEventInfo(binlogResourceInfo.getCurrentTable(), timestamp, dataCaptureState.getBinlogFile(), dataCaptureState.getBinlogPosition(), eventData);
binlogEventState.setCurrentEventInfo(eventInfo);
binlogEventState.setCurrentEventWriter(eventWriter);
dataCaptureState.setSequenceId(eventWriter.writeEvent(session, binlogResourceInfo.getTransitUri(), eventInfo, dataCaptureState.getSequenceId(),
REL_SUCCESS, eventWriterConfiguration));
}
}
}

View File

@ -19,12 +19,36 @@ package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.mysql.event.BinlogTableEventInfo;
import java.io.IOException;
import java.io.Serializable;
/**
* An abstract base class for writing MYSQL table-related binlog events into flow file(s), e.g.
*/
public abstract class AbstractBinlogTableEventWriter<T extends BinlogTableEventInfo> extends AbstractBinlogEventWriter<T> {
protected Object getWritableObject(Integer type, Serializable value) {
if (value == null) {
return null;
}
if (type == null) {
if (value instanceof byte[]) {
return new String((byte[]) value);
} else if (value instanceof Number) {
return value;
} else {
return null;
}
} else {
if (value instanceof byte[]) {
return new String((byte[]) value);
} else if (value instanceof Number) {
return value;
} else {
return value.toString();
}
}
}
protected void writeJson(T event) throws IOException {
super.writeJson(event);
if (event.getDatabaseName() != null) {

View File

@ -17,7 +17,6 @@
package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
@ -89,7 +88,7 @@ public class DeleteRowsWriter extends AbstractBinlogTableEventWriter<DeleteRowsE
if (row[i] == null) {
jsonGenerator.writeNullField("value");
} else {
jsonGenerator.writeObjectField("value", MySQLCDCUtils.getWritableObject(columnType, row[i]));
jsonGenerator.writeObjectField("value", getWritableObject(columnType, row[i]));
}
jsonGenerator.writeEndObject();
i = includedColumns.nextSetBit(i + 1);

View File

@ -17,7 +17,6 @@
package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
@ -90,7 +89,7 @@ public class InsertRowsWriter extends AbstractBinlogTableEventWriter<InsertRowsE
if (row[i] == null) {
jsonGenerator.writeNullField("value");
} else {
jsonGenerator.writeObjectField("value", MySQLCDCUtils.getWritableObject(columnType, row[i]));
jsonGenerator.writeObjectField("value", getWritableObject(columnType, row[i]));
}
jsonGenerator.writeEndObject();
i = includedColumns.nextSetBit(i + 1);

View File

@ -17,7 +17,6 @@
package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
@ -95,13 +94,13 @@ public class UpdateRowsWriter extends AbstractBinlogTableEventWriter<UpdateRowsE
if (oldRow[i] == null) {
jsonGenerator.writeNullField("last_value");
} else {
jsonGenerator.writeObjectField("last_value", MySQLCDCUtils.getWritableObject(columnType, oldRow[i]));
jsonGenerator.writeObjectField("last_value", getWritableObject(columnType, oldRow[i]));
}
if (newRow[i] == null) {
jsonGenerator.writeNullField("value");
} else {
jsonGenerator.writeObjectField("value", MySQLCDCUtils.getWritableObject(columnType, newRow[i]));
jsonGenerator.writeObjectField("value", getWritableObject(columnType, newRow[i]));
}
jsonGenerator.writeEndObject();
i = includedColumns.nextSetBit(i + 1);

View File

@ -22,6 +22,7 @@ import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.GtidEventData;
import com.github.shyiko.mysql.binlog.event.MySqlGtid;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
@ -36,7 +37,6 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
import org.apache.nifi.annotation.notification.PrimaryNodeState;
@ -45,25 +45,20 @@ import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.event.RowEventException;
import org.apache.nifi.cdc.event.TableInfo;
import org.apache.nifi.cdc.event.TableInfoCacheKey;
import org.apache.nifi.cdc.mysql.event.BeginTransactionEventInfo;
import org.apache.nifi.cdc.mysql.event.BinlogEventInfo;
import org.apache.nifi.cdc.mysql.event.BinlogEventListener;
import org.apache.nifi.cdc.mysql.event.BinlogLifecycleListener;
import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
import org.apache.nifi.cdc.mysql.event.DDLEventInfo;
import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
import org.apache.nifi.cdc.mysql.event.DataCaptureState;
import org.apache.nifi.cdc.event.io.EventWriterConfiguration;
import org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy;
import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
import org.apache.nifi.cdc.mysql.event.RawBinlogEvent;
import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
import org.apache.nifi.cdc.mysql.event.io.AbstractBinlogEventWriter;
import org.apache.nifi.cdc.mysql.event.io.BeginTransactionEventWriter;
import org.apache.nifi.cdc.mysql.event.io.CommitTransactionEventWriter;
import org.apache.nifi.cdc.mysql.event.io.DDLEventWriter;
import org.apache.nifi.cdc.mysql.event.io.DeleteRowsWriter;
import org.apache.nifi.cdc.mysql.event.io.InsertRowsWriter;
import org.apache.nifi.cdc.mysql.event.io.UpdateRowsWriter;
import org.apache.nifi.cdc.mysql.event.handler.BeginEventHandler;
import org.apache.nifi.cdc.mysql.event.handler.CommitEventHandler;
import org.apache.nifi.cdc.mysql.event.handler.DDLEventHandler;
import org.apache.nifi.cdc.mysql.event.handler.DeleteEventHandler;
import org.apache.nifi.cdc.mysql.event.handler.InsertEventHandler;
import org.apache.nifi.cdc.mysql.event.handler.UpdateEventHandler;
import org.apache.nifi.cdc.mysql.processors.ssl.BinaryLogSSLSocketFactory;
import org.apache.nifi.cdc.mysql.processors.ssl.ConnectionPropertiesProvider;
import org.apache.nifi.cdc.mysql.processors.ssl.StandardConnectionPropertiesProvider;
@ -77,9 +72,7 @@ import org.apache.nifi.components.resource.ResourceType;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
@ -96,7 +89,6 @@ import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.Driver;
@ -120,8 +112,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;
import java.util.regex.Pattern;
@ -166,6 +156,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
// Random invalid constant used as an indicator to not set the binlog position on the client (thereby using the latest available)
private static final int DO_NOT_SET = -1000;
private static final int DEFAULT_MYSQL_PORT = 3306;
// A regular expression matching multiline comments, used when parsing DDL statements
private static final Pattern MULTI_COMMENT_PATTERN = Pattern.compile("/\\*.*?\\*/", Pattern.DOTALL);
@ -228,13 +220,14 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
public static final PropertyDescriptor HOSTS = new PropertyDescriptor.Builder()
.name("capture-change-mysql-hosts")
.displayName("MySQL Hosts")
.description("A list of hostname/port entries corresponding to nodes in a MySQL cluster. The entries should be comma separated "
+ "using a colon such as host1:port,host2:port,.... For example mysql.myhost.com:3306. This processor will attempt to connect to "
.displayName("MySQL Nodes")
.description("A list of hostname (and optional port) entries corresponding to nodes in a MySQL cluster. The entries should be comma separated "
+ "using a colon (if the port is to be specified) such as host1:port,host2:port,.... For example mysql.myhost.com:3306. The port need not be specified, "
+ "when omitted the default MySQL port value of 3306 will be used. This processor will attempt to connect to "
+ "the hosts in the list in order. If one node goes down and failover is enabled for the cluster, then the processor will connect "
+ "to the active node (assuming its host entry is specified in this property. The default port for MySQL connections is 3306.")
+ "to the active node (assuming its node entry is specified in this property).")
.required(true)
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
@ -317,10 +310,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
public static final PropertyDescriptor DIST_CACHE_CLIENT = new PropertyDescriptor.Builder()
.name("capture-change-mysql-dist-map-cache-client")
.displayName("Distributed Map Cache Client")
.description("Identifies a Distributed Map Cache Client controller service to be used for keeping information about the various table columns, datatypes, etc. "
+ "needed by the processor. If a client is not specified, the generated events will not include column type or name information (but they will include database "
+ "and table information.")
.displayName("Distributed Map Cache Client - unused")
.description("This is a legacy property that is no longer used to store table information, the processor will handle the table information (column names, types, etc.)")
.identifiesControllerService(DistributedMapCacheClient.class)
.required(false)
.build();
@ -464,58 +455,38 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
private static final List<PropertyDescriptor> propDescriptors;
private volatile ProcessSession currentSession;
private BinaryLogClient binlogClient;
private BinlogEventListener eventListener;
private BinlogLifecycleListener lifecycleListener;
private GtidSet gtidSet;
private volatile BinaryLogClient binlogClient;
private volatile BinlogEventListener eventListener;
private volatile BinlogLifecycleListener lifecycleListener;
private volatile GtidSet gtidSet;
// Set queue capacity to avoid excessive memory consumption
private final BlockingQueue<RawBinlogEvent> queue = new LinkedBlockingQueue<>(1000);
private volatile String currentBinlogFile = null;
private volatile long currentBinlogPosition = 4;
private volatile String currentGtidSet = null;
// The following variables save the value of the binlog filename, position, (sequence id), and gtid at the beginning of a transaction. Used for rollback
private volatile String xactBinlogFile = null;
private volatile long xactBinlogPosition = 4;
private volatile long xactSequenceId = 0;
private volatile String xactGtidSet = null;
private final Map<TableInfoCacheKey, TableInfo> tableInfoCache = new HashMap<>();
private volatile ProcessSession currentSession;
private DataCaptureState currentDataCaptureState = new DataCaptureState();
private volatile BinlogResourceInfo binlogResourceInfo = new BinlogResourceInfo();
private volatile TableInfo currentTable = null;
private volatile String currentDatabase = null;
private volatile Pattern databaseNamePattern;
private volatile Pattern tableNamePattern;
private volatile boolean includeBeginCommit = false;
private volatile boolean includeDDLEvents = false;
private volatile boolean useGtid = false;
private volatile boolean inTransaction = false;
private volatile boolean skipTable = false;
private final AtomicBoolean hasRun = new AtomicBoolean(false);
private int currentHost = 0;
private String transitUri = "<unknown>";
private volatile JDBCConnectionHolder jdbcConnectionHolder = null;
private final AtomicLong currentSequenceId = new AtomicLong(0);
private final BinlogEventState binlogEventState = new BinlogEventState();
private volatile DistributedMapCacheClient cacheClient = null;
private final Serializer<TableInfoCacheKey> cacheKeySerializer = new TableInfoCacheKey.Serializer();
private final Serializer<TableInfo> cacheValueSerializer = new TableInfo.Serializer();
private final Deserializer<TableInfo> cacheValueDeserializer = new TableInfo.Deserializer();
private JDBCConnectionHolder jdbcConnectionHolder = null;
private final BeginTransactionEventWriter beginEventWriter = new BeginTransactionEventWriter();
private final CommitTransactionEventWriter commitEventWriter = new CommitTransactionEventWriter();
private final DDLEventWriter ddlEventWriter = new DDLEventWriter();
private final InsertRowsWriter insertRowsWriter = new InsertRowsWriter();
private final DeleteRowsWriter deleteRowsWriter = new DeleteRowsWriter();
private final UpdateRowsWriter updateRowsWriter = new UpdateRowsWriter();
private final BeginEventHandler beginEventHandler = new BeginEventHandler();
private final CommitEventHandler commitEventHandler = new CommitEventHandler();
private final DDLEventHandler ddlEventHandler = new DDLEventHandler();
private final InsertEventHandler insertEventHandler = new InsertEventHandler();
private final DeleteEventHandler deleteEventHandler = new DeleteEventHandler();
private final UpdateEventHandler updateEventHandler = new UpdateEventHandler();
private volatile EventWriterConfiguration eventWriterConfiguration;
private volatile BinlogEventInfo currentEventInfo;
private AbstractBinlogEventWriter<? extends BinlogEventInfo> currentEventWriter;
static {
@ -609,6 +580,9 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
return;
}
// Get inTransaction value from state
binlogResourceInfo.setInTransaction("true".equals(stateMap.get("inTransaction")));
// Build a event writer config object for the event writers to use
final FlowFileEventWriteStrategy flowFileEventWriteStrategy = FlowFileEventWriteStrategy.valueOf(context.getProperty(EVENTS_PER_FLOWFILE_STRATEGY).getValue());
eventWriterConfiguration = new EventWriterConfiguration(
@ -624,49 +598,52 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
boolean getAllRecords = context.getProperty(RETRIEVE_ALL_RECORDS).asBoolean();
includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
includeDDLEvents = context.getProperty(INCLUDE_DDL_EVENTS).asBoolean();
useGtid = context.getProperty(USE_BINLOG_GTID).asBoolean();
if (binlogResourceInfo == null) {
binlogResourceInfo = new BinlogResourceInfo();
}
currentDataCaptureState.setUseGtid(context.getProperty(USE_BINLOG_GTID).asBoolean());
if (useGtid) {
if (currentDataCaptureState.isUseGtid()) {
// Set current gtid to whatever is in State, falling back to the Retrieve All Records then Initial Gtid if no State variable is present
currentGtidSet = stateMap.get(BinlogEventInfo.BINLOG_GTIDSET_KEY);
if (currentGtidSet == null) {
currentDataCaptureState.setGtidSet(stateMap.get(BinlogEventInfo.BINLOG_GTIDSET_KEY));
if (currentDataCaptureState.getGtidSet() == null) {
if (!getAllRecords && context.getProperty(INIT_BINLOG_GTID).isSet()) {
currentGtidSet = context.getProperty(INIT_BINLOG_GTID).evaluateAttributeExpressions().getValue();
currentDataCaptureState.setGtidSet(context.getProperty(INIT_BINLOG_GTID).evaluateAttributeExpressions().getValue());
} else {
// If we're starting from the beginning of all binlogs, the binlog gtid must be the empty string (not null)
currentGtidSet = "";
currentDataCaptureState.setGtidSet("");
}
}
currentBinlogFile = "";
currentBinlogPosition = DO_NOT_SET;
currentDataCaptureState.setBinlogFile("");
currentDataCaptureState.setBinlogPosition(DO_NOT_SET);
} else {
// Set current binlog filename to whatever is in State, falling back to the Retrieve All Records then Initial Binlog Filename if no State variable is present
currentBinlogFile = stateMap.get(BinlogEventInfo.BINLOG_FILENAME_KEY);
final String currentBinlogFile = stateMap.get(BinlogEventInfo.BINLOG_FILENAME_KEY);
if (currentBinlogFile == null) {
if (!getAllRecords) {
if (context.getProperty(INIT_BINLOG_FILENAME).isSet()) {
currentBinlogFile = context.getProperty(INIT_BINLOG_FILENAME).evaluateAttributeExpressions().getValue();
currentDataCaptureState.setBinlogFile(context.getProperty(INIT_BINLOG_FILENAME).evaluateAttributeExpressions().getValue());
}
} else {
// If we're starting from the beginning of all binlogs, the binlog filename must be the empty string (not null)
currentBinlogFile = "";
currentDataCaptureState.setBinlogFile("");
}
} else {
currentDataCaptureState.setBinlogFile(currentBinlogFile);
}
// Set current binlog position to whatever is in State, falling back to the Retrieve All Records then Initial Binlog Filename if no State variable is present
String binlogPosition = stateMap.get(BinlogEventInfo.BINLOG_POSITION_KEY);
final String binlogPosition = stateMap.get(BinlogEventInfo.BINLOG_POSITION_KEY);
if (binlogPosition != null) {
currentBinlogPosition = Long.valueOf(binlogPosition);
currentDataCaptureState.setBinlogPosition(Long.parseLong(binlogPosition));
} else if (!getAllRecords) {
if (context.getProperty(INIT_BINLOG_POSITION).isSet()) {
currentBinlogPosition = context.getProperty(INIT_BINLOG_POSITION).evaluateAttributeExpressions().asLong();
currentDataCaptureState.setBinlogPosition(context.getProperty(INIT_BINLOG_POSITION).evaluateAttributeExpressions().asLong());
} else {
currentBinlogPosition = DO_NOT_SET;
currentDataCaptureState.setBinlogPosition(DO_NOT_SET);
}
} else {
currentBinlogPosition = -1;
currentDataCaptureState.setBinlogPosition(-1);
}
}
@ -676,22 +653,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
// Use Initial Sequence ID property if none is found in state
PropertyValue seqIdProp = context.getProperty(INIT_SEQUENCE_ID);
if (seqIdProp.isSet()) {
currentSequenceId.set(seqIdProp.evaluateAttributeExpressions().asInteger());
currentDataCaptureState.setSequenceId(seqIdProp.evaluateAttributeExpressions().asInteger());
}
} else {
currentSequenceId.set(Long.parseLong(seqIdString));
}
//get inTransaction value from state
inTransaction = "true".equals(stateMap.get("inTransaction"));
// Get reference to Distributed Cache if one exists. If it does not, no enrichment (resolution of column names, e.g.) will be performed
boolean createEnrichmentConnection = false;
if (context.getProperty(DIST_CACHE_CLIENT).isSet()) {
cacheClient = context.getProperty(DIST_CACHE_CLIENT).asControllerService(DistributedMapCacheClient.class);
createEnrichmentConnection = true;
} else {
logger.warn("No Distributed Map Cache Client is specified, so no event enrichment (resolution of column names, e.g.) will be performed.");
cacheClient = null;
currentDataCaptureState.setSequenceId(Long.parseLong(seqIdString));
}
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE)
@ -717,7 +682,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
Long serverId = context.getProperty(SERVER_ID).evaluateAttributeExpressions().asLong();
connect(hosts, username, password, serverId, createEnrichmentConnection, driverLocation, driverName, connectTimeout, sslContextService, sslMode);
connect(hosts, username, password, serverId, driverLocation, driverName, connectTimeout, sslContextService, sslMode);
} catch (IOException | IllegalStateException e) {
if (eventListener != null) {
eventListener.stop();
@ -735,8 +700,6 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
@Override
public synchronized void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) throws ProcessException {
// Indicate that this processor has executed at least once, so we know whether or not the state values are valid and should be updated
hasRun.set(true);
ComponentLog log = getLogger();
// Create a client if we don't have one
@ -773,16 +736,12 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
try {
outputEvents(currentSession, log);
outputEvents(currentSession, context, log);
} catch (Exception eventException) {
getLogger().error("Exception during event processing at file={} pos={}", currentBinlogFile, currentBinlogPosition, eventException);
getLogger().error("Exception during event processing at file={} pos={}", currentDataCaptureState.getBinlogFile(), currentDataCaptureState.getBinlogPosition(), eventException);
try {
// Perform some processor-level "rollback", then rollback the session
currentBinlogFile = xactBinlogFile == null ? "" : xactBinlogFile;
currentBinlogPosition = xactBinlogPosition;
currentSequenceId.set(xactSequenceId);
currentGtidSet = xactGtidSet;
inTransaction = false;
binlogResourceInfo.setInTransaction(false);
stop();
} catch (Exception e) {
// Not much we can recover from here
@ -795,16 +754,6 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
}
@OnStopped
@OnShutdown
public void onStopped(ProcessContext context) {
try {
stop();
} catch (CDCException ioe) {
throw new ProcessException(ioe);
}
}
/**
* Get a list of hosts from a NiFi property, e.g.
*
@ -821,16 +770,19 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
for (String item : hostsSplit) {
String[] addresses = item.split(":");
if (addresses.length != 2) {
if (addresses.length > 2 || addresses.length == 0) {
throw new ArrayIndexOutOfBoundsException("Not in host:port format");
} else if (addresses.length > 1) {
hostsList.add(new InetSocketAddress(addresses[0].trim(), Integer.parseInt(addresses[1].trim())));
} else {
// Assume default port of 3306
hostsList.add(new InetSocketAddress(addresses[0].trim(), DEFAULT_MYSQL_PORT));
}
hostsList.add(new InetSocketAddress(addresses[0].trim(), Integer.parseInt(addresses[1].trim())));
}
return hostsList;
}
protected void connect(List<InetSocketAddress> hosts, String username, String password, Long serverId, boolean createEnrichmentConnection,
protected void connect(List<InetSocketAddress> hosts, String username, String password, Long serverId,
String driverLocation, String driverName, long connectTimeout,
final SSLContextService sslContextService, final SSLMode sslMode) throws IOException {
@ -839,17 +791,15 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
InetSocketAddress connectedHost = null;
Exception lastConnectException = new Exception("Unknown connection error");
if (createEnrichmentConnection) {
try {
// Ensure driverLocation and driverName are correct before establishing binlog connection
// to avoid failing after binlog messages are received.
// Actual JDBC connection is created after binlog client gets started, because we need
// the connect-able host same as the binlog client.
registerDriver(driverLocation, driverName);
} catch (InitializationException e) {
throw new RuntimeException("Failed to register JDBC driver. Ensure MySQL Driver Location(s)" +
" and MySQL Driver Class Name are configured correctly. " + e, e);
}
try {
// Ensure driverLocation and driverName are correct before establishing binlog connection
// to avoid failing after binlog messages are received.
// Actual JDBC connection is created after binlog client gets started, because we need
// the connect-able host same as the binlog client.
registerDriver(driverLocation, driverName);
} catch (InitializationException e) {
throw new RuntimeException("Failed to register JDBC driver. Ensure MySQL Driver Location(s)" +
" and MySQL Driver Class Name are configured correctly. " + e, e);
}
while (connectedHost == null && connectionAttempts < numHosts) {
@ -871,12 +821,12 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
binlogClient.registerLifecycleListener(lifecycleListener);
binlogClient.setBinlogFilename(currentBinlogFile);
if (currentBinlogPosition != DO_NOT_SET) {
binlogClient.setBinlogPosition(currentBinlogPosition);
binlogClient.setBinlogFilename(currentDataCaptureState.getBinlogFile());
if (currentDataCaptureState.getBinlogPosition() != DO_NOT_SET) {
binlogClient.setBinlogPosition(currentDataCaptureState.getBinlogPosition());
}
binlogClient.setGtidSet(currentGtidSet);
binlogClient.setGtidSet(currentDataCaptureState.getGtidSet());
binlogClient.setGtidSetFallbackToPurged(true);
if (serverId != null) {
@ -895,12 +845,12 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
connectTimeout = Long.MAX_VALUE;
}
binlogClient.connect(connectTimeout);
transitUri = "mysql://" + connectedHost.getHostString() + ":" + connectedHost.getPort();
binlogResourceInfo.setTransitUri("mysql://" + connectedHost.getHostString() + ":" + connectedHost.getPort());
} catch (IOException | TimeoutException te) {
// Try the next host
connectedHost = null;
transitUri = "<unknown>";
binlogResourceInfo.setTransitUri("<unknown>");
currentHost = (currentHost + 1) % numHosts;
connectionAttempts++;
lastConnectException = te;
@ -918,36 +868,37 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
throw new IOException("Could not connect binlog client to any of the specified hosts due to: " + lastConnectException.getMessage(), lastConnectException);
}
if (createEnrichmentConnection) {
final TlsConfiguration tlsConfiguration = sslContextService == null ? null : sslContextService.createTlsConfiguration();
final ConnectionPropertiesProvider connectionPropertiesProvider = new StandardConnectionPropertiesProvider(sslMode, tlsConfiguration);
final Map<String, String> jdbcConnectionProperties = connectionPropertiesProvider.getConnectionProperties();
jdbcConnectionHolder = new JDBCConnectionHolder(connectedHost, username, password, jdbcConnectionProperties, connectTimeout);
try {
// Ensure connection can be created.
getJdbcConnection();
} catch (SQLException e) {
getLogger().error("Error creating binlog enrichment JDBC connection to any of the specified hosts", e);
if (eventListener != null) {
eventListener.stop();
if (binlogClient != null) {
binlogClient.unregisterEventListener(eventListener);
}
}
final TlsConfiguration tlsConfiguration = sslContextService == null ? null : sslContextService.createTlsConfiguration();
final ConnectionPropertiesProvider connectionPropertiesProvider = new StandardConnectionPropertiesProvider(sslMode, tlsConfiguration);
final Map<String, String> jdbcConnectionProperties = connectionPropertiesProvider.getConnectionProperties();
jdbcConnectionHolder = new JDBCConnectionHolder(connectedHost, username, password, jdbcConnectionProperties, connectTimeout);
try {
// Ensure connection can be created.
getJdbcConnection();
} catch (SQLException e) {
getLogger().error("Error creating binlog enrichment JDBC connection to any of the specified hosts", e);
if (eventListener != null) {
eventListener.stop();
if (binlogClient != null) {
binlogClient.disconnect();
binlogClient = null;
binlogClient.unregisterEventListener(eventListener);
}
return;
}
if (binlogClient != null) {
binlogClient.disconnect();
binlogClient = null;
}
return;
}
gtidSet = new GtidSet(binlogClient.getGtidSet());
}
public void outputEvents(ProcessSession session, ComponentLog log) throws IOException {
public void outputEvents(ProcessSession session, ProcessContext context, ComponentLog log) throws IOException {
RawBinlogEvent rawBinlogEvent;
DataCaptureState dataCaptureState = currentDataCaptureState.copy();
final boolean includeBeginCommit = context.getProperty(INCLUDE_BEGIN_COMMIT).asBoolean();
final boolean includeDDLEvents = context.getProperty(INCLUDE_DDL_EVENTS).asBoolean();
// Drain the queue
while (isScheduled() && (rawBinlogEvent = queue.poll()) != null) {
@ -959,10 +910,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
// We always get ROTATE and FORMAT_DESCRIPTION messages no matter where we start (even from the end), and they won't have the correct "next position" value, so only
// advance the position if it is not that type of event. ROTATE events don't generate output CDC events and have the current binlog position in a special field, which
// is filled in during the ROTATE case
if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !useGtid) {
currentBinlogPosition = header.getPosition();
if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !currentDataCaptureState.isUseGtid()) {
dataCaptureState.setBinlogPosition(header.getPosition());
}
log.debug("Message event, type={} pos={} file={}", eventType, currentBinlogPosition, currentBinlogFile);
log.debug("Message event, type={} pos={} file={}", eventType, dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile());
switch (eventType) {
case TABLE_MAP:
// This is sent to inform which table is about to be changed by subsequent events
@ -974,139 +925,92 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
if (!skipTable) {
TableInfoCacheKey key = new TableInfoCacheKey(this.getIdentifier(), data.getDatabase(), data.getTable(), data.getTableId());
if (cacheClient != null) {
binlogResourceInfo.setCurrentTable(tableInfoCache.get(key));
if (binlogResourceInfo.getCurrentTable() == null) {
// We don't have an entry for this table yet, so fetch the info from the database and populate the cache
try {
currentTable = cacheClient.get(key, cacheKeySerializer, cacheValueDeserializer);
} catch (ConnectException ce) {
throw new IOException("Could not connect to Distributed Map Cache server to get table information", ce);
binlogResourceInfo.setCurrentTable(loadTableInfo(key));
tableInfoCache.put(key, binlogResourceInfo.getCurrentTable());
} catch (SQLException se) {
// Propagate the error up, so things like rollback and logging/bulletins can be handled
throw new IOException(se.getMessage(), se);
}
if (currentTable == null) {
// We don't have an entry for this table yet, so fetch the info from the database and populate the cache
try {
currentTable = loadTableInfo(key);
try {
cacheClient.put(key, currentTable, cacheKeySerializer, cacheValueSerializer);
} catch (ConnectException ce) {
throw new IOException("Could not connect to Distributed Map Cache server to put table information", ce);
}
} catch (SQLException se) {
// Propagate the error up, so things like rollback and logging/bulletins can be handled
throw new IOException(se.getMessage(), se);
}
}
} else {
// Populate a limited version of TableInfo without column information
currentTable = new TableInfo(key.getDatabaseName(), key.getTableName(), key.getTableId(), Collections.emptyList());
}
} else {
// Clear the current table, to force a reload next time we get a TABLE_MAP event we care about
currentTable = null;
// Clear the current table, to force reload next time we get a TABLE_MAP event we care about
binlogResourceInfo.setCurrentTable(null);
}
break;
case QUERY:
QueryEventData queryEventData = event.getData();
currentDatabase = queryEventData.getDatabase();
binlogResourceInfo.setCurrentDatabase(queryEventData.getDatabase());
String sql = queryEventData.getSql();
// Is this the start of a transaction?
if ("BEGIN".equals(sql)) {
// If we're already in a transaction, something bad happened, alert the user
if (inTransaction) {
if (binlogResourceInfo.isInTransaction()) {
getLogger().debug("BEGIN event received at pos={} file={} while already processing a transaction. This could indicate that your binlog position is invalid "
+ "or the event stream is out of sync or there was an issue with the processor state.", currentBinlogPosition, currentBinlogFile);
+ "or the event stream is out of sync or there was an issue with the processor state.", dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile());
}
// Mark the current binlog position and GTID in case we have to rollback the transaction (if the processor is stopped, e.g.)
xactBinlogFile = currentBinlogFile;
xactBinlogPosition = currentBinlogPosition;
xactSequenceId = currentSequenceId.get();
xactGtidSet = currentGtidSet;
if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
BeginTransactionEventInfo beginEvent = useGtid
? new BeginTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
: new BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
currentEventInfo = beginEvent;
currentEventWriter = beginEventWriter;
currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
if (databaseNamePattern == null || databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()) {
beginEventHandler.handleEvent(queryEventData, includeBeginCommit, currentDataCaptureState, binlogResourceInfo,
binlogEventState, sql, eventWriterConfiguration, currentSession, timestamp);
}
inTransaction = true;
//update inTransaction value to state
updateState(session);
// Whether we skip this event or not, it's still the beginning of a transaction
binlogResourceInfo.setInTransaction(true);
// Update inTransaction value to state
updateState(session, dataCaptureState);
} else if ("COMMIT".equals(sql)) {
if (!inTransaction) {
// InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here
if (!binlogResourceInfo.isInTransaction()) {
getLogger().debug("COMMIT event received at pos={} file={} while not processing a transaction (i.e. no corresponding BEGIN event). "
+ "This could indicate that your binlog position is invalid or the event stream is out of sync or there was an issue with the processor state "
+ "or there was an issue with the processor state.", currentBinlogPosition, currentBinlogFile);
+ "or there was an issue with the processor state.", dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile());
}
// InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here
if (includeBeginCommit) {
if (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches()) {
CommitTransactionEventInfo commitTransactionEvent = useGtid
? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
: new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
currentEventInfo = commitTransactionEvent;
currentEventWriter = commitEventWriter;
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
}
} else {
// If the COMMIT event is not to be written, the FlowFile should still be finished and the session committed.
if (currentSession != null) {
FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
if (flowFile != null && currentEventWriter != null) {
// Flush the events to the FlowFile when the processor is stopped
currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS);
}
currentSession.commitAsync();
}
if (databaseNamePattern == null || databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()) {
commitEventHandler.handleEvent(queryEventData, includeBeginCommit, currentDataCaptureState, binlogResourceInfo,
binlogEventState, sql, eventWriterConfiguration, currentSession, timestamp);
}
//update inTransaction value to state
inTransaction = false;
updateState(session);
// Whether we skip this event or not, it's the end of a transaction
binlogResourceInfo.setInTransaction(false);
updateState(session, dataCaptureState);
// If there is no FlowFile open, commit the session
if (eventWriterConfiguration.getCurrentFlowFile() == null) {
// Commit the NiFi session
session.commitAsync();
}
currentTable = null;
binlogResourceInfo.setCurrentTable(null);
binlogResourceInfo.setCurrentDatabase(null);
} else {
// Check for DDL events (alter table, e.g.). Normalize the query to do string matching on the type of change
String normalizedQuery = normalizeQuery(sql);
if (normalizedQuery.startsWith("alter table")
|| normalizedQuery.startsWith("alter ignore table")
|| normalizedQuery.startsWith("create table")
|| normalizedQuery.startsWith("truncate table")
|| normalizedQuery.startsWith("rename table")
|| normalizedQuery.startsWith("drop table")
|| normalizedQuery.startsWith("drop database")) {
if (isQueryDDL(normalizedQuery)) {
if (databaseNamePattern == null || databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()) {
if (queryEventData.getDatabase() == null) {
queryEventData.setDatabase(binlogResourceInfo.getCurrentDatabase());
}
ddlEventHandler.handleEvent(queryEventData, includeDDLEvents, currentDataCaptureState, binlogResourceInfo,
binlogEventState, sql, eventWriterConfiguration, currentSession, timestamp);
if (includeDDLEvents && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
// If we don't have table information, we can still use the database name
TableInfo ddlTableInfo = (currentTable != null) ? currentTable : new TableInfo(currentDatabase, null, null, null);
DDLEventInfo ddlEvent = useGtid
? new DDLEventInfo(ddlTableInfo, timestamp, currentGtidSet, sql)
: new DDLEventInfo(ddlTableInfo, timestamp, currentBinlogFile, currentBinlogPosition, sql);
currentEventInfo = ddlEvent;
currentEventWriter = ddlEventWriter;
currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
}
// Remove all the keys from the cache that this processor added
if (cacheClient != null) {
cacheClient.removeByPattern(this.getIdentifier() + ".*");
// The altered table may not be the "active" table, so clear the cache to pick up changes
tableInfoCache.clear();
}
// If not in a transaction, commit the session so the DDL event(s) will be transferred
if (includeDDLEvents && !inTransaction) {
updateState(session);
if (includeDDLEvents && !binlogResourceInfo.isInTransaction()) {
updateState(session, dataCaptureState);
if (FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.equals(eventWriterConfiguration.getFlowFileEventWriteStrategy())) {
if (currentSession != null) {
FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
if (flowFile != null && currentEventWriter != null) {
if (flowFile != null && binlogEventState.getCurrentEventWriter() != null) {
// Flush the events to the FlowFile when the processor is stopped
currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS);
binlogEventState.getCurrentEventWriter().finishAndTransferFlowFile(currentSession, eventWriterConfiguration, binlogResourceInfo.getTransitUri(),
dataCaptureState.getSequenceId(), binlogEventState.getCurrentEventInfo(), REL_SUCCESS);
}
}
}
@ -1120,41 +1024,26 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
break;
case XID:
if (!inTransaction) {
if (!binlogResourceInfo.isInTransaction()) {
getLogger().debug("COMMIT (XID) event received at pos={} file={} /while not processing a transaction (i.e. no corresponding BEGIN event). "
+ "This could indicate that your binlog position is invalid or the event stream is out of sync or there was an issue with the processor state.",
currentBinlogPosition, currentBinlogFile);
+ "This could indicate that your binlog position is invalid or the event stream is out of sync or there was an issue with the processor state.",
dataCaptureState.getBinlogPosition(), dataCaptureState.getBinlogFile());
}
if (includeBeginCommit) {
if (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches()) {
CommitTransactionEventInfo commitTransactionEvent = useGtid
? new CommitTransactionEventInfo(currentDatabase, timestamp, currentGtidSet)
: new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
currentEventInfo = commitTransactionEvent;
currentEventWriter = commitEventWriter;
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
}
} else {
// If the COMMIT event is not to be written, the FlowFile should still be finished and the session committed.
if (currentSession != null) {
FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
if (flowFile != null && currentEventWriter != null) {
// Flush the events to the FlowFile when the processor is stopped
currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS);
}
}
if (databaseNamePattern == null || databaseNamePattern.matcher(binlogResourceInfo.getCurrentDatabase()).matches()) {
commitEventHandler.handleEvent(event.getData(), includeBeginCommit, currentDataCaptureState, binlogResourceInfo,
binlogEventState, null, eventWriterConfiguration, currentSession, timestamp);
}
// update inTransaction value and save next position
// so when restart this processor,we will not read xid event again
inTransaction = false;
currentBinlogPosition = header.getNextPosition();
updateState(session);
// Whether we skip this event or not, it's the end of a transaction
binlogResourceInfo.setInTransaction(false);
dataCaptureState.setBinlogPosition(header.getNextPosition());
updateState(session, dataCaptureState);
// If there is no FlowFile open, commit the session
if (eventWriterConfiguration.getCurrentFlowFile() == null) {
// Commit the NiFi session
session.commitAsync();
}
currentTable = null;
currentDatabase = null;
binlogResourceInfo.setCurrentTable(null);
binlogResourceInfo.setCurrentDatabase(null);
break;
case WRITE_ROWS:
@ -1170,11 +1059,11 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
if (skipTable) {
break;
}
if (!inTransaction) {
if (!binlogResourceInfo.isInTransaction()) {
// These events should only happen inside a transaction, warn the user otherwise
log.info("Event {} occurred outside of a transaction, which is unexpected.", eventType.name());
}
if (currentTable == null && cacheClient != null) {
if (binlogResourceInfo.getCurrentTable() == null) {
// No Table Map event was processed prior to this event, which should not happen, so throw an error
throw new RowEventException("No table information is available for this event, cannot process further.");
}
@ -1183,52 +1072,42 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|| eventType == EXT_WRITE_ROWS
|| eventType == PRE_GA_WRITE_ROWS) {
InsertRowsEventInfo eventInfo = useGtid
? new InsertRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData())
: new InsertRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
currentEventInfo = eventInfo;
currentEventWriter = insertRowsWriter;
currentSequenceId.set(insertRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
insertEventHandler.handleEvent(event.getData(), true, currentDataCaptureState, binlogResourceInfo,
binlogEventState, null, eventWriterConfiguration, currentSession, timestamp);
} else if (eventType == DELETE_ROWS
|| eventType == EXT_DELETE_ROWS
|| eventType == PRE_GA_DELETE_ROWS) {
DeleteRowsEventInfo eventInfo = useGtid
? new DeleteRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData())
: new DeleteRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
currentEventInfo = eventInfo;
currentEventWriter = deleteRowsWriter;
currentSequenceId.set(deleteRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
deleteEventHandler.handleEvent(event.getData(), true, currentDataCaptureState, binlogResourceInfo,
binlogEventState, null, eventWriterConfiguration, currentSession, timestamp);
} else {
// Update event
UpdateRowsEventInfo eventInfo = useGtid
? new UpdateRowsEventInfo(currentTable, timestamp, currentGtidSet, event.getData())
: new UpdateRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
currentEventInfo = eventInfo;
currentEventWriter = updateRowsWriter;
currentSequenceId.set(updateRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS, eventWriterConfiguration));
updateEventHandler.handleEvent(event.getData(), true, currentDataCaptureState, binlogResourceInfo,
binlogEventState, null, eventWriterConfiguration, currentSession, timestamp);
}
break;
case ROTATE:
if (!useGtid) {
if (!currentDataCaptureState.isUseGtid()) {
// Update current binlog filename
RotateEventData rotateEventData = event.getData();
currentBinlogFile = rotateEventData.getBinlogFilename();
currentBinlogPosition = rotateEventData.getBinlogPosition();
dataCaptureState.setBinlogFile(rotateEventData.getBinlogFilename());
dataCaptureState.setBinlogPosition(rotateEventData.getBinlogPosition());
}
updateState(session);
updateState(session, dataCaptureState);
break;
case GTID:
if (useGtid) {
if (currentDataCaptureState.isUseGtid()) {
// Update current binlog gtid
GtidEventData gtidEventData = event.getData();
gtidSet.add(gtidEventData.getGtid());
currentGtidSet = gtidSet.toString();
updateState(session);
MySqlGtid mySqlGtid = gtidEventData.getMySqlGtid();
if (mySqlGtid != null) {
gtidSet.add(mySqlGtid.toString());
dataCaptureState.setGtidSet(gtidSet.toString());
updateState(session, dataCaptureState);
}
}
break;
@ -1239,12 +1118,22 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
// Advance the current binlog position. This way if no more events are received and the processor is stopped, it will resume after the event that was just processed.
// We always get ROTATE and FORMAT_DESCRIPTION messages no matter where we start (even from the end), and they won't have the correct "next position" value, so only
// advance the position if it is not that type of event.
if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !useGtid && eventType != XID) {
currentBinlogPosition = header.getNextPosition();
if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION && !currentDataCaptureState.isUseGtid() && eventType != XID) {
dataCaptureState.setBinlogPosition(header.getNextPosition());
}
}
}
private boolean isQueryDDL(String sql) {
return sql.startsWith("alter table")
|| sql.startsWith("alter ignore table")
|| sql.startsWith("create table")
|| sql.startsWith("truncate table")
|| sql.startsWith("rename table")
|| sql.startsWith("drop table")
|| sql.startsWith("drop database");
}
protected void clearState() throws IOException {
if (currentSession == null) {
throw new IllegalStateException("No current session");
@ -1263,7 +1152,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
return normalizedQuery;
}
protected void stop() throws CDCException {
@OnStopped
public void stop() throws CDCException {
try {
if (eventListener != null) {
eventListener.stop();
@ -1277,14 +1167,18 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
if (currentSession != null) {
FlowFile flowFile = eventWriterConfiguration.getCurrentFlowFile();
if (flowFile != null && currentEventWriter != null) {
if (flowFile != null && binlogEventState.getCurrentEventWriter() != null) {
// Flush the events to the FlowFile when the processor is stopped
currentEventWriter.finishAndTransferFlowFile(currentSession, eventWriterConfiguration, transitUri, currentSequenceId.get(), currentEventInfo, REL_SUCCESS);
binlogEventState.getCurrentEventWriter().finishAndTransferFlowFile(
currentSession,
eventWriterConfiguration,
binlogResourceInfo.getTransitUri(),
currentDataCaptureState.getSequenceId(), binlogEventState.getCurrentEventInfo(), REL_SUCCESS);
}
currentSession.commitAsync();
}
currentBinlogPosition = -1;
currentDataCaptureState.setBinlogPosition(-1);
} catch (IOException e) {
throw new CDCException("Error closing CDC connection", e);
} finally {
@ -1296,29 +1190,30 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
}
private void updateState(ProcessSession session) throws IOException {
updateState(session, currentBinlogFile, currentBinlogPosition, currentSequenceId.get(), currentGtidSet, inTransaction);
private void updateState(ProcessSession session, DataCaptureState dataCaptureState) throws IOException {
updateState(session, dataCaptureState, binlogResourceInfo.isInTransaction());
}
private void updateState(ProcessSession session, String binlogFile, long binlogPosition, long sequenceId, String gtidSet, boolean inTransaction) throws IOException {
private void updateState(ProcessSession session, DataCaptureState dataCaptureState, boolean inTransaction) throws IOException {
// Update state with latest values
final Map<String, String> newStateMap = new HashMap<>(session.getState(Scope.CLUSTER).toMap());
// Save current binlog filename, position and GTID to the state map
if (binlogFile != null) {
newStateMap.put(BinlogEventInfo.BINLOG_FILENAME_KEY, binlogFile);
if (dataCaptureState.getBinlogFile() != null) {
newStateMap.put(BinlogEventInfo.BINLOG_FILENAME_KEY, dataCaptureState.getBinlogFile());
}
newStateMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(binlogPosition));
newStateMap.put(SEQUENCE_ID_KEY, String.valueOf(sequenceId));
newStateMap.put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(dataCaptureState.getBinlogPosition()));
newStateMap.put(SEQUENCE_ID_KEY, String.valueOf(dataCaptureState.getSequenceId()));
//add inTransaction value into state
newStateMap.put("inTransaction", inTransaction ? "true" : "false");
if (gtidSet != null) {
newStateMap.put(BinlogEventInfo.BINLOG_GTIDSET_KEY, gtidSet);
if (dataCaptureState.getGtidSet() != null) {
newStateMap.put(BinlogEventInfo.BINLOG_GTIDSET_KEY, dataCaptureState.getGtidSet());
}
session.setState(newStateMap, Scope.CLUSTER);
currentDataCaptureState = dataCaptureState;
}
@ -1391,8 +1286,13 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
private JDBCConnectionHolder(InetSocketAddress host, String username, String password, Map<String, String> customProperties, long connectionTimeoutMillis) {
this.connectionUrl = "jdbc:mysql://" + host.getHostString() + ":" + host.getPort();
connectionProps.putAll(customProperties);
connectionProps.put("user", username);
connectionProps.put("password", password);
if (username != null) {
connectionProps.put("user", username);
if (password != null) {
connectionProps.put("password", password);
}
}
this.connectionTimeoutMillis = connectionTimeoutMillis;
}
@ -1486,7 +1386,71 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
public Logger getParentLogger() throws SQLFeatureNotSupportedException {
return driver.getParentLogger();
}
}
public static class BinlogEventState {
private BinlogEventInfo currentEventInfo;
private AbstractBinlogEventWriter<? extends BinlogEventInfo> currentEventWriter;
public BinlogEventInfo getCurrentEventInfo() {
return currentEventInfo;
}
public void setCurrentEventInfo(BinlogEventInfo currentEventInfo) {
this.currentEventInfo = currentEventInfo;
}
public AbstractBinlogEventWriter<? extends BinlogEventInfo> getCurrentEventWriter() {
return currentEventWriter;
}
public void setCurrentEventWriter(AbstractBinlogEventWriter<? extends BinlogEventInfo> currentEventWriter) {
this.currentEventWriter = currentEventWriter;
}
}
public static class BinlogResourceInfo {
private TableInfo currentTable = null;
private String currentDatabase = null;
private boolean inTransaction = false;
private String transitUri = "<unknown>";
public BinlogResourceInfo() {
}
public TableInfo getCurrentTable() {
return currentTable;
}
public void setCurrentTable(TableInfo currentTable) {
this.currentTable = currentTable;
}
public String getCurrentDatabase() {
return currentDatabase;
}
public void setCurrentDatabase(String currentDatabase) {
this.currentDatabase = currentDatabase;
}
public boolean isInTransaction() {
return inTransaction;
}
public void setInTransaction(boolean inTransaction) {
this.inTransaction = inTransaction;
}
public String getTransitUri() {
return transitUri;
}
public void setTransitUri(String transitUri) {
this.transitUri = transitUri;
}
}
}

View File

@ -23,6 +23,7 @@ import com.github.shyiko.mysql.binlog.event.EventData
import com.github.shyiko.mysql.binlog.event.EventHeaderV4
import com.github.shyiko.mysql.binlog.event.EventType
import com.github.shyiko.mysql.binlog.event.GtidEventData
import com.github.shyiko.mysql.binlog.event.MySqlGtid
import com.github.shyiko.mysql.binlog.event.QueryEventData
import com.github.shyiko.mysql.binlog.event.RotateEventData
import com.github.shyiko.mysql.binlog.event.TableMapEventData
@ -30,7 +31,6 @@ import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData
import com.github.shyiko.mysql.binlog.network.SSLMode
import groovy.json.JsonSlurper
import org.apache.commons.io.output.WriterOutputStream
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading
import org.apache.nifi.cdc.event.ColumnDefinition
import org.apache.nifi.cdc.event.TableInfo
@ -40,22 +40,12 @@ import org.apache.nifi.cdc.event.io.FlowFileEventWriteStrategy
import org.apache.nifi.cdc.mysql.MockBinlogClient
import org.apache.nifi.cdc.mysql.event.BinlogEventInfo
import org.apache.nifi.cdc.mysql.processors.ssl.BinaryLogSSLSocketFactory
import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.components.state.Scope
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.distributed.cache.client.Deserializer
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService
import org.apache.nifi.distributed.cache.client.Serializer
import org.apache.nifi.flowfile.attributes.CoreAttributes
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.processor.exception.ProcessException
import org.apache.nifi.provenance.ProvenanceEventType
import org.apache.nifi.reporting.InitializationException
import org.apache.nifi.ssl.SSLContextService
import org.apache.nifi.state.MockStateManager
import org.apache.nifi.util.MockComponentLog
import org.apache.nifi.util.MockControllerServiceInitializationContext
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.junit.jupiter.api.BeforeEach
@ -67,8 +57,6 @@ import java.sql.ResultSet
import java.sql.SQLException
import java.sql.Statement
import java.util.concurrent.TimeoutException
import java.util.regex.Matcher
import java.util.regex.Pattern
import static org.junit.jupiter.api.Assertions.assertEquals
import static org.junit.jupiter.api.Assertions.assertNotNull
@ -378,13 +366,6 @@ class CaptureChangeMySQLTest {
testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_POSITION, '4')
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'true')
final DistributedMapCacheClientImpl cacheClient = createCacheClient()
def clientProperties = [:]
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 'localhost')
testRunner.addControllerService('client', cacheClient, clientProperties)
testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
testRunner.enableControllerService(cacheClient)
testRunner.run(1, false, true)
@ -521,7 +502,7 @@ class CaptureChangeMySQLTest {
@Test
void testExcludeSchemaChanges() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION)
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost') // Don't include port here, should default to 3306
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.SERVER_ID, '1')
@ -530,13 +511,6 @@ class CaptureChangeMySQLTest {
testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_POSITION, '4')
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'false')
final DistributedMapCacheClientImpl cacheClient = createCacheClient()
def clientProperties = [:]
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 'localhost')
testRunner.addControllerService('client', cacheClient, clientProperties)
testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
testRunner.enableControllerService(cacheClient)
testRunner.run(1, false, true)
@ -595,16 +569,10 @@ class CaptureChangeMySQLTest {
@Test
void testNoTableInformationAvailable() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, DRIVER_LOCATION)
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost') // Port should default to 3306
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
final DistributedMapCacheClientImpl cacheClient = createCacheClient()
def clientProperties = [:]
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 'localhost')
testRunner.addControllerService('client', cacheClient, clientProperties)
testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
testRunner.enableControllerService(cacheClient)
testRunner.run(1, false, true)
@ -1106,12 +1074,7 @@ class CaptureChangeMySQLTest {
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
final DistributedMapCacheClientImpl cacheClient = createCacheClient()
def clientProperties = [:]
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 'localhost')
testRunner.addControllerService('client', cacheClient, clientProperties)
testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
testRunner.enableControllerService(cacheClient)
testRunner.run(1, false, true)
// ROTATE
@ -1174,19 +1137,13 @@ class CaptureChangeMySQLTest {
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
testRunner.setProperty(CaptureChangeMySQL.USE_BINLOG_GTID, 'true')
final DistributedMapCacheClientImpl cacheClient = createCacheClient()
def clientProperties = [:]
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 'localhost')
testRunner.addControllerService('client', cacheClient, clientProperties)
testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
testRunner.enableControllerService(cacheClient)
testRunner.run(1, false, true)
// GTID
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.GTID, nextPosition: 2] as EventHeaderV4,
[gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1'] as GtidEventData
[gtid: MySqlGtid.fromString( 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1')] as GtidEventData
))
// BEGIN
@ -1206,7 +1163,7 @@ class CaptureChangeMySQLTest {
// Stop the processor and verify the state is set
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, '', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '6', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1-1', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-1', Scope.CLUSTER)
((CaptureChangeMySQL) testRunner.getProcessor()).clearState()
testRunner.stateManager.clear(Scope.CLUSTER)
@ -1218,7 +1175,7 @@ class CaptureChangeMySQLTest {
// GTID
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.GTID, nextPosition: 8] as EventHeaderV4,
[gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2'] as GtidEventData
[gtid: 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:2'] as GtidEventData
))
// BEGIN
@ -1239,12 +1196,12 @@ class CaptureChangeMySQLTest {
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, '', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '12', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-2', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:2-2', Scope.CLUSTER)
// GTID
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.GTID, nextPosition: 14] as EventHeaderV4,
[gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:3'] as GtidEventData
[gtid: 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:3'] as GtidEventData
))
// BEGIN
@ -1263,7 +1220,7 @@ class CaptureChangeMySQLTest {
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, '', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '18', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-3', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:2-3', Scope.CLUSTER)
}
@Test
@ -1340,12 +1297,12 @@ class CaptureChangeMySQLTest {
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
testRunner.setProperty(CaptureChangeMySQL.USE_BINLOG_GTID, 'true')
testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_GTID, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1')
testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_GTID, 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1')
testRunner.setProperty(CaptureChangeMySQL.INIT_SEQUENCE_ID, '10')
testRunner.setProperty(CaptureChangeMySQL.RETRIEVE_ALL_RECORDS, 'false')
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
testRunner.getStateManager().setState([
("${BinlogEventInfo.BINLOG_GTIDSET_KEY}".toString()): 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2',
("${BinlogEventInfo.BINLOG_GTIDSET_KEY}".toString()): 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:2',
("${EventWriter.SEQUENCE_ID_KEY}".toString()): '1'
], Scope.CLUSTER)
@ -1354,7 +1311,7 @@ class CaptureChangeMySQLTest {
// GTID
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.GTID, nextPosition: 2] as EventHeaderV4,
[gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:3'] as GtidEventData
[gtid: 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:3'] as GtidEventData
))
// BEGIN
@ -1375,7 +1332,7 @@ class CaptureChangeMySQLTest {
assertEquals(2, resultFiles.size())
assertEquals(
'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:2-3',
'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:2-3',
resultFiles.last().getAttribute(BinlogEventInfo.BINLOG_GTIDSET_KEY)
)
}
@ -1388,7 +1345,7 @@ class CaptureChangeMySQLTest {
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
testRunner.setProperty(CaptureChangeMySQL.USE_BINLOG_GTID, 'true')
testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_GTID, 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1')
testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_GTID, 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1')
testRunner.setProperty(CaptureChangeMySQL.RETRIEVE_ALL_RECORDS, 'false')
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
@ -1397,7 +1354,7 @@ class CaptureChangeMySQLTest {
// GTID
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.GTID, nextPosition: 2] as EventHeaderV4,
[gtid: 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:3'] as GtidEventData
[gtid: 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:3'] as GtidEventData
))
// BEGIN
@ -1418,7 +1375,7 @@ class CaptureChangeMySQLTest {
assertEquals(2, resultFiles.size())
assertEquals(
'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx:1-1:3-3',
'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa:1-1:3-3',
resultFiles.last().getAttribute(BinlogEventInfo.BINLOG_GTIDSET_KEY)
)
}
@ -1430,12 +1387,6 @@ class CaptureChangeMySQLTest {
testRunner.setProperty(CaptureChangeMySQL.USERNAME, "root")
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, "2 seconds")
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, "true")
final DistributedMapCacheClientImpl cacheClient = createCacheClient()
Map<String, String> clientProperties = new HashMap<>()
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), "localhost")
testRunner.addControllerService("client", cacheClient, clientProperties)
testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, "client")
testRunner.enableControllerService(cacheClient)
testRunner.run(1, false, true)
// COMMIT
@ -1496,131 +1447,5 @@ class CaptureChangeMySQLTest {
when(mockStatement.executeQuery(anyString())).thenReturn(mockResultSet)
return mockConnection
}
}
static DistributedMapCacheClientImpl createCacheClient() throws InitializationException {
final DistributedMapCacheClientImpl client = new DistributedMapCacheClientImpl()
final ComponentLog logger = new MockComponentLog("client", client)
final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client", logger, new MockStateManager(client))
client.initialize(clientInitContext)
return client
}
static
final class DistributedMapCacheClientImpl extends AbstractControllerService implements DistributedMapCacheClient {
private Map<String, String> cacheMap = new HashMap<>()
@Override
void close() throws IOException {
}
@Override
void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return [DistributedMapCacheClientService.HOSTNAME,
DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT,
DistributedMapCacheClientService.PORT,
DistributedMapCacheClientService.SSL_CONTEXT_SERVICE]
}
@Override
<K, V> boolean putIfAbsent(
final K key,
final V value,
final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
StringWriter keyWriter = new StringWriter()
keySerializer.serialize(key, new WriterOutputStream(keyWriter))
String keyString = keyWriter.toString()
if (cacheMap.containsKey(keyString)) return false
StringWriter valueWriter = new StringWriter()
valueSerializer.serialize(value, new WriterOutputStream(valueWriter))
return true
}
@Override
@SuppressWarnings("unchecked")
<K, V> V getAndPutIfAbsent(
final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
final Deserializer<V> valueDeserializer) throws IOException {
StringWriter keyWriter = new StringWriter()
keySerializer.serialize(key, new WriterOutputStream(keyWriter))
String keyString = keyWriter.toString()
if (cacheMap.containsKey(keyString)) return valueDeserializer.deserialize(cacheMap.get(keyString).bytes)
StringWriter valueWriter = new StringWriter()
valueSerializer.serialize(value, new WriterOutputStream(valueWriter))
return null
}
@Override
<K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
StringWriter keyWriter = new StringWriter()
keySerializer.serialize(key, new WriterOutputStream(keyWriter))
String keyString = keyWriter.toString()
return cacheMap.containsKey(keyString)
}
@Override
<K, V> V get(
final K key,
final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
StringWriter keyWriter = new StringWriter()
keySerializer.serialize(key, new WriterOutputStream(keyWriter))
String keyString = keyWriter.toString()
return (cacheMap.containsKey(keyString)) ? valueDeserializer.deserialize(cacheMap.get(keyString).bytes) : null
}
@Override
<K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
StringWriter keyWriter = new StringWriter()
serializer.serialize(key, new WriterOutputStream(keyWriter))
String keyString = keyWriter.toString()
boolean removed = (cacheMap.containsKey(keyString))
cacheMap.remove(keyString)
return removed
}
@Override
long removeByPattern(String regex) throws IOException {
final List<String> removedRecords = new ArrayList<>()
Pattern p = Pattern.compile(regex)
for (String key : cacheMap.keySet()) {
// Key must be backed by something that can be converted into a String
Matcher m = p.matcher(key)
if (m.matches()) {
removedRecords.add(cacheMap.get(key))
}
}
final long numRemoved = removedRecords.size()
removedRecords.each {cacheMap.remove(it)}
return numRemoved
}
@Override
<K, V> void put(
final K key,
final V value,
final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
StringWriter keyWriter = new StringWriter()
keySerializer.serialize(key, new WriterOutputStream(keyWriter))
StringWriter valueWriter = new StringWriter()
valueSerializer.serialize(value, new WriterOutputStream(valueWriter))
}
}
}

View File

@ -14,7 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event;
package org.apache.nifi.cdc.mysql.event.io;
import org.junit.jupiter.api.Test;
@ -23,17 +24,15 @@ import java.sql.Types;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
class TestInsertRowsWriter {
/**
* Unit Tests for MySQLCDCUtils utility class
*/
public class MySQLCDCUtilsTest {
@Test
public void testGetWritableObject() throws Exception {
assertNull(MySQLCDCUtils.getWritableObject(null, null));
assertNull(MySQLCDCUtils.getWritableObject(Types.INTEGER, null));
assertEquals((byte) 1, MySQLCDCUtils.getWritableObject(Types.INTEGER, (byte) 1));
assertEquals("Hello", MySQLCDCUtils.getWritableObject(Types.VARCHAR, "Hello".getBytes()));
public void testGetWritableObject() {
InsertRowsWriter insertRowsWriter = new InsertRowsWriter();
assertNull(insertRowsWriter.getWritableObject(null, null));
assertNull(insertRowsWriter.getWritableObject(Types.INTEGER, null));
assertEquals((byte) 1, insertRowsWriter.getWritableObject(Types.INTEGER, (byte) 1));
assertEquals("Hello", insertRowsWriter.getWritableObject(Types.VARCHAR, "Hello".getBytes()));
}
}