diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java index aedac3780c..3769932269 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/main/java/org/apache/nifi/processors/kudu/PutKudu.java @@ -38,6 +38,9 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.components.PropertyDescriptor.Builder; import org.apache.nifi.components.PropertyValue; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.flowfile.FlowFile; import org.apache.nifi.processor.ProcessContext; @@ -58,6 +61,7 @@ import java.io.IOException; import java.io.InputStream; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -124,13 +128,39 @@ public class PutKudu extends AbstractKuduProcessor { .addValidator(StandardValidators.BOOLEAN_VALIDATOR) .build(); + protected static final Validator OperationTypeValidator = new Validator() { + @Override + public ValidationResult validate(String subject, String value, ValidationContext context) { + if (context.isExpressionLanguageSupported(subject) && context.isExpressionLanguagePresent(value)) { + return new ValidationResult.Builder().subject(subject).input(value) + .explanation("Expression Language Present").valid(true).build(); + } + + boolean valid; + try { + OperationType.valueOf(value.toUpperCase()); + valid = true; + } catch (IllegalArgumentException ex) { + valid = false; + } + + final String explanation = valid ? null : + "Value must be one of: " + + Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(", ")); + return new ValidationResult.Builder().subject(subject).input(value).valid(valid) + .explanation(explanation).build(); + } + }; + protected static final PropertyDescriptor INSERT_OPERATION = new Builder() .name("Insert Operation") .displayName("Kudu Operation Type") - .description("Specify operationType for this processor. Insert-Ignore will ignore duplicated rows") + .description("Specify operationType for this processor.\n" + + "Valid values are: " + + Arrays.stream(OperationType.values()).map(Enum::toString).collect(Collectors.joining(", "))) .defaultValue(OperationType.INSERT.toString()) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .addValidator(OperationTypeValidator) .build(); protected static final PropertyDescriptor FLUSH_MODE = new Builder() @@ -227,7 +257,7 @@ public class PutKudu extends AbstractKuduProcessor { public void onScheduled(final ProcessContext context) throws IOException, LoginException { batchSize = context.getProperty(BATCH_SIZE).evaluateAttributeExpressions().asInteger(); ffbatch = context.getProperty(FLOWFILE_BATCH_SIZE).evaluateAttributeExpressions().asInteger(); - flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue()); + flushMode = SessionConfiguration.FlushMode.valueOf(context.getProperty(FLUSH_MODE).getValue().toUpperCase()); createKuduClient(context); } @@ -271,7 +301,7 @@ public class PutKudu extends AbstractKuduProcessor { final RecordReader recordReader = recordReaderFactory.createRecordReader(flowFile, in, getLogger())) { final String tableName = getEvaluatedProperty(TABLE_NAME, context, flowFile); - final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile)); + final OperationType operationType = OperationType.valueOf(getEvaluatedProperty(INSERT_OPERATION, context, flowFile).toUpperCase()); final Boolean ignoreNull = Boolean.valueOf(getEvaluatedProperty(IGNORE_NULL, context, flowFile)); final Boolean lowercaseFields = Boolean.valueOf(getEvaluatedProperty(LOWERCASE_FIELD_NAMES, context, flowFile)); final Boolean handleSchemaDrift = Boolean.valueOf(getEvaluatedProperty(HANDLE_SCHEMA_DRIFT, context, flowFile)); diff --git a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java index eee5e60949..b3ad1700e2 100644 --- a/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java +++ b/nifi-nar-bundles/nifi-kudu-bundle/nifi-kudu-processors/src/test/java/org/apache/nifi/processors/kudu/ITPutKudu.java @@ -136,10 +136,13 @@ public class ITPutKudu { flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename); // Use values to ensure multiple batches and multiple flow files per-trigger - testRunner.setProperty(PutKudu.INSERT_OPERATION, OperationType.UPSERT.toString()); testRunner.setProperty(PutKudu.BATCH_SIZE, "10"); testRunner.setProperty(PutKudu.FLOWFILE_BATCH_SIZE, "2"); + // Set the operation type. + flowFileAttributes.put("kudu.operation.type", "upsert"); + testRunner.setProperty(PutKudu.INSERT_OPERATION, "${kudu.operation.type}"); + // Don't ignore null values. flowFileAttributes.put("kudu.ignore.null", "false"); testRunner.setProperty(PutKudu.IGNORE_NULL, "${kudu.ignore.null}");