mirror of https://github.com/apache/nifi.git
NIFI-583: Ignore STDIN for ExecuteStreamCommand
- Added the ability (default: false) to ignore STDIN when passing a flowfile to the ExecuteStreamCommand processor. This is useful if the command you are executing cannot take STDIN, or passing STDIN is unnecessary Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
parent
42a2fc5f64
commit
6d1128497b
|
@ -89,7 +89,13 @@ import org.apache.nifi.stream.io.StreamUtils;
|
||||||
* <li>Supports expression language: true</li>
|
* <li>Supports expression language: true</li>
|
||||||
* </ul>
|
* </ul>
|
||||||
* </li>
|
* </li>
|
||||||
*
|
* <li>Ignore STDIN
|
||||||
|
* <ul>
|
||||||
|
* <li>Indicates whether or not the flowfile's contents should be streamed as part of STDIN</li>
|
||||||
|
* <li>Default value: false (this means that the contents of a flowfile will be sent as STDIN to your command</li>
|
||||||
|
* <li>Supports expression language: false</li>
|
||||||
|
* </ul>
|
||||||
|
* </li>
|
||||||
* </ul>
|
* </ul>
|
||||||
*
|
*
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -177,12 +183,22 @@ public class ExecuteStreamCommand extends AbstractProcessor {
|
||||||
.required(false)
|
.required(false)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
static final PropertyDescriptor IGNORE_STDIN = new PropertyDescriptor.Builder()
|
||||||
|
.name("Ignore STDIN")
|
||||||
|
.description("If true, the contents of the incoming flowfile will not be passed to the executing command")
|
||||||
|
.addValidator(Validator.VALID)
|
||||||
|
.allowableValues("true", "false")
|
||||||
|
.defaultValue("false")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
|
||||||
private static final List<PropertyDescriptor> PROPERTIES;
|
private static final List<PropertyDescriptor> PROPERTIES;
|
||||||
|
|
||||||
static {
|
static {
|
||||||
List<PropertyDescriptor> props = new ArrayList<>();
|
List<PropertyDescriptor> props = new ArrayList<>();
|
||||||
props.add(EXECUTION_ARGUMENTS);
|
props.add(EXECUTION_ARGUMENTS);
|
||||||
props.add(EXECUTION_COMMAND);
|
props.add(EXECUTION_COMMAND);
|
||||||
|
props.add(IGNORE_STDIN);
|
||||||
props.add(WORKING_DIR);
|
props.add(WORKING_DIR);
|
||||||
PROPERTIES = Collections.unmodifiableList(props);
|
PROPERTIES = Collections.unmodifiableList(props);
|
||||||
}
|
}
|
||||||
|
@ -225,6 +241,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
|
||||||
final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(flowFile).getValue();
|
final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(flowFile).getValue();
|
||||||
args.add(executeCommand);
|
args.add(executeCommand);
|
||||||
final String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).getValue();
|
final String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).getValue();
|
||||||
|
final boolean ignoreStdin = Boolean.parseBoolean(context.getProperty(IGNORE_STDIN).getValue());
|
||||||
if (!StringUtils.isBlank(commandArguments)) {
|
if (!StringUtils.isBlank(commandArguments)) {
|
||||||
for (String arg : commandArguments.split(";")) {
|
for (String arg : commandArguments.split(";")) {
|
||||||
args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(flowFile).getValue());
|
args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(flowFile).getValue());
|
||||||
|
@ -269,7 +286,11 @@ public class ExecuteStreamCommand extends AbstractProcessor {
|
||||||
final BufferedOutputStream bos = new BufferedOutputStream(pos);
|
final BufferedOutputStream bos = new BufferedOutputStream(pos);
|
||||||
FlowFile outputStreamFlowFile = session.create(flowFile);
|
FlowFile outputStreamFlowFile = session.create(flowFile);
|
||||||
StdInWriterCallback callback = new StdInWriterCallback(bos, bis, logger, session, outputStreamFlowFile, process);
|
StdInWriterCallback callback = new StdInWriterCallback(bos, bis, logger, session, outputStreamFlowFile, process);
|
||||||
session.read(flowFile, callback);
|
if (ignoreStdin) {
|
||||||
|
session.read(outputStreamFlowFile, callback);
|
||||||
|
} else {
|
||||||
|
session.read(flowFile, callback);
|
||||||
|
}
|
||||||
outputStreamFlowFile = callback.outputStreamFlowFile;
|
outputStreamFlowFile = callback.outputStreamFlowFile;
|
||||||
exitCode = callback.exitCode;
|
exitCode = callback.exitCode;
|
||||||
logger.debug("Execution complete for command: {}. Exited with code: {}", new Object[]{executeCommand, exitCode});
|
logger.debug("Execution complete for command: {}. Exited with code: {}", new Object[]{executeCommand, exitCode});
|
||||||
|
|
|
@ -132,7 +132,7 @@ public class TestExecuteStreamCommand {
|
||||||
String result = new String(byteArray);
|
String result = new String(byteArray);
|
||||||
|
|
||||||
assertTrue(result.contains(File.separator + "nifi-standard-processors:ModifiedResult\r\n")
|
assertTrue(result.contains(File.separator + "nifi-standard-processors:ModifiedResult\r\n")
|
||||||
|| result.contains(File.separator + "nifi-standard-processors:ModifiedResult\n"));
|
|| result.contains(File.separator + "nifi-standard-processors:ModifiedResult\n"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -154,7 +154,30 @@ public class TestExecuteStreamCommand {
|
||||||
byte[] byteArray = flowFiles.get(0).toByteArray();
|
byte[] byteArray = flowFiles.get(0).toByteArray();
|
||||||
String result = new String(byteArray);
|
String result = new String(byteArray);
|
||||||
assertTrue(result.contains(File.separator + "nifi-standard-processors" + File.separator + "target:ModifiedResult\r\n")
|
assertTrue(result.contains(File.separator + "nifi-standard-processors" + File.separator + "target:ModifiedResult\r\n")
|
||||||
|| result.contains(File.separator + "nifi-standard-processors" + File.separator + "target:ModifiedResult\n"));
|
|| result.contains(File.separator + "nifi-standard-processors" + File.separator + "target:ModifiedResult\n"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testIgnoredStdin() throws IOException {
|
||||||
|
File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
|
||||||
|
File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
|
||||||
|
String jarPath = exJar.getAbsolutePath();
|
||||||
|
exJar.setExecutable(true);
|
||||||
|
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
|
||||||
|
controller.setValidateExpressionUsage(false);
|
||||||
|
controller.enqueue(dummy.toPath());
|
||||||
|
controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
|
||||||
|
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
|
||||||
|
controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
|
||||||
|
controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true");
|
||||||
|
controller.run(1);
|
||||||
|
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
|
||||||
|
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1);
|
||||||
|
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
|
||||||
|
byte[] byteArray = flowFiles.get(0).toByteArray();
|
||||||
|
String result = new String(byteArray);
|
||||||
|
assertTrue("TestIngestAndUpdate.jar should not have received anything to modify",
|
||||||
|
result.endsWith("target:ModifiedResult\n"));
|
||||||
}
|
}
|
||||||
|
|
||||||
// this is dependent on window with cygwin...so it's not enabled
|
// this is dependent on window with cygwin...so it's not enabled
|
||||||
|
|
Loading…
Reference in New Issue