NIFI-3413: Add GetChangeDataCaptureMySQL processor

NIFI-3413: Incorporated review comments
NIFI-3413: Changed GetChangeDataCaptureMySQL to CaptureChangeMySQL, fixed some bugs
NIFI-3413: Refactored setup() for better error handling, more review comments incorporated
NIFI-3413: Refactored CDC into its own module(s), updated assembly and top-level POMs
NIFI-3413: Added RECEIVE prov event and Server ID property

Signed-off-by: ijokarumawak <ijokarumawak@apache.org>
This commit is contained in:
Matt Burgess 2017-03-23 18:43:04 -04:00 committed by ijokarumawak
parent 3386839ebc
commit 8f37ad4512
50 changed files with 4111 additions and 0 deletions

View File

@ -458,6 +458,11 @@ language governing permissions and limitations under the License. -->
<artifactId>nifi-stateful-analysis-nar</artifactId> <artifactId>nifi-stateful-analysis-nar</artifactId>
<type>nar</type> <type>nar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-cdc-mysql-nar</artifactId>
<type>nar</type>
</dependency>
</dependencies> </dependencies>
<profiles> <profiles>
<profile> <profile>

View File

@ -0,0 +1,43 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-cdc</artifactId>
<version>1.2.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-cdc-api</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc;
/**
* An exception class representing errors that occur during Change Data Capture (CDC) processing
*/
public class CDCException extends Exception {
public CDCException() {
}
public CDCException(String message) {
super(message);
}
public CDCException(String message, Throwable cause) {
super(message, cause);
}
public CDCException(Throwable cause) {
super(cause);
}
public CDCException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.event;
/**
* A base class for all MySQL binlog events
*/
public class BaseEventInfo implements EventInfo {
private final String eventType;
private final Long timestamp;
public BaseEventInfo(String eventType, Long timestamp) {
this.eventType = eventType;
this.timestamp = timestamp;
}
public String getEventType() {
return eventType;
}
public Long getTimestamp() {
return timestamp;
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.event;
import java.util.List;
/**
* A base class to use for representing information to row mutation events
*/
public class BaseRowEventInfo<RowEventDataType> extends BaseTableEventInfo implements RowEventInfo<RowEventDataType> {
protected List<RowEventDataType> rows;
public BaseRowEventInfo(TableInfo tableInfo, String eventType, Long timestamp, List<RowEventDataType> rows) {
super(tableInfo, eventType, timestamp);
this.rows = rows;
}
public List<RowEventDataType> getRows() {
return rows;
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.event;
import java.util.List;
/**
* An abstract base class for all MySQL binlog events affecting a table.
*/
public class BaseTableEventInfo extends BaseEventInfo implements TableEventInfo {
private String databaseName;
private String tableName;
private Long tableId;
private List<ColumnDefinition> columns;
public BaseTableEventInfo(TableInfo tableInfo, String eventType, Long timestamp) {
super(eventType, timestamp);
if (tableInfo != null) {
this.databaseName = tableInfo.getDatabaseName();
this.tableName = tableInfo.getTableName();
this.tableId = tableInfo.getTableId();
this.columns = tableInfo.getColumns();
}
}
public String getDatabaseName() {
return databaseName;
}
public String getTableName() {
return tableName;
}
public Long getTableId() {
return tableId;
}
public List<ColumnDefinition> getColumns() {
return columns;
}
public ColumnDefinition getColumnByIndex(int i) {
try {
return columns.get(i);
} catch (IndexOutOfBoundsException | NullPointerException e) {
return null;
}
}
}

View File

@ -0,0 +1,73 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.event;
import org.apache.commons.lang3.builder.EqualsBuilder;
/**
* A class that specifies a definition for a relational table column, including type, name, etc.
*/
public class ColumnDefinition {
private int type;
private String name = "";
public ColumnDefinition(int type) {
this.type = type;
}
public ColumnDefinition(int type, String name) {
this(type);
this.name = name;
}
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ColumnDefinition that = (ColumnDefinition) o;
return new EqualsBuilder()
.append(type, that.type)
.append(name, that.name)
.isEquals();
}
@Override
public int hashCode() {
int result = type;
result = 31 * result + (name != null ? name.hashCode() : 0);
return result;
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.event;
/**
* An interface representing a data structure containing event information, and serialization/deserlization methods.
*/
public interface EventInfo {
// Event type constants
String BEGIN_EVENT = "begin";
String COMMIT_EVENT = "commit";
String WRITE_EVENT = "write";
String DELETE_EVENT = "delete";
String UPDATE_EVENT = "update";
String SCHEMA_CHANGE = "schema_change";
String getEventType();
Long getTimestamp();
}

View File

@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.event;
import java.io.IOException;
/**
* A marker class for Exceptions that occur during the handling of CDC row modification events
*/
public class RowEventException extends IOException {
public RowEventException() {
}
public RowEventException(String message) {
super(message);
}
public RowEventException(String message, Throwable cause) {
super(message, cause);
}
public RowEventException(Throwable cause) {
super(cause);
}
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.event;
import java.util.List;
/**
* An interface corresponding to row-level info from events on tabular data structures.
*/
public interface RowEventInfo<RowEventDataType> extends TableEventInfo {
List<RowEventDataType> getRows();
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.event;
import java.util.List;
/**
* An interface for methods related to events that occur on tabular data structures.
*/
public interface TableEventInfo extends EventInfo {
String getDatabaseName();
String getTableName();
Long getTableId();
List<ColumnDefinition> getColumns();
ColumnDefinition getColumnByIndex(int i);
}

View File

@ -0,0 +1,155 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.event;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* A POJO for holding table information related to update events.
*/
public class TableInfo {
final static String DB_TABLE_NAME_DELIMITER = "@!@";
private String databaseName;
private String tableName;
private Long tableId;
private List<ColumnDefinition> columns;
public TableInfo(String databaseName, String tableName, Long tableId, List<ColumnDefinition> columns) {
this.databaseName = databaseName;
this.tableName = tableName;
this.tableId = tableId;
this.columns = columns;
}
public String getDatabaseName() {
return databaseName;
}
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
public Long getTableId() {
return tableId;
}
public List<ColumnDefinition> getColumns() {
return columns;
}
public void setColumns(List<ColumnDefinition> columns) {
this.columns = columns;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TableInfo that = (TableInfo) o;
return new EqualsBuilder()
.append(databaseName, that.databaseName)
.append(tableName, that.tableName)
.append(tableId, that.tableId)
.append(columns, that.columns)
.isEquals();
}
@Override
public int hashCode() {
int result = databaseName.hashCode();
result = 31 * result + tableName.hashCode();
result = 31 * result + tableId.hashCode();
result = 31 * result + (columns != null ? columns.hashCode() : 0);
return result;
}
public static class Serializer implements org.apache.nifi.distributed.cache.client.Serializer<TableInfo> {
@Override
public void serialize(TableInfo value, OutputStream output) throws SerializationException, IOException {
StringBuilder sb = new StringBuilder(value.getDatabaseName());
sb.append(DB_TABLE_NAME_DELIMITER);
sb.append(value.getTableName());
sb.append(DB_TABLE_NAME_DELIMITER);
sb.append(value.getTableId());
List<ColumnDefinition> columnDefinitions = value.getColumns();
if (columnDefinitions != null && !columnDefinitions.isEmpty()) {
sb.append(DB_TABLE_NAME_DELIMITER);
sb.append(columnDefinitions.stream().map((col) -> col.getName() + DB_TABLE_NAME_DELIMITER + col.getType()).collect(Collectors.joining(DB_TABLE_NAME_DELIMITER)));
}
output.write(sb.toString().getBytes());
}
}
public static class Deserializer implements org.apache.nifi.distributed.cache.client.Deserializer<TableInfo> {
@Override
public TableInfo deserialize(byte[] input) throws DeserializationException, IOException {
// Don't bother deserializing if empty, just return null. This usually happens when the key is not found in the cache
if (input == null || input.length == 0) {
return null;
}
String inputString = new String(input);
String[] tokens = inputString.split(DB_TABLE_NAME_DELIMITER);
int numTokens = tokens.length;
if (numTokens < 3) {
throw new IOException("Could not deserialize TableInfo from the following value: " + inputString);
}
String dbName = tokens[0];
String tableName = tokens[1];
Long tableId;
try {
tableId = Long.parseLong(tokens[2]);
} catch (NumberFormatException nfe) {
throw new IOException("Illegal table ID: " + tokens[2]);
}
// Parse column names and types
List<ColumnDefinition> columnDefinitions = new ArrayList<>();
for (int i = 0; i < numTokens - 3; i += 2) {
try {
int columnTypeIndex = i + 4;
int columnNameIndex = i + 3;
if (columnTypeIndex < numTokens) {
columnDefinitions.add(new ColumnDefinition(Integer.parseInt(tokens[columnTypeIndex]), tokens[columnNameIndex]));
} else {
throw new IOException("No type detected for column: " + tokens[columnNameIndex]);
}
} catch (NumberFormatException nfe) {
throw new IOException("Illegal column type value for column " + (i / 2 + 1) + ": " + tokens[i + 4]);
}
}
return new TableInfo(dbName, tableName, tableId, columnDefinitions);
}
}
}

View File

@ -0,0 +1,98 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.event;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.nifi.distributed.cache.client.exception.SerializationException;
import java.io.IOException;
import java.io.OutputStream;
import static org.apache.nifi.cdc.event.TableInfo.DB_TABLE_NAME_DELIMITER;
/**
* This class represents a key in a cache that contains information (column definitions, e.g.) for a database table
*/
public class TableInfoCacheKey {
private final String databaseName;
private final String tableName;
private final long tableId;
private final String uuidPrefix;
public TableInfoCacheKey(String uuidPrefix, String databaseName, String tableName, long tableId) {
this.uuidPrefix = uuidPrefix;
this.databaseName = databaseName;
this.tableName = tableName;
this.tableId = tableId;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TableInfoCacheKey that = (TableInfoCacheKey) o;
return new EqualsBuilder()
.append(tableId, that.tableId)
.append(databaseName, that.databaseName)
.append(tableName, that.tableName)
.append(uuidPrefix, that.uuidPrefix)
.isEquals();
}
@Override
public int hashCode() {
int result = databaseName != null ? databaseName.hashCode() : 0;
result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
result = 31 * result + (int) (tableId ^ (tableId >>> 32));
result = 31 * result + (uuidPrefix != null ? uuidPrefix.hashCode() : 0);
return result;
}
public String getDatabaseName() {
return databaseName;
}
public String getTableName() {
return tableName;
}
public long getTableId() {
return tableId;
}
public String getUuidPrefix() {
return uuidPrefix;
}
public static class Serializer implements org.apache.nifi.distributed.cache.client.Serializer<TableInfoCacheKey> {
@Override
public void serialize(TableInfoCacheKey key, OutputStream output) throws SerializationException, IOException {
StringBuilder sb = new StringBuilder(key.getUuidPrefix());
sb.append(DB_TABLE_NAME_DELIMITER);
sb.append(key.getDatabaseName());
sb.append(DB_TABLE_NAME_DELIMITER);
sb.append(key.getTableName());
sb.append(DB_TABLE_NAME_DELIMITER);
sb.append(key.getTableId());
output.write(sb.toString().getBytes());
}
}
}

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.event.io;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import org.apache.nifi.cdc.event.EventInfo;
import java.io.IOException;
import java.io.OutputStream;
/**
* An abstract class that outputs common information (event type, timestamp, e.g.) about CDC events.
*/
public abstract class AbstractEventWriter<T extends EventInfo> implements EventWriter<T> {
private final JsonFactory JSON_FACTORY = new JsonFactory();
protected JsonGenerator jsonGenerator;
// Common method to create a JSON generator and start the root object. Should be called by sub-classes unless they need their own generator and such.
protected void startJson(OutputStream outputStream, T event) throws IOException {
jsonGenerator = createJsonGenerator(outputStream);
jsonGenerator.writeStartObject();
String eventType = event.getEventType();
if (eventType == null) {
jsonGenerator.writeNullField("type");
} else {
jsonGenerator.writeStringField("type", eventType);
}
Long timestamp = event.getTimestamp();
if (timestamp == null) {
jsonGenerator.writeNullField("timestamp");
} else {
jsonGenerator.writeNumberField("timestamp", event.getTimestamp());
}
}
protected void endJson() throws IOException {
if (jsonGenerator == null) {
throw new IOException("endJson called without a JsonGenerator");
}
jsonGenerator.writeEndObject();
jsonGenerator.flush();
jsonGenerator.close();
}
private JsonGenerator createJsonGenerator(OutputStream out) throws IOException {
return JSON_FACTORY.createGenerator(out);
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.event.io;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.cdc.event.EventInfo;
import org.apache.nifi.processor.Relationship;
/**
* An interface to write an event to the process session. Note that a single event may produce multiple flow files.
*/
public interface EventWriter<T extends EventInfo> {
String APPLICATION_JSON = "application/json";
String SEQUENCE_ID_KEY = "cdc.sequence.id";
String CDC_EVENT_TYPE_ATTRIBUTE = "cdc.event.type";
/**
* Writes the given event to the process session, possibly via transferring it to the specified relationship (usually used for success)
*
* @param session The session to write the event to
* @param transitUri The URI indicating the source MySQL system from which the specified event is associated
* @param eventInfo The event data
* @param currentSequenceId the current sequence ID
* @param relationship A relationship to transfer any flowfile(s) to
* @return a sequence ID, usually incremented from the specified current sequence id by the number of flow files transferred and/or committed
*/
long writeEvent(final ProcessSession session, String transitUri, final T eventInfo, final long currentSequenceId, Relationship relationship);
}

View File

@ -0,0 +1,40 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-cdc-mysql-bundle</artifactId>
<version>1.2.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-cdc-mysql-nar</artifactId>
<packaging>nar</packaging>
<description>NiFi MySQL Change Data Capture (CDC) NAR</description>
<properties>
<maven.javadoc.skip>true</maven.javadoc.skip>
<source.skip>true</source.skip>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-standard-services-api-nar</artifactId>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-cdc-mysql-processors</artifactId>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,5 @@
nifi-cdc-mysql-nar
Copyright 2014-2017 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).

View File

@ -0,0 +1,67 @@
<?xml version="1.0"?>
<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
license agreements. See the NOTICE file distributed with this work for additional
information regarding copyright ownership. The ASF licenses this file to
You under the Apache License, Version 2.0 (the "License"); you may not use
this file except in compliance with the License. You may obtain a copy of
the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
by applicable law or agreed to in writing, software distributed under the
License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
OF ANY KIND, either express or implied. See the License for the specific
language governing permissions and limitations under the License. -->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-cdc-mysql-bundle</artifactId>
<version>1.2.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-cdc-mysql-processors</artifactId>
<packaging>jar</packaging>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-cdc-api</artifactId>
<version>1.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
</dependency>
<dependency>
<groupId>com.github.shyiko</groupId>
<artifactId>mysql-binlog-connector-java</artifactId>
<version>0.11.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-distributed-cache-client-service</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-ssl-context-service</artifactId>
<version>1.2.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event;
import org.apache.nifi.cdc.event.BaseEventInfo;
/**
* A base class for all MYSQL binlog events
*/
public class BaseBinlogEventInfo extends BaseEventInfo implements BinlogEventInfo {
private String binlogFilename;
private Long binlogPosition;
public BaseBinlogEventInfo(String eventType, Long timestamp, String binlogFilename, Long binlogPosition) {
super(eventType, timestamp);
this.binlogFilename = binlogFilename;
this.binlogPosition = binlogPosition;
}
public String getBinlogFilename() {
return binlogFilename;
}
public Long getBinlogPosition() {
return binlogPosition;
}
}

View File

@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event;
import org.apache.nifi.cdc.event.BaseRowEventInfo;
import org.apache.nifi.cdc.event.RowEventInfo;
import org.apache.nifi.cdc.event.TableInfo;
import java.util.BitSet;
import java.util.List;
/**
* A base class to help store information about a row mutation event (UPDATE, DELETE, etc.)
*/
public class BaseBinlogRowEventInfo<RowEventDataType> extends BaseBinlogTableEventInfo implements RowEventInfo<RowEventDataType> {
private BitSet includedColumns;
private RowEventInfo<RowEventDataType> delegate;
public BaseBinlogRowEventInfo(TableInfo tableInfo, String type, Long timestamp, String binlogFilename, Long binlogPosition, BitSet includedColumns, List<RowEventDataType> rows) {
super(tableInfo, type, timestamp, binlogFilename, binlogPosition);
this.includedColumns = includedColumns;
this.delegate = new BaseRowEventInfo<>(tableInfo, type, timestamp, rows);
}
public BitSet getIncludedColumns() {
return includedColumns;
}
@Override
public List<RowEventDataType> getRows() {
return delegate.getRows();
}
}

View File

@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event;
import org.apache.nifi.cdc.event.BaseTableEventInfo;
import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.event.TableEventInfo;
import org.apache.nifi.cdc.event.TableInfo;
import java.util.List;
/**
* A base class to handle data common to binlog table events, such as database name, table name, etc.
*/
public class BaseBinlogTableEventInfo extends BaseBinlogEventInfo implements BinlogTableEventInfo {
private TableEventInfo delegate;
public BaseBinlogTableEventInfo(TableInfo tableInfo, String eventType, Long timestamp, String binlogFilename, Long binlogPosition) {
super(eventType, timestamp, binlogFilename, binlogPosition);
this.delegate = new BaseTableEventInfo(tableInfo, SCHEMA_CHANGE, timestamp);
}
@Override
public String getDatabaseName() {
return delegate.getDatabaseName();
}
@Override
public String getTableName() {
return delegate.getTableName();
}
@Override
public Long getTableId() {
return delegate.getTableId();
}
@Override
public List<ColumnDefinition> getColumns() {
return delegate.getColumns();
}
@Override
public ColumnDefinition getColumnByIndex(int i) {
return delegate.getColumnByIndex(i);
}
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event;
/**
* An event implementation corresponding to the beginning of a MySQL transaction (update rows, e.g.)
*/
public class BeginTransactionEventInfo extends BaseBinlogEventInfo {
public BeginTransactionEventInfo(Long timestamp, String binlogFilename, long binlogPosition) {
super(BEGIN_EVENT, timestamp, binlogFilename, binlogPosition);
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event;
import org.apache.nifi.cdc.event.EventInfo;
/**
* An interface for MYSQL binlog-specific events.
*/
public interface BinlogEventInfo extends EventInfo {
String BINLOG_FILENAME_KEY = "binlog.filename";
String BINLOG_POSITION_KEY = "binlog.position";
String getBinlogFilename();
Long getBinlogPosition();
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.Event;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* An event listener wrapper for MYSQL binlog events generated from the mysql-binlog-connector.
*/
public class BinlogEventListener implements BinaryLogClient.EventListener {
protected final AtomicBoolean stopNow = new AtomicBoolean(false);
private static final int QUEUE_OFFER_TIMEOUT_MSEC = 100;
private final BlockingQueue<RawBinlogEvent> queue;
private final BinaryLogClient client;
public BinlogEventListener(BinaryLogClient client, BlockingQueue<RawBinlogEvent> q) {
this.client = client;
this.queue = q;
}
public void start() {
stopNow.set(false);
}
public void stop() {
stopNow.set(true);
}
@Override
public void onEvent(Event event) {
while (!stopNow.get()) {
RawBinlogEvent ep = new RawBinlogEvent(event, client.getBinlogFilename());
try {
if (queue.offer(ep, QUEUE_OFFER_TIMEOUT_MSEC, TimeUnit.MILLISECONDS)) {
return;
} else {
throw new RuntimeException("Unable to add event to the queue");
}
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while adding event to the queue");
}
}
}
}

View File

@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import java.util.concurrent.atomic.AtomicReference;
/**
* An listener wrapper for mysql-binlog-connector lifecycle events.
*/
public class BinlogLifecycleListener implements BinaryLogClient.LifecycleListener {
AtomicReference<BinaryLogClient> client = new AtomicReference<>(null);
AtomicReference<Exception> exception = new AtomicReference<>(null);
@Override
public void onConnect(BinaryLogClient binaryLogClient) {
client.set(binaryLogClient);
exception.set(null);
}
@Override
public void onCommunicationFailure(BinaryLogClient binaryLogClient, Exception e) {
client.set(binaryLogClient);
exception.set(e);
}
@Override
public void onEventDeserializationFailure(BinaryLogClient binaryLogClient, Exception e) {
client.set(binaryLogClient);
exception.set(e);
}
@Override
public void onDisconnect(BinaryLogClient binaryLogClient) {
client.set(binaryLogClient);
}
public BinaryLogClient getClient() {
return client.get();
}
public Exception getException() {
return exception.get();
}
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event;
import org.apache.nifi.cdc.event.TableEventInfo;
/**
* A marker interface for those classes wishing to implement binlog-specific methods as well as table-generic methods (getDatabase, e.g.)
*/
public interface BinlogTableEventInfo extends BinlogEventInfo, TableEventInfo {
}

View File

@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event;
/**
* An event implementation corresponding to the beginning of a MySQL transaction (update rows, e.g.)
*/
public class CommitTransactionEventInfo extends BaseBinlogEventInfo {
public CommitTransactionEventInfo(Long timestamp, String binlogFilename, long binlogPosition) {
super(COMMIT_EVENT, timestamp, binlogFilename, binlogPosition);
}
}

View File

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import org.apache.nifi.cdc.event.TableInfo;
import java.io.Serializable;
/**
* This class represents information about rows deleted from a MySQL table
*/
public class DeleteRowsEventInfo extends BaseBinlogRowEventInfo<Serializable[]> {
public DeleteRowsEventInfo(TableInfo tableInfo, Long timestamp, String binlogFilename, Long binlogPosition, DeleteRowsEventData data) {
super(tableInfo, DELETE_EVENT, timestamp, binlogFilename, binlogPosition, data.getIncludedColumns(), data.getRows());
}
}

View File

@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import org.apache.nifi.cdc.event.TableInfo;
import java.io.Serializable;
/**
* This class represents information about rows written/added to a MySQL table
*/
public class InsertRowsEventInfo extends BaseBinlogRowEventInfo<Serializable[]> {
private WriteRowsEventData data;
public InsertRowsEventInfo(TableInfo tableInfo, Long timestamp, String binlogFilename, Long binlogPosition, WriteRowsEventData data) {
super(tableInfo, WRITE_EVENT, timestamp, binlogFilename, binlogPosition, data.getIncludedColumns(), data.getRows());
this.data = data;
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event;
import java.io.Serializable;
/**
* A utility class to provide MySQL- / binlog-specific constants and methods for processing events and data
*/
public class MySQLCDCUtils {
public static Object getWritableObject(Integer type, Serializable value) {
if (value == null) {
return null;
}
if (type == null) {
if (value instanceof byte[]) {
return new String((byte[]) value);
} else if (value instanceof Number) {
return value;
}
} else if (value instanceof Number) {
return value;
} else {
if (value instanceof byte[]) {
return new String((byte[]) value);
} else {
return value.toString();
}
}
return null;
}
}

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event;
import com.github.shyiko.mysql.binlog.event.Event;
/**
* An object holder for raw binlog events
*/
public class RawBinlogEvent {
private Event event;
private String binlogFilename;
public RawBinlogEvent(Event event, String binlogFilename) {
this.event = event;
this.binlogFilename = binlogFilename;
}
public Event getEvent() {
return event;
}
public void setEvent(Event event) {
this.event = event;
}
public String getBinlogFilename() {
return binlogFilename;
}
}

View File

@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event;
import org.apache.nifi.cdc.event.TableEventInfo;
import org.apache.nifi.cdc.event.TableInfo;
/**
* An event class corresponding to table schema changes (add/drop column, add/drop table, etc.)
*/
public class SchemaChangeEventInfo extends BaseBinlogTableEventInfo implements TableEventInfo {
private String query;
public SchemaChangeEventInfo(TableInfo tableInfo, Long timestamp, String binlogFilename, long binlogPosition, String query) {
super(tableInfo, SCHEMA_CHANGE, timestamp, binlogFilename, binlogPosition);
this.query = query;
}
public String getQuery() {
return query;
}
}

View File

@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import org.apache.nifi.cdc.event.TableInfo;
import java.io.Serializable;
import java.util.BitSet;
import java.util.Map;
/**
* This class represents information about rows written/added to a MySQL table
*/
public class UpdateRowsEventInfo extends BaseBinlogRowEventInfo<Map.Entry<Serializable[], Serializable[]>> {
private BitSet includedColumnsBeforeUpdate;
public UpdateRowsEventInfo(TableInfo tableInfo, Long timestamp, String binlogFilename, Long binlogPosition, UpdateRowsEventData data) {
super(tableInfo, UPDATE_EVENT, timestamp, binlogFilename, binlogPosition, data.getIncludedColumns(), data.getRows());
includedColumnsBeforeUpdate = data.getIncludedColumnsBeforeUpdate();
}
public BitSet getIncludedColumnsBeforeUpdate() {
return includedColumnsBeforeUpdate;
}
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.mysql.event.BinlogEventInfo;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.cdc.event.io.AbstractEventWriter;
import org.apache.nifi.processor.Relationship;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* An abstract base class for writing MYSQL binlog events into flow file(s), e.g.
*/
public abstract class AbstractBinlogEventWriter<T extends BinlogEventInfo> extends AbstractEventWriter<T> {
protected void writeJson(T event) throws IOException {
jsonGenerator.writeStringField("binlog_filename", event.getBinlogFilename());
jsonGenerator.writeNumberField("binlog_position", event.getBinlogPosition());
}
protected Map<String, String> getCommonAttributes(final long sequenceId, BinlogEventInfo eventInfo) {
return new HashMap<String, String>() {
{
put(SEQUENCE_ID_KEY, Long.toString(sequenceId));
put(CDC_EVENT_TYPE_ATTRIBUTE, eventInfo.getEventType());
put(BinlogEventInfo.BINLOG_FILENAME_KEY, eventInfo.getBinlogFilename());
put(BinlogEventInfo.BINLOG_POSITION_KEY, Long.toString(eventInfo.getBinlogPosition()));
put(CoreAttributes.MIME_TYPE.key(), APPLICATION_JSON);
}
};
}
// Default implementation for binlog events
@Override
public long writeEvent(ProcessSession session, String transitUri, T eventInfo, long currentSequenceId, Relationship relationship) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, (outputStream) -> {
super.startJson(outputStream, eventInfo);
writeJson(eventInfo);
// Nothing in the body
super.endJson();
});
flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo));
session.transfer(flowFile, relationship);
session.getProvenanceReporter().receive(flowFile, transitUri);
return currentSequenceId + 1;
}
}

View File

@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.cdc.mysql.event.BinlogTableEventInfo;
import java.io.IOException;
/**
* An abstract base class for writing MYSQL table-related binlog events into flow file(s), e.g.
*/
public abstract class AbstractBinlogTableEventWriter<T extends BinlogTableEventInfo> extends AbstractBinlogEventWriter<T> {
protected void writeJson(T event) throws IOException {
super.writeJson(event);
if (event.getDatabaseName() != null) {
jsonGenerator.writeStringField("database", event.getDatabaseName());
} else {
jsonGenerator.writeNullField("database");
}
if (event.getTableName() != null) {
jsonGenerator.writeStringField("table_name", event.getTableName());
} else {
jsonGenerator.writeNullField("table_name");
}
if (event.getTableId() != null) {
jsonGenerator.writeNumberField("table_id", event.getTableId());
} else {
jsonGenerator.writeNullField("table_id");
}
}
// Default implementation for table-related binlog events
@Override
public long writeEvent(ProcessSession session, String transitUri, T eventInfo, long currentSequenceId, Relationship relationship) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, (outputStream) -> {
super.startJson(outputStream, eventInfo);
writeJson(eventInfo);
// Nothing in the body
super.endJson();
});
flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo));
session.transfer(flowFile, relationship);
session.getProvenanceReporter().receive(flowFile, transitUri);
return currentSequenceId + 1;
}
}

View File

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.mysql.event.BeginTransactionEventInfo;
/**
* A writer for events corresponding to the beginning of a MySQL transaction
*/
public class BeginTransactionEventWriter extends AbstractBinlogEventWriter<BeginTransactionEventInfo> {
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
/**
* A writer for events corresponding to the end (i.e. commit) of a MySQL transaction
*/
public class CommitTransactionEventWriter extends AbstractBinlogEventWriter<CommitTransactionEventInfo> {
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.mysql.event.DeleteRowsEventInfo;
import org.apache.nifi.processor.Relationship;
import java.io.IOException;
import java.io.Serializable;
import java.util.BitSet;
import java.util.concurrent.atomic.AtomicLong;
/**
* A writer class to output MySQL binlog "delete rows" events to flow file(s).
*/
public class DeleteRowsWriter extends AbstractBinlogTableEventWriter<DeleteRowsEventInfo> {
/**
* Creates and transfers a new flow file whose contents are the JSON-serialized value of the specified event, and the sequence ID attribute set
*
* @param session A reference to a ProcessSession from which the flow file(s) will be created and transferred
* @param eventInfo An event whose value will become the contents of the flow file
* @return The next available CDC sequence ID for use by the CDC processor
*/
@Override
public long writeEvent(final ProcessSession session, String transitUri, final DeleteRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) {
final AtomicLong seqId = new AtomicLong(currentSequenceId);
for (Serializable[] row : eventInfo.getRows()) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, outputStream -> {
super.startJson(outputStream, eventInfo);
super.writeJson(eventInfo);
final BitSet bitSet = eventInfo.getIncludedColumns();
writeRow(eventInfo, row, bitSet);
super.endJson();
});
flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo));
session.transfer(flowFile, relationship);
session.getProvenanceReporter().receive(flowFile, transitUri);
seqId.getAndIncrement();
}
return seqId.get();
}
protected void writeRow(DeleteRowsEventInfo event, Serializable[] row, BitSet includedColumns) throws IOException {
jsonGenerator.writeArrayFieldStart("columns");
int i = includedColumns.nextSetBit(0);
while (i != -1) {
jsonGenerator.writeStartObject();
jsonGenerator.writeNumberField("id", i + 1);
ColumnDefinition columnDefinition = event.getColumnByIndex(i);
Integer columnType = null;
if (columnDefinition != null) {
jsonGenerator.writeStringField("name", columnDefinition.getName());
columnType = columnDefinition.getType();
jsonGenerator.writeNumberField("column_type", columnType);
}
if (row[i] == null) {
jsonGenerator.writeNullField("value");
} else {
jsonGenerator.writeObjectField("value", MySQLCDCUtils.getWritableObject(columnType, row[i]));
}
jsonGenerator.writeEndObject();
i = includedColumns.nextSetBit(i + 1);
}
jsonGenerator.writeEndArray();
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.mysql.event.InsertRowsEventInfo;
import org.apache.nifi.processor.Relationship;
import java.io.IOException;
import java.io.Serializable;
import java.util.BitSet;
import java.util.concurrent.atomic.AtomicLong;
/**
* A writer class to output MySQL binlog "write rows" (aka INSERT) events to flow file(s).
*/
public class InsertRowsWriter extends AbstractBinlogTableEventWriter<InsertRowsEventInfo> {
/**
* Creates and transfers a new flow file whose contents are the JSON-serialized value of the specified event, and the sequence ID attribute set
*
* @param session A reference to a ProcessSession from which the flow file(s) will be created and transferred
* @param eventInfo An event whose value will become the contents of the flow file
* @return The next available CDC sequence ID for use by the CDC processor
*/
@Override
public long writeEvent(final ProcessSession session, String transitUri, final InsertRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) {
final AtomicLong seqId = new AtomicLong(currentSequenceId);
for (Serializable[] row : eventInfo.getRows()) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, outputStream -> {
super.startJson(outputStream, eventInfo);
super.writeJson(eventInfo);
final BitSet bitSet = eventInfo.getIncludedColumns();
writeRow(eventInfo, row, bitSet);
super.endJson();
});
flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo));
session.transfer(flowFile, relationship);
session.getProvenanceReporter().receive(flowFile, transitUri);
seqId.getAndIncrement();
}
return seqId.get();
}
protected void writeRow(InsertRowsEventInfo event, Serializable[] row, BitSet includedColumns) throws IOException {
jsonGenerator.writeArrayFieldStart("columns");
int i = includedColumns.nextSetBit(0);
while (i != -1) {
jsonGenerator.writeStartObject();
jsonGenerator.writeNumberField("id", i + 1);
ColumnDefinition columnDefinition = event.getColumnByIndex(i);
Integer columnType = null;
if (columnDefinition != null) {
jsonGenerator.writeStringField("name", columnDefinition.getName());
columnType = columnDefinition.getType();
jsonGenerator.writeNumberField("column_type", columnType);
}
if (row[i] == null) {
jsonGenerator.writeNullField("value");
} else {
jsonGenerator.writeObjectField("value", MySQLCDCUtils.getWritableObject(columnType, row[i]));
}
jsonGenerator.writeEndObject();
i = includedColumns.nextSetBit(i + 1);
}
jsonGenerator.writeEndArray();
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.cdc.mysql.event.SchemaChangeEventInfo;
/**
* A writer class to output MySQL binlog "schema change" (ALTER TABLE, e.g.) events to flow file(s).
*/
public class SchemaChangeEventWriter extends AbstractBinlogTableEventWriter<SchemaChangeEventInfo> {
@Override
public long writeEvent(ProcessSession session, String transitUri, SchemaChangeEventInfo eventInfo, long currentSequenceId, Relationship relationship) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, (outputStream) -> {
super.startJson(outputStream, eventInfo);
super.writeJson(eventInfo);
jsonGenerator.writeStringField("query", eventInfo.getQuery());
super.endJson();
});
flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo));
session.transfer(flowFile, relationship);
session.getProvenanceReporter().receive(flowFile, transitUri);
return currentSequenceId + 1;
}
}

View File

@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.mysql.event.MySQLCDCUtils;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.cdc.event.ColumnDefinition;
import org.apache.nifi.cdc.mysql.event.UpdateRowsEventInfo;
import org.apache.nifi.processor.Relationship;
import java.io.IOException;
import java.io.Serializable;
import java.util.BitSet;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
/**
* A writer class to output MySQL binlog "write rows" (aka INSERT) events to flow file(s).
*/
public class UpdateRowsWriter extends AbstractBinlogTableEventWriter<UpdateRowsEventInfo> {
/**
* Creates and transfers a new flow file whose contents are the JSON-serialized value of the specified event, and the sequence ID attribute set
*
* @param session A reference to a ProcessSession from which the flow file(s) will be created and transferred
* @param eventInfo An event whose value will become the contents of the flow file
* @return The next available CDC sequence ID for use by the CDC processor
*/
@Override
public long writeEvent(final ProcessSession session, String transitUri, final UpdateRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) {
final AtomicLong seqId = new AtomicLong(currentSequenceId);
for (Map.Entry<Serializable[], Serializable[]> row : eventInfo.getRows()) {
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, outputStream -> {
super.startJson(outputStream, eventInfo);
super.writeJson(eventInfo);
final BitSet bitSet = eventInfo.getIncludedColumns();
writeRow(eventInfo, row, bitSet);
super.endJson();
});
flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo));
session.transfer(flowFile, relationship);
session.getProvenanceReporter().receive(flowFile, transitUri);
seqId.getAndIncrement();
}
return seqId.get();
}
protected void writeRow(UpdateRowsEventInfo event, Map.Entry<Serializable[], Serializable[]> row, BitSet includedColumns) throws IOException {
jsonGenerator.writeArrayFieldStart("columns");
int i = includedColumns.nextSetBit(0);
while (i != -1) {
jsonGenerator.writeStartObject();
jsonGenerator.writeNumberField("id", i + 1);
ColumnDefinition columnDefinition = event.getColumnByIndex(i);
Integer columnType = null;
if (columnDefinition != null) {
jsonGenerator.writeStringField("name", columnDefinition.getName());
columnType = columnDefinition.getType();
jsonGenerator.writeNumberField("column_type", columnType);
}
Serializable[] oldRow = row.getKey();
Serializable[] newRow = row.getValue();
if (oldRow[i] == null) {
jsonGenerator.writeNullField("last_value");
} else {
jsonGenerator.writeObjectField("last_value", MySQLCDCUtils.getWritableObject(columnType, oldRow[i]));
}
if (newRow[i] == null) {
jsonGenerator.writeNullField("value");
} else {
jsonGenerator.writeObjectField("value", MySQLCDCUtils.getWritableObject(columnType, newRow[i]));
}
jsonGenerator.writeEndObject();
i = includedColumns.nextSetBit(i + 1);
}
jsonGenerator.writeEndArray();
}
}

View File

@ -0,0 +1,15 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql
import com.github.shyiko.mysql.binlog.BinaryLogClient
import com.github.shyiko.mysql.binlog.event.Event
import java.util.concurrent.TimeoutException
/**
* A mock implementation for BinaryLogClient, in order to unit test the connection and event handling logic
*/
class MockBinlogClient extends BinaryLogClient {
String hostname
int port
String username
String password
boolean connected
boolean connectionTimeout = false
boolean connectionError = false
List<BinaryLogClient.EventListener> eventListeners = []
List<BinaryLogClient.LifecycleListener> lifecycleListeners = []
MockBinlogClient(String hostname, int port, String username, String password) {
super(hostname, port, username, password)
this.hostname = hostname
this.port = port
this.username = username
this.password = password
}
@Override
void connect(long timeoutInMilliseconds) throws IOException, TimeoutException {
if (connectionTimeout) {
throw new TimeoutException('Connection timed out')
}
if (connectionError) {
throw new IOException('Error during connect')
}
if (password == null) {
throw new NullPointerException('''Password can't be null''')
}
connected = true
}
@Override
void disconnect() throws IOException {
connected = false
}
@Override
void registerEventListener(BinaryLogClient.EventListener eventListener) {
if (!eventListeners.contains(eventListener)) {
eventListeners.add eventListener
}
}
@Override
void unregisterEventListener(BinaryLogClient.EventListener eventListener) {
eventListeners.remove eventListener
}
@Override
void registerLifecycleListener(BinaryLogClient.LifecycleListener lifecycleListener) {
if (!lifecycleListeners.contains(lifecycleListener)) {
lifecycleListeners.add lifecycleListener
}
}
@Override
void unregisterLifecycleListener(BinaryLogClient.LifecycleListener lifecycleListener) {
lifecycleListeners.remove lifecycleListener
}
def sendEvent(Event event) {
eventListeners.each { it.onEvent(event) }
}
}

View File

@ -0,0 +1,795 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.processors
import com.github.shyiko.mysql.binlog.BinaryLogClient
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData
import com.github.shyiko.mysql.binlog.event.Event
import com.github.shyiko.mysql.binlog.event.EventData
import com.github.shyiko.mysql.binlog.event.EventHeaderV4
import com.github.shyiko.mysql.binlog.event.EventType
import com.github.shyiko.mysql.binlog.event.QueryEventData
import com.github.shyiko.mysql.binlog.event.RotateEventData
import com.github.shyiko.mysql.binlog.event.TableMapEventData
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData
import org.apache.commons.io.output.WriterOutputStream
import org.apache.nifi.cdc.mysql.MockBinlogClient
import org.apache.nifi.cdc.mysql.event.BinlogEventInfo
import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.components.state.Scope
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.distributed.cache.client.Deserializer
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient
import org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService
import org.apache.nifi.distributed.cache.client.Serializer
import org.apache.nifi.flowfile.attributes.CoreAttributes
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.cdc.event.ColumnDefinition
import org.apache.nifi.cdc.event.TableInfo
import org.apache.nifi.cdc.event.TableInfoCacheKey
import org.apache.nifi.cdc.event.io.EventWriter
import org.apache.nifi.provenance.ProvenanceEventType
import org.apache.nifi.reporting.InitializationException
import org.apache.nifi.state.MockStateManager
import org.apache.nifi.util.MockComponentLog
import org.apache.nifi.util.MockControllerServiceInitializationContext
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.junit.After
import org.junit.Before
import org.junit.Test
import java.sql.Connection
import java.sql.ResultSet
import java.sql.SQLException
import java.sql.Statement
import java.util.regex.Matcher
import java.util.regex.Pattern
import static org.junit.Assert.assertEquals
import static org.junit.Assert.assertTrue
import static org.mockito.Matchers.anyString
import static org.mockito.Mockito.mock
import static org.mockito.Mockito.when
/**
* Unit test(s) for MySQL CDC
*/
class CaptureChangeMySQLTest {
CaptureChangeMySQL processor
TestRunner testRunner
MockBinlogClient client
@Before
void setUp() throws Exception {
processor = new MockCaptureChangeMySQL()
testRunner = TestRunners.newTestRunner(processor)
client = new MockBinlogClient('localhost', 3306, 'root', 'password')
}
@After
void tearDown() throws Exception {
}
@Test
void testBeginCommitTransaction() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
testRunner.run(1, false, true)
// ROTATE
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
[binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
))
// BEGIN
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
[database: 'myDB', sql: 'BEGIN'] as QueryEventData
))
// COMMIT
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 12] as EventHeaderV4,
{} as EventData
))
testRunner.run(1, true, false)
}
@Test
void testInitialSequenceIdIgnoredWhenStatePresent() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
testRunner.setProperty(CaptureChangeMySQL.INIT_SEQUENCE_ID, '10')
testRunner.getStateManager().setState([("${EventWriter.SEQUENCE_ID_KEY}".toString()): '1'], Scope.CLUSTER)
testRunner.run(1, false, true)
// ROTATE
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
[binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
))
// BEGIN
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
[database: 'myDB', sql: 'BEGIN'] as QueryEventData
))
// COMMIT
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 12] as EventHeaderV4,
{} as EventData
))
testRunner.run(1, true, false)
def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
resultFiles.eachWithIndex { e, i ->
// Sequence ID should start from 1 (as was put into the state map), showing that the
// Initial Sequence ID value was ignored
assertEquals(i + 1, Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY)))
}
}
@Test
void testInitialSequenceIdNoStatePresent() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
testRunner.setProperty(CaptureChangeMySQL.INIT_SEQUENCE_ID, '10')
testRunner.run(1, false, true)
// ROTATE
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
[binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
))
// BEGIN
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
[database: 'myDB', sql: 'BEGIN'] as QueryEventData
))
// COMMIT
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 12] as EventHeaderV4,
{} as EventData
))
testRunner.run(1, true, false)
def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
resultFiles.eachWithIndex { e, i ->
assertEquals(i + 10, Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY)))
}
}
@Test(expected = AssertionError.class)
void testCommitWithoutBegin() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
testRunner.run(1, false, true)
// COMMIT
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 12] as EventHeaderV4,
{} as EventData
))
testRunner.run(1, true, false)
}
@Test
void testExtendedTransaction() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.SERVER_ID, '1')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_FILENAME, 'master.000001')
testRunner.setProperty(CaptureChangeMySQL.INIT_BINLOG_POSITION, '4')
final DistributedMapCacheClientImpl cacheClient = createCacheClient()
def clientProperties = [:]
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 'localhost')
testRunner.addControllerService('client', cacheClient, clientProperties)
testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
testRunner.enableControllerService(cacheClient)
testRunner.run(1, false, true)
// ROTATE scenario
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
[binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
))
// INSERT scenario
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
[database: 'myDB', sql: 'BEGIN'] as QueryEventData
))
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4,
[tableId: 1, database: 'myDB', table: 'myTable', columnTypes: [4, -4] as byte[]] as TableMapEventData
))
def cols = new BitSet()
cols.set(1)
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
[tableId: 1, includedColumns: cols,
rows : [[2, 'Smith'] as Serializable[], [3, 'Jones'] as Serializable[], [10, 'Cruz'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
))
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 12] as EventHeaderV4,
{} as EventData
))
// UPDATE scenario
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 16] as EventHeaderV4,
[database: 'myDB', sql: 'BEGIN'] as QueryEventData
))
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 18] as EventHeaderV4,
[tableId: 1, database: 'myDB', table: 'myTable', columnTypes: [4, -4] as byte[]] as TableMapEventData
))
def colsBefore = new BitSet()
colsBefore.set(0)
colsBefore.set(1)
def colsAfter = new BitSet()
colsAfter.set(1)
Map.Entry<Serializable[], Serializable[]> updateMapEntry = new Map.Entry<Serializable[], Serializable[]>() {
Serializable[] getKey() {
return [2, 'Smith'] as Serializable[]
}
@Override
Serializable[] getValue() {
return [3, 'Jones'] as Serializable[]
}
@Override
Serializable[] setValue(Serializable[] value) {
return [3, 'Jones'] as Serializable[]
}
}
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.UPDATE_ROWS, nextPosition: 20] as EventHeaderV4,
[tableId: 1, includedColumnsBeforeUpdate: colsBefore, includedColumns: colsAfter, rows: [updateMapEntry]
as List<Map.Entry<Serializable[], Serializable[]>>] as UpdateRowsEventData
))
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 24] as EventHeaderV4,
{} as EventData
))
// ROTATE scenario
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 26] as EventHeaderV4,
[binlogFilename: 'master.000002', binlogPosition: 4L] as RotateEventData
))
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 28] as EventHeaderV4,
[database: 'myDB', sql: 'BEGIN'] as QueryEventData
))
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 30] as EventHeaderV4,
[tableId: 1, database: 'myDB', table: 'myTable', columnTypes: [4, -4] as byte[]] as TableMapEventData
))
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 32] as EventHeaderV4,
[database: 'myDB', sql: 'ALTER TABLE myTable add column col1 int'] as QueryEventData
))
// DELETE scenario
cols = new BitSet()
cols.set(0)
cols.set(1)
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.DELETE_ROWS, nextPosition: 36] as EventHeaderV4,
[tableId: 1, includedColumns: cols, rows: [[2, 'Smith'] as Serializable[], [3, 'Jones'] as Serializable[]] as List<Serializable[]>] as DeleteRowsEventData
))
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 40] as EventHeaderV4,
{} as EventData
))
testRunner.run(1, true, false)
def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
List<String> expectedEventTypes = ([] + 'begin' + Collections.nCopies(3, 'write') + 'commit' + 'begin' + 'update' + 'commit'
+ 'begin' + 'schema_change' + Collections.nCopies(2, 'delete') + 'commit')
resultFiles.eachWithIndex { e, i ->
assertEquals(i, Long.valueOf(e.getAttribute(EventWriter.SEQUENCE_ID_KEY)))
assertEquals(EventWriter.APPLICATION_JSON, e.getAttribute(CoreAttributes.MIME_TYPE.key()))
assertEquals((i < 8) ? 'master.000001' : 'master.000002', e.getAttribute(BinlogEventInfo.BINLOG_FILENAME_KEY))
assertTrue(Long.valueOf(e.getAttribute(BinlogEventInfo.BINLOG_POSITION_KEY)) % 4 == 0L)
assertEquals(e.getAttribute('cdc.event.type'), expectedEventTypes[i])
}
assertEquals(13, resultFiles.size())
assertEquals(13, testRunner.provenanceEvents.size())
testRunner.provenanceEvents.each { assertEquals(ProvenanceEventType.RECEIVE, it.eventType)}
}
@Test(expected = AssertionError.class)
void testNoTableInformationAvailable() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
final DistributedMapCacheClientImpl cacheClient = createCacheClient()
def clientProperties = [:]
clientProperties.put(DistributedMapCacheClientService.HOSTNAME.getName(), 'localhost')
testRunner.addControllerService('client', cacheClient, clientProperties)
testRunner.setProperty(CaptureChangeMySQL.DIST_CACHE_CLIENT, 'client')
testRunner.enableControllerService(cacheClient)
testRunner.run(1, false, true)
// ROTATE scenario
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
[binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
))
// INSERT scenario
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
[database: 'myDB', sql: 'BEGIN'] as QueryEventData
))
def cols = new BitSet()
cols.set(1)
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
[tableId: 1, includedColumns: cols,
rows : [[2, 'Smith'] as Serializable[], [3, 'Jones'] as Serializable[], [10, 'Cruz'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
))
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 12] as EventHeaderV4,
{} as EventData
))
testRunner.run(1, true, false)
}
@Test
void testSkipTable() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
testRunner.setProperty(CaptureChangeMySQL.DATABASE_NAME_PATTERN, "myDB")
testRunner.setProperty(CaptureChangeMySQL.TABLE_NAME_PATTERN, "user")
testRunner.run(1, false, true)
// ROTATE
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
[binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
))
// BEGIN
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
[database: 'myDB', sql: 'BEGIN'] as QueryEventData
))
// TABLE MAP for table not matching the regex (note the s on the end of users vs the regex of 'user')
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4,
[tableId: 1, database: 'myDB', table: 'users', columnTypes: [4, -4] as byte[]] as TableMapEventData
))
// This WRITE ROWS should be skipped
def cols = new BitSet()
cols.set(1)
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
[tableId: 1, includedColumns: cols,
rows : [[2, 'Smith'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
))
// TABLE MAP for table matching, all modification events (1) should be emitted
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 10] as EventHeaderV4,
[tableId: 1, database: 'myDB', table: 'user', columnTypes: [4, -4] as byte[]] as TableMapEventData
))
// WRITE ROWS for matching table
cols = new BitSet()
cols.set(1)
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 12] as EventHeaderV4,
[tableId: 1, includedColumns: cols,
rows : [[10, 'Cruz'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
))
// COMMIT
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4,
{} as EventData
))
////////////////////////
// Test database filter
////////////////////////
// BEGIN
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
[database: 'myDB', sql: 'BEGIN'] as QueryEventData
))
// TABLE MAP for database not matching the regex
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4,
[tableId: 1, database: 'myDB', table: 'notMyDB', columnTypes: [4, -4] as byte[]] as TableMapEventData
))
// This WRITE ROWS should be skipped
cols = new BitSet()
cols.set(1)
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
[tableId: 1, includedColumns: cols,
rows : [[2, 'Smith'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
))
// COMMIT
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4,
{} as EventData
))
testRunner.run(1, true, false)
def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
// BEGIN + WRITE + COMMIT from table matching, BEGIN + COMMIT for database matching
assertEquals(5, resultFiles.size())
}
@Test
void testTransactionAcrossMultipleProcessorExecutions() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
testRunner.run(1, false, true)
// ROTATE
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
[binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
))
// BEGIN
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
[database: 'myDB', sql: 'BEGIN'] as QueryEventData
))
// TABLE MAP
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.TABLE_MAP, nextPosition: 6] as EventHeaderV4,
[tableId: 1, database: 'myDB', table: 'users', columnTypes: [4, -4] as byte[]] as TableMapEventData
))
// Run and Stop the processor
testRunner.run(1, true, false)
def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
assertEquals(1, resultFiles.size())
// Re-initialize the processor so it can receive events
testRunner.run(1, false, true)
// This WRITE ROWS should be skipped
def cols = new BitSet()
cols.set(1)
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.EXT_WRITE_ROWS, nextPosition: 8] as EventHeaderV4,
[tableId: 1, includedColumns: cols,
rows : [[2, 'Smith'] as Serializable[]] as List<Serializable[]>] as WriteRowsEventData
))
// COMMIT
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4,
{} as EventData
))
// Run and Stop the processor
testRunner.run(1, true, false)
resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
assertEquals(3, resultFiles.size())
}
@Test
void testUpdateState() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
testRunner.run(1, false, true)
// ROTATE
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
[binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
))
testRunner.run(1, false, false)
// Ensure state not set, as the processor hasn't been stopped and no State Update Interval has been set
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, null, Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, null, Scope.CLUSTER)
// Stop the processor and verify the state is set
testRunner.run(1, true, false)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, 'master.000001', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '4', Scope.CLUSTER)
testRunner.stateManager.clear(Scope.CLUSTER)
// Send some events, wait for the State Update Interval, and verify the state was set
testRunner.setProperty(CaptureChangeMySQL.STATE_UPDATE_INTERVAL, '1 second')
testRunner.run(1, false, true)
// ROTATE
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
[binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
))
// BEGIN
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 6] as EventHeaderV4,
[database: 'myDB', sql: 'BEGIN'] as QueryEventData
))
sleep(1000)
testRunner.run(1, false, false)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, 'master.000001', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '6', Scope.CLUSTER)
// COMMIT
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 12] as EventHeaderV4,
{} as EventData
))
testRunner.run(1, true, false)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, 'master.000001', Scope.CLUSTER)
testRunner.stateManager.assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, '12', Scope.CLUSTER)
}
/********************************
* Mock and helper classes below
********************************/
class MockCaptureChangeMySQL extends CaptureChangeMySQL {
Map<TableInfoCacheKey, TableInfo> cache = new HashMap<>()
@Override
BinaryLogClient createBinlogClient(String hostname, int port, String username, String password) {
client
}
@Override
protected TableInfo loadTableInfo(TableInfoCacheKey key) {
TableInfo tableInfo = cache.get(key)
if (tableInfo == null) {
tableInfo = new TableInfo(key.databaseName, key.tableName, key.tableId,
[new ColumnDefinition((byte) 4, 'id'),
new ColumnDefinition((byte) -4, 'string1')
] as List<ColumnDefinition>)
cache.put(key, tableInfo)
}
return tableInfo
}
@Override
protected Connection getJdbcConnection(String locationString, String drvName, InetSocketAddress host, String username, String password, Map<String, String> customProperties)
throws InitializationException, SQLException {
Connection mockConnection = mock(Connection)
Statement mockStatement = mock(Statement)
when(mockConnection.createStatement()).thenReturn(mockStatement)
ResultSet mockResultSet = mock(ResultSet)
when(mockStatement.executeQuery(anyString())).thenReturn(mockResultSet)
return mockConnection
}
}
static DistributedMapCacheClientImpl createCacheClient() throws InitializationException {
final DistributedMapCacheClientImpl client = new DistributedMapCacheClientImpl()
final ComponentLog logger = new MockComponentLog("client", client)
final MockControllerServiceInitializationContext clientInitContext = new MockControllerServiceInitializationContext(client, "client", logger, new MockStateManager(client))
client.initialize(clientInitContext)
return client
}
static
final class DistributedMapCacheClientImpl extends AbstractControllerService implements DistributedMapCacheClient {
private Map<String, String> cacheMap = new HashMap<>()
@Override
void close() throws IOException {
}
@Override
void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return [DistributedMapCacheClientService.HOSTNAME,
DistributedMapCacheClientService.COMMUNICATIONS_TIMEOUT,
DistributedMapCacheClientService.PORT,
DistributedMapCacheClientService.SSL_CONTEXT_SERVICE]
}
@Override
<K, V> boolean putIfAbsent(
final K key,
final V value,
final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
StringWriter keyWriter = new StringWriter()
keySerializer.serialize(key, new WriterOutputStream(keyWriter))
String keyString = keyWriter.toString()
if (cacheMap.containsKey(keyString)) return false
StringWriter valueWriter = new StringWriter()
valueSerializer.serialize(value, new WriterOutputStream(valueWriter))
return true
}
@Override
@SuppressWarnings("unchecked")
<K, V> V getAndPutIfAbsent(
final K key, final V value, final Serializer<K> keySerializer, final Serializer<V> valueSerializer,
final Deserializer<V> valueDeserializer) throws IOException {
StringWriter keyWriter = new StringWriter()
keySerializer.serialize(key, new WriterOutputStream(keyWriter))
String keyString = keyWriter.toString()
if (cacheMap.containsKey(keyString)) return valueDeserializer.deserialize(cacheMap.get(keyString).bytes)
StringWriter valueWriter = new StringWriter()
valueSerializer.serialize(value, new WriterOutputStream(valueWriter))
return null
}
@Override
<K> boolean containsKey(final K key, final Serializer<K> keySerializer) throws IOException {
StringWriter keyWriter = new StringWriter()
keySerializer.serialize(key, new WriterOutputStream(keyWriter))
String keyString = keyWriter.toString()
return cacheMap.containsKey(keyString)
}
@Override
<K, V> V get(
final K key,
final Serializer<K> keySerializer, final Deserializer<V> valueDeserializer) throws IOException {
StringWriter keyWriter = new StringWriter()
keySerializer.serialize(key, new WriterOutputStream(keyWriter))
String keyString = keyWriter.toString()
return (cacheMap.containsKey(keyString)) ? valueDeserializer.deserialize(cacheMap.get(keyString).bytes) : null
}
@Override
<K> boolean remove(final K key, final Serializer<K> serializer) throws IOException {
StringWriter keyWriter = new StringWriter()
serializer.serialize(key, new WriterOutputStream(keyWriter))
String keyString = keyWriter.toString()
boolean removed = (cacheMap.containsKey(keyString))
cacheMap.remove(keyString)
return removed
}
@Override
long removeByPattern(String regex) throws IOException {
final List<String> removedRecords = new ArrayList<>()
Pattern p = Pattern.compile(regex)
for (String key : cacheMap.keySet()) {
// Key must be backed by something that can be converted into a String
Matcher m = p.matcher(key)
if (m.matches()) {
removedRecords.add(cacheMap.get(key))
}
}
final long numRemoved = removedRecords.size()
removedRecords.each { cacheMap.remove(it) }
return numRemoved
}
@Override
<K, V> void put(
final K key,
final V value,
final Serializer<K> keySerializer, final Serializer<V> valueSerializer) throws IOException {
StringWriter keyWriter = new StringWriter()
keySerializer.serialize(key, new WriterOutputStream(keyWriter))
StringWriter valueWriter = new StringWriter()
valueSerializer.serialize(value, new WriterOutputStream(valueWriter))
}
}
}

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.cdc.mysql.event;
import org.junit.Test;
import java.sql.Types;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
/**
* Unit Tests for MySQLCDCUtils utility class
*/
public class MySQLCDCUtilsTest {
@Test
public void testGetWritableObject() throws Exception {
assertNull(MySQLCDCUtils.getWritableObject(null, null));
assertNull(MySQLCDCUtils.getWritableObject(Types.INTEGER, null));
assertEquals((byte) 1, MySQLCDCUtils.getWritableObject(Types.INTEGER, (byte) 1));
assertEquals("Hello", MySQLCDCUtils.getWritableObject(Types.VARCHAR, "Hello".getBytes()));
}
}

View File

@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-cdc</artifactId>
<version>1.2.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-cdc-mysql-bundle</artifactId>
<packaging>pom</packaging>
<description>NiFi MySQL CDC Bundle</description>
<modules>
<module>nifi-cdc-mysql-processors</module>
<module>nifi-cdc-mysql-nar</module>
</modules>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-cdc-mysql-processors</artifactId>
<version>1.2.0-SNAPSHOT</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>

View File

@ -0,0 +1,29 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-bundles</artifactId>
<version>1.2.0-SNAPSHOT</version>
</parent>
<artifactId>nifi-cdc</artifactId>
<packaging>pom</packaging>
<modules>
<module>nifi-cdc-api</module>
<module>nifi-cdc-mysql-bundle</module>
</modules>
</project>

View File

@ -79,6 +79,7 @@
<module>nifi-registry-bundle</module> <module>nifi-registry-bundle</module>
<module>nifi-stateful-analysis-bundle</module> <module>nifi-stateful-analysis-bundle</module>
<module>nifi-poi-bundle</module> <module>nifi-poi-bundle</module>
<module>nifi-cdc</module>
</modules> </modules>
<build> <build>

View File

@ -1330,6 +1330,12 @@ language governing permissions and limitations under the License. -->
<version>1.2.0-SNAPSHOT</version> <version>1.2.0-SNAPSHOT</version>
<type>nar</type> <type>nar</type>
</dependency> </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-cdc-mysql-nar</artifactId>
<version>1.2.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency> <dependency>
<groupId>org.apache.nifi</groupId> <groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId> <artifactId>nifi-properties</artifactId>