NIFI-13702 Remove deprecated property STATE_UPDATE_INTERVAL from CaptureChangeMySQL

This closes #9225

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
EndzeitBegins 2024-09-02 20:54:00 +02:00 committed by exceptionfactory
parent 1ad9d86a26
commit d1432d6be9
No known key found for this signature in database
2 changed files with 35 additions and 44 deletions

View File

@ -27,31 +27,6 @@ import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.network.SSLMode;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.DriverPropertyInfo;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.PrimaryNodeOnly;
@ -104,6 +79,7 @@ import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.migration.PropertyConfiguration;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@ -115,6 +91,32 @@ import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.TlsConfiguration;
import org.apache.nifi.ssl.SSLContextService;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
import java.sql.DriverPropertyInfo;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.logging.Logger;
import java.util.regex.Pattern;
import static com.github.shyiko.mysql.binlog.event.EventType.DELETE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.EXT_DELETE_ROWS;
import static com.github.shyiko.mysql.binlog.event.EventType.EXT_WRITE_ROWS;
@ -361,19 +363,6 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final PropertyDescriptor STATE_UPDATE_INTERVAL = new PropertyDescriptor.Builder()
.name("capture-change-mysql-state-update-interval")
.displayName("State Update Interval")
.description("DEPRECATED. This property is no longer used and exists solely for backward compatibility purposes. Indicates how often to update the processor's state with binlog "
+ "file/position values. A value of zero means that state will only be updated when the processor is "
+ "stopped or shutdown. If at some point the processor state does not contain the desired binlog values, the last flow file emitted will contain the last observed values, "
+ "and the processor can be returned to that state by using the Initial Binlog File, Initial Binlog Position, and Initial Sequence ID properties.")
.defaultValue("0 seconds")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
public static final PropertyDescriptor INIT_SEQUENCE_ID = new PropertyDescriptor.Builder()
.name("capture-change-mysql-init-seq-id")
.displayName("Initial Sequence ID")
@ -476,7 +465,6 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
RETRIEVE_ALL_RECORDS,
INCLUDE_BEGIN_COMMIT,
INCLUDE_DDL_EVENTS,
STATE_UPDATE_INTERVAL,
INIT_SEQUENCE_ID,
INIT_BINLOG_FILENAME,
INIT_BINLOG_POSITION,
@ -528,6 +516,11 @@ public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
return propDescriptors;
}
@Override
public void migrateProperties(PropertyConfiguration config) {
config.removeProperty("capture-change-mysql-state-update-interval");
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final Collection<ValidationResult> results = new ArrayList<>();

View File

@ -1014,15 +1014,14 @@ public class CaptureChangeMySQLTest {
testRunner.run(1, false, false);
// Ensure state not set, as the processor hasn't been stopped and no State Update Interval has been set
// Ensure state not set, as the processor hasn't been stopped
testRunner.getStateManager().assertStateEquals(BinlogEventInfo.BINLOG_FILENAME_KEY, INIT_BIN_LOG_FILENAME, Scope.CLUSTER);
testRunner.getStateManager().assertStateEquals(BinlogEventInfo.BINLOG_POSITION_KEY, FOUR, Scope.CLUSTER);
testRunner.getStateManager().assertStateEquals(BinlogEventInfo.BINLOG_GTIDSET_KEY, null, Scope.CLUSTER);
testRunner.getStateManager().clear(Scope.CLUSTER);
// Send some events, wait for the State Update Interval, and verify the state was set
testRunner.setProperty(CaptureChangeMySQL.STATE_UPDATE_INTERVAL, "1 second");
// Send some events and verify the state was set
testRunner.run(1, false, true);
// ROTATE
@ -1087,8 +1086,7 @@ public class CaptureChangeMySQLTest {
((CaptureChangeMySQL) testRunner.getProcessor()).clearState();
testRunner.getStateManager().clear(Scope.CLUSTER);
// Send some events, wait for the State Update Interval, and verify the state was set
testRunner.setProperty(CaptureChangeMySQL.STATE_UPDATE_INTERVAL, "1 second");
// Send some events and verify the state was set
testRunner.run(1, false, true);
// GTID