This commit is contained in:
Matt Gilman 2015-06-05 14:50:56 -04:00
commit 73b08d8202
2 changed files with 48 additions and 4 deletions

View File

@ -89,7 +89,13 @@ import org.apache.nifi.stream.io.StreamUtils;
* <li>Supports expression language: true</li> * <li>Supports expression language: true</li>
* </ul> * </ul>
* </li> * </li>
* * <li>Ignore STDIN
* <ul>
* <li>Indicates whether or not the flowfile's contents should be streamed as part of STDIN</li>
* <li>Default value: false (this means that the contents of a flowfile will be sent as STDIN to your command</li>
* <li>Supports expression language: false</li>
* </ul>
* </li>
* </ul> * </ul>
* *
* <p> * <p>
@ -177,12 +183,22 @@ public class ExecuteStreamCommand extends AbstractProcessor {
.required(false) .required(false)
.build(); .build();
static final PropertyDescriptor IGNORE_STDIN = new PropertyDescriptor.Builder()
.name("Ignore STDIN")
.description("If true, the contents of the incoming flowfile will not be passed to the executing command")
.addValidator(Validator.VALID)
.allowableValues("true", "false")
.defaultValue("false")
.build();
private static final List<PropertyDescriptor> PROPERTIES; private static final List<PropertyDescriptor> PROPERTIES;
static { static {
List<PropertyDescriptor> props = new ArrayList<>(); List<PropertyDescriptor> props = new ArrayList<>();
props.add(EXECUTION_ARGUMENTS); props.add(EXECUTION_ARGUMENTS);
props.add(EXECUTION_COMMAND); props.add(EXECUTION_COMMAND);
props.add(IGNORE_STDIN);
props.add(WORKING_DIR); props.add(WORKING_DIR);
PROPERTIES = Collections.unmodifiableList(props); PROPERTIES = Collections.unmodifiableList(props);
} }
@ -225,6 +241,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(flowFile).getValue(); final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(flowFile).getValue();
args.add(executeCommand); args.add(executeCommand);
final String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).getValue(); final String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).getValue();
final boolean ignoreStdin = Boolean.parseBoolean(context.getProperty(IGNORE_STDIN).getValue());
if (!StringUtils.isBlank(commandArguments)) { if (!StringUtils.isBlank(commandArguments)) {
for (String arg : commandArguments.split(";")) { for (String arg : commandArguments.split(";")) {
args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(flowFile).getValue()); args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(flowFile).getValue());
@ -269,7 +286,11 @@ public class ExecuteStreamCommand extends AbstractProcessor {
final BufferedOutputStream bos = new BufferedOutputStream(pos); final BufferedOutputStream bos = new BufferedOutputStream(pos);
FlowFile outputStreamFlowFile = session.create(flowFile); FlowFile outputStreamFlowFile = session.create(flowFile);
StdInWriterCallback callback = new StdInWriterCallback(bos, bis, logger, session, outputStreamFlowFile, process); StdInWriterCallback callback = new StdInWriterCallback(bos, bis, logger, session, outputStreamFlowFile, process);
session.read(flowFile, callback); if (ignoreStdin) {
session.read(outputStreamFlowFile, callback);
} else {
session.read(flowFile, callback);
}
outputStreamFlowFile = callback.outputStreamFlowFile; outputStreamFlowFile = callback.outputStreamFlowFile;
exitCode = callback.exitCode; exitCode = callback.exitCode;
logger.debug("Execution complete for command: {}. Exited with code: {}", new Object[]{executeCommand, exitCode}); logger.debug("Execution complete for command: {}. Exited with code: {}", new Object[]{executeCommand, exitCode});

View File

@ -132,7 +132,7 @@ public class TestExecuteStreamCommand {
String result = new String(byteArray); String result = new String(byteArray);
assertTrue(result.contains(File.separator + "nifi-standard-processors:ModifiedResult\r\n") assertTrue(result.contains(File.separator + "nifi-standard-processors:ModifiedResult\r\n")
|| result.contains(File.separator + "nifi-standard-processors:ModifiedResult\n")); || result.contains(File.separator + "nifi-standard-processors:ModifiedResult\n"));
} }
@Test @Test
@ -154,7 +154,30 @@ public class TestExecuteStreamCommand {
byte[] byteArray = flowFiles.get(0).toByteArray(); byte[] byteArray = flowFiles.get(0).toByteArray();
String result = new String(byteArray); String result = new String(byteArray);
assertTrue(result.contains(File.separator + "nifi-standard-processors" + File.separator + "target:ModifiedResult\r\n") assertTrue(result.contains(File.separator + "nifi-standard-processors" + File.separator + "target:ModifiedResult\r\n")
|| result.contains(File.separator + "nifi-standard-processors" + File.separator + "target:ModifiedResult\n")); || result.contains(File.separator + "nifi-standard-processors" + File.separator + "target:ModifiedResult\n"));
}
@Test
public void testIgnoredStdin() throws IOException {
File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
String jarPath = exJar.getAbsolutePath();
exJar.setExecutable(true);
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
controller.setValidateExpressionUsage(false);
controller.enqueue(dummy.toPath());
controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true");
controller.run(1);
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1);
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
byte[] byteArray = flowFiles.get(0).toByteArray();
String result = new String(byteArray);
assertTrue("TestIngestAndUpdate.jar should not have received anything to modify",
result.endsWith("target:ModifiedResult\n"));
} }
// this is dependent on window with cygwin...so it's not enabled // this is dependent on window with cygwin...so it's not enabled