NIFI-6867: Validate PutKudu operation type property

This patch adds validation to the PutKudu operation type property.
It also improves the description to include the valid values and
adjusts the inputs to be case insensitive.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #4063.
This commit is contained in:
Grant Henke 2020-02-14 10:00:00 -06:00 committed by Pierre Villard
parent 268ba1d23e
commit 09c7406d18
No known key found for this signature in database
GPG Key ID: BEE1599F0726E9CD
2 changed files with 38 additions and 5 deletions

View File

@ -38,6 +38,9 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
@ -58,6 +61,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -124,13 +128,39 @@ public class PutKudu extends AbstractKuduProcessor {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
protected static final Validator OperationTypeValidator = new Validator() {
@Override
public ValidationResult validate(String subject, String value, ValidationContext context) {
if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) {
return new ValidationResult.Builder().subject(subject).input(value)
.explanation("Expression Language Present").valid(true).build();
}
boolean valid;
try {
OperationType.valueOf(value.toUpperCase());
valid = true;
} catch (IllegalArgumentException ex) {
valid = false;
}
final String explanation = valid ? null :
"Value must be one of: " +
Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(", "));
return new ValidationResult.Builder().subject(subject).input(value).valid(valid)
.explanation(explanation).build();
}
};
protected static final PropertyDescriptor INSERT_OPERATION = new Builder()
.name("Insert Operation")
.displayName("Kudu Operation Type")
.description("Specify operationType for this processor. Insert-Ignore will ignore duplicated rows")
.description("Specify operationType for this processor.\n" +
"Valid values are: " +
Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(", ")))
.defaultValue(OperationType.INSERT.toString())
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.addValidator(OperationTypeValidator)
.build();
protected static final PropertyDescriptor FLUSH_MODE = new Builder()
@ -227,7 +257,7 @@ public class PutKudu extends AbstractKuduProcessor {
public void onScheduled(final ProcessContext context) throws IOException, LoginException {
batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger();
ffbatch = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger();
flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue());
flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue().toUpperCase());
createKuduClient(context);
}
@ -271,7 +301,7 @@ public class PutKudu extends AbstractKuduProcessor {
final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) {
final String tableName = getEvaluatedProperty(TABLE_NAME, context, flowFile);
final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile));
final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile).toUpperCase());
final Boolean ignoreNull = Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile));
final Boolean lowercaseFields = Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile));
final Boolean handleSchemaDrift = Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile));

View File

@ -136,10 +136,13 @@ public class ITPutKudu {
flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
// Use values to ensure multiple batches and multiple flow files per-trigger
testRunner.setProperty(PutKudu.INSERT_OPERATION, OperationType.UPSERT.toString());
testRunner.setProperty(PutKudu.BATCH_SIZE, "10");
testRunner.setProperty(PutKudu.FLOWFILE_BATCH_SIZE, "2");
// Set the operation type.
flowFileAttributes.put("kudu.operation.type", "upsert");
testRunner.setProperty(PutKudu.INSERT_OPERATION, "${kudu.operation.type}");
// Don't ignore null values.
flowFileAttributes.put("kudu.ignore.null", "false");
testRunner.setProperty(PutKudu.IGNORE_NULL, "${kudu.ignore.null}");