diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
index 13e3c58a94..1c1137c0d3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java
@@ -46,10 +46,10 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.ArgumentUtils;
import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream;
-import org.apache.nifi.stream.io.BufferedInputStream;
-import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
@@ -115,7 +115,12 @@ import java.util.concurrent.atomic.AtomicReference;
*
*
output-stream
*
- * - The destination path for the flow file created from the command's output
+ * - The destination path for the flow file created from the command's output, if the exit code is zero
+ *
+ *
+ * nonzero-status
+ *
+ * - The destination path for the flow file created from the command's output, if the exit code is non-zero
*
*
*
@@ -138,11 +143,16 @@ public class ExecuteStreamCommand extends AbstractProcessor {
public static final Relationship ORIGINAL_RELATIONSHIP = new Relationship.Builder()
.name("original")
- .description("FlowFiles that were successfully processed")
+ .description("FlowFiles that were successfully processed.")
.build();
public static final Relationship OUTPUT_STREAM_RELATIONSHIP = new Relationship.Builder()
.name("output stream")
- .description("The destination path for the flow file created from the command's output")
+ .description("The destination path for the flow file created from the command's output, if the returned status code is zero.")
+ .build();
+ public static final Relationship NONZERO_STATUS_RELATIONSHIP = new Relationship.Builder()
+ .name("nonzero status")
+ .description("The destination path for the flow file created from the command's output, if the returned status code is non-zero. "
+ + "All flow files routed to this relationship will be penalized.")
.build();
private AtomicReference> relationships = new AtomicReference<>();
@@ -198,7 +208,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
static final PropertyDescriptor PUT_OUTPUT_IN_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("Output Destination Attribute")
.description("If set, the output of the stream command will be put into an attribute of the original FlowFile instead of a separate "
- + "FlowFile. There will no longer be a relationship for 'output stream'. The value of this property will be the key for the output attribute.")
+ + "FlowFile. There will no longer be a relationship for 'output stream' or 'nonzero status'. The value of this property will be the key for the output attribute.")
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
.build();
@@ -222,7 +232,6 @@ public class ExecuteStreamCommand extends AbstractProcessor {
.defaultValue(";")
.build();
-
private static final List PROPERTIES;
static {
@@ -240,6 +249,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
Set outputStreamRelationships = new HashSet<>();
outputStreamRelationships.add(OUTPUT_STREAM_RELATIONSHIP);
outputStreamRelationships.add(ORIGINAL_RELATIONSHIP);
+ outputStreamRelationships.add(NONZERO_STATUS_RELATIONSHIP);
OUTPUT_STREAM_RELATIONSHIP_SET = Collections.unmodifiableSet(outputStreamRelationships);
Set attributeRelationships = new HashSet<>();
@@ -339,10 +349,10 @@ public class ExecuteStreamCommand extends AbstractProcessor {
throw new ProcessException(e);
}
try (final OutputStream pos = process.getOutputStream();
- final InputStream pis = process.getInputStream();
- final InputStream pes = process.getErrorStream();
- final BufferedInputStream bis = new BufferedInputStream(pis);
- final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pes))) {
+ final InputStream pis = process.getInputStream();
+ final InputStream pes = process.getErrorStream();
+ final BufferedInputStream bis = new BufferedInputStream(pis);
+ final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pes))) {
int exitCode = -1;
final BufferedOutputStream bos = new BufferedOutputStream(pos);
FlowFile outputFlowFile = putToAttribute ? inputFlowFile : session.create(inputFlowFile);
@@ -373,7 +383,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
int length = strBldr.length() > 4000 ? 4000 : strBldr.length();
attributes.put("execution.error", strBldr.substring(0, length));
- final Relationship outputFlowFileRelationship = putToAttribute ? ORIGINAL_RELATIONSHIP : OUTPUT_STREAM_RELATIONSHIP;
+ final Relationship outputFlowFileRelationship = putToAttribute ? ORIGINAL_RELATIONSHIP : (exitCode != 0) ? NONZERO_STATUS_RELATIONSHIP : OUTPUT_STREAM_RELATIONSHIP;
if (exitCode == 0) {
logger.info("Transferring flow file {} to {}",
new Object[]{outputFlowFile,outputFlowFileRelationship.getName()});
@@ -387,7 +397,10 @@ public class ExecuteStreamCommand extends AbstractProcessor {
attributes.put("execution.command.args", commandArguments);
outputFlowFile = session.putAllAttributes(outputFlowFile, attributes);
- // This transfer will transfer the FlowFile that received the stream out put to it's destined relationship.
+ if (NONZERO_STATUS_RELATIONSHIP.equals(outputFlowFileRelationship)) {
+ outputFlowFile = session.penalize(outputFlowFile);
+ }
+ // This will transfer the FlowFile that received the stream output to its destined relationship.
// In the event the stream is put to the an attribute of the original, it will be transferred here.
session.transfer(outputFlowFile, outputFlowFileRelationship);
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
index 852209321b..09881373ad 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java
@@ -93,10 +93,13 @@ public class TestExecuteStreamCommand {
controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
controller.run(1);
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
- controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1);
- List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
- assertEquals(0, flowFiles.get(0).getSize());
- assertEquals("Error: Unable to access jarfile", flowFiles.get(0).getAttribute("execution.error").substring(0, 31));
+ controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
+ controller.assertTransferCount(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP, 1);
+ List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP);
+ MockFlowFile flowFile = flowFiles.get(0);
+ assertEquals(0, flowFile.getSize());
+ assertEquals("Error: Unable to access jarfile", flowFile.getAttribute("execution.error").substring(0, 31));
+ assertTrue(flowFile.isPenalized());
}
@Test
@@ -303,6 +306,7 @@ public class TestExecuteStreamCommand {
controller.run(1);
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
+ controller.assertTransferCount(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP, 0);
List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
MockFlowFile outputFlowFile = flowFiles.get(0);
@@ -470,6 +474,36 @@ public class TestExecuteStreamCommand {
controller.assertValid();
}
+ @Test
+ public void testExecuteJarPutToAttributeBadPath() throws Exception {
+ File exJar = new File("src/test/resources/ExecuteCommand/noSuchFile.jar");
+ File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
+ String jarPath = exJar.getAbsolutePath();
+ exJar.setExecutable(true);
+ final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
+ controller.setValidateExpressionUsage(false);
+ controller.enqueue(dummy.toPath());
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
+ controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
+ controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output");
+ controller.run(1);
+ controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
+ controller.assertTransferCount(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP, 0);
+ controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
+
+ List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
+ MockFlowFile outputFlowFile = flowFiles.get(0);
+ String result = outputFlowFile.getAttribute("executeStreamCommand.output");
+ outputFlowFile.assertContentEquals(dummy);
+ assertTrue(result.isEmpty()); // java -jar with bad path only prints to standard error not standard out
+ assertEquals("1", outputFlowFile.getAttribute("execution.status")); // java -jar with bad path exits with code 1
+ assertEquals("java", outputFlowFile.getAttribute("execution.command"));
+ assertEquals("-jar;", outputFlowFile.getAttribute("execution.command.args").substring(0, 5));
+ String attribute = outputFlowFile.getAttribute("execution.command.args");
+ String expected = "src" + File.separator + "test" + File.separator + "resources" + File.separator + "ExecuteCommand" + File.separator + "noSuchFile.jar";
+ assertEquals(expected, attribute.substring(attribute.length() - expected.length()));
+ }
+
private static boolean isWindows() {
return System.getProperty("os.name").toLowerCase().startsWith("windows");
}