NIFI-12577 Updated nifi-cdc-bundle using current API methods

This closes #8214

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
EndzeitBegins 2024-01-07 20:07:16 +01:00 committed by exceptionfactory
parent 62d444ffc8
commit 77cdba1efa
No known key found for this signature in database
4 changed files with 33 additions and 52 deletions

View File

@ -26,15 +26,11 @@ import java.io.Serializable;
*/
public class InsertRowsEventInfo extends BaseBinlogRowEventInfo<Serializable[]> {
private WriteRowsEventData data;
public InsertRowsEventInfo(TableInfo tableInfo, Long timestamp, String binlogFilename, Long binlogPosition, WriteRowsEventData data) {
super(tableInfo, INSERT_EVENT, timestamp, binlogFilename, binlogPosition, data.getIncludedColumns(), data.getRows());
this.data = data;
}
public InsertRowsEventInfo(TableInfo tableInfo, Long timestamp, String binlogGtidSet, WriteRowsEventData data) {
super(tableInfo, INSERT_EVENT, timestamp, binlogGtidSet, data.getIncludedColumns(), data.getRows());
this.data = data;
}
}

View File

@ -40,9 +40,7 @@ import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@ -177,7 +175,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
.description("Successfully created FlowFile from SQL query result set.")
.build();
protected static Set<Relationship> relationships;
protected static Set<Relationship> relationships = Set.of(REL_SUCCESS);
private static final AllowableValue SSL_MODE_DISABLED = new AllowableValue(SSLMode.DISABLED.toString(),
SSLMode.DISABLED.toString(),
@ -289,7 +287,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
.sensitive(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues(FlowFileEventWriteStrategy.class)
.defaultValue(MAX_EVENTS_PER_FLOWFILE.getValue())
.defaultValue(MAX_EVENTS_PER_FLOWFILE)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
@ -303,7 +301,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.defaultValue("1")
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.dependsOn(EVENTS_PER_FLOWFILE_STRATEGY, MAX_EVENTS_PER_FLOWFILE.getValue())
.dependsOn(EVENTS_PER_FLOWFILE_STRATEGY, MAX_EVENTS_PER_FLOWFILE)
.build();
public static final PropertyDescriptor SERVER_ID = new PropertyDescriptor.Builder()
@ -443,7 +441,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
.displayName("SSL Mode")
.description("SSL Mode used when SSL Context Service configured supporting certificate verification options")
.required(true)
.defaultValue(SSLMode.DISABLED.toString())
.defaultValue(SSL_MODE_DISABLED)
.allowableValues(SSL_MODE_DISABLED,
SSL_MODE_PREFERRED,
SSL_MODE_REQUIRED,
@ -462,7 +460,31 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
SSL_MODE_VERIFY_IDENTITY)
.build();
private static final List<PropertyDescriptor> propDescriptors;
private static final List<PropertyDescriptor> propDescriptors = List.of(
HOSTS,
DRIVER_NAME,
DRIVER_LOCATION,
USERNAME,
PASSWORD,
EVENTS_PER_FLOWFILE_STRATEGY,
NUMBER_OF_EVENTS_PER_FLOWFILE,
SERVER_ID,
DATABASE_NAME_PATTERN,
TABLE_NAME_PATTERN,
CONNECT_TIMEOUT,
DIST_CACHE_CLIENT,
RETRIEVE_ALL_RECORDS,
INCLUDE_BEGIN_COMMIT,
INCLUDE_DDL_EVENTS,
STATE_UPDATE_INTERVAL,
INIT_SEQUENCE_ID,
INIT_BINLOG_FILENAME,
INIT_BINLOG_POSITION,
USE_BINLOG_GTID,
INIT_BINLOG_GTID,
SSL_MODE,
SSL_CONTEXT_SERVICE
);
private volatile BinaryLogClient binlogClient;
private volatile BinlogEventListener eventListener;
@ -496,41 +518,6 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
private volatile EventWriterConfiguration eventWriterConfiguration;
static {
final Set<Relationship> r = new HashSet<>();
r.add(REL_SUCCESS);
relationships = Collections.unmodifiableSet(r);
final List<PropertyDescriptor> pds = new ArrayList<>();
pds.add(HOSTS);
pds.add(DRIVER_NAME);
pds.add(DRIVER_LOCATION);
pds.add(USERNAME);
pds.add(PASSWORD);
pds.add(EVENTS_PER_FLOWFILE_STRATEGY);
pds.add(NUMBER_OF_EVENTS_PER_FLOWFILE);
pds.add(SERVER_ID);
pds.add(DATABASE_NAME_PATTERN);
pds.add(TABLE_NAME_PATTERN);
pds.add(CONNECT_TIMEOUT);
pds.add(DIST_CACHE_CLIENT);
pds.add(RETRIEVE_ALL_RECORDS);
pds.add(INCLUDE_BEGIN_COMMIT);
pds.add(INCLUDE_DDL_EVENTS);
pds.add(STATE_UPDATE_INTERVAL);
pds.add(INIT_SEQUENCE_ID);
pds.add(INIT_BINLOG_FILENAME);
pds.add(INIT_BINLOG_POSITION);
pds.add(USE_BINLOG_GTID);
pds.add(INIT_BINLOG_GTID);
pds.add(SSL_MODE);
pds.add(SSL_CONTEXT_SERVICE);
propDescriptors = Collections.unmodifiableList(pds);
}
@Override
public Set<Relationship> getRelationships() {
return relationships;
@ -593,7 +580,7 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
binlogResourceInfo.setInTransaction("true".equals(stateMap.get("inTransaction")));
// Build a event writer config object for the event writers to use
final FlowFileEventWriteStrategy flowFileEventWriteStrategy = FlowFileEventWriteStrategy.valueOf(context.getProperty(EVENTS_PER_FLOWFILE_STRATEGY).getValue());
final FlowFileEventWriteStrategy flowFileEventWriteStrategy = context.getProperty(EVENTS_PER_FLOWFILE_STRATEGY).asDescribedValue(FlowFileEventWriteStrategy.class);
eventWriterConfiguration = new EventWriterConfiguration(
flowFileEventWriteStrategy,
context.getProperty(NUMBER_OF_EVENTS_PER_FLOWFILE).evaluateAttributeExpressions().asInteger()

View File

@ -20,11 +20,9 @@ import com.github.shyiko.mysql.binlog.network.SSLMode;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.security.util.TlsPlatform;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
/**
* Standard implementation of Connection Properties Provider
@ -72,7 +70,7 @@ public class StandardConnectionPropertiesProvider implements ConnectionPropertie
if (tlsConfiguration == null) {
// Set preferred protocols based on Java platform configuration
final String protocols = TlsPlatform.getPreferredProtocols().stream().collect(Collectors.joining(COMMA_SEPARATOR));
final String protocols = String.join(COMMA_SEPARATOR, TlsPlatform.getPreferredProtocols());
properties.put(SecurityProperty.ENABLED_TLS_PROTOCOLS.getProperty(), protocols);
} else {
final Map<String, String> certificateProperties = getCertificateProperties();
@ -86,7 +84,7 @@ public class StandardConnectionPropertiesProvider implements ConnectionPropertie
private Map<String, String> getCertificateProperties() {
final Map<String, String> properties = new LinkedHashMap<>();
final String protocols = Arrays.stream(tlsConfiguration.getEnabledProtocols()).collect(Collectors.joining(COMMA_SEPARATOR));
final String protocols = String.join(COMMA_SEPARATOR, tlsConfiguration.getEnabledProtocols());
properties.put(SecurityProperty.ENABLED_TLS_PROTOCOLS.getProperty(), protocols);
if (tlsConfiguration.isKeystorePopulated()) {

View File

@ -795,7 +795,7 @@ public class CaptureChangeMySQLTest {
testRunner.setProperty(CaptureChangeMySQL.DATABASE_NAME_PATTERN, MY_DB);
testRunner.setProperty(CaptureChangeMySQL.TABLE_NAME_PATTERN, USER_TABLE);
testRunner.setProperty(CaptureChangeMySQL.INCLUDE_BEGIN_COMMIT, Boolean.TRUE.toString());
testRunner.setProperty(CaptureChangeMySQL.EVENTS_PER_FLOWFILE_STRATEGY, FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE.name());
testRunner.setProperty(CaptureChangeMySQL.EVENTS_PER_FLOWFILE_STRATEGY, FlowFileEventWriteStrategy.ONE_TRANSACTION_PER_FLOWFILE);
testRunner.run(1, false, true);