mirror of
https://github.com/apache/nifi.git
synced 2025-02-07 18:48:51 +00:00
NIFI-10867 Refactored common configuration for Stateless Connectors
This closes #6744 Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
parent
a12c9ca9c7
commit
c8d3a69d64
@ -109,7 +109,7 @@ public class StatelessNiFiSinkTaskIT {
|
||||
sinkTask.initialize(Mockito.mock(SinkTaskContext.class));
|
||||
|
||||
final Map<String, String> properties = createDefaultProperties(testInfo);
|
||||
properties.put(StatelessNiFiSinkConnector.FAILURE_PORTS, "Success, Failure");
|
||||
properties.put(StatelessNiFiSinkConfig.FAILURE_PORTS, "Success, Failure");
|
||||
sinkTask.start(properties);
|
||||
|
||||
final SinkRecord record = new SinkRecord("topic", 0, null, "key", null, "Hello World", 0L);
|
||||
@ -129,12 +129,12 @@ public class StatelessNiFiSinkTaskIT {
|
||||
|
||||
private Map<String, String> createDefaultProperties(TestInfo testInfo) {
|
||||
final Map<String, String> properties = new HashMap<>();
|
||||
properties.put(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, "30 sec");
|
||||
properties.put(StatelessNiFiSinkConnector.INPUT_PORT_NAME, "In");
|
||||
properties.put(StatelessKafkaConnectorUtil.FLOW_SNAPSHOT, "src/test/resources/flows/Write_To_File.json");
|
||||
properties.put(StatelessKafkaConnectorUtil.NAR_DIRECTORY, "target/nifi-kafka-connector-bin/nars");
|
||||
properties.put(StatelessKafkaConnectorUtil.WORKING_DIRECTORY, "target/nifi-kafka-connector-bin/working");
|
||||
properties.put(StatelessKafkaConnectorUtil.DATAFLOW_NAME, testInfo.getTestMethod().get().getName());
|
||||
properties.put(StatelessNiFiCommonConfig.DATAFLOW_TIMEOUT, "30 sec");
|
||||
properties.put(StatelessNiFiSinkConfig.INPUT_PORT_NAME, "In");
|
||||
properties.put(StatelessNiFiCommonConfig.FLOW_SNAPSHOT, "src/test/resources/flows/Write_To_File.json");
|
||||
properties.put(StatelessNiFiCommonConfig.NAR_DIRECTORY, "target/nifi-kafka-connector-bin/nars");
|
||||
properties.put(StatelessNiFiCommonConfig.WORKING_DIRECTORY, "target/nifi-kafka-connector-bin/working");
|
||||
properties.put(StatelessNiFiCommonConfig.DATAFLOW_NAME, testInfo.getTestMethod().get().getName());
|
||||
return properties;
|
||||
}
|
||||
}
|
||||
|
@ -68,7 +68,7 @@ public class StatelessNiFiSourceTaskIT {
|
||||
sourceTask.initialize(createContext());
|
||||
|
||||
final Map<String, String> properties = createDefaultProperties(testInfo);
|
||||
properties.put(StatelessNiFiSourceConnector.KEY_ATTRIBUTE, "greeting");
|
||||
properties.put(StatelessNiFiSourceConfig.KEY_ATTRIBUTE, "greeting");
|
||||
sourceTask.start(properties);
|
||||
|
||||
final List<SourceRecord> sourceRecords = sourceTask.poll();
|
||||
@ -88,7 +88,7 @@ public class StatelessNiFiSourceTaskIT {
|
||||
sourceTask.initialize(createContext());
|
||||
|
||||
final Map<String, String> properties = createDefaultProperties(testInfo);
|
||||
properties.put(StatelessNiFiSourceConnector.TOPIC_NAME_ATTRIBUTE, "greeting");
|
||||
properties.put(StatelessNiFiSourceConfig.TOPIC_NAME_ATTRIBUTE, "greeting");
|
||||
sourceTask.start(properties);
|
||||
|
||||
final List<SourceRecord> sourceRecords = sourceTask.poll();
|
||||
@ -106,7 +106,7 @@ public class StatelessNiFiSourceTaskIT {
|
||||
sourceTask.initialize(createContext());
|
||||
|
||||
final Map<String, String> properties = createDefaultProperties(testInfo);
|
||||
properties.put(StatelessNiFiSourceConnector.HEADER_REGEX, "uuid|greeting|num.*");
|
||||
properties.put(StatelessNiFiSourceConfig.HEADER_REGEX, "uuid|greeting|num.*");
|
||||
sourceTask.start(properties);
|
||||
|
||||
final List<SourceRecord> sourceRecords = sourceTask.poll();
|
||||
@ -134,7 +134,7 @@ public class StatelessNiFiSourceTaskIT {
|
||||
sourceTask.initialize(createContext());
|
||||
|
||||
final Map<String, String> properties = createDefaultProperties(testInfo);
|
||||
properties.put(StatelessNiFiSourceConnector.OUTPUT_PORT_NAME, "Another");
|
||||
properties.put(StatelessNiFiSourceConfig.OUTPUT_PORT_NAME, "Another");
|
||||
sourceTask.start(properties);
|
||||
|
||||
assertThrows(RetriableException.class, () -> sourceTask.poll(), "Expected RetriableException to be thrown");
|
||||
@ -145,7 +145,7 @@ public class StatelessNiFiSourceTaskIT {
|
||||
final OffsetStorageReader offsetStorageReader = new OffsetStorageReader() {
|
||||
@Override
|
||||
public <T> Map<String, Object> offset(final Map<String, T> partition) {
|
||||
if ("CLUSTER".equals(partition.get(StatelessNiFiSourceTask.STATE_MAP_KEY))) {
|
||||
if ("CLUSTER".equals(partition.get(StatelessNiFiSourceConfig.STATE_MAP_KEY))) {
|
||||
final String serializedStateMap = "{\"version\":4,\"stateValues\":{\"abc\":\"123\"}}";
|
||||
return Collections.singletonMap("c6562d38-4994-3fcc-ac98-1da34de1916f", serializedStateMap);
|
||||
}
|
||||
@ -163,7 +163,7 @@ public class StatelessNiFiSourceTaskIT {
|
||||
sourceTask.initialize(createContext(offsetStorageReader));
|
||||
|
||||
final Map<String, String> properties = createDefaultProperties(testInfo);
|
||||
properties.put(StatelessNiFiSourceConnector.OUTPUT_PORT_NAME, "Another");
|
||||
properties.put(StatelessNiFiSourceConfig.OUTPUT_PORT_NAME, "Another");
|
||||
sourceTask.start(properties);
|
||||
|
||||
final StatelessDataflow dataflow = sourceTask.getDataflow();
|
||||
@ -239,15 +239,15 @@ public class StatelessNiFiSourceTaskIT {
|
||||
|
||||
private Map<String, String> createDefaultProperties(TestInfo testInfo) {
|
||||
final Map<String, String> properties = new HashMap<>();
|
||||
properties.put(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, "30 sec");
|
||||
properties.put(StatelessNiFiSourceConnector.OUTPUT_PORT_NAME, "Out");
|
||||
properties.put(StatelessNiFiSourceConnector.TOPIC_NAME, "my-topic");
|
||||
properties.put(StatelessNiFiSourceConnector.KEY_ATTRIBUTE, "kafka.key");
|
||||
properties.put(StatelessKafkaConnectorUtil.FLOW_SNAPSHOT, "src/test/resources/flows/Generate_Data.json");
|
||||
properties.put(StatelessKafkaConnectorUtil.NAR_DIRECTORY, "target/nifi-kafka-connector-bin/nars");
|
||||
properties.put(StatelessKafkaConnectorUtil.WORKING_DIRECTORY, "target/nifi-kafka-connector-bin/working");
|
||||
properties.put(StatelessKafkaConnectorUtil.DATAFLOW_NAME, testInfo.getTestMethod().get().getName());
|
||||
properties.put(StatelessNiFiSourceTask.STATE_MAP_KEY, "1");
|
||||
properties.put(StatelessNiFiCommonConfig.DATAFLOW_TIMEOUT, "30 sec");
|
||||
properties.put(StatelessNiFiSourceConfig.OUTPUT_PORT_NAME, "Out");
|
||||
properties.put(StatelessNiFiSourceConfig.TOPIC_NAME, "my-topic");
|
||||
properties.put(StatelessNiFiSourceConfig.KEY_ATTRIBUTE, "kafka.key");
|
||||
properties.put(StatelessNiFiCommonConfig.FLOW_SNAPSHOT, "src/test/resources/flows/Generate_Data.json");
|
||||
properties.put(StatelessNiFiCommonConfig.NAR_DIRECTORY, "target/nifi-kafka-connector-bin/nars");
|
||||
properties.put(StatelessNiFiCommonConfig.WORKING_DIRECTORY, "target/nifi-kafka-connector-bin/working");
|
||||
properties.put(StatelessNiFiCommonConfig.DATAFLOW_NAME, testInfo.getTestMethod().get().getName());
|
||||
properties.put(StatelessNiFiSourceConfig.STATE_MAP_KEY, "1");
|
||||
|
||||
return properties;
|
||||
}
|
||||
|
@ -17,10 +17,6 @@
|
||||
|
||||
package org.apache.nifi.kafka.connect;
|
||||
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.nifi.kafka.connect.validators.ConnectDirectoryExistsValidator;
|
||||
import org.apache.nifi.kafka.connect.validators.ConnectHttpUrlValidator;
|
||||
import org.apache.nifi.kafka.connect.validators.FlowSnapshotValidator;
|
||||
import org.apache.nifi.stateless.bootstrap.StatelessBootstrap;
|
||||
import org.apache.nifi.stateless.config.ExtensionClientDefinition;
|
||||
import org.apache.nifi.stateless.config.ParameterOverride;
|
||||
@ -48,86 +44,13 @@ import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.jar.JarFile;
|
||||
import java.util.jar.Manifest;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
|
||||
|
||||
public class StatelessKafkaConnectorUtil {
|
||||
private static final String UNKNOWN_VERSION = "<Unable to determine Stateless NiFi Kafka Connector Version>";
|
||||
private static final Logger logger = LoggerFactory.getLogger(StatelessKafkaConnectorUtil.class);
|
||||
private static final Lock unpackNarLock = new ReentrantLock();
|
||||
|
||||
static final String NAR_DIRECTORY = "nar.directory";
|
||||
static final String EXTENSIONS_DIRECTORY = "extensions.directory";
|
||||
static final String WORKING_DIRECTORY = "working.directory";
|
||||
static final String FLOW_SNAPSHOT = "flow.snapshot";
|
||||
static final String KRB5_FILE = "krb5.file";
|
||||
static final String NEXUS_BASE_URL = "nexus.url";
|
||||
static final String DATAFLOW_TIMEOUT = "dataflow.timeout";
|
||||
static final String DATAFLOW_NAME = "name";
|
||||
|
||||
static final String TRUSTSTORE_FILE = "security.truststore";
|
||||
static final String TRUSTSTORE_TYPE = "security.truststoreType";
|
||||
static final String TRUSTSTORE_PASSWORD = "security.truststorePasswd";
|
||||
static final String KEYSTORE_FILE = "security.keystore";
|
||||
static final String KEYSTORE_TYPE = "security.keystoreType";
|
||||
static final String KEYSTORE_PASSWORD = "security.keystorePasswd";
|
||||
static final String KEY_PASSWORD = "security.keyPasswd";
|
||||
static final String SENSITIVE_PROPS_KEY = "sensitive.props.key";
|
||||
|
||||
static final String BOOTSTRAP_SNAPSHOT_URL = "nifi.stateless.flow.snapshot.url";
|
||||
static final String BOOTSTRAP_SNAPSHOT_FILE = "nifi.stateless.flow.snapshot.file";
|
||||
static final String BOOTSTRAP_SNAPSHOT_CONTENTS = "nifi.stateless.flow.snapshot.contents";
|
||||
static final String BOOTSTRAP_FLOW_NAME = "nifi.stateless.flow.name";
|
||||
|
||||
static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
|
||||
static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
|
||||
static final File DEFAULT_WORKING_DIRECTORY = new File("/tmp/nifi-stateless-working");
|
||||
static final File DEFAULT_EXTENSIONS_DIRECTORY = new File("/tmp/nifi-stateless-extensions");
|
||||
static final String DEFAULT_SENSITIVE_PROPS_KEY = "nifi-stateless";
|
||||
|
||||
private static final Pattern STATELESS_BOOTSTRAP_FILE_PATTERN = Pattern.compile("nifi-stateless-bootstrap-(.*).jar");
|
||||
private static final Pattern PARAMETER_WITH_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*?):(.*)");
|
||||
private static final Pattern PARAMETER_WITHOUT_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*)");
|
||||
|
||||
public static void addCommonConfigElements(final ConfigDef configDef) {
|
||||
configDef.define(NAR_DIRECTORY, ConfigDef.Type.STRING, null, new ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
|
||||
"Specifies the directory that stores the NiFi Archives (NARs)");
|
||||
configDef.define(EXTENSIONS_DIRECTORY, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
|
||||
"Specifies the directory that stores the extensions that will be downloaded (if any) from the configured Extension Client");
|
||||
configDef.define(WORKING_DIRECTORY, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
|
||||
"Specifies the temporary working directory for expanding NiFi Archives (NARs)");
|
||||
configDef.define(FLOW_SNAPSHOT, ConfigDef.Type.STRING, null, new FlowSnapshotValidator(), ConfigDef.Importance.HIGH,
|
||||
"Specifies the dataflow to run. This may be a file containing the dataflow, a URL that points to a dataflow, or a String containing the entire dataflow as an escaped JSON.");
|
||||
configDef.define(DATAFLOW_NAME, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(), ConfigDef.Importance.HIGH, "The name of the dataflow.");
|
||||
|
||||
configDef.define(StatelessKafkaConnectorUtil.KRB5_FILE, ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_KRB5_FILE, ConfigDef.Importance.MEDIUM,
|
||||
"Specifies the krb5.conf file to use if connecting to Kerberos-enabled services");
|
||||
configDef.define(StatelessKafkaConnectorUtil.NEXUS_BASE_URL, ConfigDef.Type.STRING, null, new ConnectHttpUrlValidator(), ConfigDef.Importance.MEDIUM,
|
||||
"Specifies the Base URL of the Nexus instance to source extensions from");
|
||||
|
||||
configDef.define(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT, ConfigDef.Importance.MEDIUM,
|
||||
"Specifies the amount of time to wait for the dataflow to finish processing input before considering the dataflow a failure");
|
||||
|
||||
configDef.define(KEYSTORE_FILE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
|
||||
"Filename of the keystore that Stateless NiFi should use for connecting to NiFi Registry and for Site-to-Site communications.");
|
||||
configDef.define(KEYSTORE_TYPE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
|
||||
"The type of the Keystore file. Either JKS or PKCS12.");
|
||||
configDef.define(KEYSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
|
||||
"The password for the keystore.");
|
||||
configDef.define(KEY_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
|
||||
"The password for the key in the keystore. If not provided, the password is assumed to be the same as the keystore password.");
|
||||
configDef.define(TRUSTSTORE_FILE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
|
||||
"Filename of the truststore that Stateless NiFi should use for connecting to NiFi Registry and for Site-to-Site communications. If not specified, communications will occur only over " +
|
||||
"http, not https.");
|
||||
configDef.define(TRUSTSTORE_TYPE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
|
||||
"The type of the Truststore file. Either JKS or PKCS12.");
|
||||
configDef.define(TRUSTSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
|
||||
"The password for the truststore.");
|
||||
configDef.define(SENSITIVE_PROPS_KEY, ConfigDef.Type.PASSWORD, DEFAULT_SENSITIVE_PROPS_KEY, ConfigDef.Importance.MEDIUM, "A key that components can use for encrypting and decrypting " +
|
||||
"sensitive values.");
|
||||
}
|
||||
protected static final Pattern STATELESS_BOOTSTRAP_FILE_PATTERN = Pattern.compile("nifi-stateless-bootstrap-(.*).jar");
|
||||
|
||||
public static String getVersion() {
|
||||
final File bootstrapJar = detectBootstrapJar();
|
||||
@ -148,32 +71,18 @@ public class StatelessKafkaConnectorUtil {
|
||||
return UNKNOWN_VERSION;
|
||||
}
|
||||
|
||||
public static StatelessDataflow createDataflow(final Map<String, String> properties) {
|
||||
final StatelessEngineConfiguration engineConfiguration = createEngineConfiguration(properties);
|
||||
final String configuredFlowSnapshot = properties.get(FLOW_SNAPSHOT);
|
||||
public static StatelessDataflow createDataflow(final StatelessNiFiCommonConfig config) {
|
||||
final StatelessEngineConfiguration engineConfiguration = createEngineConfiguration(config);
|
||||
|
||||
final List<ParameterOverride> parameterOverrides = parseParameterOverrides(properties);
|
||||
final String dataflowName = properties.get(DATAFLOW_NAME);
|
||||
final List<ParameterOverride> parameterOverrides = config.getParameterOverrides();
|
||||
final String dataflowName = config.getDataflowName();
|
||||
|
||||
final DataflowDefinition dataflowDefinition;
|
||||
final StatelessBootstrap bootstrap;
|
||||
try {
|
||||
final Map<String, String> dataflowDefinitionProperties = new HashMap<>();
|
||||
|
||||
if (configuredFlowSnapshot.startsWith("http://") || configuredFlowSnapshot.startsWith("https://")) {
|
||||
logger.debug("Configured Flow Snapshot appears to be a URL. Will use {} property to configured Stateless NiFi", BOOTSTRAP_SNAPSHOT_URL);
|
||||
dataflowDefinitionProperties.put(BOOTSTRAP_SNAPSHOT_URL, configuredFlowSnapshot);
|
||||
} else if (configuredFlowSnapshot.trim().startsWith("{")) {
|
||||
logger.debug("Configured Flow Snapshot appears to be JSON. Will use {} property to configured Stateless NiFi", BOOTSTRAP_SNAPSHOT_CONTENTS);
|
||||
dataflowDefinitionProperties.put(BOOTSTRAP_SNAPSHOT_CONTENTS, configuredFlowSnapshot);
|
||||
} else {
|
||||
logger.debug("Configured Flow Snapshot appears to be a File. Will use {} property to configured Stateless NiFi", BOOTSTRAP_SNAPSHOT_FILE);
|
||||
final File flowSnapshotFile = new File(configuredFlowSnapshot);
|
||||
dataflowDefinitionProperties.put(BOOTSTRAP_SNAPSHOT_FILE, flowSnapshotFile.getAbsolutePath());
|
||||
}
|
||||
|
||||
dataflowDefinitionProperties.put(BOOTSTRAP_FLOW_NAME, dataflowName);
|
||||
|
||||
config.setFlowDefinition(dataflowDefinitionProperties);
|
||||
dataflowDefinitionProperties.put(StatelessNiFiCommonConfig.BOOTSTRAP_FLOW_NAME, dataflowName);
|
||||
MDC.setContextMap(Collections.singletonMap("dataflow", dataflowName));
|
||||
|
||||
// Use a Write Lock to ensure that only a single thread is calling StatelessBootstrap.bootstrap().
|
||||
@ -194,71 +103,37 @@ public class StatelessKafkaConnectorUtil {
|
||||
}
|
||||
}
|
||||
|
||||
private static List<ParameterOverride> parseParameterOverrides(final Map<String, String> properties) {
|
||||
final List<ParameterOverride> parameterOverrides = new ArrayList<>();
|
||||
|
||||
for (final Map.Entry<String, String> entry : properties.entrySet()) {
|
||||
final String parameterValue = entry.getValue();
|
||||
|
||||
ParameterOverride parameterOverride = null;
|
||||
final Matcher matcher = PARAMETER_WITH_CONTEXT_PATTERN.matcher(entry.getKey());
|
||||
if (matcher.matches()) {
|
||||
final String contextName = matcher.group(1);
|
||||
final String parameterName = matcher.group(2);
|
||||
parameterOverride = new ParameterOverride(contextName, parameterName, parameterValue);
|
||||
} else {
|
||||
final Matcher noContextMatcher = PARAMETER_WITHOUT_CONTEXT_PATTERN.matcher(entry.getKey());
|
||||
if (noContextMatcher.matches()) {
|
||||
final String parameterName = noContextMatcher.group(1);
|
||||
parameterOverride = new ParameterOverride(parameterName, parameterValue);
|
||||
}
|
||||
}
|
||||
|
||||
if (parameterOverride != null) {
|
||||
parameterOverrides.add(parameterOverride);
|
||||
}
|
||||
}
|
||||
|
||||
return parameterOverrides;
|
||||
}
|
||||
|
||||
public static Map<String, String> getLoggableProperties(final Map<String, String> properties) {
|
||||
final Map<String, String> loggable = new HashMap<>(properties);
|
||||
loggable.keySet().removeIf(key -> key.startsWith("parameter."));
|
||||
return loggable;
|
||||
}
|
||||
|
||||
private static StatelessEngineConfiguration createEngineConfiguration(final Map<String, String> properties) {
|
||||
private static StatelessEngineConfiguration createEngineConfiguration(final StatelessNiFiCommonConfig config) {
|
||||
final File narDirectory;
|
||||
final String narDirectoryFilename = properties.get(NAR_DIRECTORY);
|
||||
final String narDirectoryFilename = config.getNarDirectory();
|
||||
if (narDirectoryFilename == null) {
|
||||
narDirectory = detectNarDirectory();
|
||||
} else {
|
||||
narDirectory = new File(narDirectoryFilename);
|
||||
}
|
||||
|
||||
final String dataflowName = properties.get(DATAFLOW_NAME);
|
||||
final String dataflowName = config.getDataflowName();
|
||||
|
||||
final File baseWorkingDirectory;
|
||||
final String workingDirectoryFilename = properties.get(WORKING_DIRECTORY);
|
||||
final String workingDirectoryFilename = config.getWorkingDirectory();
|
||||
if (workingDirectoryFilename == null) {
|
||||
baseWorkingDirectory = DEFAULT_WORKING_DIRECTORY;
|
||||
baseWorkingDirectory = StatelessNiFiCommonConfig.DEFAULT_WORKING_DIRECTORY;
|
||||
} else {
|
||||
baseWorkingDirectory = new File(workingDirectoryFilename);
|
||||
}
|
||||
final File workingDirectory = new File(baseWorkingDirectory, dataflowName);
|
||||
|
||||
final File extensionsDirectory;
|
||||
final String extensionsDirectoryFilename = properties.get(EXTENSIONS_DIRECTORY);
|
||||
final String extensionsDirectoryFilename = config.getExtensionsDirectory();
|
||||
if (extensionsDirectoryFilename == null) {
|
||||
extensionsDirectory = DEFAULT_EXTENSIONS_DIRECTORY;
|
||||
extensionsDirectory = StatelessNiFiCommonConfig.DEFAULT_EXTENSIONS_DIRECTORY;
|
||||
} else {
|
||||
extensionsDirectory = new File(extensionsDirectoryFilename);
|
||||
}
|
||||
|
||||
final SslContextDefinition sslContextDefinition = createSslContextDefinition(properties);
|
||||
final SslContextDefinition sslContextDefinition = createSslContextDefinition(config);
|
||||
|
||||
final StatelessEngineConfiguration engineConfiguration = new StatelessEngineConfiguration() {
|
||||
return new StatelessEngineConfiguration() {
|
||||
@Override
|
||||
public File getWorkingDirectory() {
|
||||
return workingDirectory;
|
||||
@ -281,7 +156,7 @@ public class StatelessKafkaConnectorUtil {
|
||||
|
||||
@Override
|
||||
public File getKrb5File() {
|
||||
return new File(properties.getOrDefault(KRB5_FILE, DEFAULT_KRB5_FILE));
|
||||
return new File(config.getKrb5File());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -296,14 +171,14 @@ public class StatelessKafkaConnectorUtil {
|
||||
|
||||
@Override
|
||||
public String getSensitivePropsKey() {
|
||||
return properties.getOrDefault(SENSITIVE_PROPS_KEY, DEFAULT_SENSITIVE_PROPS_KEY);
|
||||
return config.getSensitivePropsKey();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ExtensionClientDefinition> getExtensionClients() {
|
||||
final List<ExtensionClientDefinition> extensionClientDefinitions = new ArrayList<>();
|
||||
|
||||
final String nexusBaseUrl = properties.get(NEXUS_BASE_URL);
|
||||
final String nexusBaseUrl = config.getNexusBaseUrl();
|
||||
if (nexusBaseUrl != null) {
|
||||
final ExtensionClientDefinition definition = new ExtensionClientDefinition();
|
||||
definition.setUseSslContext(false);
|
||||
@ -321,12 +196,10 @@ public class StatelessKafkaConnectorUtil {
|
||||
return "1 min";
|
||||
}
|
||||
};
|
||||
|
||||
return engineConfiguration;
|
||||
}
|
||||
|
||||
private static SslContextDefinition createSslContextDefinition(final Map<String, String> properties) {
|
||||
final String truststoreFile = properties.get(TRUSTSTORE_FILE);
|
||||
private static SslContextDefinition createSslContextDefinition(final StatelessNiFiCommonConfig config) {
|
||||
final String truststoreFile = config.getTruststoreFile();
|
||||
if (truststoreFile == null || truststoreFile.trim().isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
@ -334,18 +207,18 @@ public class StatelessKafkaConnectorUtil {
|
||||
final SslContextDefinition sslContextDefinition;
|
||||
sslContextDefinition = new SslContextDefinition();
|
||||
sslContextDefinition.setTruststoreFile(truststoreFile);
|
||||
sslContextDefinition.setTruststorePass(properties.get(TRUSTSTORE_PASSWORD));
|
||||
sslContextDefinition.setTruststoreType(properties.get(TRUSTSTORE_TYPE));
|
||||
sslContextDefinition.setTruststorePass(config.getTruststorePassword());
|
||||
sslContextDefinition.setTruststoreType(config.getTruststoreType());
|
||||
|
||||
final String keystoreFile = properties.get(KEYSTORE_FILE);
|
||||
final String keystoreFile = config.getKeystoreFile();
|
||||
if (keystoreFile != null && !keystoreFile.trim().isEmpty()) {
|
||||
sslContextDefinition.setKeystoreFile(keystoreFile);
|
||||
sslContextDefinition.setKeystoreType(properties.get(KEYSTORE_TYPE));
|
||||
sslContextDefinition.setKeystoreType(config.getKeystoreType());
|
||||
|
||||
final String keystorePass = properties.get(KEYSTORE_PASSWORD);
|
||||
final String keystorePass = config.getKeystorePassword();
|
||||
sslContextDefinition.setKeystorePass(keystorePass);
|
||||
|
||||
final String explicitKeyPass = properties.get(KEY_PASSWORD);
|
||||
final String explicitKeyPass = config.getKeystoreKeyPassword();
|
||||
final String keyPass = (explicitKeyPass == null || explicitKeyPass.trim().isEmpty()) ? keystorePass : explicitKeyPass;
|
||||
sslContextDefinition.setKeyPass(keyPass);
|
||||
}
|
||||
@ -356,7 +229,9 @@ public class StatelessKafkaConnectorUtil {
|
||||
private static URLClassLoader getConnectClassLoader() {
|
||||
final ClassLoader classLoader = StatelessKafkaConnectorUtil.class.getClassLoader();
|
||||
if (!(classLoader instanceof URLClassLoader)) {
|
||||
throw new IllegalStateException("No configuration value was set for the " + NAR_DIRECTORY + " configuration property, and was unable to determine the NAR directory automatically");
|
||||
throw new IllegalStateException("No configuration value was set for the " +
|
||||
StatelessNiFiCommonConfig.NAR_DIRECTORY +
|
||||
" configuration property, and was unable to determine the NAR directory automatically");
|
||||
}
|
||||
|
||||
return (URLClassLoader) classLoader;
|
||||
@ -383,8 +258,11 @@ public class StatelessKafkaConnectorUtil {
|
||||
final File bootstrapJar = detectBootstrapJar();
|
||||
if (bootstrapJar == null) {
|
||||
final URLClassLoader urlClassLoader = getConnectClassLoader();
|
||||
logger.error("ClassLoader that loaded Stateless Kafka Connector did not contain nifi-stateless-bootstrap. URLs that were present: {}", Arrays.asList(urlClassLoader.getURLs()));
|
||||
throw new IllegalStateException("No configuration value was set for the " + NAR_DIRECTORY + " configuration property, and was unable to determine the NAR directory automatically");
|
||||
logger.error("ClassLoader that loaded Stateless Kafka Connector did not contain nifi-stateless-bootstrap." +
|
||||
" URLs that were present: {}", Arrays.asList(urlClassLoader.getURLs()));
|
||||
throw new IllegalStateException("No configuration value was set for the " +
|
||||
StatelessNiFiCommonConfig.NAR_DIRECTORY +
|
||||
" configuration property, and was unable to determine the NAR directory automatically");
|
||||
}
|
||||
|
||||
final File narDirectory = bootstrapJar.getParentFile();
|
||||
|
@ -0,0 +1,269 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.kafka.connect;
|
||||
|
||||
import org.apache.kafka.common.config.AbstractConfig;
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.common.config.types.Password;
|
||||
import org.apache.nifi.kafka.connect.validators.ConnectDirectoryExistsValidator;
|
||||
import org.apache.nifi.kafka.connect.validators.ConnectHttpUrlValidator;
|
||||
import org.apache.nifi.kafka.connect.validators.FlowSnapshotValidator;
|
||||
import org.apache.nifi.stateless.config.ParameterOverride;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
|
||||
|
||||
public abstract class StatelessNiFiCommonConfig extends AbstractConfig {
|
||||
private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiCommonConfig.class);
|
||||
public static final String NAR_DIRECTORY = "nar.directory";
|
||||
public static final String EXTENSIONS_DIRECTORY = "extensions.directory";
|
||||
public static final String WORKING_DIRECTORY = "working.directory";
|
||||
public static final String FLOW_SNAPSHOT = "flow.snapshot";
|
||||
public static final String KRB5_FILE = "krb5.file";
|
||||
public static final String NEXUS_BASE_URL = "nexus.url";
|
||||
public static final String DATAFLOW_TIMEOUT = "dataflow.timeout";
|
||||
public static final String DATAFLOW_NAME = "name";
|
||||
public static final String TRUSTSTORE_FILE = "security.truststore";
|
||||
public static final String TRUSTSTORE_TYPE = "security.truststoreType";
|
||||
public static final String TRUSTSTORE_PASSWORD = "security.truststorePasswd";
|
||||
public static final String KEYSTORE_FILE = "security.keystore";
|
||||
public static final String KEYSTORE_TYPE = "security.keystoreType";
|
||||
public static final String KEYSTORE_PASSWORD = "security.keystorePasswd";
|
||||
public static final String KEY_PASSWORD = "security.keyPasswd";
|
||||
public static final String SENSITIVE_PROPS_KEY = "sensitive.props.key";
|
||||
public static final String BOOTSTRAP_SNAPSHOT_URL = "nifi.stateless.flow.snapshot.url";
|
||||
public static final String BOOTSTRAP_SNAPSHOT_FILE = "nifi.stateless.flow.snapshot.file";
|
||||
public static final String BOOTSTRAP_SNAPSHOT_CONTENTS = "nifi.stateless.flow.snapshot.contents";
|
||||
public static final String BOOTSTRAP_FLOW_NAME = "nifi.stateless.flow.name";
|
||||
public static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
|
||||
public static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
|
||||
public static final File DEFAULT_WORKING_DIRECTORY = new File("/tmp/nifi-stateless-working");
|
||||
public static final File DEFAULT_EXTENSIONS_DIRECTORY = new File("/tmp/nifi-stateless-extensions");
|
||||
public static final String DEFAULT_SENSITIVE_PROPS_KEY = "nifi-stateless";
|
||||
public static final String FLOW_GROUP = "Flow";
|
||||
public static final String DIRECTORIES_GROUP = "Directories";
|
||||
public static final String TLS_GROUP = "TLS";
|
||||
public static final String KERBEROS_GROUP = "Kerberos";
|
||||
public static final String NEXUS_GROUP = "Nexus";
|
||||
public static final String SECURITY_GROUP = "Security";
|
||||
public static final String RECORD_GROUP = "Record";
|
||||
|
||||
protected static final Pattern PARAMETER_WITH_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*?):(.*)");
|
||||
protected static final Pattern PARAMETER_WITHOUT_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*)");
|
||||
|
||||
protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals, Map<String, ?> configProviderProps, boolean doLog) {
|
||||
super(definition, originals, configProviderProps, doLog);
|
||||
}
|
||||
|
||||
protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals) {
|
||||
super(definition, originals);
|
||||
}
|
||||
|
||||
protected StatelessNiFiCommonConfig(ConfigDef definition, Map<?, ?> originals, boolean doLog) {
|
||||
super(definition, originals, doLog);
|
||||
}
|
||||
|
||||
public String getNarDirectory() {
|
||||
return getString(NAR_DIRECTORY);
|
||||
}
|
||||
|
||||
public String getExtensionsDirectory() {
|
||||
return getString(EXTENSIONS_DIRECTORY);
|
||||
}
|
||||
|
||||
public String getWorkingDirectory() {
|
||||
return getString(WORKING_DIRECTORY);
|
||||
}
|
||||
|
||||
public String getDataflowName() {
|
||||
return getString(DATAFLOW_NAME);
|
||||
}
|
||||
|
||||
public String getKrb5File() {
|
||||
return getString(KRB5_FILE);
|
||||
}
|
||||
|
||||
public String getNexusBaseUrl() {
|
||||
return getString(NEXUS_BASE_URL);
|
||||
}
|
||||
|
||||
public String getDataflowTimeout() {
|
||||
return getString(DATAFLOW_TIMEOUT);
|
||||
}
|
||||
|
||||
public String getKeystoreFile() {
|
||||
return getString(KEYSTORE_FILE);
|
||||
}
|
||||
|
||||
public String getKeystoreType() {
|
||||
return getString(KEYSTORE_TYPE);
|
||||
}
|
||||
|
||||
public String getKeystorePassword() {
|
||||
return getOptionalPassword(KEYSTORE_PASSWORD);
|
||||
}
|
||||
|
||||
public String getKeystoreKeyPassword() {
|
||||
return getOptionalPassword(KEY_PASSWORD);
|
||||
}
|
||||
|
||||
public String getTruststoreFile() {
|
||||
return getString(TRUSTSTORE_FILE);
|
||||
}
|
||||
|
||||
public String getTruststoreType() {
|
||||
return getString(TRUSTSTORE_TYPE);
|
||||
}
|
||||
|
||||
public String getTruststorePassword() {
|
||||
return getOptionalPassword(TRUSTSTORE_PASSWORD);
|
||||
}
|
||||
|
||||
public String getSensitivePropsKey() {
|
||||
return getOptionalPassword(SENSITIVE_PROPS_KEY);
|
||||
}
|
||||
|
||||
/**
|
||||
* Populates the properties with the data flow definition parameters
|
||||
*
|
||||
* @param dataflowDefinitionProperties The properties to populate.
|
||||
*/
|
||||
public void setFlowDefinition(final Map<String, String> dataflowDefinitionProperties) {
|
||||
String configuredFlowSnapshot = getString(FLOW_SNAPSHOT);
|
||||
if (configuredFlowSnapshot.startsWith("http://") || configuredFlowSnapshot.startsWith("https://")) {
|
||||
logger.debug("Configured Flow Snapshot appears to be a URL. Will use {} property to configured Stateless NiFi", StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_URL);
|
||||
dataflowDefinitionProperties.put(StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_URL, configuredFlowSnapshot);
|
||||
} else if (configuredFlowSnapshot.trim().startsWith("{")) {
|
||||
logger.debug("Configured Flow Snapshot appears to be JSON. Will use {} property to configured Stateless NiFi", StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_CONTENTS);
|
||||
dataflowDefinitionProperties.put(StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_CONTENTS, configuredFlowSnapshot);
|
||||
} else {
|
||||
logger.debug("Configured Flow Snapshot appears to be a File. Will use {} property to configured Stateless NiFi", StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_FILE);
|
||||
final File flowSnapshotFile = new File(configuredFlowSnapshot);
|
||||
dataflowDefinitionProperties.put(StatelessNiFiCommonConfig.BOOTSTRAP_SNAPSHOT_FILE, flowSnapshotFile.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Collect Parameter Context values that override standard properties
|
||||
*
|
||||
* @return The parameter overrides of the flow.
|
||||
*/
|
||||
public List<ParameterOverride> getParameterOverrides() {
|
||||
final List<ParameterOverride> parameterOverrides = new ArrayList<>();
|
||||
|
||||
for (final Map.Entry<String, String> entry : originalsStrings().entrySet()) {
|
||||
final String parameterValue = entry.getValue();
|
||||
|
||||
ParameterOverride parameterOverride = null;
|
||||
final Matcher matcher = StatelessNiFiCommonConfig.PARAMETER_WITH_CONTEXT_PATTERN.matcher(entry.getKey());
|
||||
if (matcher.matches()) {
|
||||
final String contextName = matcher.group(1);
|
||||
final String parameterName = matcher.group(2);
|
||||
parameterOverride = new ParameterOverride(contextName, parameterName, parameterValue);
|
||||
} else {
|
||||
final Matcher noContextMatcher = StatelessNiFiCommonConfig.PARAMETER_WITHOUT_CONTEXT_PATTERN.matcher(entry.getKey());
|
||||
if (noContextMatcher.matches()) {
|
||||
final String parameterName = noContextMatcher.group(1);
|
||||
parameterOverride = new ParameterOverride(parameterName, parameterValue);
|
||||
}
|
||||
}
|
||||
|
||||
if (parameterOverride != null) {
|
||||
parameterOverrides.add(parameterOverride);
|
||||
}
|
||||
}
|
||||
|
||||
return parameterOverrides;
|
||||
}
|
||||
|
||||
protected String getOptionalPassword(String key) {
|
||||
Password password = getPassword(key);
|
||||
return password == null ? null : password.value();
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the flow definition related common configs to a config definition.
|
||||
*
|
||||
* @param configDef The config def to extend.
|
||||
*/
|
||||
protected static void addFlowConfigElements(final ConfigDef configDef) {
|
||||
configDef.define(FLOW_SNAPSHOT, ConfigDef.Type.STRING, null, new FlowSnapshotValidator(), ConfigDef.Importance.HIGH,
|
||||
"Specifies the dataflow to run. This may be a file containing the dataflow, a URL that points to a dataflow, or a String containing the entire dataflow as an escaped JSON.",
|
||||
FLOW_GROUP, 0, ConfigDef.Width.NONE, "Flow snapshot");
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds the directory, NAR, kerberos and TLS common configs to a config definition.
|
||||
*
|
||||
* @param configDef The config def to extend.
|
||||
*/
|
||||
protected static void addCommonConfigElements(final ConfigDef configDef) {
|
||||
configDef.define(NAR_DIRECTORY, ConfigDef.Type.STRING, null, new ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
|
||||
"Specifies the directory that stores the NiFi Archives (NARs)", DIRECTORIES_GROUP, 0, ConfigDef.Width.NONE, "NAR directory");
|
||||
configDef.define(EXTENSIONS_DIRECTORY, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
|
||||
"Specifies the directory that stores the extensions that will be downloaded (if any) from the configured Extension Client",
|
||||
DIRECTORIES_GROUP, 1, ConfigDef.Width.NONE, "Extensions directory");
|
||||
configDef.define(WORKING_DIRECTORY, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
|
||||
"Specifies the temporary working directory for expanding NiFi Archives (NARs)",
|
||||
DIRECTORIES_GROUP, 2, ConfigDef.Width.NONE, "Working directory");
|
||||
configDef.define(DATAFLOW_NAME, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(), ConfigDef.Importance.HIGH, "The name of the dataflow.");
|
||||
|
||||
configDef.define(
|
||||
KRB5_FILE, ConfigDef.Type.STRING, DEFAULT_KRB5_FILE, ConfigDef.Importance.MEDIUM,
|
||||
"Specifies the krb5.conf file to use if connecting to Kerberos-enabled services",
|
||||
KERBEROS_GROUP, 0, ConfigDef.Width.NONE, "krb5.conf file");
|
||||
configDef.define(
|
||||
NEXUS_BASE_URL, ConfigDef.Type.STRING, null, new ConnectHttpUrlValidator(), ConfigDef.Importance.MEDIUM,
|
||||
"Specifies the Base URL of the Nexus instance to source extensions from",
|
||||
NEXUS_GROUP, 0, ConfigDef.Width.NONE, "Nexus base URL");
|
||||
|
||||
configDef.define(
|
||||
DATAFLOW_TIMEOUT, ConfigDef.Type.STRING, DEFAULT_DATAFLOW_TIMEOUT, ConfigDef.Importance.MEDIUM,
|
||||
"Specifies the amount of time to wait for the dataflow to finish processing input before considering the dataflow a failure",
|
||||
FLOW_GROUP, 1, ConfigDef.Width.NONE, "Dataflow processing timeout");
|
||||
|
||||
configDef.define(TRUSTSTORE_FILE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
|
||||
"Filename of the truststore that Stateless NiFi should use for connecting to NiFi Registry and for Site-to-Site communications." +
|
||||
" If not specified, communications will occur only over http, not https.", TLS_GROUP, 0, ConfigDef.Width.NONE, "Truststore file");
|
||||
configDef.define(TRUSTSTORE_TYPE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
|
||||
"The type of the Truststore file. Either JKS or PKCS12.", TLS_GROUP, 1, ConfigDef.Width.NONE, "Truststore type");
|
||||
configDef.define(TRUSTSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
|
||||
"The password for the truststore.", TLS_GROUP, 2, ConfigDef.Width.NONE, "Truststore password");
|
||||
configDef.define(KEYSTORE_FILE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
|
||||
"Filename of the keystore that Stateless NiFi should use for connecting to NiFi Registry and for Site-to-Site communications.",
|
||||
TLS_GROUP, 3, ConfigDef.Width.NONE, "Keystore file");
|
||||
configDef.define(KEYSTORE_TYPE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
|
||||
"The type of the Keystore file. Either JKS or PKCS12.", TLS_GROUP, 4, ConfigDef.Width.NONE, "Keystore type");
|
||||
configDef.define(KEYSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
|
||||
"The password for the keystore.", TLS_GROUP, 5, ConfigDef.Width.NONE, "Keystore password");
|
||||
configDef.define(KEY_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
|
||||
"The password for the key in the keystore. If not provided, the password is assumed to be the same as the keystore password.",
|
||||
TLS_GROUP, 6, ConfigDef.Width.NONE, "Keystore key password");
|
||||
|
||||
configDef.define(SENSITIVE_PROPS_KEY, ConfigDef.Type.PASSWORD, DEFAULT_SENSITIVE_PROPS_KEY, ConfigDef.Importance.MEDIUM, "A key that components can use for encrypting and decrypting " +
|
||||
"sensitive values.", SECURITY_GROUP, 0, ConfigDef.Width.NONE, "Sensitive properties key");
|
||||
}
|
||||
}
|
@ -0,0 +1,113 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.kafka.connect;
|
||||
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class StatelessNiFiSinkConfig extends StatelessNiFiCommonConfig {
|
||||
public static final String INPUT_PORT_NAME = "input.port";
|
||||
public static final String FAILURE_PORTS = "failure.ports";
|
||||
public static final String HEADERS_AS_ATTRIBUTES_REGEX = "headers.as.attributes.regex";
|
||||
public static final String HEADER_ATTRIBUTE_NAME_PREFIX = "attribute.prefix";
|
||||
protected static final ConfigDef CONFIG_DEF = createConfigDef();
|
||||
|
||||
public StatelessNiFiSinkConfig(Map<?, ?> originals) {
|
||||
super(CONFIG_DEF, originals);
|
||||
}
|
||||
|
||||
protected StatelessNiFiSinkConfig(ConfigDef definition, Map<?, ?> originals) {
|
||||
super(definition, originals);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The input port name to use when feeding the flow. Can be null, which means the single available input port will be used.
|
||||
*/
|
||||
public String getInputPortName() {
|
||||
return getString(INPUT_PORT_NAME);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The output ports to handle as failure ports. Flow files sent to this port will cause the Connector to retry.
|
||||
*/
|
||||
public Set<String> getFailurePorts() {
|
||||
final List<String> configuredPorts = getList(FAILURE_PORTS);
|
||||
if (configuredPorts == null) {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
return new LinkedHashSet<>(configuredPorts);
|
||||
}
|
||||
|
||||
public String getHeadersAsAttributesRegex() {
|
||||
return getString(HEADERS_AS_ATTRIBUTES_REGEX);
|
||||
}
|
||||
|
||||
public String getHeaderAttributeNamePrefix() {
|
||||
return getString(HEADER_ATTRIBUTE_NAME_PREFIX);
|
||||
}
|
||||
|
||||
protected static ConfigDef createConfigDef() {
|
||||
ConfigDef configDef = new ConfigDef();
|
||||
StatelessNiFiCommonConfig.addCommonConfigElements(configDef);
|
||||
addFlowConfigs(configDef);
|
||||
addSinkConfigs(configDef);
|
||||
return configDef;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the flow definition related common configs to a config definition.
|
||||
*
|
||||
* @param configDef The config def to extend.
|
||||
*/
|
||||
protected static void addFlowConfigs(ConfigDef configDef) {
|
||||
StatelessNiFiCommonConfig.addFlowConfigElements(configDef);
|
||||
configDef.define(INPUT_PORT_NAME, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
|
||||
"The name of the Input Port to push data to", StatelessNiFiCommonConfig.FLOW_GROUP, 100,
|
||||
ConfigDef.Width.NONE, "Input port name");
|
||||
configDef.define(
|
||||
StatelessNiFiSinkConfig.FAILURE_PORTS, ConfigDef.Type.LIST, null, ConfigDef.Importance.MEDIUM,
|
||||
"A list of Output Ports that are considered failures. If any FlowFile is routed to an Output Ports whose name is provided in this property," +
|
||||
" the session is rolled back and is considered a failure", FLOW_GROUP, 200, ConfigDef.Width.NONE, "Failure ports");
|
||||
}
|
||||
|
||||
/**
|
||||
* Add sink configs to a config definition.
|
||||
*
|
||||
* @param configDef The config def to extend.
|
||||
*/
|
||||
protected static void addSinkConfigs(ConfigDef configDef) {
|
||||
configDef.define(
|
||||
HEADERS_AS_ATTRIBUTES_REGEX, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
|
||||
"A regular expression to evaluate against Kafka message header keys. Any message header whose key matches the regular expression" +
|
||||
" will be added to the FlowFile as an attribute. The name of the attribute will match the header key (with an optional prefix, as " +
|
||||
"defined by the attribute.prefix configuration) and the header value will be added as the attribute value.",
|
||||
RECORD_GROUP, 0, ConfigDef.Width.NONE, "Headers as Attributes regex");
|
||||
configDef.define(
|
||||
HEADER_ATTRIBUTE_NAME_PREFIX, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
|
||||
"A prefix to add to the key of each header that matches the headers.as.attributes.regex Regular Expression. For example," +
|
||||
" if a header has the key MyHeader and a value of MyValue, and the headers.as.attributes.regex is set to My.* and this property" +
|
||||
" is set to kafka. then the FlowFile that is created for the Kafka message will have an attribute" +
|
||||
" named kafka.MyHeader with a value of MyValue.",
|
||||
RECORD_GROUP, 1, ConfigDef.Width.NONE, "Headers as Attributes prefix");
|
||||
}
|
||||
}
|
@ -27,10 +27,6 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class StatelessNiFiSinkConnector extends SinkConnector {
|
||||
static final String INPUT_PORT_NAME = "input.port";
|
||||
static final String FAILURE_PORTS = "failure.ports";
|
||||
static final String HEADERS_AS_ATTRIBUTES_REGEX = "headers.as.attributes.regex";
|
||||
static final String HEADER_ATTRIBUTE_NAME_PREFIX = "attribute.prefix";
|
||||
|
||||
private Map<String, String> properties;
|
||||
|
||||
@ -47,7 +43,7 @@ public class StatelessNiFiSinkConnector extends SinkConnector {
|
||||
@Override
|
||||
public List<Map<String, String>> taskConfigs(final int maxTasks) {
|
||||
final List<Map<String, String>> configs = new ArrayList<>();
|
||||
for (int i=0; i < maxTasks; i++) {
|
||||
for (int i = 0; i < maxTasks; i++) {
|
||||
configs.add(new HashMap<>(properties));
|
||||
}
|
||||
|
||||
@ -60,24 +56,7 @@ public class StatelessNiFiSinkConnector extends SinkConnector {
|
||||
|
||||
@Override
|
||||
public ConfigDef config() {
|
||||
final ConfigDef configDef = new ConfigDef();
|
||||
StatelessKafkaConnectorUtil.addCommonConfigElements(configDef);
|
||||
|
||||
configDef.define(INPUT_PORT_NAME, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "The name of the Input Port to push data to");
|
||||
configDef.define(HEADERS_AS_ATTRIBUTES_REGEX, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
|
||||
"A regular expression to evaluate against Kafka message header keys. Any message " +
|
||||
"header whose key matches the regular expression will be added to the FlowFile as an attribute. The name of the attribute will match the header key (with an optional prefix, as " +
|
||||
"defined by the attribute.prefix configuration) and the header value will be added as the attribute value.");
|
||||
configDef.define(HEADER_ATTRIBUTE_NAME_PREFIX, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
|
||||
"A prefix to add to the key of each header that matches the headers.as.attributes.regex Regular Expression. For example, if a header has the key MyHeader and a value of " +
|
||||
"MyValue, and the headers.as.attributes.regex is set to My.* and this property is set to kafka. then the FlowFile that is created for the Kafka message will have an attribute" +
|
||||
" named kafka.MyHeader with a value of MyValue.");
|
||||
|
||||
configDef.define(FAILURE_PORTS, ConfigDef.Type.LIST, null, ConfigDef.Importance.MEDIUM,
|
||||
"A list of Output Ports that are considered failures. If any FlowFile is routed to an Output Ports whose name is provided in this property, the session is rolled back and is considered " +
|
||||
"a failure");
|
||||
|
||||
return configDef;
|
||||
return StatelessNiFiSinkConfig.CONFIG_DEF;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -35,9 +35,7 @@ import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
@ -66,33 +64,36 @@ public class StatelessNiFiSinkTask extends SinkTask {
|
||||
|
||||
@Override
|
||||
public void start(final Map<String, String> properties) {
|
||||
logger.info("Starting Sink Task with properties {}", StatelessKafkaConnectorUtil.getLoggableProperties(properties));
|
||||
logger.info("Starting Sink Task");
|
||||
StatelessNiFiSinkConfig config = createConfig(properties);
|
||||
|
||||
final String timeout = properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
|
||||
final String timeout = config.getDataflowTimeout();
|
||||
timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, TimeUnit.MILLISECONDS);
|
||||
|
||||
dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
|
||||
dataflowName = config.getDataflowName();
|
||||
|
||||
final String regex = properties.get(StatelessNiFiSinkConnector.HEADERS_AS_ATTRIBUTES_REGEX);
|
||||
final String regex = config.getHeadersAsAttributesRegex();
|
||||
headerNameRegex = regex == null ? null : Pattern.compile(regex);
|
||||
headerNamePrefix = properties.getOrDefault(StatelessNiFiSinkConnector.HEADER_ATTRIBUTE_NAME_PREFIX, "");
|
||||
headerNamePrefix = config.getHeaderAttributeNamePrefix();
|
||||
if (headerNamePrefix == null) {
|
||||
headerNamePrefix = "";
|
||||
}
|
||||
|
||||
dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
|
||||
dataflow = StatelessKafkaConnectorUtil.createDataflow(config);
|
||||
dataflow.initialize();
|
||||
|
||||
// Determine input port name. If input port is explicitly set, use the value given. Otherwise, if only one port exists, use that. Otherwise, throw ConfigException.
|
||||
final String dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
|
||||
inputPortName = properties.get(StatelessNiFiSinkConnector.INPUT_PORT_NAME);
|
||||
inputPortName = config.getInputPortName();
|
||||
if (inputPortName == null) {
|
||||
final Set<String> inputPorts = dataflow.getInputPortNames();
|
||||
if (inputPorts.isEmpty()) {
|
||||
throw new ConfigException("The dataflow specified for <" + dataflowName + "> does not have an Input Port at the root level. Dataflows used for a Kafka Connect Sink Task "
|
||||
+ "must have at least one Input Port at the root level.");
|
||||
+ "must have at least one Input Port at the root level.");
|
||||
}
|
||||
|
||||
if (inputPorts.size() > 1) {
|
||||
throw new ConfigException("The dataflow specified for <" + dataflowName + "> has multiple Input Ports at the root level (" + inputPorts.toString()
|
||||
+ "). The " + StatelessNiFiSinkConnector.INPUT_PORT_NAME + " property must be set to indicate which of these Ports Kafka records should be sent to.");
|
||||
throw new ConfigException("The dataflow specified for <" + dataflowName + "> has multiple Input Ports at the root level (" + inputPorts
|
||||
+ "). The " + StatelessNiFiSinkConfig.INPUT_PORT_NAME + " property must be set to indicate which of these Ports Kafka records should be sent to.");
|
||||
}
|
||||
|
||||
inputPortName = inputPorts.iterator().next();
|
||||
@ -101,29 +102,18 @@ public class StatelessNiFiSinkTask extends SinkTask {
|
||||
// Validate the input port
|
||||
if (!dataflow.getInputPortNames().contains(inputPortName)) {
|
||||
throw new ConfigException("The dataflow specified for <" + dataflowName + "> does not have Input Port with name <" + inputPortName + "> at the root level. Existing Input Port names are "
|
||||
+ dataflow.getInputPortNames());
|
||||
+ dataflow.getInputPortNames());
|
||||
}
|
||||
|
||||
// Determine the failure Ports, if any are given.
|
||||
final String failurePortList = properties.get(StatelessNiFiSinkConnector.FAILURE_PORTS);
|
||||
if (failurePortList == null || failurePortList.trim().isEmpty()) {
|
||||
failurePortNames = Collections.emptySet();
|
||||
} else {
|
||||
failurePortNames = new HashSet<>();
|
||||
|
||||
final String[] names = failurePortList.split(",");
|
||||
for (final String name : names) {
|
||||
final String trimmed = name.trim();
|
||||
failurePortNames.add(trimmed);
|
||||
}
|
||||
}
|
||||
failurePortNames = config.getFailurePorts();
|
||||
|
||||
// Validate the failure ports
|
||||
final Set<String> outputPortNames = dataflow.getOutputPortNames();
|
||||
for (final String failurePortName : failurePortNames) {
|
||||
if (!outputPortNames.contains(failurePortName)) {
|
||||
throw new ConfigException("Dataflow was configured with a Failure Port of " + failurePortName
|
||||
+ " but there is no Port with that name in the dataflow. Valid Port names are " + outputPortNames);
|
||||
+ " but there is no Port with that name in the dataflow. Valid Port names are " + outputPortNames);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -140,6 +130,16 @@ public class StatelessNiFiSinkTask extends SinkTask {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a config instance to be used by the task.
|
||||
*
|
||||
* @param properties The properties to use in the config.
|
||||
* @return The config instance.
|
||||
*/
|
||||
protected StatelessNiFiSinkConfig createConfig(Map<String, String> properties) {
|
||||
return new StatelessNiFiSinkConfig(properties);
|
||||
}
|
||||
|
||||
private void backoff() {
|
||||
// If no backoff period has been set, set it to 1 second. Otherwise, double the amount of time to backoff, up to 10 seconds.
|
||||
if (backoffMillis == 0L) {
|
||||
|
@ -0,0 +1,120 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.nifi.kafka.connect;
|
||||
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.nifi.kafka.connect.validators.ConnectRegularExpressionValidator;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class StatelessNiFiSourceConfig extends StatelessNiFiCommonConfig {
|
||||
public static final String OUTPUT_PORT_NAME = "output.port";
|
||||
public static final String TOPIC_NAME = "topics";
|
||||
public static final String TOPIC_NAME_ATTRIBUTE = "topic.name.attribute";
|
||||
public static final String KEY_ATTRIBUTE = "key.attribute";
|
||||
public static final String HEADER_REGEX = "header.attribute.regex";
|
||||
public static final String STATE_MAP_KEY = "task.index";
|
||||
protected static final ConfigDef CONFIG_DEF = createConfigDef();
|
||||
|
||||
public StatelessNiFiSourceConfig(Map<?, ?> originals) {
|
||||
super(CONFIG_DEF, originals);
|
||||
}
|
||||
|
||||
protected StatelessNiFiSourceConfig(ConfigDef definition, Map<?, ?> originals) {
|
||||
super(definition, originals);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return The output port name to use when reading from the flow. Can be null, which means the single available output port will be used.
|
||||
*/
|
||||
public String getOutputPortName() {
|
||||
return getString(OUTPUT_PORT_NAME);
|
||||
}
|
||||
|
||||
public String getTopicName() {
|
||||
return getString(TOPIC_NAME);
|
||||
}
|
||||
|
||||
public String getTopicNameAttribute() {
|
||||
return getString(TOPIC_NAME_ATTRIBUTE);
|
||||
}
|
||||
|
||||
public String getKeyAttribute() {
|
||||
return getString(KEY_ATTRIBUTE);
|
||||
}
|
||||
|
||||
public String getHeaderRegex() {
|
||||
return getString(HEADER_REGEX);
|
||||
}
|
||||
|
||||
public String getStateMapKey() {
|
||||
return originalsStrings().get(STATE_MAP_KEY);
|
||||
}
|
||||
|
||||
protected static ConfigDef createConfigDef() {
|
||||
ConfigDef configDef = new ConfigDef();
|
||||
StatelessNiFiCommonConfig.addCommonConfigElements(configDef);
|
||||
addFlowConfigs(configDef);
|
||||
addSourceConfigs(configDef);
|
||||
return configDef;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the flow definition related common configs to a config definition.
|
||||
*
|
||||
* @param configDef The config def to extend.
|
||||
*/
|
||||
protected static void addFlowConfigs(ConfigDef configDef) {
|
||||
StatelessNiFiCommonConfig.addFlowConfigElements(configDef);
|
||||
configDef.define(StatelessNiFiSourceConfig.OUTPUT_PORT_NAME, ConfigDef.Type.STRING, null,
|
||||
ConfigDef.Importance.HIGH, "The name of the Output Port to pull data from",
|
||||
FLOW_GROUP, 100, ConfigDef.Width.NONE, "Output port name");
|
||||
}
|
||||
|
||||
/**
|
||||
* Add sink configs to a config definition.
|
||||
*
|
||||
* @param configDef The config def to extend.
|
||||
*/
|
||||
protected static void addSourceConfigs(ConfigDef configDef) {
|
||||
configDef.define(
|
||||
StatelessNiFiSourceConfig.TOPIC_NAME, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
|
||||
"The name of the Kafka topic to send data to. Either the topics or topic.name.attribute configuration must be specified.",
|
||||
RECORD_GROUP, 0, ConfigDef.Width.NONE, "Topic name");
|
||||
configDef.define(
|
||||
StatelessNiFiSourceConfig.TOPIC_NAME_ATTRIBUTE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
|
||||
"Specifies the name of a FlowFile attribute to use for determining which Kafka Topic a FlowFile"
|
||||
+ " will be sent to. Either the " + StatelessNiFiSourceConfig.TOPIC_NAME + " or " + StatelessNiFiSourceConfig.TOPIC_NAME_ATTRIBUTE +
|
||||
" configuration must be specified. If both are specified, the " + StatelessNiFiSourceConfig.TOPIC_NAME_ATTRIBUTE
|
||||
+ " will be preferred, but if a FlowFile does not have the specified attribute name, then the " + StatelessNiFiSourceConfig.TOPIC_NAME +
|
||||
" property will serve as the default topic name to use.",
|
||||
RECORD_GROUP, 1, ConfigDef.Width.NONE, "Topic name attribute");
|
||||
|
||||
configDef.define(StatelessNiFiSourceConfig.KEY_ATTRIBUTE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
|
||||
"Specifies the name of a FlowFile attribute to use for determining the Kafka Message key. If not"
|
||||
+ " specified, the message key will be null. If specified, the value of the attribute with the given name will be used as the message key.",
|
||||
RECORD_GROUP, 100, ConfigDef.Width.NONE, "Record key attribute");
|
||||
|
||||
configDef.define(
|
||||
StatelessNiFiSourceConfig.HEADER_REGEX, ConfigDef.Type.STRING, null, new ConnectRegularExpressionValidator(), ConfigDef.Importance.MEDIUM,
|
||||
"Specifies a Regular Expression to evaluate against all FlowFile attributes. Any attribute whose name matches the Regular Expression" +
|
||||
" will be converted into a Kafka message header with the name of the attribute used as header key and the value of the attribute used as the header"
|
||||
+ " value.",
|
||||
RECORD_GROUP, 200, ConfigDef.Width.NONE, "Record header attribute regex");
|
||||
}
|
||||
}
|
@ -20,7 +20,6 @@ package org.apache.nifi.kafka.connect;
|
||||
import org.apache.kafka.common.config.ConfigDef;
|
||||
import org.apache.kafka.connect.connector.Task;
|
||||
import org.apache.kafka.connect.source.SourceConnector;
|
||||
import org.apache.nifi.kafka.connect.validators.ConnectRegularExpressionValidator;
|
||||
import org.apache.nifi.stateless.flow.StatelessDataflow;
|
||||
|
||||
import java.util.ArrayList;
|
||||
@ -29,21 +28,15 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class StatelessNiFiSourceConnector extends SourceConnector {
|
||||
static final String OUTPUT_PORT_NAME = "output.port";
|
||||
static final String TOPIC_NAME = "topics";
|
||||
|
||||
static final String TOPIC_NAME_ATTRIBUTE = "topic.name.attribute";
|
||||
static final String KEY_ATTRIBUTE = "key.attribute";
|
||||
static final String HEADER_REGEX = "header.attribute.regex";
|
||||
|
||||
private Map<String, String> properties;
|
||||
private StatelessNiFiSourceConfig config;
|
||||
private boolean primaryNodeOnly;
|
||||
|
||||
@Override
|
||||
public void start(final Map<String, String> properties) {
|
||||
this.properties = new HashMap<>(properties);
|
||||
config = createConfig(properties);
|
||||
|
||||
final StatelessDataflow dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
|
||||
final StatelessDataflow dataflow = StatelessKafkaConnectorUtil.createDataflow(config);
|
||||
primaryNodeOnly = dataflow.isSourcePrimaryNodeOnly();
|
||||
dataflow.shutdown();
|
||||
}
|
||||
@ -58,8 +51,8 @@ public class StatelessNiFiSourceConnector extends SourceConnector {
|
||||
final int numTasks = primaryNodeOnly ? 1 : maxTasks;
|
||||
|
||||
final List<Map<String, String>> configs = new ArrayList<>();
|
||||
for (int i=0; i < numTasks; i++) {
|
||||
final Map<String, String> taskConfig = new HashMap<>(properties);
|
||||
for (int i = 0; i < numTasks; i++) {
|
||||
final Map<String, String> taskConfig = new HashMap<>(config.originalsStrings());
|
||||
taskConfig.put("task.index", String.valueOf(i));
|
||||
configs.add(taskConfig);
|
||||
}
|
||||
@ -74,31 +67,21 @@ public class StatelessNiFiSourceConnector extends SourceConnector {
|
||||
|
||||
@Override
|
||||
public ConfigDef config() {
|
||||
final ConfigDef configDef = new ConfigDef();
|
||||
StatelessKafkaConnectorUtil.addCommonConfigElements(configDef);
|
||||
|
||||
configDef.define(OUTPUT_PORT_NAME, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "The name of the Output Port to pull data from");
|
||||
configDef.define(TOPIC_NAME, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
|
||||
"The name of the Kafka topic to send data to. Either the topics or topic.name.attribute configuration must be specified.");
|
||||
|
||||
configDef.define(TOPIC_NAME_ATTRIBUTE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
|
||||
"Specifies the name of a FlowFile attribute to use for determining which Kafka Topic a FlowFile"
|
||||
+ " will be sent to. Either the " + TOPIC_NAME + " or " + TOPIC_NAME_ATTRIBUTE + " configuration must be specified. If both are specified, the " + TOPIC_NAME_ATTRIBUTE
|
||||
+ " will be preferred, but if a FlowFile does not have the specified attribute name, then the " + TOPIC_NAME + " property will serve as the default topic name to use.");
|
||||
|
||||
configDef.define(KEY_ATTRIBUTE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Specifies the name of a FlowFile attribute to use for determining the Kafka Message key. If not"
|
||||
+ " specified, the message key will be null. If specified, the value of the attribute with the given name will be used as the message key.");
|
||||
|
||||
configDef.define(HEADER_REGEX, ConfigDef.Type.STRING, null, new ConnectRegularExpressionValidator(), ConfigDef.Importance.MEDIUM,
|
||||
"Specifies a Regular Expression to evaluate against all FlowFile attributes. Any attribute whose name"
|
||||
+ " matches the Regular Expression will be converted into a Kafka message header with the name of the attribute used as header key and the value of the attribute used as the header"
|
||||
+ " value.");
|
||||
|
||||
return configDef;
|
||||
return StatelessNiFiSourceConfig.CONFIG_DEF;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String version() {
|
||||
return StatelessKafkaConnectorUtil.getVersion();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a config instance to be used by the Connector.
|
||||
*
|
||||
* @param properties Properties to use in the config.
|
||||
* @return The config instance.
|
||||
*/
|
||||
protected StatelessNiFiSourceConfig createConfig(Map<String, String> properties) {
|
||||
return new StatelessNiFiSourceConfig(properties);
|
||||
}
|
||||
}
|
||||
|
@ -45,7 +45,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class StatelessNiFiSourceTask extends SourceTask {
|
||||
public static final String STATE_MAP_KEY = "task.index";
|
||||
private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiSourceTask.class);
|
||||
private static final long FAILURE_YIELD_MILLIS = 1000L;
|
||||
|
||||
@ -60,7 +59,7 @@ public class StatelessNiFiSourceTask extends SourceTask {
|
||||
private String dataflowName;
|
||||
private long failureYieldExpiration = 0L;
|
||||
|
||||
private final Map<String, String> clusterStatePartitionMap = Collections.singletonMap(STATE_MAP_KEY, "CLUSTER");
|
||||
private final Map<String, String> clusterStatePartitionMap = Collections.singletonMap(StatelessNiFiSourceConfig.STATE_MAP_KEY, "CLUSTER");
|
||||
private Map<String, String> localStatePartitionMap = new HashMap<>();
|
||||
|
||||
private final AtomicLong unacknowledgedRecords = new AtomicLong(0L);
|
||||
@ -72,45 +71,46 @@ public class StatelessNiFiSourceTask extends SourceTask {
|
||||
|
||||
@Override
|
||||
public void start(final Map<String, String> properties) {
|
||||
logger.info("Starting Source Task with properties {}", StatelessKafkaConnectorUtil.getLoggableProperties(properties));
|
||||
logger.info("Starting Source Task");
|
||||
StatelessNiFiSourceConfig config = createConfig(properties);
|
||||
|
||||
final String timeout = properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
|
||||
final String timeout = config.getDataflowTimeout();
|
||||
timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, TimeUnit.MILLISECONDS);
|
||||
|
||||
topicName = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME);
|
||||
topicNameAttribute = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME_ATTRIBUTE);
|
||||
keyAttributeName = properties.get(StatelessNiFiSourceConnector.KEY_ATTRIBUTE);
|
||||
topicName = config.getTopicName();
|
||||
topicNameAttribute = config.getTopicNameAttribute();
|
||||
keyAttributeName = config.getKeyAttribute();
|
||||
|
||||
if (topicName == null && topicNameAttribute == null) {
|
||||
throw new ConfigException("Either the topic.name or topic.name.attribute configuration must be specified");
|
||||
}
|
||||
|
||||
final String headerRegex = properties.get(StatelessNiFiSourceConnector.HEADER_REGEX);
|
||||
final String headerRegex = config.getHeaderRegex();
|
||||
headerAttributeNamePattern = headerRegex == null ? null : Pattern.compile(headerRegex);
|
||||
|
||||
dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
|
||||
dataflow = StatelessKafkaConnectorUtil.createDataflow(config);
|
||||
dataflow.initialize();
|
||||
|
||||
// Determine the name of the Output Port to retrieve data from
|
||||
dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
|
||||
outputPortName = properties.get(StatelessNiFiSourceConnector.OUTPUT_PORT_NAME);
|
||||
dataflowName = config.getDataflowName();
|
||||
outputPortName = config.getOutputPortName();
|
||||
if (outputPortName == null) {
|
||||
final Set<String> outputPorts = dataflow.getOutputPortNames();
|
||||
if (outputPorts.isEmpty()) {
|
||||
throw new ConfigException("The dataflow specified for <" + dataflowName + "> does not have an Output Port at the root level. Dataflows used for a Kafka Connect Source Task "
|
||||
+ "must have at least one Output Port at the root level.");
|
||||
+ "must have at least one Output Port at the root level.");
|
||||
}
|
||||
|
||||
if (outputPorts.size() > 1) {
|
||||
throw new ConfigException("The dataflow specified for <" + dataflowName + "> has multiple Output Ports at the root level (" + outputPorts.toString()
|
||||
+ "). The " + StatelessNiFiSourceConnector.OUTPUT_PORT_NAME + " property must be set to indicate which of these Ports Kafka records should be retrieved from.");
|
||||
throw new ConfigException("The dataflow specified for <" + dataflowName + "> has multiple Output Ports at the root level (" + outputPorts
|
||||
+ "). The " + StatelessNiFiSourceConfig.OUTPUT_PORT_NAME + " property must be set to indicate which of these Ports Kafka records should be retrieved from.");
|
||||
}
|
||||
|
||||
outputPortName = outputPorts.iterator().next();
|
||||
}
|
||||
|
||||
final String taskIndex = properties.get(STATE_MAP_KEY);
|
||||
localStatePartitionMap.put(STATE_MAP_KEY, taskIndex);
|
||||
final String taskIndex = config.getStateMapKey();
|
||||
localStatePartitionMap.put(StatelessNiFiSourceConfig.STATE_MAP_KEY, taskIndex);
|
||||
|
||||
final Map<String, String> localStateMap = (Map<String, String>) (Map) context.offsetStorageReader().offset(localStatePartitionMap);
|
||||
final Map<String, String> clusterStateMap = (Map<String, String>) (Map) context.offsetStorageReader().offset(clusterStatePartitionMap);
|
||||
@ -205,6 +205,16 @@ public class StatelessNiFiSourceTask extends SourceTask {
|
||||
return sourceRecords;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a config instance to be used by the task.
|
||||
*
|
||||
* @param properties The properties to use in the config.
|
||||
* @return The config instance.
|
||||
*/
|
||||
protected StatelessNiFiSourceConfig createConfig(Map<String, String> properties) {
|
||||
return new StatelessNiFiSourceConfig(properties);
|
||||
}
|
||||
|
||||
private void verifyFlowFilesTransferredToProperPort(final TriggerResult triggerResult, final String expectedPortName, final DataflowTrigger trigger) {
|
||||
final Map<String, List<FlowFile>> flowFileOutputMap = triggerResult.getOutputFlowFiles();
|
||||
|
||||
|
@ -45,7 +45,7 @@ public class ConnectDirectoryExistsValidator implements ConfigDef.Validator {
|
||||
final File[] files = file.listFiles();
|
||||
if (files == null) {
|
||||
throw new ConfigException("The value " + value + " configured for the property " + name + " is not valid because could not obtain a listing of files in directory "
|
||||
+ file.getAbsolutePath());
|
||||
+ file.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user