From 096ca61e255a6b4f91d525239ca95aefb508fb2e Mon Sep 17 00:00:00 2001 From: Aldrin Piri Date: Sun, 7 Jun 2015 15:02:57 -0400 Subject: [PATCH] NIFI-583 Adjusting the callback to be aware of whether or not STDIN should be ignored, preferring to bypass the stream copying process and the reuse of the outputflowfile as the source. --- .../standard/ExecuteStreamCommand.java | 44 ++++++++++--------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java index f97a4557f4..676bd079c2 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java @@ -232,22 +232,22 @@ public class ExecuteStreamCommand extends AbstractProcessor { @Override public void onTrigger(ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (null == flowFile) { + FlowFile inputFlowFile = session.get(); + if (null == inputFlowFile) { return; } final ArrayList args = new ArrayList<>(); - final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(flowFile).getValue(); + final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(inputFlowFile).getValue(); args.add(executeCommand); final String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).getValue(); final boolean ignoreStdin = Boolean.parseBoolean(context.getProperty(IGNORE_STDIN).getValue()); if (!StringUtils.isBlank(commandArguments)) { for (String arg : commandArguments.split(";")) { - args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(flowFile).getValue()); + args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(inputFlowFile).getValue()); } } - final String workingDir = context.getProperty(WORKING_DIR).evaluateAttributeExpressions(flowFile).getValue(); + final String workingDir = context.getProperty(WORKING_DIR).evaluateAttributeExpressions(inputFlowFile).getValue(); final ProcessBuilder builder = new ProcessBuilder(); @@ -284,13 +284,9 @@ public class ExecuteStreamCommand extends AbstractProcessor { final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pes))) { int exitCode = -1; final BufferedOutputStream bos = new BufferedOutputStream(pos); - FlowFile outputStreamFlowFile = session.create(flowFile); - StdInWriterCallback callback = new StdInWriterCallback(bos, bis, logger, session, outputStreamFlowFile, process); - if (ignoreStdin) { - session.read(outputStreamFlowFile, callback); - } else { - session.read(flowFile, callback); - } + FlowFile outputStreamFlowFile = session.create(inputFlowFile); + ProcessStreamWriterCallback callback = new ProcessStreamWriterCallback(ignoreStdin, bos, bis, logger, session, outputStreamFlowFile, process); + session.read(inputFlowFile, callback); outputStreamFlowFile = callback.outputStreamFlowFile; exitCode = callback.exitCode; logger.debug("Execution complete for command: {}. Exited with code: {}", new Object[]{executeCommand, exitCode}); @@ -321,9 +317,9 @@ public class ExecuteStreamCommand extends AbstractProcessor { attributes.put("execution.command.args", commandArguments); outputStreamFlowFile = session.putAllAttributes(outputStreamFlowFile, attributes); session.transfer(outputStreamFlowFile, OUTPUT_STREAM_RELATIONSHIP); - logger.info("Transferring flow file {} to original", new Object[]{flowFile}); - flowFile = session.putAllAttributes(flowFile, attributes); - session.transfer(flowFile, ORIGINAL_RELATIONSHIP); + logger.info("Transferring flow file {} to original", new Object[]{inputFlowFile}); + inputFlowFile = session.putAllAttributes(inputFlowFile, attributes); + session.transfer(inputFlowFile, ORIGINAL_RELATIONSHIP); } catch (final IOException ex) { // could not close Process related streams @@ -333,8 +329,9 @@ public class ExecuteStreamCommand extends AbstractProcessor { } } - static class StdInWriterCallback implements InputStreamCallback { + static class ProcessStreamWriterCallback implements InputStreamCallback { + final boolean ignoreStdin; final OutputStream stdInWritable; final InputStream stdOutReadable; final ProcessorLog logger; @@ -343,7 +340,9 @@ public class ExecuteStreamCommand extends AbstractProcessor { FlowFile outputStreamFlowFile; int exitCode; - public StdInWriterCallback(OutputStream stdInWritable, InputStream stdOutReadable, ProcessorLog logger, ProcessSession session, FlowFile outputStreamFlowFile, Process process) { + public ProcessStreamWriterCallback(boolean ignoreStdin, OutputStream stdInWritable, InputStream stdOutReadable, + ProcessorLog logger, ProcessSession session, FlowFile outputStreamFlowFile, Process process) { + this.ignoreStdin = ignoreStdin; this.stdInWritable = stdInWritable; this.stdOutReadable = stdOutReadable; this.logger = logger; @@ -358,14 +357,17 @@ public class ExecuteStreamCommand extends AbstractProcessor { @Override public void process(OutputStream out) throws IOException { + Thread writerThread = new Thread(new Runnable() { @Override public void run() { - try { - StreamUtils.copy(incomingFlowFileIS, stdInWritable); - } catch (IOException e) { - logger.error("Failed to write flow file to stdIn due to {}", new Object[]{e}, e); + if (!ignoreStdin) { + try { + StreamUtils.copy(incomingFlowFileIS, stdInWritable); + } catch (IOException e) { + logger.error("Failed to write flow file to stdIn due to {}", new Object[]{e}, e); + } } // MUST close the output stream to the stdIn so that whatever is reading knows // there is no more data