mirror of https://github.com/apache/nifi.git
NIFI-11638 Refactored Groovy tests in nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors to Java (and JUnit 5)
NIFI-11638 Removed groovy-json dependency Signed-off-by: Matt Burgess <mattyb149@apache.org> This closes #7408
This commit is contained in:
parent
84fdb5e32f
commit
5db06437a3
|
@ -57,11 +57,5 @@ language governing permissions and limitations under the License. -->
|
||||||
<artifactId>nifi-distributed-cache-client-service</artifactId>
|
<artifactId>nifi-distributed-cache-client-service</artifactId>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
|
||||||
<groupId>org.codehaus.groovy</groupId>
|
|
||||||
<artifactId>groovy-json</artifactId>
|
|
||||||
<version>${nifi.groovy.version}</version>
|
|
||||||
<scope>test</scope>
|
|
||||||
</dependency>
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -1,105 +0,0 @@
|
||||||
/*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
|
||||||
* contributor license agreements. See the NOTICE file distributed with
|
|
||||||
* this work for additional information regarding copyright ownership.
|
|
||||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
|
||||||
* (the "License"); you may not use this file except in compliance with
|
|
||||||
* the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.nifi.cdc.mysql
|
|
||||||
|
|
||||||
import com.github.shyiko.mysql.binlog.BinaryLogClient
|
|
||||||
import com.github.shyiko.mysql.binlog.event.Event
|
|
||||||
import com.github.shyiko.mysql.binlog.network.SSLSocketFactory
|
|
||||||
|
|
||||||
import java.util.concurrent.TimeoutException
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A mock implementation for BinaryLogClient, in order to unit test the connection and event handling logic
|
|
||||||
*/
|
|
||||||
class MockBinlogClient extends BinaryLogClient {
|
|
||||||
|
|
||||||
String hostname
|
|
||||||
int port
|
|
||||||
String username
|
|
||||||
String password
|
|
||||||
|
|
||||||
boolean connected
|
|
||||||
boolean connectionTimeout = false
|
|
||||||
boolean connectionError = false
|
|
||||||
|
|
||||||
List<BinaryLogClient.EventListener> eventListeners = []
|
|
||||||
List<BinaryLogClient.LifecycleListener> lifecycleListeners = []
|
|
||||||
|
|
||||||
SSLSocketFactory sslSocketFactory
|
|
||||||
|
|
||||||
MockBinlogClient(String hostname, int port, String username, String password) {
|
|
||||||
super(hostname, port, username, password)
|
|
||||||
this.hostname = hostname
|
|
||||||
this.port = port
|
|
||||||
this.username = username
|
|
||||||
this.password = password
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void connect(long timeoutInMilliseconds) throws IOException, TimeoutException {
|
|
||||||
if (connectionTimeout) {
|
|
||||||
throw new TimeoutException('Connection timed out')
|
|
||||||
}
|
|
||||||
if (connectionError) {
|
|
||||||
throw new IOException('Error during connect')
|
|
||||||
}
|
|
||||||
if (password == null) {
|
|
||||||
throw new NullPointerException('''Password can't be null''')
|
|
||||||
}
|
|
||||||
connected = true
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void disconnect() throws IOException {
|
|
||||||
connected = false
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void registerEventListener(BinaryLogClient.EventListener eventListener) {
|
|
||||||
if (!eventListeners.contains(eventListener)) {
|
|
||||||
eventListeners.add eventListener
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void unregisterEventListener(BinaryLogClient.EventListener eventListener) {
|
|
||||||
eventListeners.remove eventListener
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void registerLifecycleListener(BinaryLogClient.LifecycleListener lifecycleListener) {
|
|
||||||
if (!lifecycleListeners.contains(lifecycleListener)) {
|
|
||||||
lifecycleListeners.add lifecycleListener
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void unregisterLifecycleListener(BinaryLogClient.LifecycleListener lifecycleListener) {
|
|
||||||
lifecycleListeners.remove lifecycleListener
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
void setSslSocketFactory(SSLSocketFactory sslSocketFactory) {
|
|
||||||
super.setSslSocketFactory(sslSocketFactory)
|
|
||||||
this.sslSocketFactory = sslSocketFactory
|
|
||||||
}
|
|
||||||
|
|
||||||
def sendEvent(Event event) {
|
|
||||||
eventListeners.each { it.onEvent(event) }
|
|
||||||
}
|
|
||||||
}
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,123 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.cdc.mysql;
|
||||||
|
|
||||||
|
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
|
||||||
|
import com.github.shyiko.mysql.binlog.event.Event;
|
||||||
|
import com.github.shyiko.mysql.binlog.event.EventData;
|
||||||
|
import com.github.shyiko.mysql.binlog.event.EventHeader;
|
||||||
|
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
|
||||||
|
import com.github.shyiko.mysql.binlog.event.EventType;
|
||||||
|
import com.github.shyiko.mysql.binlog.event.GtidEventData;
|
||||||
|
import com.github.shyiko.mysql.binlog.event.QueryEventData;
|
||||||
|
import com.github.shyiko.mysql.binlog.event.RotateEventData;
|
||||||
|
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
|
||||||
|
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
|
||||||
|
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.BitSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class EventUtils {
|
||||||
|
private EventUtils() {}
|
||||||
|
public static Event buildEvent(EventHeader header) {
|
||||||
|
return buildEvent(header, new EventData() {});
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Event buildEvent(EventHeader header, EventData data) {
|
||||||
|
return new Event(header, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static EventHeaderV4 buildEventHeaderV4(EventType eventType, long nextPosition) {
|
||||||
|
EventHeaderV4 eventHeaderV4 = new EventHeaderV4();
|
||||||
|
eventHeaderV4.setTimestamp(System.currentTimeMillis());
|
||||||
|
eventHeaderV4.setEventType(eventType);
|
||||||
|
eventHeaderV4.setNextPosition(nextPosition);
|
||||||
|
|
||||||
|
return eventHeaderV4;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static RotateEventData buildRotateEventData(String binlogFilename, long binlogPosition) {
|
||||||
|
RotateEventData rotateEventData = new RotateEventData();
|
||||||
|
rotateEventData.setBinlogFilename(binlogFilename);
|
||||||
|
rotateEventData.setBinlogPosition(binlogPosition);
|
||||||
|
|
||||||
|
return rotateEventData;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static QueryEventData buildQueryEventData(String database, String sql) {
|
||||||
|
QueryEventData queryEventData = new QueryEventData();
|
||||||
|
queryEventData.setDatabase(database);
|
||||||
|
queryEventData.setSql(sql);
|
||||||
|
|
||||||
|
return queryEventData;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static TableMapEventData buildTableMapEventData(long tableId, String database,
|
||||||
|
String table, byte [] columnTypes) {
|
||||||
|
TableMapEventData tableMapEventData = new TableMapEventData();
|
||||||
|
tableMapEventData.setTableId(tableId);
|
||||||
|
tableMapEventData.setDatabase(database);
|
||||||
|
tableMapEventData.setTable(table);
|
||||||
|
tableMapEventData.setColumnTypes(columnTypes);
|
||||||
|
|
||||||
|
return tableMapEventData;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static WriteRowsEventData buildWriteRowsEventData(long tableId, BitSet includedColumns, List<Serializable[]> rows) {
|
||||||
|
WriteRowsEventData writeRowsEventData = new WriteRowsEventData();
|
||||||
|
writeRowsEventData.setTableId(tableId);
|
||||||
|
writeRowsEventData.setIncludedColumns(includedColumns);
|
||||||
|
writeRowsEventData.setRows(rows);
|
||||||
|
|
||||||
|
return writeRowsEventData;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static UpdateRowsEventData buildUpdateRowsEventData(long tableId, BitSet includedColumnsBeforeUpdate,
|
||||||
|
BitSet includedColumns, List<Map.Entry<Serializable[], Serializable[]>> rows) {
|
||||||
|
UpdateRowsEventData updateRowsEventData = new UpdateRowsEventData();
|
||||||
|
updateRowsEventData.setTableId(tableId);
|
||||||
|
updateRowsEventData.setIncludedColumnsBeforeUpdate(includedColumnsBeforeUpdate);
|
||||||
|
updateRowsEventData.setIncludedColumns(includedColumns);
|
||||||
|
updateRowsEventData.setRows(rows);
|
||||||
|
|
||||||
|
return updateRowsEventData;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static DeleteRowsEventData buildDeleteRowsEventData(long tableId, BitSet includedColumns, List<Serializable[]> rows) {
|
||||||
|
DeleteRowsEventData deleteRowsEventData = new DeleteRowsEventData();
|
||||||
|
deleteRowsEventData.setTableId(tableId);
|
||||||
|
deleteRowsEventData.setIncludedColumns(includedColumns);
|
||||||
|
deleteRowsEventData.setRows(rows);
|
||||||
|
|
||||||
|
return deleteRowsEventData;
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("deprecation")
|
||||||
|
public static GtidEventData buildGtidEventData(String sourceId, String transactionId) {
|
||||||
|
GtidEventData gtidEventData = new GtidEventData();
|
||||||
|
gtidEventData.setGtid(buildGtid(sourceId, transactionId));
|
||||||
|
|
||||||
|
return gtidEventData;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static String buildGtid(String sourceId, String...singleOrTransactionRanges) {
|
||||||
|
return sourceId + ":" + String.join(":", singleOrTransactionRanges);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,127 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.nifi.cdc.mysql;
|
||||||
|
|
||||||
|
import com.github.shyiko.mysql.binlog.BinaryLogClient;
|
||||||
|
import com.github.shyiko.mysql.binlog.event.Event;
|
||||||
|
import com.github.shyiko.mysql.binlog.network.SSLSocketFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A mock implementation for BinaryLogClient, in order to unit test the connection and event handling logic
|
||||||
|
*/
|
||||||
|
public class MockBinlogClient extends BinaryLogClient {
|
||||||
|
private final String password;
|
||||||
|
private boolean connected;
|
||||||
|
private boolean connectionTimeout = false;
|
||||||
|
private boolean connectionError = false;
|
||||||
|
|
||||||
|
private final List<EventListener> eventListeners = new ArrayList<>();
|
||||||
|
private final List<BinaryLogClient.LifecycleListener> lifecycleListeners = new ArrayList<>();
|
||||||
|
|
||||||
|
SSLSocketFactory sslSocketFactory;
|
||||||
|
|
||||||
|
public MockBinlogClient(String hostname, int port, String username, String password) {
|
||||||
|
super(hostname, port, username, password);
|
||||||
|
this.password = password;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void connect(long timeoutInMilliseconds) throws IOException, TimeoutException {
|
||||||
|
if (connectionTimeout) {
|
||||||
|
throw new TimeoutException("Connection timed out");
|
||||||
|
}
|
||||||
|
if (connectionError) {
|
||||||
|
throw new IOException("Error during connect");
|
||||||
|
}
|
||||||
|
if (password == null) {
|
||||||
|
throw new NullPointerException("Password can't be null");
|
||||||
|
}
|
||||||
|
connected = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void disconnect() {
|
||||||
|
connected = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void registerEventListener(BinaryLogClient.EventListener eventListener) {
|
||||||
|
if (!eventListeners.contains(eventListener)) {
|
||||||
|
eventListeners.add(eventListener);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void unregisterEventListener(BinaryLogClient.EventListener eventListener) {
|
||||||
|
eventListeners.remove(eventListener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void registerLifecycleListener(BinaryLogClient.LifecycleListener lifecycleListener) {
|
||||||
|
if (!lifecycleListeners.contains(lifecycleListener)) {
|
||||||
|
lifecycleListeners.add(lifecycleListener);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void unregisterLifecycleListener(BinaryLogClient.LifecycleListener lifecycleListener) {
|
||||||
|
lifecycleListeners.remove(lifecycleListener);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setSslSocketFactory(SSLSocketFactory sslSocketFactory) {
|
||||||
|
super.setSslSocketFactory(sslSocketFactory);
|
||||||
|
this.sslSocketFactory = sslSocketFactory;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void sendEvent(Event event) {
|
||||||
|
eventListeners.forEach(eventListener -> eventListener.onEvent(event));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isConnected() {
|
||||||
|
return connected;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setConnectionTimeout(boolean connectionTimeout) {
|
||||||
|
this.connectionTimeout = connectionTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setConnectionError(boolean connectionError) {
|
||||||
|
this.connectionError = connectionError;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<EventListener> getEventListeners() {
|
||||||
|
return eventListeners;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<LifecycleListener> getLifecycleListeners() {
|
||||||
|
return lifecycleListeners;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SSLSocketFactory getSslSocketFactory() {
|
||||||
|
return sslSocketFactory;
|
||||||
|
}
|
||||||
|
}
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue