diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java index f97a4557f4..676bd079c2 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java @@ -232,22 +232,22 @@ public class ExecuteStreamCommand extends AbstractProcessor { @Override public void onTrigger(ProcessContext context, final ProcessSession session) throws ProcessException { - FlowFile flowFile = session.get(); - if (null == flowFile) { + FlowFile inputFlowFile = session.get(); + if (null == inputFlowFile) { return; } final ArrayList args = new ArrayList<>(); - final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(flowFile).getValue(); + final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(inputFlowFile).getValue(); args.add(executeCommand); final String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).getValue(); final boolean ignoreStdin = Boolean.parseBoolean(context.getProperty(IGNORE_STDIN).getValue()); if (!StringUtils.isBlank(commandArguments)) { for (String arg : commandArguments.split(";")) { - args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(flowFile).getValue()); + args.add(context.newPropertyValue(arg).evaluateAttributeExpressions(inputFlowFile).getValue()); } } - final String workingDir = context.getProperty(WORKING_DIR).evaluateAttributeExpressions(flowFile).getValue(); + final String workingDir = context.getProperty(WORKING_DIR).evaluateAttributeExpressions(inputFlowFile).getValue(); final ProcessBuilder builder = new ProcessBuilder(); @@ -284,13 +284,9 @@ public class ExecuteStreamCommand extends AbstractProcessor { final BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(pes))) { int exitCode = -1; final BufferedOutputStream bos = new BufferedOutputStream(pos); - FlowFile outputStreamFlowFile = session.create(flowFile); - StdInWriterCallback callback = new StdInWriterCallback(bos, bis, logger, session, outputStreamFlowFile, process); - if (ignoreStdin) { - session.read(outputStreamFlowFile, callback); - } else { - session.read(flowFile, callback); - } + FlowFile outputStreamFlowFile = session.create(inputFlowFile); + ProcessStreamWriterCallback callback = new ProcessStreamWriterCallback(ignoreStdin, bos, bis, logger, session, outputStreamFlowFile, process); + session.read(inputFlowFile, callback); outputStreamFlowFile = callback.outputStreamFlowFile; exitCode = callback.exitCode; logger.debug("Execution complete for command: {}. Exited with code: {}", new Object[]{executeCommand, exitCode}); @@ -321,9 +317,9 @@ public class ExecuteStreamCommand extends AbstractProcessor { attributes.put("execution.command.args", commandArguments); outputStreamFlowFile = session.putAllAttributes(outputStreamFlowFile, attributes); session.transfer(outputStreamFlowFile, OUTPUT_STREAM_RELATIONSHIP); - logger.info("Transferring flow file {} to original", new Object[]{flowFile}); - flowFile = session.putAllAttributes(flowFile, attributes); - session.transfer(flowFile, ORIGINAL_RELATIONSHIP); + logger.info("Transferring flow file {} to original", new Object[]{inputFlowFile}); + inputFlowFile = session.putAllAttributes(inputFlowFile, attributes); + session.transfer(inputFlowFile, ORIGINAL_RELATIONSHIP); } catch (final IOException ex) { // could not close Process related streams @@ -333,8 +329,9 @@ public class ExecuteStreamCommand extends AbstractProcessor { } } - static class StdInWriterCallback implements InputStreamCallback { + static class ProcessStreamWriterCallback implements InputStreamCallback { + final boolean ignoreStdin; final OutputStream stdInWritable; final InputStream stdOutReadable; final ProcessorLog logger; @@ -343,7 +340,9 @@ public class ExecuteStreamCommand extends AbstractProcessor { FlowFile outputStreamFlowFile; int exitCode; - public StdInWriterCallback(OutputStream stdInWritable, InputStream stdOutReadable, ProcessorLog logger, ProcessSession session, FlowFile outputStreamFlowFile, Process process) { + public ProcessStreamWriterCallback(boolean ignoreStdin, OutputStream stdInWritable, InputStream stdOutReadable, + ProcessorLog logger, ProcessSession session, FlowFile outputStreamFlowFile, Process process) { + this.ignoreStdin = ignoreStdin; this.stdInWritable = stdInWritable; this.stdOutReadable = stdOutReadable; this.logger = logger; @@ -358,14 +357,17 @@ public class ExecuteStreamCommand extends AbstractProcessor { @Override public void process(OutputStream out) throws IOException { + Thread writerThread = new Thread(new Runnable() { @Override public void run() { - try { - StreamUtils.copy(incomingFlowFileIS, stdInWritable); - } catch (IOException e) { - logger.error("Failed to write flow file to stdIn due to {}", new Object[]{e}, e); + if (!ignoreStdin) { + try { + StreamUtils.copy(incomingFlowFileIS, stdInWritable); + } catch (IOException e) { + logger.error("Failed to write flow file to stdIn due to {}", new Object[]{e}, e); + } } // MUST close the output stream to the stdIn so that whatever is reading knows // there is no more data