NIFI-4559: Add non-zero status relationship to ExecuteStreamCommand

NIFI-4559: Removed Penalize Non-zero Status property and updated doc per review comments
Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #2246
This commit is contained in:
Matthew Burgess 2017-11-02 11:22:50 -04:00
parent 33281300cd
commit 3b15ed855c
2 changed files with 64 additions and 17 deletions

View File

@ -46,10 +46,10 @@ import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.ArgumentUtils;
import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
@ -115,7 +115,12 @@ import java.util.concurrent.atomic.AtomicReference;
* </li>
* <li>output-stream
* <ul>
* <li>The destination path for the flow file created from the command's output</li>
* <li>The destination path for the flow file created from the command's output, if the exit code is zero</li>
* </ul>
* </li>
* <li>nonzero-status
* <ul>
* <li>The destination path for the flow file created from the command's output, if the exit code is non-zero</li>
* </ul>
* </li>
* </ul>
@ -138,11 +143,16 @@ public class ExecuteStreamCommand extends AbstractProcessor {
public static final Relationship ORIGINAL_RELATIONSHIP = new Relationship.Builder()
.name("original")
.description("FlowFiles that were successfully processed")
.description("FlowFiles that were successfully processed.")
.build();
public static final Relationship OUTPUT_STREAM_RELATIONSHIP = new Relationship.Builder()
.name("output stream")
.description("The destination path for the flow file created from the command's output")
.description("The destination path for the flow file created from the command's output, if the returned status code is zero.")
.build();
public static final Relationship NONZERO_STATUS_RELATIONSHIP = new Relationship.Builder()
.name("nonzero status")
.description("The destination path for the flow file created from the command's output, if the returned status code is non-zero. "
+ "All flow files routed to this relationship will be penalized.")
.build();
private AtomicReference<Set<Relationship>> relationships = new AtomicReference<>();
@ -198,7 +208,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
static final PropertyDescriptor PUT_OUTPUT_IN_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("Output Destination Attribute")
.description("If set, the output of the stream command will be put into an attribute of the original FlowFile instead of a separate "
+ "FlowFile. There will no longer be a relationship for 'output stream'. The value of this property will be the key for the output attribute.")
+ "FlowFile. There will no longer be a relationship for 'output stream' or 'nonzero status'. The value of this property will be the key for the output attribute.")
.addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
.build();
@ -222,7 +232,6 @@ public class ExecuteStreamCommand extends AbstractProcessor {
.defaultValue(";")
.build();
private static final List<PropertyDescriptor> PROPERTIES;
static {
@ -240,6 +249,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
Set<Relationship> outputStreamRelationships = new HashSet<>();
outputStreamRelationships.add(OUTPUT_STREAM_RELATIONSHIP);
outputStreamRelationships.add(ORIGINAL_RELATIONSHIP);
outputStreamRelationships.add(NONZERO_STATUS_RELATIONSHIP);
OUTPUT_STREAM_RELATIONSHIP_SET = Collections.unmodifiableSet(outputStreamRelationships);
Set<Relationship> attributeRelationships = new HashSet<>();
@ -339,10 +349,10 @@ public class ExecuteStreamCommand extends AbstractProcessor {
throw new ProcessException(e);
}
try (final OutputStream pos = process.getOutputStream();
final InputStream pis = process.getInputStream();
final InputStream pes = process.getErrorStream();
final BufferedInputStream bis = new BufferedInputStream(pis);
final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pes))) {
final InputStream pis = process.getInputStream();
final InputStream pes = process.getErrorStream();
final BufferedInputStream bis = new BufferedInputStream(pis);
final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pes))) {
int exitCode = -1;
final BufferedOutputStream bos = new BufferedOutputStream(pos);
FlowFile outputFlowFile = putToAttribute ? inputFlowFile : session.create(inputFlowFile);
@ -373,7 +383,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
int length = strBldr.length() > 4000 ? 4000 : strBldr.length();
attributes.put("execution.error", strBldr.substring(0, length));
final Relationship outputFlowFileRelationship = putToAttribute ? ORIGINAL_RELATIONSHIP : OUTPUT_STREAM_RELATIONSHIP;
final Relationship outputFlowFileRelationship = putToAttribute ? ORIGINAL_RELATIONSHIP : (exitCode != 0) ? NONZERO_STATUS_RELATIONSHIP : OUTPUT_STREAM_RELATIONSHIP;
if (exitCode == 0) {
logger.info("Transferring flow file {} to {}",
new Object[]{outputFlowFile,outputFlowFileRelationship.getName()});
@ -387,7 +397,10 @@ public class ExecuteStreamCommand extends AbstractProcessor {
attributes.put("execution.command.args", commandArguments);
outputFlowFile = session.putAllAttributes(outputFlowFile, attributes);
// This transfer will transfer the FlowFile that received the stream out put to it's destined relationship.
if (NONZERO_STATUS_RELATIONSHIP.equals(outputFlowFileRelationship)) {
outputFlowFile = session.penalize(outputFlowFile);
}
// This will transfer the FlowFile that received the stream output to its destined relationship.
// In the event the stream is put to the an attribute of the original, it will be transferred here.
session.transfer(outputFlowFile, outputFlowFileRelationship);

View File

@ -93,10 +93,13 @@ public class TestExecuteStreamCommand {
controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
controller.run(1);
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1);
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
assertEquals(0, flowFiles.get(0).getSize());
assertEquals("Error: Unable to access jarfile", flowFiles.get(0).getAttribute("execution.error").substring(0, 31));
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
controller.assertTransferCount(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP, 1);
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP);
MockFlowFile flowFile = flowFiles.get(0);
assertEquals(0, flowFile.getSize());
assertEquals("Error: Unable to access jarfile", flowFile.getAttribute("execution.error").substring(0, 31));
assertTrue(flowFile.isPenalized());
}
@Test
@ -303,6 +306,7 @@ public class TestExecuteStreamCommand {
controller.run(1);
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
controller.assertTransferCount(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP, 0);
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
MockFlowFile outputFlowFile = flowFiles.get(0);
@ -470,6 +474,36 @@ public class TestExecuteStreamCommand {
controller.assertValid();
}
@Test
public void testExecuteJarPutToAttributeBadPath() throws Exception {
File exJar = new File("src/test/resources/ExecuteCommand/noSuchFile.jar");
File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
String jarPath = exJar.getAbsolutePath();
exJar.setExecutable(true);
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
controller.setValidateExpressionUsage(false);
controller.enqueue(dummy.toPath());
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output");
controller.run(1);
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
controller.assertTransferCount(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP, 0);
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
MockFlowFile outputFlowFile = flowFiles.get(0);
String result = outputFlowFile.getAttribute("executeStreamCommand.output");
outputFlowFile.assertContentEquals(dummy);
assertTrue(result.isEmpty()); // java -jar with bad path only prints to standard error not standard out
assertEquals("1", outputFlowFile.getAttribute("execution.status")); // java -jar with bad path exits with code 1
assertEquals("java", outputFlowFile.getAttribute("execution.command"));
assertEquals("-jar;", outputFlowFile.getAttribute("execution.command.args").substring(0, 5));
String attribute = outputFlowFile.getAttribute("execution.command.args");
String expected = "src" + File.separator + "test" + File.separator + "resources" + File.separator + "ExecuteCommand" + File.separator + "noSuchFile.jar";
assertEquals(expected, attribute.substring(attribute.length() - expected.length()));
}
private static boolean isWindows() {
return System.getProperty("os.name").toLowerCase().startsWith("windows");
}