NIFI-604: Custom Argument Delimiters ExecuteStreamCommand / ExecuteProcess

- Unified the way ExecuteStreamCommand and ExecuteProcess handle arguments
- Argument delimiters can now be specified. Their default being what they were using before (; and space)
This commit is contained in:
ricky 2015-08-27 15:02:37 -04:00
parent d39848f06e
commit 9cefc4a5a5
6 changed files with 166 additions and 53 deletions

View File

@ -602,6 +602,34 @@ public class StandardValidators {
}
}
public static class StringLengthValidator implements Validator {
private final int minimum;
private final int maximum;
public StringLengthValidator(int minimum, int maximum) {
this.minimum = minimum;
this.maximum = maximum;
}
@Override
public ValidationResult validate(final String subject, final String value, final ValidationContext context) {
if (value.length() < minimum || value.length() > maximum) {
return new ValidationResult.Builder()
.subject(subject)
.valid(false)
.input(value)
.explanation(String.format("String length invalid [min: %d, max: %d]", minimum, maximum))
.build();
} else {
return new ValidationResult.Builder()
.valid(true)
.input(value)
.subject(subject)
.build();
}
}
}
public static class DirectoryExistsValidator implements Validator {
private final boolean allowEL;

View File

@ -48,6 +48,7 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractProcessor;
@ -57,6 +58,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.ArgumentUtils;
@Tags({"command", "process", "source", "external", "invoke", "script"})
@CapabilityDescription("Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected "
@ -110,6 +112,18 @@ public class ExecuteProcess extends AbstractProcessor {
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
private static final Validator characterValidator = new StandardValidators.StringLengthValidator(1, 1);
static final PropertyDescriptor ARG_DELIMITER = new PropertyDescriptor.Builder()
.name("Argument Delimiter")
.description("Delimiter to use to separate arguments for a command [default: space]. Must be a single character.")
.addValidator(Validator.VALID)
.addValidator(characterValidator)
.required(true)
.defaultValue(" ")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All created FlowFiles are routed to this relationship")
@ -132,6 +146,7 @@ public class ExecuteProcess extends AbstractProcessor {
properties.add(COMMAND_ARGUMENTS);
properties.add(BATCH_DURATION);
properties.add(REDIRECT_ERROR_STREAM);
properties.add(ARG_DELIMITER);
return properties;
}
@ -145,51 +160,7 @@ public class ExecuteProcess extends AbstractProcessor {
.build();
}
static List<String> splitArgs(final String input) {
if (input == null) {
return Collections.emptyList();
}
final List<String> args = new ArrayList<>();
final String trimmed = input.trim();
boolean inQuotes = false;
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < trimmed.length(); i++) {
final char c = trimmed.charAt(i);
switch (c) {
case ' ':
case '\t':
case '\r':
case '\n': {
if (inQuotes) {
sb.append(c);
} else {
final String arg = sb.toString().trim();
if (!arg.isEmpty()) {
args.add(arg);
}
sb.setLength(0);
}
break;
}
case '"':
inQuotes = !inQuotes;
break;
default:
sb.append(c);
break;
}
}
final String finalArg = sb.toString().trim();
if (!finalArg.isEmpty()) {
args.add(finalArg);
}
return args;
}
@OnScheduled
public void setupExecutor(final ProcessContext context) {
@ -293,7 +264,8 @@ public class ExecuteProcess extends AbstractProcessor {
protected List<String> createCommandStrings(final ProcessContext context) {
final String command = context.getProperty(COMMAND).getValue();
final List<String> args = splitArgs(context.getProperty(COMMAND_ARGUMENTS).getValue());
final List<String> args = ArgumentUtils.splitArgs(context.getProperty(COMMAND_ARGUMENTS).getValue(),
context.getProperty(ARG_DELIMITER).getValue().charAt(0));
final List<String> commandStrings = new ArrayList<>(args.size() + 1);
commandStrings.add(command);

View File

@ -56,6 +56,7 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.ArgumentUtils;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
@ -163,7 +164,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
public ValidationResult validate(String subject, String input, ValidationContext context) {
ValidationResult result = new ValidationResult.Builder()
.subject(subject).valid(true).input(input).build();
String[] args = input.split(";");
List<String> args = ArgumentUtils.splitArgs(input, context.getProperty(ARG_DELIMITER).getValue().charAt(0));
for (String arg : args) {
ValidationResult valResult = ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR.validate(subject, arg, context);
if (!valResult.isValid()) {
@ -191,6 +192,17 @@ public class ExecuteStreamCommand extends AbstractProcessor {
.defaultValue("false")
.build();
private static final Validator characterValidator = new StandardValidators.StringLengthValidator(1, 1);
static final PropertyDescriptor ARG_DELIMITER = new PropertyDescriptor.Builder()
.name("Argument Delimiter")
.description("Delimiter to use to separate arguments for a command [default: ;]. Must be a single character")
.addValidator(Validator.VALID)
.addValidator(characterValidator)
.required(true)
.defaultValue(";")
.build();
private static final List<PropertyDescriptor> PROPERTIES;
@ -200,6 +212,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
props.add(EXECUTION_COMMAND);
props.add(IGNORE_STDIN);
props.add(WORKING_DIR);
props.add(ARG_DELIMITER);
PROPERTIES = Collections.unmodifiableList(props);
}
@ -243,7 +256,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
final String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).getValue();
final boolean ignoreStdin = Boolean.parseBoolean(context.getProperty(IGNORE_STDIN).getValue());
if (!StringUtils.isBlank(commandArguments)) {
for (String arg : commandArguments.split(";")) {
for (String arg : ArgumentUtils.splitArgs(commandArguments, context.getProperty(ARG_DELIMITER).getValue().charAt(0))) {
args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(inputFlowFile).getValue());
}
}

View File

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard.util;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class ArgumentUtils {
private final static char QUOTE = '"';
private final static List<Character> DELIMITING_CHARACTERS = new ArrayList<>(3);
static {
DELIMITING_CHARACTERS.add('\t');
DELIMITING_CHARACTERS.add('\r');
DELIMITING_CHARACTERS.add('\n');
}
public static List<String> splitArgs(final String input, final char definedDelimiter) {
if (input == null) {
return Collections.emptyList();
}
final List<String> args = new ArrayList<>();
final String trimmed = input.trim();
boolean inQuotes = false;
final StringBuilder sb = new StringBuilder();
for (int i = 0; i < trimmed.length(); i++) {
final char c = trimmed.charAt(i);
if (DELIMITING_CHARACTERS.contains(c) || c == definedDelimiter) {
if (inQuotes) {
sb.append(c);
} else {
final String arg = sb.toString().trim();
if (!arg.isEmpty()) {
args.add(arg);
}
sb.setLength(0);
}
continue;
}
if (c == QUOTE) {
inQuotes = !inQuotes;
continue;
}
sb.append(c);
}
final String finalArg = sb.toString().trim();
if (!finalArg.isEmpty()) {
args.add(finalArg);
}
return args;
}
}

View File

@ -24,6 +24,7 @@ import java.io.File;
import java.util.List;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.standard.util.ArgumentUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -34,28 +35,28 @@ public class TestExecuteProcess {
@Test
public void testSplitArgs() {
final List<String> nullArgs = ExecuteProcess.splitArgs(null);
final List<String> nullArgs = ArgumentUtils.splitArgs(null, ' ');
assertNotNull(nullArgs);
assertTrue(nullArgs.isEmpty());
final List<String> zeroArgs = ExecuteProcess.splitArgs(" ");
final List<String> zeroArgs = ArgumentUtils.splitArgs(" ", ' ');
assertNotNull(zeroArgs);
assertTrue(zeroArgs.isEmpty());
final List<String> singleArg = ExecuteProcess.splitArgs(" hello ");
final List<String> singleArg = ArgumentUtils.splitArgs(" hello ", ' ');
assertEquals(1, singleArg.size());
assertEquals("hello", singleArg.get(0));
final List<String> twoArg = ExecuteProcess.splitArgs(" hello good-bye ");
final List<String> twoArg = ArgumentUtils.splitArgs(" hello good-bye ", ' ');
assertEquals(2, twoArg.size());
assertEquals("hello", twoArg.get(0));
assertEquals("good-bye", twoArg.get(1));
final List<String> singleQuotedArg = ExecuteProcess.splitArgs(" \"hello\" ");
final List<String> singleQuotedArg = ArgumentUtils.splitArgs(" \"hello\" ", ' ');
assertEquals(1, singleQuotedArg.size());
assertEquals("hello", singleQuotedArg.get(0));
final List<String> twoQuotedArg = ExecuteProcess.splitArgs(" hello \"good bye\"");
final List<String> twoQuotedArg = ArgumentUtils.splitArgs(" hello \"good bye\"", ' ');
assertEquals(2, twoQuotedArg.size());
assertEquals("hello", twoQuotedArg.get(0));
assertEquals("good bye", twoQuotedArg.get(1));

View File

@ -16,6 +16,8 @@
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processors.standard.util.ArgumentUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@ -229,4 +231,24 @@ public class TestExecuteStreamCommand {
assertTrue("NIFI_TEST_1 environment variable is missing", dynamicEnvironmentVariables.contains("NIFI_TEST_1=testvalue1"));
assertTrue("NIFI_TEST_2 environment variable is missing", dynamicEnvironmentVariables.contains("NIFI_TEST_2=testvalue2"));
}
@Test
public void testQuotedArguments() throws Exception {
List<String> args = ArgumentUtils.splitArgs("echo -n \"arg1 arg2 arg3\"", ' ');
assertEquals(3, args.size());
args = ArgumentUtils.splitArgs("echo;-n;\"arg1 arg2 arg3\"", ';');
assertEquals(3, args.size());
}
@Test
public void testInvalidDelimiter() throws Exception {
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "echo");
controller.assertValid();
controller.setProperty(ExecuteStreamCommand.ARG_DELIMITER, "foo");
controller.assertNotValid();
controller.setProperty(ExecuteStreamCommand.ARG_DELIMITER, "f");
controller.assertValid();
}
}