From 6d1128497bcfcfd105fd3d18305051615b254576 Mon Sep 17 00:00:00 2001 From: ricky Date: Mon, 4 May 2015 17:12:04 -0400 Subject: [PATCH] NIFI-583: Ignore STDIN for ExecuteStreamCommand - Added the ability (default: false) to ignore STDIN when passing a flowfile to the ExecuteStreamCommand processor. This is useful if the command you are executing cannot take STDIN, or passing STDIN is unnecessary Signed-off-by: Aldrin Piri --- .../standard/ExecuteStreamCommand.java | 25 +++++++++++++++-- .../standard/TestExecuteStreamCommand.java | 27 +++++++++++++++++-- 2 files changed, 48 insertions(+), 4 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java index 4c4288aad0..f97a4557f4 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java @@ -89,7 +89,13 @@ import org.apache.nifi.stream.io.StreamUtils; *
  • Supports expression language: true
  • * * - * + *
  • Ignore STDIN + *
      + *
    • Indicates whether or not the flowfile's contents should be streamed as part of STDIN
    • + *
    • Default value: false (this means that the contents of a flowfile will be sent as STDIN to your command
    • + *
    • Supports expression language: false
    • + *
    + *
  • * * *

    @@ -177,12 +183,22 @@ public class ExecuteStreamCommand extends AbstractProcessor { .required(false) .build(); + static final PropertyDescriptor IGNORE_STDIN = new PropertyDescriptor.Builder() + .name("Ignore STDIN") + .description("If true, the contents of the incoming flowfile will not be passed to the executing command") + .addValidator(Validator.VALID) + .allowableValues("true", "false") + .defaultValue("false") + .build(); + + private static final List PROPERTIES; static { List props = new ArrayList<>(); props.add(EXECUTION_ARGUMENTS); props.add(EXECUTION_COMMAND); + props.add(IGNORE_STDIN); props.add(WORKING_DIR); PROPERTIES = Collections.unmodifiableList(props); } @@ -225,6 +241,7 @@ public class ExecuteStreamCommand extends AbstractProcessor { final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(flowFile).getValue(); args.add(executeCommand); final String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).getValue(); + final boolean ignoreStdin = Boolean.parseBoolean(context.getProperty(IGNORE_STDIN).getValue()); if (!StringUtils.isBlank(commandArguments)) { for (String arg : commandArguments.split(";")) { args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(flowFile).getValue()); @@ -269,7 +286,11 @@ public class ExecuteStreamCommand extends AbstractProcessor { final BufferedOutputStream bos = new BufferedOutputStream(pos); FlowFile outputStreamFlowFile = session.create(flowFile); StdInWriterCallback callback = new StdInWriterCallback(bos, bis, logger, session, outputStreamFlowFile, process); - session.read(flowFile, callback); + if (ignoreStdin) { + session.read(outputStreamFlowFile, callback); + } else { + session.read(flowFile, callback); + } outputStreamFlowFile = callback.outputStreamFlowFile; exitCode = callback.exitCode; logger.debug("Execution complete for command: {}. Exited with code: {}", new Object[]{executeCommand, exitCode}); diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java index 555c3e4350..90fb28f287 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java @@ -132,7 +132,7 @@ public class TestExecuteStreamCommand { String result = new String(byteArray); assertTrue(result.contains(File.separator + "nifi-standard-processors:ModifiedResult\r\n") - || result.contains(File.separator + "nifi-standard-processors:ModifiedResult\n")); + || result.contains(File.separator + "nifi-standard-processors:ModifiedResult\n")); } @Test @@ -154,7 +154,30 @@ public class TestExecuteStreamCommand { byte[] byteArray = flowFiles.get(0).toByteArray(); String result = new String(byteArray); assertTrue(result.contains(File.separator + "nifi-standard-processors" + File.separator + "target:ModifiedResult\r\n") - || result.contains(File.separator + "nifi-standard-processors" + File.separator + "target:ModifiedResult\n")); + || result.contains(File.separator + "nifi-standard-processors" + File.separator + "target:ModifiedResult\n")); + } + + @Test + public void testIgnoredStdin() throws IOException { + File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.setValidateExpressionUsage(false); + controller.enqueue(dummy.toPath()); + controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath); + controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true"); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1); + List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP); + byte[] byteArray = flowFiles.get(0).toByteArray(); + String result = new String(byteArray); + assertTrue("TestIngestAndUpdate.jar should not have received anything to modify", + result.endsWith("target:ModifiedResult\n")); } // this is dependent on window with cygwin...so it's not enabled