From 09c7406d1835dc9828e6aa798db934830fb1f548 Mon Sep 17 00:00:00 2001 From: Grant Henke Date: Fri, 14 Feb 2020 10:00:00 -0600 Subject: [PATCH] NIFI-6867: Validate PutKudu operation type property This patch adds validation to the PutKudu operation type property. It also improves the description to include the valid values and adjusts the inputs to be case insensitive. Signed-off-by: Pierre Villard This closes #4063. --- .../apache/nifi/processors/kudu/PutKudu.java | 38 +++++++++++++++++-- .../nifi/processors/kudu/ITPutKudu.java | 5 ++- 2 files changed, 38 insertions(+), 5 deletions(-) diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java index aedac3780c..3769932269 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java @@ -38,6 +38,9 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor.Builder; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; @@ -58,6 +61,7 @@ import java.io.IOException; import java.io.InputStream; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -124,13 +128,39 @@ public class PutKudu extends AbstractKuduProcessor { .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .build(); + protected static final Validator OperationTypeValidator = new Validator() { + @Override + public ValidationResult validate(String subject, String value, ValidationContext context) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { + return new ValidationResult.Builder().subject(subject).input(value) + .explanation("Expression Language Present").valid(true).build(); + } + + boolean valid; + try { + OperationType.valueOf(value.toUpperCase()); + valid = true; + } catch (IllegalArgumentException ex) { + valid = false; + } + + final String explanation = valid ? null : + "Value must be one of: " + + Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(", ")); + return new ValidationResult.Builder().subject(subject).input(value).valid(valid) + .explanation(explanation).build(); + } + }; + protected static final PropertyDescriptor INSERT_OPERATION = new Builder() .name("Insert Operation") .displayName("Kudu Operation Type") - .description("Specify operationType for this processor. Insert-Ignore will ignore duplicated rows") + .description("Specify operationType for this processor.\n" + + "Valid values are: " + + Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(", "))) .defaultValue(OperationType.INSERT.toString()) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(OperationTypeValidator) .build(); protected static final PropertyDescriptor FLUSH_MODE = new Builder() @@ -227,7 +257,7 @@ public class PutKudu extends AbstractKuduProcessor { public void onScheduled(final ProcessContext context) throws IOException, LoginException { batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); ffbatch = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger(); - flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue()); + flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue().toUpperCase()); createKuduClient(context); } @@ -271,7 +301,7 @@ public class PutKudu extends AbstractKuduProcessor { final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) { final String tableName = getEvaluatedProperty(TABLE_NAME, context, flowFile); - final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile)); + final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile).toUpperCase()); final Boolean ignoreNull = Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile)); final Boolean lowercaseFields = Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile)); final Boolean handleSchemaDrift = Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile)); diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java index eee5e60949..b3ad1700e2 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java @@ -136,10 +136,13 @@ public class ITPutKudu { flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); // Use values to ensure multiple batches and multiple flow files per-trigger - testRunner.setProperty(PutKudu.INSERT_OPERATION, OperationType.UPSERT.toString()); testRunner.setProperty(PutKudu.BATCH_SIZE, "10"); testRunner.setProperty(PutKudu.FLOWFILE_BATCH_SIZE, "2"); + // Set the operation type. + flowFileAttributes.put("kudu.operation.type", "upsert"); + testRunner.setProperty(PutKudu.INSERT_OPERATION, "${kudu.operation.type}"); + // Don't ignore null values. flowFileAttributes.put("kudu.ignore.null", "false"); testRunner.setProperty(PutKudu.IGNORE_NULL, "${kudu.ignore.null}");