NIFI-3139 Added host:port list and non empty EL validators in StandardValidators

This commit is contained in:
Pierre Villard 2016-12-29 11:52:49 +01:00 committed by Matt Burgess
parent 74ecc20f00
commit 4e4d14f86f
11 changed files with 118 additions and 105 deletions

View File

@ -22,9 +22,11 @@ import java.net.URI;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.charset.UnsupportedCharsetException;
import java.time.Instant;
import java.text.NumberFormat;
import java.text.ParseException;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
@ -158,6 +160,55 @@ public class StandardValidators {
}
};
/**
* {@link Validator} that ensures that value's length > 0 and that expression language is present
*/
public static final Validator NON_EMPTY_EL_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
return StandardValidators.NON_EMPTY_VALIDATOR.validate(subject, input, context);
}
};
/**
* {@link Validator} that ensures that value is a non-empty comma separated list of hostname:port
*/
public static final Validator HOSTNAME_PORT_LIST_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
// expression language
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
// not empty
ValidationResult nonEmptyValidatorResult = StandardValidators.NON_EMPTY_VALIDATOR.validate(subject, input, context);
if (!nonEmptyValidatorResult.isValid()) {
return nonEmptyValidatorResult;
}
// check format
final List<String> hostnamePortList = Arrays.asList(input.split(","));
for (String hostnamePort : hostnamePortList) {
String[] addresses = hostnamePort.split(":");
// Protect against invalid input like http://127.0.0.1:9300 (URL scheme should not be there)
if (addresses.length != 2) {
return new ValidationResult.Builder().subject(subject).input(input).explanation(
"Must be in hostname:port form (no scheme such as http://").valid(false).build();
}
// Validate the port
String port = addresses[1].trim();
ValidationResult portValidatorResult = StandardValidators.PORT_VALIDATOR.validate(subject, port, context);
if (!portValidatorResult.isValid()) {
return portValidatorResult;
}
}
return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid cluster definition").valid(true).build();
}
};
/**
* {@link Validator} that ensures that value has 1+ non-whitespace
* characters

View File

@ -16,7 +16,6 @@
*/
package org.apache.nifi.util.validator;
import org.apache.nifi.processor.util.StandardValidators;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@ -26,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.util.StandardValidators;
import org.junit.Test;
import org.mockito.Mockito;
@ -45,6 +45,61 @@ public class TestStandardValidators {
assertTrue(vr.isValid());
}
@Test
public void testNonEmptyELValidator() {
Validator val = StandardValidators.NON_EMPTY_EL_VALIDATOR;
ValidationContext vc = mock(ValidationContext.class);
Mockito.when(vc.isExpressionLanguageSupported("foo")).thenReturn(true);
ValidationResult vr = val.validate("foo", "", vc);
assertFalse(vr.isValid());
vr = val.validate("foo", " h", vc);
assertTrue(vr.isValid());
Mockito.when(vc.isExpressionLanguagePresent("${test}")).thenReturn(true);
vr = val.validate("foo", "${test}", vc);
assertTrue(vr.isValid());
vr = val.validate("foo", "${test", vc);
assertTrue(vr.isValid());
}
@Test
public void testHostnamePortListValidator() {
Validator val = StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR;
ValidationContext vc = mock(ValidationContext.class);
Mockito.when(vc.isExpressionLanguageSupported("foo")).thenReturn(true);
ValidationResult vr = val.validate("foo", "", vc);
assertFalse(vr.isValid());
vr = val.validate("foo", "localhost", vc);
assertFalse(vr.isValid());
vr = val.validate("foo", "test:0", vc);
assertFalse(vr.isValid());
vr = val.validate("foo", "test:65536", vc);
assertFalse(vr.isValid());
vr = val.validate("foo", "test:6666,localhost", vc);
assertFalse(vr.isValid());
vr = val.validate("foo", "test:65535", vc);
assertTrue(vr.isValid());
vr = val.validate("foo", "test:65535,localhost:666,127.0.0.1:8989", vc);
assertTrue(vr.isValid());
Mockito.when(vc.isExpressionLanguagePresent("${test}")).thenReturn(true);
vr = val.validate("foo", "${test}", vc);
assertTrue(vr.isValid());
vr = val.validate("foo", "${test", vc);
assertFalse(vr.isValid());
}
@Test
public void testTimePeriodValidator() {
Validator val = StandardValidators.createTimePeriodValidator(1L, TimeUnit.SECONDS, Long.MAX_VALUE, TimeUnit.NANOSECONDS);

View File

@ -33,7 +33,6 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@ -60,31 +59,6 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
public static final int DEFAULT_CASSANDRA_PORT = 9042;
private static final Validator HOSTNAME_PORT_VALIDATOR = new Validator() {
@Override
public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
final List<String> esList = Arrays.asList(input.split(","));
for (String hostnamePort : esList) {
String[] addresses = hostnamePort.split(":");
// Protect against invalid input like http://127.0.0.1:9042 (URL scheme should not be there)
if (addresses.length != 2) {
return new ValidationResult.Builder().subject(subject).input(input).explanation(
"Each entry must be in hostname:port form (no scheme such as http://, and port must be specified)")
.valid(false).build();
}
// Validate the port
String port = addresses[1].trim();
ValidationResult portValidatorResult = StandardValidators.PORT_VALIDATOR.validate(subject, port, context);
if (!portValidatorResult.isValid()) {
return portValidatorResult;
}
}
return new ValidationResult.Builder().subject(subject).input(input).explanation(
"Valid cluster definition").valid(true).build();
}
};
// Common descriptors
public static final PropertyDescriptor CONTACT_POINTS = new PropertyDescriptor.Builder()
.name("Cassandra Contact Points")
@ -93,7 +67,7 @@ public abstract class AbstractCassandraProcessor extends AbstractProcessor {
+ " The default client port for Cassandra is 9042, but the port(s) must be explicitly specified.")
.required(true)
.expressionLanguageSupported(false)
.addValidator(HOSTNAME_PORT_VALIDATOR)
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
.build();
public static final PropertyDescriptor KEYSPACE = new PropertyDescriptor.Builder()

View File

@ -17,8 +17,6 @@
package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
@ -46,26 +44,6 @@ import java.util.concurrent.atomic.AtomicReference;
abstract class AbstractElasticsearch5TransportClientProcessor extends AbstractElasticsearch5Processor {
/**
* This validator ensures the Elasticsearch hosts property is a valid list of hostname:port entries
*/
private static final Validator HOSTNAME_PORT_VALIDATOR = (subject, input, context) -> {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
final List<String> esList = Arrays.asList(input.split(","));
for (String hostnamePort : esList) {
String[] addresses = hostnamePort.split(":");
// Protect against invalid input like http://127.0.0.1:9300 (URL scheme should not be there)
if (addresses.length != 2) {
return new ValidationResult.Builder().subject(subject).input(input).explanation(
"Must be in hostname:port form (no scheme such as http://").valid(false).build();
}
}
return new ValidationResult.Builder().subject(subject).input(input).explanation(
"Valid cluster definition").valid(true).build();
};
protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder()
.name("el5-cluster-name")
.displayName("Cluster Name")
@ -84,7 +62,7 @@ abstract class AbstractElasticsearch5TransportClientProcessor extends AbstractEl
+ "connect to hosts. The default transport client port is 9300.")
.required(true)
.expressionLanguageSupported(true)
.addValidator(HOSTNAME_PORT_VALIDATOR)
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
.build();
public static final PropertyDescriptor PROP_XPACK_LOCATION = new PropertyDescriptor.Builder()

View File

@ -19,7 +19,6 @@ package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
@ -36,13 +35,6 @@ import java.util.Set;
*/
public abstract class AbstractElasticsearchProcessor extends AbstractProcessor {
static final Validator NON_EMPTY_EL_VALIDATOR = (subject, value, context) -> {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
return new ValidationResult.Builder().subject(subject).input(value).explanation("Expression Language Present").valid(true).build();
}
return StandardValidators.NON_EMPTY_VALIDATOR.validate(subject, value, context);
};
public static final PropertyDescriptor PROP_SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("The SSL Context Service used to provide client certificate information for TLS/SSL "

View File

@ -17,8 +17,6 @@
package org.apache.nifi.processors.elasticsearch;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
@ -43,28 +41,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
public abstract class AbstractElasticsearchTransportClientProcessor extends AbstractElasticsearchProcessor {
/**
* This validator ensures the Elasticsearch hosts property is a valid list of hostname:port entries
*/
private static final Validator HOSTNAME_PORT_VALIDATOR = (subject, input, context) -> {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(input)) {
return new ValidationResult.Builder().subject(subject).input(input).explanation("Expression Language Present").valid(true).build();
}
final List<String> esList = Arrays.asList(input.split(","));
for (String hostnamePort : esList) {
String[] addresses = hostnamePort.split(":");
// Protect against invalid input like http://127.0.0.1:9300 (URL scheme should not be there)
if (addresses.length != 2) {
return new ValidationResult.Builder().subject(subject).input(input).explanation(
"Must be in hostname:port form (no scheme such as http://").valid(false).build();
}
}
return new ValidationResult.Builder().subject(subject).input(input).explanation("Valid cluster definition").valid(true).build();
};
protected static final PropertyDescriptor CLUSTER_NAME = new PropertyDescriptor.Builder()
.name("Cluster Name")
.description("Name of the ES cluster (for example, elasticsearch_brew). Defaults to 'elasticsearch'")
@ -81,7 +59,7 @@ public abstract class AbstractElasticsearchTransportClientProcessor extends Abst
+ "connect to hosts. The default transport client port is 9300.")
.required(true)
.expressionLanguageSupported(false)
.addValidator(HOSTNAME_PORT_VALIDATOR)
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
.expressionLanguageSupported(true)
.build();

View File

@ -96,7 +96,7 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces
.description("The type of this document (used by Elasticsearch for indexing and searching)")
.required(true)
.expressionLanguageSupported(true)
.addValidator(NON_EMPTY_EL_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
@ -104,7 +104,7 @@ public class PutElasticsearch extends AbstractElasticsearchTransportClientProces
.description("The type of the operation used to index (index, update, upsert)")
.required(true)
.expressionLanguageSupported(true)
.addValidator(NON_EMPTY_EL_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.defaultValue("index")
.build();

View File

@ -104,7 +104,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
.description("The type of this document (used by Elasticsearch for indexing and searching)")
.required(true)
.expressionLanguageSupported(true)
.addValidator(NON_EMPTY_EL_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor INDEX_OP = new PropertyDescriptor.Builder()
@ -113,7 +113,7 @@ public class PutElasticsearchHttp extends AbstractElasticsearchHttpProcessor {
.description("The type of the operation used to index (index, update, upsert, delete)")
.required(true)
.expressionLanguageSupported(true)
.addValidator(NON_EMPTY_EL_VALIDATOR)
.addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.defaultValue("index")
.build();

View File

@ -52,10 +52,6 @@ final class KafkaProcessorUtils {
final Logger logger = LoggerFactory.getLogger(this.getClass());
private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
"The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
@ -77,8 +73,7 @@ final class KafkaProcessorUtils {
.displayName("Kafka Brokers")
.description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue("localhost:9092")
.build();

View File

@ -30,7 +30,6 @@ import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
@ -60,10 +59,6 @@ import org.apache.nifi.processors.kafka.KafkaPublisher.KafkaPublisherResult;
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.")
public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
"FlowFile will be routed to"
+ " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration");
@ -116,7 +111,7 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
.name("Known Brokers")
.description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
.required(true)
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
.expressionLanguageSupported(false)
.build();
public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()

View File

@ -52,10 +52,6 @@ final class KafkaProcessorUtils {
final Logger logger = LoggerFactory.getLogger(this.getClass());
private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*";
static final AllowableValue UTF8_ENCODING = new AllowableValue("utf-8", "UTF-8 Encoded", "The key is interpreted as a UTF-8 Encoded string.");
static final AllowableValue HEX_ENCODING = new AllowableValue("hex", "Hex Encoded",
"The key is interpreted as arbitrary binary data and is encoded using hexadecimal characters with uppercase letters");
@ -77,8 +73,7 @@ final class KafkaProcessorUtils {
.displayName("Kafka Brokers")
.description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX)))
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
.expressionLanguageSupported(true)
.defaultValue("localhost:9092")
.build();