From 4b509aa5a5b3d98d0a23d1c8fce090c8bee1e6d8 Mon Sep 17 00:00:00 2001 From: Otto Fowler Date: Wed, 27 Mar 2019 16:46:24 -0400 Subject: [PATCH] NIFI-3221 This closes #3396. Add a new property for setting the argument passing strategy, either the existing parameter, or by adding new dynamic parameters, along with implementation and tests This allows for passing arguments with quotes. Signed-off-by: Joe Witt --- .../standard/ExecuteStreamCommand.java | 122 +++- .../standard/TestExecuteStreamCommand.java | 680 ++++++++++++++++++ 2 files changed, 795 insertions(+), 7 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java index 94db1c0559..4a68301b91 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java @@ -32,9 +32,12 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperties; import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.InputRequirement; @@ -46,7 +49,9 @@ import org.apache.nifi.annotation.behavior.WritesAttribute; import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.components.AllowableValue; import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.PropertyValue; import org.apache.nifi.components.RequiredPermission; import org.apache.nifi.components.ValidationContext; import org.apache.nifi.components.ValidationResult; @@ -83,6 +88,15 @@ import org.apache.nifi.stream.io.StreamUtils; *
  • Supports expression language: true
  • * * + *
  • Arguments Strategy + *
      + *
    • Selects the strategy to use for arguments to the executable
    • + *
        + *
      • Command Arguments Property: Use the delimited list of arguments from the Command Arguments Property. Does not support quotations in parameters.
      • + *
      • Dynamic Property Arguments: Use Dynamic Properties, with each property a separate argument. Does support quotes.
      • + *
      + *
    + *
  • *
  • Command Arguments *
      *
    • The arguments to supply to the executable delimited by the ';' character. Each argument may be an Expression Language statement.
    • @@ -134,7 +148,13 @@ import org.apache.nifi.stream.io.StreamUtils; @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"command execution", "command", "stream", "execute"}) @CapabilityDescription("Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.") -@DynamicProperty(name = "An environment variable name", value = "An environment variable value", description = "These environment variables are passed to the process spawned by this Processor") +@DynamicProperties({ + @DynamicProperty(name = "An environment variable name", value = "An environment variable value", + description = "These environment variables are passed to the process spawned by this Processor"), + @DynamicProperty(name = "command.argument.", value = "Argument to be supplied to the command", + description = "These arguments are supplied to the process spawned by this Processor when using the " + + "Command Arguments Strategy : Dynamic Property Arguments. The NUMBER will determine the order.") +}) @WritesAttributes({ @WritesAttribute(attribute = "execution.command", description = "The name of the command executed"), @WritesAttribute(attribute = "execution.command.args", description = "The semi-colon delimited list of arguments"), @@ -167,6 +187,17 @@ public class ExecuteStreamCommand extends AbstractProcessor { private final static Set OUTPUT_STREAM_RELATIONSHIP_SET; private final static Set ATTRIBUTE_RELATIONSHIP_SET; + private static final Pattern DYNAMIC_PARAMETER_NAME = Pattern.compile("command\\.argument\\.(?[0-9]+)$"); + public static final String executionArguments = "Command Arguments Property"; + public static final String dynamicArguements = "Dynamic Property Arguments"; + + static final AllowableValue EXECUTION_ARGUMENTS_PROPERTY_STRATEGEY = new AllowableValue(executionArguments, executionArguments, + "Arguments to be supplied to the executable are taken from the Command Arguments property"); + + static final AllowableValue DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY = new AllowableValue(dynamicArguements,dynamicArguements, + "Arguments to be supplied to the executable are taken from dynamic properties"); + + private static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true); static final PropertyDescriptor EXECUTION_COMMAND = new PropertyDescriptor.Builder() .name("Command Path") @@ -176,6 +207,16 @@ public class ExecuteStreamCommand extends AbstractProcessor { .required(true) .build(); + static final PropertyDescriptor ARGUMENTS_STRATEGY = new PropertyDescriptor.Builder() + .name("argumentsStrategy") + .displayName("Command Arguments Strategy") + .description("Strategy for configuring arguments to be supplied to the command.") + .expressionLanguageSupported(ExpressionLanguageScope.NONE) + .required(false) + .allowableValues(EXECUTION_ARGUMENTS_PROPERTY_STRATEGEY.getValue(),DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue()) + .defaultValue(EXECUTION_ARGUMENTS_PROPERTY_STRATEGEY.getValue()) + .build(); + static final PropertyDescriptor EXECUTION_ARGUMENTS = new PropertyDescriptor.Builder() .name("Command Arguments") .description("The arguments to supply to the executable delimited by the ';' character.") @@ -245,6 +286,7 @@ public class ExecuteStreamCommand extends AbstractProcessor { static { List props = new ArrayList<>(); + props.add(ARGUMENTS_STRATEGY); props.add(EXECUTION_ARGUMENTS); props.add(EXECUTION_COMMAND); props.add(IGNORE_STDIN); @@ -298,12 +340,29 @@ public class ExecuteStreamCommand extends AbstractProcessor { @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - return new PropertyDescriptor.Builder() + if (!propertyDescriptorName.startsWith("command.argument.")) { + return new PropertyDescriptor.Builder() .name(propertyDescriptorName) - .description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment") + .description( + "Sets the environment variable '" + propertyDescriptorName + "' for the process' environment") .dynamic(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + } + // get the number part of the name + Matcher matcher = DYNAMIC_PARAMETER_NAME.matcher(propertyDescriptorName); + if (matcher.matches()) { + final String commandIndex = matcher.group("commandIndex"); + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .displayName(propertyDescriptorName) + .description("Argument passed to command") + .dynamic(true) + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR) + .build(); + } + return null; } @Override @@ -315,18 +374,67 @@ public class ExecuteStreamCommand extends AbstractProcessor { final ArrayList args = new ArrayList<>(); final boolean putToAttribute = context.getProperty(PUT_OUTPUT_IN_ATTRIBUTE).isSet(); + final PropertyValue argumentsStrategyPropertyValue = context.getProperty(ARGUMENTS_STRATEGY); + final boolean useDynamicPropertyArguments = argumentsStrategyPropertyValue.isSet() && argumentsStrategyPropertyValue.getValue().equals(DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue()); final Integer attributeSize = context.getProperty(PUT_ATTRIBUTE_MAX_LENGTH).asInteger(); final String attributeName = context.getProperty(PUT_OUTPUT_IN_ATTRIBUTE).getValue(); final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(inputFlowFile).getValue(); args.add(executeCommand); - final String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).evaluateAttributeExpressions(inputFlowFile).getValue(); final boolean ignoreStdin = Boolean.parseBoolean(context.getProperty(IGNORE_STDIN).getValue()); - if (!StringUtils.isBlank(commandArguments)) { - for (String arg : ArgumentUtils.splitArgs(commandArguments, context.getProperty(ARG_DELIMITER).getValue().charAt(0))) { - args.add(arg); + final String commandArguments; + if (!useDynamicPropertyArguments) { + commandArguments = context.getProperty(EXECUTION_ARGUMENTS).evaluateAttributeExpressions(inputFlowFile).getValue(); + if (!StringUtils.isBlank(commandArguments)) { + for (String arg : ArgumentUtils + .splitArgs(commandArguments, context.getProperty(ARG_DELIMITER).getValue().charAt(0))) { + args.add(arg); + } + } + } else { + + ArrayList propertyDescriptors = new ArrayList<>(); + for (final Map.Entry entry : context.getProperties().entrySet()) { + Matcher matcher = DYNAMIC_PARAMETER_NAME.matcher(entry.getKey().getName()); + if (matcher.matches()) { + propertyDescriptors.add(entry.getKey()); + } + } + Collections.sort(propertyDescriptors,(p1,p2) -> { + Matcher matcher = DYNAMIC_PARAMETER_NAME.matcher(p1.getName()); + String indexString1 = null; + while (matcher.find()) { + indexString1 = matcher.group("commandIndex"); + } + matcher = DYNAMIC_PARAMETER_NAME.matcher(p2.getName()); + String indexString2 = null; + while (matcher.find()) { + indexString2 = matcher.group("commandIndex"); + } + final int index1 = Integer.parseInt(indexString1); + final int index2 = Integer.parseInt(indexString2); + if ( index1 > index2 ) { + return 1; + } else if (index1 < index2) { + return -1; + } + return 0; + }); + for ( final PropertyDescriptor descriptor : propertyDescriptors) { + args.add(context.getProperty(descriptor.getName()).evaluateAttributeExpressions(inputFlowFile).getValue()); + } + if (args.size() > 0) { + final StringBuilder builder = new StringBuilder(); + + for ( int i = 1; i < args.size(); i++) { + builder.append(args.get(i)).append("\t"); + } + commandArguments = builder.toString().trim(); + } else { + commandArguments = ""; } } + final String workingDir = context.getProperty(WORKING_DIR).evaluateAttributeExpressions(inputFlowFile).getValue(); final ProcessBuilder builder = new ProcessBuilder(); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java index 08282cd8b7..c8b8763193 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java @@ -23,13 +23,21 @@ import static org.junit.Assert.assertTrue; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.regex.Pattern; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.io.FileUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.expression.ExpressionLanguageScope; import org.apache.nifi.processors.standard.util.ArgumentUtils; import org.apache.nifi.util.MockFlowFile; import org.apache.nifi.util.TestRunner; @@ -79,6 +87,51 @@ public class TestExecuteStreamCommand { assertEquals(outputFlowFile.getAttribute("execution.command.args"), originalFlowFile.getAttribute("execution.command.args")); } + @Test + public void testExecuteJarDynamicPropArgs() throws Exception { + File exJar = new File("src/test/resources/ExecuteCommand/TestSuccess.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.enqueue(dummy.toPath()); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue()); + PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.1") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp1, "-jar"); + PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.2") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp2, jarPath); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1); + + List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP); + MockFlowFile outputFlowFile = flowFiles.get(0); + byte[] byteArray = outputFlowFile.toByteArray(); + String result = new String(byteArray); + assertTrue(Pattern.compile("Test was a success\r?\n").matcher(result).find()); + assertEquals("0", outputFlowFile.getAttribute("execution.status")); + assertEquals("java", outputFlowFile.getAttribute("execution.command")); + assertEquals("-jar", outputFlowFile.getAttribute("execution.command.args").substring(0, 4).trim()); + String attribute = outputFlowFile.getAttribute("execution.command.args"); + String expected = "src" + File.separator + "test" + File.separator + "resources" + File.separator + "ExecuteCommand" + File.separator + "TestSuccess.jar"; + assertEquals(expected, attribute.substring(attribute.length() - expected.length())); + + MockFlowFile originalFlowFile = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP).get(0); + assertEquals(outputFlowFile.getAttribute("execution.status"), originalFlowFile.getAttribute("execution.status")); + assertEquals(outputFlowFile.getAttribute("execution.command"), originalFlowFile.getAttribute("execution.command")); + assertEquals(outputFlowFile.getAttribute("execution.command.args"), originalFlowFile.getAttribute("execution.command.args")); + } + + @Test public void testExecuteJarWithBadPath() throws Exception { File exJar = new File("src/test/resources/ExecuteCommand/noSuchFile.jar"); @@ -100,6 +153,39 @@ public class TestExecuteStreamCommand { assertTrue(flowFile.isPenalized()); } + @Test + public void testExecuteJarWithBadPathDynamicProperties() throws Exception { + File exJar = new File("src/test/resources/ExecuteCommand/noSuchFile.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.enqueue(dummy.toPath()); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue()); + PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.1") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp1, "-jar"); + PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.2") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp2, jarPath); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0); + controller.assertTransferCount(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP, 1); + List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP); + MockFlowFile flowFile = flowFiles.get(0); + assertEquals(0, flowFile.getSize()); + assertEquals("Error: Unable to access jarfile", flowFile.getAttribute("execution.error").substring(0, 31)); + assertTrue(flowFile.isPenalized()); + } + @Test public void testExecuteIngestAndUpdate() throws IOException { File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar"); @@ -128,6 +214,46 @@ public class TestExecuteStreamCommand { assertTrue(Pattern.compile("nifi-standard-processors:ModifiedResult\r?\n").matcher(result).find()); } + @Test + public void testExecuteIngestAndUpdateDynamicProperties() throws IOException { + File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + File dummy10MBytes = new File("target/10MB.txt"); + try (FileOutputStream fos = new FileOutputStream(dummy10MBytes)) { + byte[] bytes = Files.readAllBytes(dummy.toPath()); + assertEquals(1000, bytes.length); + for (int i = 0; i < 10000; i++) { + fos.write(bytes, 0, 1000); + } + } + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.enqueue(dummy10MBytes.toPath()); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue()); + PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.1") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp1, "-jar"); + PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.2") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp2, jarPath); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1); + List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP); + byte[] byteArray = flowFiles.get(0).toByteArray(); + String result = new String(byteArray); + + assertTrue(Pattern.compile("nifi-standard-processors:ModifiedResult\r?\n").matcher(result).find()); + } + @Test public void testLoggingToStdErr() throws IOException { File exJar = new File("src/test/resources/ExecuteCommand/TestLogStdErr.jar"); @@ -148,6 +274,38 @@ public class TestExecuteStreamCommand { assertEquals("fffffffffffffffffffffffffffffff", flowFile.getAttribute("execution.error").substring(0, 31)); } + @Test + public void testLoggingToStdErrDynamicProperties() throws IOException { + File exJar = new File("src/test/resources/ExecuteCommand/TestLogStdErr.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1mb.txt"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.setValidateExpressionUsage(false); + controller.enqueue(dummy.toPath()); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue()); + PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.1") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp1, "-jar"); + PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.2") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp2, jarPath); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1); + List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP); + MockFlowFile flowFile = flowFiles.get(0); + assertEquals(0, flowFile.getSize()); + assertEquals("fffffffffffffffffffffffffffffff", flowFile.getAttribute("execution.error").substring(0, 31)); + } + @Test public void testExecuteIngestAndUpdateWithWorkingDir() throws IOException { File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar"); @@ -170,6 +328,40 @@ public class TestExecuteStreamCommand { assertTrue(Pattern.compile(quotedSeparator + "nifi-standard-processors" + quotedSeparator + "target:ModifiedResult\r?\n").matcher(result).find()); } + @Test + public void testExecuteIngestAndUpdateWithWorkingDirDynamicProperties() throws IOException { + File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.enqueue(dummy.toPath()); + controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue()); + PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.1") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp1, "-jar"); + PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.2") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp2, jarPath); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1); + List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP); + byte[] byteArray = flowFiles.get(0).toByteArray(); + String result = new String(byteArray); + + final String quotedSeparator = Pattern.quote(File.separator); + assertTrue(Pattern.compile(quotedSeparator + "nifi-standard-processors" + quotedSeparator + "target:ModifiedResult\r?\n").matcher(result).find()); + } + @Test public void testIgnoredStdin() throws IOException { File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar"); @@ -192,6 +384,40 @@ public class TestExecuteStreamCommand { Pattern.compile("target:ModifiedResult\r?\n$").matcher(result).find()); } + @Test + public void testIgnoredStdinDynamicProperties() throws IOException { + File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.enqueue(dummy.toPath()); + controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue()); + PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.1") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp1, "-jar"); + PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.2") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp2, jarPath); + controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true"); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1); + List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP); + byte[] byteArray = flowFiles.get(0).toByteArray(); + String result = new String(byteArray); + assertTrue("TestIngestAndUpdate.jar should not have received anything to modify", + Pattern.compile("target:ModifiedResult\r?\n$").matcher(result).find()); + } + // this is dependent on window with cygwin...so it's not enabled @Ignore @Test @@ -242,6 +468,43 @@ public class TestExecuteStreamCommand { assertTrue("NIFI_TEST_2 environment variable is missing", dynamicEnvironmentVariables.contains("NIFI_TEST_2=testvalue2")); } + @Test + public void testDynamicEnvironmentDynamicProperties() throws Exception { + File exJar = new File("src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.setProperty("NIFI_TEST_1", "testvalue1"); + controller.setProperty("NIFI_TEST_2", "testvalue2"); + controller.enqueue(dummy.toPath()); + controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue()); + PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.1") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp1, "-jar"); + PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.2") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp2, jarPath); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1); + List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP); + byte[] byteArray = flowFiles.get(0).toByteArray(); + String result = new String(byteArray); + Set dynamicEnvironmentVariables = new HashSet<>(Arrays.asList(result.split("\r?\n"))); + assertFalse("Should contain at least two environment variables starting with NIFI", dynamicEnvironmentVariables.size() < 2); + assertTrue("NIFI_TEST_1 environment variable is missing", dynamicEnvironmentVariables.contains("NIFI_TEST_1=testvalue1")); + assertTrue("NIFI_TEST_2 environment variable is missing", dynamicEnvironmentVariables.contains("NIFI_TEST_2=testvalue2")); + } + @Test public void testSmallEchoPutToAttribute() throws Exception { File dummy = new File("src/test/resources/hello.txt"); @@ -273,6 +536,108 @@ public class TestExecuteStreamCommand { assertEquals(isWindows() ? "cmd.exe" : "echo", outputFlowFile.getAttribute("execution.command")); } + @Test + public void testSmallEchoPutToAttributeDynamicProperties() throws Exception { + File dummy = new File("src/test/resources/hello.txt"); + assertTrue(dummy.exists()); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.enqueue("".getBytes()); + controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue()); + + if(isWindows()) { + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "cmd.exe"); + PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.1") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp1, "/c"); + PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.2") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp2, "echo Hello"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "/c;echo Hello"); + } else{ + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "echo"); + PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.1") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp1, "Hello"); + } + controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true"); + controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output"); + + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0); + + List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP); + MockFlowFile outputFlowFile = flowFiles.get(0); + outputFlowFile.assertContentEquals(""); + String ouput = outputFlowFile.getAttribute("executeStreamCommand.output"); + assertTrue(ouput.startsWith("Hello")); + assertEquals("0", outputFlowFile.getAttribute("execution.status")); + assertEquals(isWindows() ? "cmd.exe" : "echo", outputFlowFile.getAttribute("execution.command")); + } + + @Test + public void testArgumentsWithQuotesFromAttributeDynamicProperties() throws Exception { + File dummy = new File("src/test/resources/TestJson/json-sample.json"); + assertTrue(dummy.exists()); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + + Map attrs = new HashMap<>(); + + String json = FileUtils.readFileToString(dummy, StandardCharsets.UTF_8); + attrs.put("json.attribute",json); + controller.enqueue("".getBytes(),attrs); + + controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue()); + + if(isWindows()) { + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "cmd.exe"); + PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.1") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp1, "/c"); + PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.2") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp2, "echo"); + } else{ + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "echo"); + } + PropertyDescriptor dynamicProp3 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.3") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp3, "${json.attribute}"); + controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true"); + + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1); + + List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP); + MockFlowFile outputFlowFile = flowFiles.get(0); + String output = new String(outputFlowFile.toByteArray()); + ObjectMapper mapper = new ObjectMapper(); + JsonNode tree1 = mapper.readTree(json); + JsonNode tree2 = mapper.readTree(output); + assertEquals(tree1,tree2); + assertEquals("0", outputFlowFile.getAttribute("execution.status")); + assertEquals(isWindows() ? "cmd.exe" : "echo", outputFlowFile.getAttribute("execution.command")); + } + @Test public void testExecuteJarPutToAttribute() throws Exception { File exJar = new File("src/test/resources/ExecuteCommand/TestSuccess.jar"); @@ -301,6 +666,46 @@ public class TestExecuteStreamCommand { assertEquals(expected, attribute.substring(attribute.length() - expected.length())); } + @Test + public void testExecuteJarPutToAttributeDynamicProperties() throws Exception { + File exJar = new File("src/test/resources/ExecuteCommand/TestSuccess.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.enqueue(dummy.toPath()); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue()); + PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.1") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp1, "-jar"); + PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.2") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp2, jarPath); + controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output"); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0); + + List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP); + MockFlowFile outputFlowFile = flowFiles.get(0); + String result = outputFlowFile.getAttribute("executeStreamCommand.output"); + outputFlowFile.assertContentEquals(dummy); + assertTrue(Pattern.compile("Test was a success\r?\n").matcher(result).find()); + assertEquals("0", outputFlowFile.getAttribute("execution.status")); + assertEquals("java", outputFlowFile.getAttribute("execution.command")); + assertEquals("-jar", outputFlowFile.getAttribute("execution.command.args").substring(0, 4)); + String attribute = outputFlowFile.getAttribute("execution.command.args"); + String expected = "src" + File.separator + "test" + File.separator + "resources" + File.separator + "ExecuteCommand" + File.separator + "TestSuccess.jar"; + assertEquals(expected, attribute.substring(attribute.length() - expected.length())); + } + @Test public void testExecuteJarToAttributeConfiguration() throws Exception { File exJar = new File("src/test/resources/ExecuteCommand/TestSuccess.jar"); @@ -331,6 +736,48 @@ public class TestExecuteStreamCommand { assertEquals(expected, attribute.substring(attribute.length() - expected.length())); } + @Test + public void testExecuteJarToAttributeConfigurationDyanmicProperties() throws Exception { + File exJar = new File("src/test/resources/ExecuteCommand/TestSuccess.jar"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.enqueue("small test".getBytes()); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue()); + PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.1") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp1, "-jar"); + PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.2") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp2, jarPath); + controller.setProperty(ExecuteStreamCommand.PUT_ATTRIBUTE_MAX_LENGTH, "10"); + controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "outputDest"); + assertEquals(1, controller.getProcessContext().getAvailableRelationships().size()); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0); + controller.assertTransferCount(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP, 0); + + List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP); + MockFlowFile outputFlowFile = flowFiles.get(0); + outputFlowFile.assertContentEquals("small test".getBytes()); + String result = outputFlowFile.getAttribute("outputDest"); + assertTrue(Pattern.compile("Test was a").matcher(result).find()); + assertEquals("0", outputFlowFile.getAttribute("execution.status")); + assertEquals("java", outputFlowFile.getAttribute("execution.command")); + assertEquals("-jar", outputFlowFile.getAttribute("execution.command.args").substring(0,4)); + String attribute = outputFlowFile.getAttribute("execution.command.args"); + String expected = "src" + File.separator + "test" + File.separator + "resources" + File.separator + "ExecuteCommand" + File.separator + "TestSuccess.jar"; + assertEquals(expected, attribute.substring(attribute.length() - expected.length())); + } + @Test public void testExecuteIngestAndUpdatePutToAttribute() throws IOException { File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar"); @@ -358,6 +805,45 @@ public class TestExecuteStreamCommand { assertTrue(Pattern.compile("nifi-standard-processors:ModifiedResult\r?\n").matcher(result).find()); } + @Test + public void testExecuteIngestAndUpdatePutToAttributeDynamicProperties() throws IOException { + File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + File dummy10MBytes = new File("target/10MB.txt"); + byte[] bytes = Files.readAllBytes(dummy.toPath()); + try (FileOutputStream fos = new FileOutputStream(dummy10MBytes)) { + for (int i = 0; i < 10000; i++) { + fos.write(bytes, 0, 1000); + } + } + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.enqueue(dummy10MBytes.toPath()); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue()); + PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.1") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp1, "-jar"); + PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.2") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp2, jarPath); + controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "outputDest"); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0); + List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP); + String result = flowFiles.get(0).getAttribute("outputDest"); + + assertTrue(Pattern.compile("nifi-standard-processors:ModifiedResult\r?\n").matcher(result).find()); + } + @Test public void testLargePutToAttribute() throws IOException { File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); @@ -393,6 +879,57 @@ public class TestExecuteStreamCommand { assertTrue(Pattern.compile("a{256}").matcher(result).matches()); } + @Test + public void testLargePutToAttributeDynamicProperties() throws IOException { + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + File dummy10MBytes = new File("target/10MB.txt"); + byte[] bytes = Files.readAllBytes(dummy.toPath()); + try (FileOutputStream fos = new FileOutputStream(dummy10MBytes)) { + for (int i = 0; i < 10000; i++) { + fos.write(bytes, 0, 1000); + } + } + + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.enqueue("".getBytes()); + controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue()); + if(isWindows()) { + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "cmd.exe"); + PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.1") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp1, "/c"); + PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.2") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp2, "type " + dummy10MBytes.getAbsolutePath()); + } else{ + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "cat"); + PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.1") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp1, dummy10MBytes.getAbsolutePath()); + } + controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true"); + controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output"); + controller.setProperty(ExecuteStreamCommand.PUT_ATTRIBUTE_MAX_LENGTH, "256"); + + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0); + List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP); + + flowFiles.get(0).assertAttributeEquals("execution.status", "0"); + String result = flowFiles.get(0).getAttribute("executeStreamCommand.output"); + assertTrue(Pattern.compile("a{256}").matcher(result).matches()); + } + @Test public void testExecuteIngestAndUpdateWithWorkingDirPutToAttribute() throws IOException { File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar"); @@ -414,6 +951,39 @@ public class TestExecuteStreamCommand { assertTrue(Pattern.compile(quotedSeparator + "nifi-standard-processors" + quotedSeparator + "target:ModifiedResult\r?\n").matcher(result).find()); } + @Test + public void testExecuteIngestAndUpdateWithWorkingDirPutToAttributeDynamicProperties() throws IOException { + File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.enqueue(dummy.toPath()); + controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "streamOutput"); + controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue()); + PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.1") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp1, "-jar"); + PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.2") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp2, jarPath); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP); + String result = flowFiles.get(0).getAttribute("streamOutput"); + + final String quotedSeparator = Pattern.quote(File.separator); + assertTrue(Pattern.compile(quotedSeparator + "nifi-standard-processors" + quotedSeparator + "target:ModifiedResult\r?\n").matcher(result).find()); + } + @Test public void testIgnoredStdinPutToAttribute() throws IOException { File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar"); @@ -435,6 +1005,39 @@ public class TestExecuteStreamCommand { Pattern.compile("target:ModifiedResult\r?\n?").matcher(result).find()); } + @Test + public void testIgnoredStdinPutToAttributeDynamicProperties() throws IOException { + File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.enqueue(dummy.toPath()); + controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue()); + PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.1") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp1, "-jar"); + PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.2") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp2, jarPath); + controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true"); + controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output"); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP); + String result = flowFiles.get(0).getAttribute("executeStreamCommand.output"); + assertTrue("TestIngestAndUpdate.jar should not have received anything to modify", + Pattern.compile("target:ModifiedResult\r?\n?").matcher(result).find()); + } + @Test public void testDynamicEnvironmentPutToAttribute() throws Exception { File exJar = new File("src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar"); @@ -459,6 +1062,42 @@ public class TestExecuteStreamCommand { assertTrue("NIFI_TEST_2 environment variable is missing", dynamicEnvironmentVariables.contains("NIFI_TEST_2=testvalue2")); } + @Test + public void testDynamicEnvironmentPutToAttributeDynamicProperties() throws Exception { + File exJar = new File("src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.setProperty("NIFI_TEST_1", "testvalue1"); + controller.setProperty("NIFI_TEST_2", "testvalue2"); + controller.enqueue(dummy.toPath()); + controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue()); + PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.1") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp1, "-jar"); + PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.2") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp2, jarPath); + controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output"); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP); + String result = flowFiles.get(0).getAttribute("executeStreamCommand.output"); + Set dynamicEnvironmentVariables = new HashSet<>(Arrays.asList(result.split("\r?\n"))); + assertFalse("Should contain at least two environment variables starting with NIFI", dynamicEnvironmentVariables.size() < 2); + assertTrue("NIFI_TEST_1 environment variable is missing", dynamicEnvironmentVariables.contains("NIFI_TEST_1=testvalue1")); + assertTrue("NIFI_TEST_2 environment variable is missing", dynamicEnvironmentVariables.contains("NIFI_TEST_2=testvalue2")); + } + @Test public void testQuotedArguments() throws Exception { List args = ArgumentUtils.splitArgs("echo -n \"arg1 arg2 arg3\"", ' '); @@ -507,6 +1146,47 @@ public class TestExecuteStreamCommand { assertEquals(expected, attribute.substring(attribute.length() - expected.length())); } + @Test + public void testExecuteJarPutToAttributeBadPathDynamicProperties() throws Exception { + File exJar = new File("src/test/resources/ExecuteCommand/noSuchFile.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.enqueue(dummy.toPath()); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue()); + PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.1") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp1, "-jar"); + PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder() + .dynamic(true) + .name("command.argument.2") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .build(); + controller.setProperty(dynamicProp2, jarPath); + controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output"); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0); + controller.assertTransferCount(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP, 0); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + + List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP); + MockFlowFile outputFlowFile = flowFiles.get(0); + String result = outputFlowFile.getAttribute("executeStreamCommand.output"); + outputFlowFile.assertContentEquals(dummy); + assertTrue(result.isEmpty()); // java -jar with bad path only prints to standard error not standard out + assertEquals("1", outputFlowFile.getAttribute("execution.status")); // java -jar with bad path exits with code 1 + assertEquals("java", outputFlowFile.getAttribute("execution.command")); + assertEquals("-jar", outputFlowFile.getAttribute("execution.command.args").substring(0, 4)); + String attribute = outputFlowFile.getAttribute("execution.command.args"); + String expected = "src" + File.separator + "test" + File.separator + "resources" + File.separator + "ExecuteCommand" + File.separator + "noSuchFile.jar"; + assertEquals(expected, attribute.substring(attribute.length() - expected.length())); + } + private static boolean isWindows() { return System.getProperty("os.name").toLowerCase().startsWith("windows"); }