NIFI-3749: Added database filtering to DDL events in CaptureChangeMySQL

This closes #1708.

Signed-off-by: Koji Kawamura <ijokarumawak@apache.org>
This commit is contained in:
Matt Burgess 2017-04-27 10:30:47 -04:00 committed by Koji Kawamura
parent 2664ea093d
commit aa4efb43ca
6 changed files with 129 additions and 14 deletions

View File

@ -21,7 +21,14 @@ package org.apache.nifi.cdc.mysql.event;
*/
public class BeginTransactionEventInfo extends BaseBinlogEventInfo {
public BeginTransactionEventInfo(Long timestamp, String binlogFilename, long binlogPosition) {
private String databaseName;
public BeginTransactionEventInfo(String databaseName, Long timestamp, String binlogFilename, long binlogPosition) {
super(BEGIN_EVENT, timestamp, binlogFilename, binlogPosition);
this.databaseName = databaseName;
}
public String getDatabaseName() {
return databaseName;
}
}

View File

@ -22,7 +22,14 @@ package org.apache.nifi.cdc.mysql.event;
*/
public class CommitTransactionEventInfo extends BaseBinlogEventInfo {
public CommitTransactionEventInfo(Long timestamp, String binlogFilename, long binlogPosition) {
private String databaseName;
public CommitTransactionEventInfo(String databaseName, Long timestamp, String binlogFilename, long binlogPosition) {
super(COMMIT_EVENT, timestamp, binlogFilename, binlogPosition);
this.databaseName = databaseName;
}
public String getDatabaseName() {
return databaseName;
}
}

View File

@ -18,8 +18,19 @@ package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.mysql.event.BeginTransactionEventInfo;
import java.io.IOException;
/**
* A writer for events corresponding to the beginning of a MySQL transaction
*/
public class BeginTransactionEventWriter extends AbstractBinlogEventWriter<BeginTransactionEventInfo> {
protected void writeJson(BeginTransactionEventInfo event) throws IOException {
super.writeJson(event);
if (event.getDatabaseName() != null) {
jsonGenerator.writeStringField("database", event.getDatabaseName());
} else {
jsonGenerator.writeNullField("database");
}
}
}

View File

@ -19,9 +19,18 @@ package org.apache.nifi.cdc.mysql.event.io;
import org.apache.nifi.cdc.mysql.event.CommitTransactionEventInfo;
import java.io.IOException;
/**
* A writer for events corresponding to the end (i.e. commit) of a MySQL transaction
*/
public class CommitTransactionEventWriter extends AbstractBinlogEventWriter<CommitTransactionEventInfo> {
protected void writeJson(CommitTransactionEventInfo event) throws IOException {
super.writeJson(event);
if (event.getDatabaseName() != null) {
jsonGenerator.writeStringField("database", event.getDatabaseName());
} else {
jsonGenerator.writeNullField("database");
}
}
}

View File

@ -149,8 +149,11 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
public static final PropertyDescriptor DATABASE_NAME_PATTERN = new PropertyDescriptor.Builder()
.name("capture-change-mysql-db-name-pattern")
.displayName("Database/Schema Name Pattern")
.description("A regular expression (regex) for matching databases or schemas (depending on your RDBMS' terminology) against the list of CDC events. The regex must match "
+ "the schema name as it is stored in the database. If the property is not set, the schema name will not be used to filter the CDC events.")
.description("A regular expression (regex) for matching databases (or schemas, depending on your RDBMS' terminology) against the list of CDC events. The regex must match "
+ "the database name as it is stored in the RDBMS. If the property is not set, the database name will not be used to filter the CDC events. "
+ "NOTE: DDL events, even if they affect different databases, are associated with the database used by the session to execute the DDL. "
+ "This means if a connection is made to one database, but the DDL is issued against another, then the connected database will be the one matched against "
+ "the specified pattern.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@ -351,6 +354,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
private volatile long xactSequenceId = 0;
private volatile TableInfo currentTable = null;
private volatile String currentDatabase = null;
private volatile Pattern databaseNamePattern;
private volatile Pattern tableNamePattern;
private volatile boolean includeBeginCommit = false;
@ -759,9 +763,12 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
}
break;
case QUERY:
// Is this the start of a transaction?
QueryEventData queryEventData = event.getData();
currentDatabase = queryEventData.getDatabase();
String sql = queryEventData.getSql();
// Is this the start of a transaction?
if ("BEGIN".equals(sql)) {
// If we're already in a transaction, something bad happened, alert the user
if (inTransaction) {
@ -772,8 +779,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
xactBinlogPosition = currentBinlogPosition;
xactSequenceId = currentSequenceId.get();
if (includeBeginCommit) {
BeginTransactionEventInfo beginEvent = new BeginTransactionEventInfo(timestamp, currentBinlogFile, currentBinlogPosition);
if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
BeginTransactionEventInfo beginEvent = new BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS));
}
inTransaction = true;
@ -783,8 +790,8 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
+ "This could indicate that your binlog position is invalid.");
}
// InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here
if (includeBeginCommit) {
CommitTransactionEventInfo commitTransactionEvent = new CommitTransactionEventInfo(timestamp, currentBinlogFile, currentBinlogPosition);
if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
CommitTransactionEventInfo commitTransactionEvent = new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
}
// Commit the NiFi session
@ -803,8 +810,10 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
|| normalizedQuery.startsWith("drop table")
|| normalizedQuery.startsWith("drop database")) {
if (includeDDLEvents) {
DDLEventInfo ddlEvent = new DDLEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, normalizedQuery);
if (includeDDLEvents && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
// If we don't have table information, we can still use the database name
TableInfo ddlTableInfo = (currentTable != null) ? currentTable : new TableInfo(currentDatabase, null, null, null);
DDLEventInfo ddlEvent = new DDLEventInfo(ddlTableInfo, timestamp, currentBinlogFile, currentBinlogPosition, normalizedQuery);
currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS));
}
// Remove all the keys from the cache that this processor added
@ -824,14 +833,15 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
throw new IOException("COMMIT event received while not processing a transaction (i.e. no corresponding BEGIN event). "
+ "This could indicate that your binlog position is invalid.");
}
if (includeBeginCommit) {
CommitTransactionEventInfo commitTransactionEvent = new CommitTransactionEventInfo(timestamp, currentBinlogFile, currentBinlogPosition);
if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
CommitTransactionEventInfo commitTransactionEvent = new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
}
// Commit the NiFi session
session.commit();
inTransaction = false;
currentTable = null;
currentDatabase = null;
break;
case WRITE_ROWS:

View File

@ -27,6 +27,7 @@ import com.github.shyiko.mysql.binlog.event.RotateEventData
import com.github.shyiko.mysql.binlog.event.TableMapEventData
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData
import groovy.json.JsonSlurper
import org.apache.commons.io.output.WriterOutputStream
import org.apache.nifi.cdc.mysql.MockBinlogClient
import org.apache.nifi.cdc.mysql.event.BinlogEventInfo
@ -668,6 +669,76 @@ class CaptureChangeMySQLTest {
assertEquals(5, resultFiles.size())
}
@Test
void testFilterDatabase() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')
testRunner.setProperty(CaptureChangeMySQL.HOSTS, 'localhost:3306')
testRunner.setProperty(CaptureChangeMySQL.USERNAME, 'root')
testRunner.setProperty(CaptureChangeMySQL.PASSWORD, 'password')
testRunner.setProperty(CaptureChangeMySQL.CONNECT_TIMEOUT, '2 seconds')
testRunner.setProperty(CaptureChangeMySQL.DATABASE_NAME_PATTERN, "myDB")
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, 'true')
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_DDL_EVENTS, 'true')
testRunner.run(1, false, true)
// ROTATE
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.ROTATE, nextPosition: 2] as EventHeaderV4,
[binlogFilename: 'master.000001', binlogPosition: 4L] as RotateEventData
))
// BEGIN
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
[database: 'myDB', sql: 'BEGIN'] as QueryEventData
))
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 32] as EventHeaderV4,
[database: 'myDB', sql: 'ALTER TABLE myTable add column col1 int'] as QueryEventData
))
// COMMIT
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4,
{} as EventData
))
////////////////////////
// Test database filter
////////////////////////
// BEGIN
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 4] as EventHeaderV4,
[database: 'NotMyDB', sql: 'BEGIN'] as QueryEventData
))
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.QUERY, nextPosition: 32] as EventHeaderV4,
[database: 'NotMyDB', sql: 'ALTER TABLE myTable add column col1 int'] as QueryEventData
))
// COMMIT
client.sendEvent(new Event(
[timestamp: new Date().time, eventType: EventType.XID, nextPosition: 14] as EventHeaderV4,
{} as EventData
))
testRunner.run(1, true, false)
def resultFiles = testRunner.getFlowFilesForRelationship(CaptureChangeMySQL.REL_SUCCESS)
// First BEGIN + DDL + COMMIT only
assertEquals(3, resultFiles.size())
// Check that the database name is set on the objects
resultFiles.each {f ->
def json = new JsonSlurper().parseText(new String(f.toByteArray()))
assertEquals('myDB', json.database)
}
}
@Test
void testTransactionAcrossMultipleProcessorExecutions() throws Exception {
testRunner.setProperty(CaptureChangeMySQL.DRIVER_LOCATION, 'file:///path/to/mysql-connector-java-5.1.38-bin.jar')