diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java index 772aa8e97c..d0170b82c7 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/processor/util/StandardValidators.java @@ -22,9 +22,11 @@ import java.net.URI; import java.net.URL; import java.nio.charset.Charset; import java.nio.charset.UnsupportedCharsetException; -import java.time.Instant; import java.text.NumberFormat; import java.text.ParseException; +import java.time.Instant; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.regex.Pattern; @@ -158,6 +160,55 @@ public class StandardValidators { } }; + /** + * {@link Validator} that ensures that value's length > 0 and that expression language is present + */ + public static final Validator NON_EMPTY_EL_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { + return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); + } + return StandardValidators.NON_EMPTY_VALIDATOR.validate(subject, input, context); + } + }; + + /** + * {@link Validator} that ensures that value is a non-empty comma separated list of hostname:port + */ + public static final Validator HOSTNAME_PORT_LIST_VALIDATOR = new Validator() { + @Override + public ValidationResult validate(String subject, String input, ValidationContext context) { + // expression language + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { + return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); + } + // not empty + ValidationResult nonEmptyValidatorResult = StandardValidators.NON_EMPTY_VALIDATOR.validate(subject, input, context); + if (!nonEmptyValidatorResult.isValid()) { + return nonEmptyValidatorResult; + } + // check format + final List hostnamePortList = Arrays.asList(input.split(",")); + for (String hostnamePort : hostnamePortList) { + String[] addresses = hostnamePort.split(":"); + // Protect against invalid input like http://127.0.0.1:9300 (URL scheme should not be there) + if (addresses.length != 2) { + return new ValidationResult.Builder().subject(subject).input(input).explanation( + "Must be in hostname:port form (no scheme such as http://").valid(false).build(); + } + + // Validate the port + String port = addresses[1].trim(); + ValidationResult portValidatorResult = StandardValidators.PORT_VALIDATOR.validate(subject, port, context); + if (!portValidatorResult.isValid()) { + return portValidatorResult; + } + } + return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid cluster definition").valid(true).build(); + } + }; + /** * {@link Validator} that ensures that value has 1+ non-whitespace * characters diff --git a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/validator/TestStandardValidators.java b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/validator/TestStandardValidators.java index ecb752c15c..81888caa32 100644 --- a/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/validator/TestStandardValidators.java +++ b/nifi-commons/nifi-utils/src/test/java/org/apache/nifi/util/validator/TestStandardValidators.java @@ -16,7 +16,6 @@ */ package org.apache.nifi.util.validator; -import org.apache.nifi.processor.util.StandardValidators; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -26,6 +25,7 @@ import java.util.concurrent.TimeUnit; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; import org.apache.nifi.components.Validator; +import org.apache.nifi.processor.util.StandardValidators; import org.junit.Test; import org.mockito.Mockito; @@ -45,6 +45,61 @@ public class TestStandardValidators { assertTrue(vr.isValid()); } + @Test + public void testNonEmptyELValidator() { + Validator val = StandardValidators.NON_EMPTY_EL_VALIDATOR; + ValidationContext vc = mock(ValidationContext.class); + Mockito.when(vc.isExpressionLanguageSupported("foo")).thenReturn(true); + + ValidationResult vr = val.validate("foo", "", vc); + assertFalse(vr.isValid()); + + vr = val.validate("foo", " h", vc); + assertTrue(vr.isValid()); + + Mockito.when(vc.isExpressionLanguagePresent("${test}")).thenReturn(true); + vr = val.validate("foo", "${test}", vc); + assertTrue(vr.isValid()); + + vr = val.validate("foo", "${test", vc); + assertTrue(vr.isValid()); + } + + @Test + public void testHostnamePortListValidator() { + Validator val = StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR; + ValidationContext vc = mock(ValidationContext.class); + Mockito.when(vc.isExpressionLanguageSupported("foo")).thenReturn(true); + + ValidationResult vr = val.validate("foo", "", vc); + assertFalse(vr.isValid()); + + vr = val.validate("foo", "localhost", vc); + assertFalse(vr.isValid()); + + vr = val.validate("foo", "test:0", vc); + assertFalse(vr.isValid()); + + vr = val.validate("foo", "test:65536", vc); + assertFalse(vr.isValid()); + + vr = val.validate("foo", "test:6666,localhost", vc); + assertFalse(vr.isValid()); + + vr = val.validate("foo", "test:65535", vc); + assertTrue(vr.isValid()); + + vr = val.validate("foo", "test:65535,localhost:666,127.0.0.1:8989", vc); + assertTrue(vr.isValid()); + + Mockito.when(vc.isExpressionLanguagePresent("${test}")).thenReturn(true); + vr = val.validate("foo", "${test}", vc); + assertTrue(vr.isValid()); + + vr = val.validate("foo", "${test", vc); + assertFalse(vr.isValid()); + } + @Test public void testTimePeriodValidator() { Validator val = StandardValidators.createTimePeriodValidator(1L, TimeUnit.SECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS); diff --git a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java index 556fae9b4a..1dc18098bd 100644 --- a/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java +++ b/nifi-nar-bundles/nifi-cassandra-bundle/nifi-cassandra-processors/src/main/java/org/apache/nifi/processors/cassandra/AbstractCassandraProcessor.java @@ -33,7 +33,6 @@ import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; @@ -60,31 +59,6 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor { public static final int DEFAULT_CASSANDRA_PORT = 9042; - private static final Validator HOSTNAME_PORT_VALIDATOR = new Validator() { - @Override - public ValidationResult validate(final String subject, final String input, final ValidationContext context) { - final List esList = Arrays.asList(input.split(",")); - for (String hostnamePort : esList) { - String[] addresses = hostnamePort.split(":"); - // Protect against invalid input like http://127.0.0.1:9042 (URL scheme should not be there) - if (addresses.length != 2) { - return new ValidationResult.Builder().subject(subject).input(input).explanation( - "Each entry must be in hostname:port form (no scheme such as http://, and port must be specified)") - .valid(false).build(); - } - // Validate the port - String port = addresses[1].trim(); - ValidationResult portValidatorResult = StandardValidators.PORT_VALIDATOR.validate(subject, port, context); - if (!portValidatorResult.isValid()) { - return portValidatorResult; - } - - } - return new ValidationResult.Builder().subject(subject).input(input).explanation( - "Valid cluster definition").valid(true).build(); - } - }; - // Common descriptors public static final PropertyDescriptor CONTACT_POINTS = new PropertyDescriptor.Builder() .name("Cassandra Contact Points") @@ -93,7 +67,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor { + " The default client port for Cassandra is 9042, but the port(s) must be explicitly specified.") .required(true) .expressionLanguageSupported(false) - .addValidator(HOSTNAME_PORT_VALIDATOR) + .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) .build(); public static final PropertyDescriptor KEYSPACE = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearch5TransportClientProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearch5TransportClientProcessor.java index b78995e3ee..64490935db 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearch5TransportClientProcessor.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-5-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearch5TransportClientProcessor.java @@ -17,8 +17,6 @@ package org.apache.nifi.processors.elasticsearch; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.exception.ProcessException; @@ -46,26 +44,6 @@ import java.util.concurrent.atomic.AtomicReference; abstract class AbstractElasticsearch5TransportClientProcessor extends AbstractElasticsearch5Processor { - /** - * This validator ensures the Elasticsearch hosts property is a valid list of hostname:port entries - */ - private static final Validator HOSTNAME_PORT_VALIDATOR = (subject, input, context) -> { - if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { - return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); - } - final List esList = Arrays.asList(input.split(",")); - for (String hostnamePort : esList) { - String[] addresses = hostnamePort.split(":"); - // Protect against invalid input like http://127.0.0.1:9300 (URL scheme should not be there) - if (addresses.length != 2) { - return new ValidationResult.Builder().subject(subject).input(input).explanation( - "Must be in hostname:port form (no scheme such as http://").valid(false).build(); - } - } - return new ValidationResult.Builder().subject(subject).input(input).explanation( - "Valid cluster definition").valid(true).build(); - }; - protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder() .name("el5-cluster-name") .displayName("Cluster Name") @@ -84,7 +62,7 @@ abstract class AbstractElasticsearch5TransportClientProcessor extends AbstractEl + "connect to hosts. The default transport client port is 9300.") .required(true) .expressionLanguageSupported(true) - .addValidator(HOSTNAME_PORT_VALIDATOR) + .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) .build(); public static final PropertyDescriptor PROP_XPACK_LOCATION = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java index c5e4cc3d79..0453785dc5 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchProcessor.java @@ -19,7 +19,6 @@ package org.apache.nifi.processors.elasticsearch; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.exception.ProcessException; @@ -36,13 +35,6 @@ import java.util.Set; */ public abstract class AbstractElasticsearchProcessor extends AbstractProcessor { - static final Validator NON_EMPTY_EL_VALIDATOR = (subject, value, context) -> { - if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { - return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build(); - } - return StandardValidators.NON_EMPTY_VALIDATOR.validate(subject, value, context); - }; - public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() .name("SSL Context Service") .description("The SSL Context Service used to provide client certificate information for TLS/SSL " diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java index a16a0dd6a5..b260768225 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/AbstractElasticsearchTransportClientProcessor.java @@ -17,8 +17,6 @@ package org.apache.nifi.processors.elasticsearch; import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.exception.ProcessException; @@ -43,28 +41,8 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicReference; - public abstract class AbstractElasticsearchTransportClientProcessor extends AbstractElasticsearchProcessor { - /** - * This validator ensures the Elasticsearch hosts property is a valid list of hostname:port entries - */ - private static final Validator HOSTNAME_PORT_VALIDATOR = (subject, input, context) -> { - if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) { - return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build(); - } - final List esList = Arrays.asList(input.split(",")); - for (String hostnamePort : esList) { - String[] addresses = hostnamePort.split(":"); - // Protect against invalid input like http://127.0.0.1:9300 (URL scheme should not be there) - if (addresses.length != 2) { - return new ValidationResult.Builder().subject(subject).input(input).explanation( - "Must be in hostname:port form (no scheme such as http://").valid(false).build(); - } - } - return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid cluster definition").valid(true).build(); - }; - protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder() .name("Cluster Name") .description("Name of the ES cluster (for example, elasticsearch_brew). Defaults to 'elasticsearch'") @@ -81,7 +59,7 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst + "connect to hosts. The default transport client port is 9300.") .required(true) .expressionLanguageSupported(false) - .addValidator(HOSTNAME_PORT_VALIDATOR) + .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) .expressionLanguageSupported(true) .build(); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java index d208d40d34..ab61f674bc 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearch.java @@ -96,7 +96,7 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces .description("The type of this document (used by Elasticsearch for indexing and searching)") .required(true) .expressionLanguageSupported(true) - .addValidator(NON_EMPTY_EL_VALIDATOR) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) .build(); public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder() @@ -104,7 +104,7 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces .description("The type of the operation used to index (index, update, upsert)") .required(true) .expressionLanguageSupported(true) - .addValidator(NON_EMPTY_EL_VALIDATOR) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) .defaultValue("index") .build(); diff --git a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java index 479d396630..70884be002 100644 --- a/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java +++ b/nifi-nar-bundles/nifi-elasticsearch-bundle/nifi-elasticsearch-processors/src/main/java/org/apache/nifi/processors/elasticsearch/PutElasticsearchHttp.java @@ -104,7 +104,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { .description("The type of this document (used by Elasticsearch for indexing and searching)") .required(true) .expressionLanguageSupported(true) - .addValidator(NON_EMPTY_EL_VALIDATOR) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) .build(); public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder() @@ -113,7 +113,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor { .description("The type of the operation used to index (index, update, upsert, delete)") .required(true) .expressionLanguageSupported(true) - .addValidator(NON_EMPTY_EL_VALIDATOR) + .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR) .defaultValue("index") .build(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java index 3d09f2df03..cbe2e24b16 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java @@ -52,10 +52,6 @@ final class KafkaProcessorUtils { final Logger logger = LoggerFactory.getLogger(this.getClass()); - private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}"; - - private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*"; - static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string."); static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded", "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters"); @@ -77,8 +73,7 @@ final class KafkaProcessorUtils { .displayName("Kafka Brokers") .description("A comma-separated list of known Kafka Brokers in the format :") .required(true) - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) + .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) .expressionLanguageSupported(true) .defaultValue("localhost:9092") .build(); diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java index 616c6f3acf..8a4c5d5346 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java @@ -30,7 +30,6 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Pattern; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -60,10 +59,6 @@ import org.apache.nifi.processors.kafka.KafkaPublisher.KafkaPublisherResult; + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.") public class PutKafka extends AbstractKafkaProcessor { - private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}"; - - private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*"; - public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", "FlowFile will be routed to" + " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration"); @@ -116,7 +111,7 @@ public class PutKafka extends AbstractKafkaProcessor { .name("Known Brokers") .description("A comma-separated list of known Kafka Brokers in the format :") .required(true) - .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) + .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) .expressionLanguageSupported(false) .build(); public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java index 3d09f2df03..cbe2e24b16 100644 --- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java +++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java @@ -52,10 +52,6 @@ final class KafkaProcessorUtils { final Logger logger = LoggerFactory.getLogger(this.getClass()); - private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}"; - - private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*"; - static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string."); static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded", "The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters"); @@ -77,8 +73,7 @@ final class KafkaProcessorUtils { .displayName("Kafka Brokers") .description("A comma-separated list of known Kafka Brokers in the format :") .required(true) - .addValidator(StandardValidators.NON_BLANK_VALIDATOR) - .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) + .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR) .expressionLanguageSupported(true) .defaultValue("localhost:9092") .build();