From 3b15ed855cc4319844dac13ba3244dc8ddde6207 Mon Sep 17 00:00:00 2001 From: Matthew Burgess Date: Thu, 2 Nov 2017 11:22:50 -0400 Subject: [PATCH] NIFI-4559: Add non-zero status relationship to ExecuteStreamCommand NIFI-4559: Removed Penalize Non-zero Status property and updated doc per review comments Signed-off-by: Matthew Burgess This closes #2246 --- .../standard/ExecuteStreamCommand.java | 39 +++++++++++------ .../standard/TestExecuteStreamCommand.java | 42 +++++++++++++++++-- 2 files changed, 64 insertions(+), 17 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java index 13e3c58a94..1c1137c0d3 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java @@ -46,10 +46,10 @@ import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.processors.standard.util.ArgumentUtils; import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream; -import org.apache.nifi.stream.io.BufferedInputStream; -import org.apache.nifi.stream.io.BufferedOutputStream; import org.apache.nifi.stream.io.StreamUtils; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.BufferedReader; import java.io.File; import java.io.IOException; @@ -115,7 +115,12 @@ import java.util.concurrent.atomic.AtomicReference; * *
  • output-stream *
      - *
    • The destination path for the flow file created from the command's output
    • + *
    • The destination path for the flow file created from the command's output, if the exit code is zero
    • + *
    + *
  • + *
  • nonzero-status + *
      + *
    • The destination path for the flow file created from the command's output, if the exit code is non-zero
    • *
    *
  • * @@ -138,11 +143,16 @@ public class ExecuteStreamCommand extends AbstractProcessor { public static final Relationship ORIGINAL_RELATIONSHIP = new Relationship.Builder() .name("original") - .description("FlowFiles that were successfully processed") + .description("FlowFiles that were successfully processed.") .build(); public static final Relationship OUTPUT_STREAM_RELATIONSHIP = new Relationship.Builder() .name("output stream") - .description("The destination path for the flow file created from the command's output") + .description("The destination path for the flow file created from the command's output, if the returned status code is zero.") + .build(); + public static final Relationship NONZERO_STATUS_RELATIONSHIP = new Relationship.Builder() + .name("nonzero status") + .description("The destination path for the flow file created from the command's output, if the returned status code is non-zero. " + + "All flow files routed to this relationship will be penalized.") .build(); private AtomicReference> relationships = new AtomicReference<>(); @@ -198,7 +208,7 @@ public class ExecuteStreamCommand extends AbstractProcessor { static final PropertyDescriptor PUT_OUTPUT_IN_ATTRIBUTE = new PropertyDescriptor.Builder() .name("Output Destination Attribute") .description("If set, the output of the stream command will be put into an attribute of the original FlowFile instead of a separate " - + "FlowFile. There will no longer be a relationship for 'output stream'. The value of this property will be the key for the output attribute.") + + "FlowFile. There will no longer be a relationship for 'output stream' or 'nonzero status'. The value of this property will be the key for the output attribute.") .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR) .build(); @@ -222,7 +232,6 @@ public class ExecuteStreamCommand extends AbstractProcessor { .defaultValue(";") .build(); - private static final List PROPERTIES; static { @@ -240,6 +249,7 @@ public class ExecuteStreamCommand extends AbstractProcessor { Set outputStreamRelationships = new HashSet<>(); outputStreamRelationships.add(OUTPUT_STREAM_RELATIONSHIP); outputStreamRelationships.add(ORIGINAL_RELATIONSHIP); + outputStreamRelationships.add(NONZERO_STATUS_RELATIONSHIP); OUTPUT_STREAM_RELATIONSHIP_SET = Collections.unmodifiableSet(outputStreamRelationships); Set attributeRelationships = new HashSet<>(); @@ -339,10 +349,10 @@ public class ExecuteStreamCommand extends AbstractProcessor { throw new ProcessException(e); } try (final OutputStream pos = process.getOutputStream(); - final InputStream pis = process.getInputStream(); - final InputStream pes = process.getErrorStream(); - final BufferedInputStream bis = new BufferedInputStream(pis); - final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pes))) { + final InputStream pis = process.getInputStream(); + final InputStream pes = process.getErrorStream(); + final BufferedInputStream bis = new BufferedInputStream(pis); + final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pes))) { int exitCode = -1; final BufferedOutputStream bos = new BufferedOutputStream(pos); FlowFile outputFlowFile = putToAttribute ? inputFlowFile : session.create(inputFlowFile); @@ -373,7 +383,7 @@ public class ExecuteStreamCommand extends AbstractProcessor { int length = strBldr.length() > 4000 ? 4000 : strBldr.length(); attributes.put("execution.error", strBldr.substring(0, length)); - final Relationship outputFlowFileRelationship = putToAttribute ? ORIGINAL_RELATIONSHIP : OUTPUT_STREAM_RELATIONSHIP; + final Relationship outputFlowFileRelationship = putToAttribute ? ORIGINAL_RELATIONSHIP : (exitCode != 0) ? NONZERO_STATUS_RELATIONSHIP : OUTPUT_STREAM_RELATIONSHIP; if (exitCode == 0) { logger.info("Transferring flow file {} to {}", new Object[]{outputFlowFile,outputFlowFileRelationship.getName()}); @@ -387,7 +397,10 @@ public class ExecuteStreamCommand extends AbstractProcessor { attributes.put("execution.command.args", commandArguments); outputFlowFile = session.putAllAttributes(outputFlowFile, attributes); - // This transfer will transfer the FlowFile that received the stream out put to it's destined relationship. + if (NONZERO_STATUS_RELATIONSHIP.equals(outputFlowFileRelationship)) { + outputFlowFile = session.penalize(outputFlowFile); + } + // This will transfer the FlowFile that received the stream output to its destined relationship. // In the event the stream is put to the an attribute of the original, it will be transferred here. session.transfer(outputFlowFile, outputFlowFileRelationship); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java index 852209321b..09881373ad 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java @@ -93,10 +93,13 @@ public class TestExecuteStreamCommand { controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath); controller.run(1); controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); - controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1); - List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP); - assertEquals(0, flowFiles.get(0).getSize()); - assertEquals("Error: Unable to access jarfile", flowFiles.get(0).getAttribute("execution.error").substring(0, 31)); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0); + controller.assertTransferCount(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP, 1); + List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP); + MockFlowFile flowFile = flowFiles.get(0); + assertEquals(0, flowFile.getSize()); + assertEquals("Error: Unable to access jarfile", flowFile.getAttribute("execution.error").substring(0, 31)); + assertTrue(flowFile.isPenalized()); } @Test @@ -303,6 +306,7 @@ public class TestExecuteStreamCommand { controller.run(1); controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0); + controller.assertTransferCount(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP, 0); List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP); MockFlowFile outputFlowFile = flowFiles.get(0); @@ -470,6 +474,36 @@ public class TestExecuteStreamCommand { controller.assertValid(); } + @Test + public void testExecuteJarPutToAttributeBadPath() throws Exception { + File exJar = new File("src/test/resources/ExecuteCommand/noSuchFile.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.setValidateExpressionUsage(false); + controller.enqueue(dummy.toPath()); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath); + controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output"); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0); + controller.assertTransferCount(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP, 0); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + + List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP); + MockFlowFile outputFlowFile = flowFiles.get(0); + String result = outputFlowFile.getAttribute("executeStreamCommand.output"); + outputFlowFile.assertContentEquals(dummy); + assertTrue(result.isEmpty()); // java -jar with bad path only prints to standard error not standard out + assertEquals("1", outputFlowFile.getAttribute("execution.status")); // java -jar with bad path exits with code 1 + assertEquals("java", outputFlowFile.getAttribute("execution.command")); + assertEquals("-jar;", outputFlowFile.getAttribute("execution.command.args").substring(0, 5)); + String attribute = outputFlowFile.getAttribute("execution.command.args"); + String expected = "src" + File.separator + "test" + File.separator + "resources" + File.separator + "ExecuteCommand" + File.separator + "noSuchFile.jar"; + assertEquals(expected, attribute.substring(attribute.length() - expected.length())); + } + private static boolean isWindows() { return System.getProperty("os.name").toLowerCase().startsWith("windows"); }