NIFI-583 Adjusting the callback to be aware of whether or not STDIN should be ignored, preferring to bypass the stream copying process and the reuse of the outputflowfile as the source.

This commit is contained in:
Aldrin Piri 2015-06-07 15:02:57 -04:00
parent ceda66159e
commit 096ca61e25
1 changed files with 23 additions and 21 deletions

View File

@ -232,22 +232,22 @@ public class ExecuteStreamCommand extends AbstractProcessor {
@Override
public void onTrigger(ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (null == flowFile) {
FlowFile inputFlowFile = session.get();
if (null == inputFlowFile) {
return;
}
final ArrayList<String> args = new ArrayList<>();
final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(flowFile).getValue();
final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(inputFlowFile).getValue();
args.add(executeCommand);
final String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).getValue();
final boolean ignoreStdin = Boolean.parseBoolean(context.getProperty(IGNORE_STDIN).getValue());
if (!StringUtils.isBlank(commandArguments)) {
for (String arg : commandArguments.split(";")) {
args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(flowFile).getValue());
args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(inputFlowFile).getValue());
}
}
final String workingDir = context.getProperty(WORKING_DIR).evaluateAttributeExpressions(flowFile).getValue();
final String workingDir = context.getProperty(WORKING_DIR).evaluateAttributeExpressions(inputFlowFile).getValue();
final ProcessBuilder builder = new ProcessBuilder();
@ -284,13 +284,9 @@ public class ExecuteStreamCommand extends AbstractProcessor {
final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pes))) {
int exitCode = -1;
final BufferedOutputStream bos = new BufferedOutputStream(pos);
FlowFile outputStreamFlowFile = session.create(flowFile);
StdInWriterCallback callback = new StdInWriterCallback(bos, bis, logger, session, outputStreamFlowFile, process);
if (ignoreStdin) {
session.read(outputStreamFlowFile, callback);
} else {
session.read(flowFile, callback);
}
FlowFile outputStreamFlowFile = session.create(inputFlowFile);
ProcessStreamWriterCallback callback = new ProcessStreamWriterCallback(ignoreStdin, bos, bis, logger, session, outputStreamFlowFile, process);
session.read(inputFlowFile, callback);
outputStreamFlowFile = callback.outputStreamFlowFile;
exitCode = callback.exitCode;
logger.debug("Execution complete for command: {}. Exited with code: {}", new Object[]{executeCommand, exitCode});
@ -321,9 +317,9 @@ public class ExecuteStreamCommand extends AbstractProcessor {
attributes.put("execution.command.args", commandArguments);
outputStreamFlowFile = session.putAllAttributes(outputStreamFlowFile, attributes);
session.transfer(outputStreamFlowFile, OUTPUT_STREAM_RELATIONSHIP);
logger.info("Transferring flow file {} to original", new Object[]{flowFile});
flowFile = session.putAllAttributes(flowFile, attributes);
session.transfer(flowFile, ORIGINAL_RELATIONSHIP);
logger.info("Transferring flow file {} to original", new Object[]{inputFlowFile});
inputFlowFile = session.putAllAttributes(inputFlowFile, attributes);
session.transfer(inputFlowFile, ORIGINAL_RELATIONSHIP);
} catch (final IOException ex) {
// could not close Process related streams
@ -333,8 +329,9 @@ public class ExecuteStreamCommand extends AbstractProcessor {
}
}
static class StdInWriterCallback implements InputStreamCallback {
static class ProcessStreamWriterCallback implements InputStreamCallback {
final boolean ignoreStdin;
final OutputStream stdInWritable;
final InputStream stdOutReadable;
final ProcessorLog logger;
@ -343,7 +340,9 @@ public class ExecuteStreamCommand extends AbstractProcessor {
FlowFile outputStreamFlowFile;
int exitCode;
public StdInWriterCallback(OutputStream stdInWritable, InputStream stdOutReadable, ProcessorLog logger, ProcessSession session, FlowFile outputStreamFlowFile, Process process) {
public ProcessStreamWriterCallback(boolean ignoreStdin, OutputStream stdInWritable, InputStream stdOutReadable,
ProcessorLog logger, ProcessSession session, FlowFile outputStreamFlowFile, Process process) {
this.ignoreStdin = ignoreStdin;
this.stdInWritable = stdInWritable;
this.stdOutReadable = stdOutReadable;
this.logger = logger;
@ -358,14 +357,17 @@ public class ExecuteStreamCommand extends AbstractProcessor {
@Override
public void process(OutputStream out) throws IOException {
Thread writerThread = new Thread(new Runnable() {
@Override
public void run() {
try {
StreamUtils.copy(incomingFlowFileIS, stdInWritable);
} catch (IOException e) {
logger.error("Failed to write flow file to stdIn due to {}", new Object[]{e}, e);
if (!ignoreStdin) {
try {
StreamUtils.copy(incomingFlowFileIS, stdInWritable);
} catch (IOException e) {
logger.error("Failed to write flow file to stdIn due to {}", new Object[]{e}, e);
}
}
// MUST close the output stream to the stdIn so that whatever is reading knows
// there is no more data