From 12b44ee0b8446346794a25da5b0ab7d5329287a1 Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Tue, 24 Feb 2015 21:30:18 -0500 Subject: [PATCH 1/3] NIFI-380: Initial import of ExecuteProcess processor Signed-off-by: joewitt --- .../processors/standard/ExecuteProcess.java | 494 ++++++++++++++++++ .../org.apache.nifi.processor.Processor | 1 + .../standard/TestExecuteProcess.java | 78 +++ 3 files changed, 573 insertions(+) create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java new file mode 100644 index 0000000000..6f68f7d209 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java @@ -0,0 +1,494 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.annotation.lifecycle.OnUnscheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.io.OutputStreamCallback; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.stream.io.StreamUtils; + + +@Tags({"command", "process", "source", "external", "invoke", "script"}) +@CapabilityDescription("Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected " + + "to be long-running, the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual " + + "format, as it typically does not make sense to split binary data on arbitrary time-based intervals.") +public class ExecuteProcess extends AbstractProcessor { + + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All created FlowFiles are routed to this relationship") + .build(); + + public static final PropertyDescriptor COMMAND = new PropertyDescriptor.Builder() + .name("Command Path") + .description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.") + .required(true) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor COMMAND_ARGUMENTS = new PropertyDescriptor.Builder() + .name("Command Arguments") + .description("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.") + .required(false) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + + public static final PropertyDescriptor WORKING_DIR = new PropertyDescriptor.Builder() + .name("Working Directory") + .description("The directory to use as the current working directory when executing the command") + .expressionLanguageSupported(false) + .addValidator(StandardValidators.createDirectoryExistsValidator(false, true)) + .required(false) + .build(); + + public static final PropertyDescriptor BATCH_DURATION = new PropertyDescriptor.Builder() + .name("Batch Duration") + .description("If the process is expected to be long-running and produce textual output, a batch duration can be specified so " + + "that the output will be captured for this amount of time and a FlowFile will then be sent out with the results " + + "and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results") + .required(false) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + private volatile ExecutorService executor; + + @Override + public Set getRelationships() { + return Collections.singleton(REL_SUCCESS); + } + + @Override + protected List getSupportedPropertyDescriptors() { + final List properties = new ArrayList<>(); + properties.add(COMMAND); + properties.add(COMMAND_ARGUMENTS); + properties.add(BATCH_DURATION); + return properties; + } + + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment") + .dynamic(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + } + + static List splitArgs(final String input) { + if ( input == null ) { + return Collections.emptyList(); + } + + final List args = new ArrayList<>(); + + final String trimmed = input.trim(); + boolean inQuotes = false; + + final StringBuilder sb = new StringBuilder(); + for (int i=0; i < trimmed.length(); i++) { + final char c = trimmed.charAt(i); + switch (c) { + case ' ': + case '\t': + case '\r': + case '\n': { + if ( inQuotes ) { + sb.append(c); + } else { + final String arg = sb.toString().trim(); + if ( !arg.isEmpty() ) { + args.add(arg); + } + sb.setLength(0); + } + break; + } + case '"': + inQuotes = !inQuotes; + break; + default: + sb.append(c); + break; + } + } + + final String finalArg = sb.toString().trim(); + if ( !finalArg.isEmpty() ) { + args.add(finalArg); + } + + return args; + } + + @OnScheduled + public void setupExecutor(final ProcessContext context) { + executor = Executors.newFixedThreadPool(context.getMaxConcurrentTasks() * 2, new ThreadFactory() { + private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); + + @Override + public Thread newThread(final Runnable r) { + final Thread t = defaultFactory.newThread(r); + t.setName("ExecuteProcess " + getIdentifier() + " Task"); + return t; + } + }); + } + + @OnUnscheduled + public void shutdownExecutor() { + executor.shutdown(); + } + + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { + final String command = context.getProperty(COMMAND).getValue(); + final List args = splitArgs(context.getProperty(COMMAND_ARGUMENTS).getValue()); + + final List commandStrings = new ArrayList<>(args.size() + 1); + commandStrings.add(command); + commandStrings.addAll(args); + + final String commandString = StringUtils.join(commandStrings, " "); + + final ProcessBuilder builder = new ProcessBuilder(commandStrings); + final String workingDirName = context.getProperty(WORKING_DIR).getValue(); + if ( workingDirName != null ) { + builder.directory(new File(workingDirName)); + } + + final Map environment = new HashMap<>(); + for ( final Map.Entry entry : context.getProperties().entrySet() ) { + if ( entry.getKey().isDynamic() ) { + environment.put(entry.getKey().getName(), entry.getValue()); + } + } + + if ( !environment.isEmpty() ) { + builder.environment().putAll(environment); + } + + final long startNanos = System.nanoTime(); + final Process process; + try { + process = builder.start(); + } catch (final IOException ioe) { + getLogger().error("Failed to create process due to {}", new Object[] {ioe}); + context.yield(); + return; + } + + final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS); + + // Submit task to read error stream from process + final AtomicReference errorStream = new AtomicReference<>(); + executor.submit(new Runnable() { + @Override + public void run() { + final StringBuilder sb = new StringBuilder(); + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) { + String line; + while ((line = reader.readLine()) != null) { + if ( sb.length() < 4000 ) { + sb.append(line); + sb.append("\n"); + } + } + } catch (final IOException ioe) { + } + + errorStream.set(sb.toString()); + } + }); + + // Submit task to read output of Process and write to FlowFile. + final ProxyOutputStream proxyOut = new ProxyOutputStream(getLogger()); + final AtomicBoolean failure = new AtomicBoolean(false); + final AtomicBoolean finishedCopying = new AtomicBoolean(false); + final Future future = executor.submit(new Callable() { + @Override + public Object call() throws IOException { + try { + if ( batchNanos == null ) { + // if we aren't batching, just copy the stream from the process to the flowfile. + try (final BufferedInputStream bufferedIn = new BufferedInputStream(process.getInputStream())) { + final byte[] buffer = new byte[4096]; + int len; + while ((len = bufferedIn.read(buffer)) > 0) { + if ( !isScheduled() ) { + return null; + } + + proxyOut.write(buffer, 0, len); + } + } + } else { + // we are batching, which means that the output of the process is text. It doesn't make sense to grab + // arbitrary batches of bytes from some process and send it along as a piece of data, so we assume that + // setting a batch during means text. + // Also, we don't want that text to get split up in the middle of a line, so we use BufferedReader + // to read lines of text and write them as lines of text. + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { + String line; + + while ((line = reader.readLine()) != null) { + if ( !isScheduled() ) { + return null; + } + + proxyOut.write((line + "\n").getBytes(StandardCharsets.UTF_8)); + } + } + } + } catch (final IOException ioe) { + failure.set(true); + throw ioe; + } finally { + finishedCopying.set(true); + } + + return null; + } + }); + + // continue to do this loop until both the process has finished and we have finished copying + // the output from the process to the FlowFile. Unfortunately, even after calling Process.exitValue(), + // there can be data buffered on the InputStream; so we will wait until the stream is empty as well. + int flowFileCount = 0; + while (!finishedCopying.get() || isAlive(process)) { + if ( !isScheduled() ) { + getLogger().info("User stopped processor; will terminate process immediately"); + process.destroy(); + break; + } + FlowFile flowFile = session.create(); + flowFile = session.write(flowFile, new OutputStreamCallback() { + @Override + public void process(final OutputStream flowFileOut) throws IOException { + try (final OutputStream out = new BufferedOutputStream(flowFileOut)) { + proxyOut.setDelegate(out); + + if ( batchNanos == null ) { + // we are not creating batches; wait until process terminates. + Integer exitCode = null; + while (exitCode == null) { + try { + exitCode = process.waitFor(); + } catch (final InterruptedException ie) {} + } + } else { + // wait the allotted amount of time. + try { + TimeUnit.NANOSECONDS.sleep(batchNanos); + } catch (final InterruptedException ie) {} + } + + proxyOut.setDelegate(null); // prevent from writing to this stream + } + } + }); + + if ( flowFile.getSize() == 0L ) { + session.remove(flowFile); + } else if ( failure.get() ) { + session.remove(flowFile); + getLogger().error("Failed to read data from Process, so will not generate FlowFile"); + } else { + session.getProvenanceReporter().create(flowFile, "Created from command: " + commandString); + getLogger().info("Created {} and routed to success", new Object[] {flowFile}); + session.transfer(flowFile, REL_SUCCESS); + flowFileCount++; + } + + session.commit(); + } + + final int exitCode; + final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); + try { + exitCode = process.waitFor(); + } catch (final InterruptedException ie) { + getLogger().warn("Process was interrupted before finishing"); + return; + } + + try { + future.get(); + } catch (final ExecutionException e) { + getLogger().error("Failed to copy output from Process to FlowFile due to {}", new Object[] {e.getCause()}); + } catch (final InterruptedException ie) { + getLogger().error("Interrupted while waiting to copy data form Process to FlowFile"); + return; + } + + getLogger().info("Process finished with exit code {} after creating {} FlowFiles in {} millis", new Object[] {exitCode, flowFileCount, millis}); + } + + + private boolean isAlive(final Process process) { + // unfortunately, java provides no straight-forward way to test if a Process is alive. + // In Java 8, Process.isAlive() is introduced, but NiFi needs to run against Java 7, + // so we have this solution in the mean time. + try { + process.exitValue(); + return false; + } catch (final IllegalThreadStateException itse) { + return true; + } + } + + + private static class ProxyOutputStream extends OutputStream { + private final ProcessorLog logger; + + private final Lock lock = new ReentrantLock(); + private OutputStream delegate; + + public ProxyOutputStream(final ProcessorLog logger) { + this.logger = logger; + } + + public void setDelegate(final OutputStream delegate) { + lock.lock(); + try { + logger.trace("Switching delegate from {} to {}", new Object[] {this.delegate, delegate}); + + this.delegate = delegate; + } finally { + lock.unlock(); + } + } + + private void sleep(final long millis) { + try { + Thread.sleep(millis); + } catch (final InterruptedException ie) {} + } + + @Override + public void write(final int b) throws IOException { + lock.lock(); + try { + while (true) { + if ( delegate != null ) { + logger.trace("Writing to {}", new Object[] {delegate}); + + delegate.write(b); + return; + } else { + lock.unlock(); + sleep(1L); + lock.lock(); + } + } + } finally { + lock.unlock(); + } + } + + @Override + public void write(final byte[] b, final int off, final int len) throws IOException { + lock.lock(); + try { + while (true) { + if ( delegate != null ) { + logger.trace("Writing to {}", new Object[] {delegate}); + + delegate.write(b, off, len); + return; + } else { + lock.unlock(); + sleep(1L); + lock.lock(); + } + } + } finally { + lock.unlock(); + } + } + + @Override + public void write(final byte[] b) throws IOException { + write(b, 0, b.length); + } + + @Override + public void close() throws IOException { + } + + @Override + public void flush() throws IOException { + lock.lock(); + try { + while (true) { + if ( delegate != null ) { + delegate.flush(); + return; + } else { + lock.unlock(); + sleep(1L); + lock.lock(); + } + } + } finally { + lock.unlock(); + } + } + } +} diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index 2d5855c40b..f81ccec9c4 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -24,6 +24,7 @@ org.apache.nifi.processors.standard.EvaluateRegularExpression org.apache.nifi.processors.standard.EvaluateXPath org.apache.nifi.processors.standard.EvaluateXQuery org.apache.nifi.processors.standard.ExecuteStreamCommand +org.apache.nifi.processors.standard.ExecuteProcess org.apache.nifi.processors.standard.GenerateFlowFile org.apache.nifi.processors.standard.GetFile org.apache.nifi.processors.standard.GetFTP diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java new file mode 100644 index 0000000000..0aa2ee3237 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.standard; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.List; + +import org.apache.nifi.util.MockFlowFile; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Test; + +public class TestExecuteProcess { + + @Test + public void testSplitArgs() { + final List nullArgs = ExecuteProcess.splitArgs(null); + assertNotNull(nullArgs); + assertTrue(nullArgs.isEmpty()); + + final List zeroArgs = ExecuteProcess.splitArgs(" "); + assertNotNull(zeroArgs); + assertTrue(zeroArgs.isEmpty()); + + final List singleArg = ExecuteProcess.splitArgs(" hello "); + assertEquals(1, singleArg.size()); + assertEquals("hello", singleArg.get(0)); + + final List twoArg = ExecuteProcess.splitArgs(" hello good-bye "); + assertEquals(2, twoArg.size()); + assertEquals("hello", twoArg.get(0)); + assertEquals("good-bye", twoArg.get(1)); + + final List singleQuotedArg = ExecuteProcess.splitArgs(" \"hello\" "); + assertEquals(1, singleQuotedArg.size()); + assertEquals("hello", singleQuotedArg.get(0)); + + final List twoQuotedArg = ExecuteProcess.splitArgs(" hello \"good bye\""); + assertEquals(2, twoQuotedArg.size()); + assertEquals("hello", twoQuotedArg.get(0)); + assertEquals("good bye", twoQuotedArg.get(1)); + } + + @Test + public void testPing() { + System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "TRACE"); + + final TestRunner runner = TestRunners.newTestRunner(ExecuteProcess.class); + runner.setProperty(ExecuteProcess.COMMAND, "ping"); + runner.setProperty(ExecuteProcess.COMMAND_ARGUMENTS, "127.0.0.1"); + runner.setProperty(ExecuteProcess.BATCH_DURATION, "500 millis"); + + runner.run(); + + final List flowFiles = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS); + for ( final MockFlowFile flowFile : flowFiles ) { + System.out.println(flowFile); + System.out.println(new String(flowFile.toByteArray())); + } + } +} From a066d9b601ae1b8910bd7e526d1c631e4e4e3aae Mon Sep 17 00:00:00 2001 From: Mark Payne Date: Wed, 4 Mar 2015 20:09:08 -0500 Subject: [PATCH 2/3] NIFI-380: Added documentation Signed-off-by: joewitt --- .../processors/standard/ExecuteProcess.java | 27 ++++-- .../index.html | 90 +++++++++++++++++++ 2 files changed, 110 insertions(+), 7 deletions(-) create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ExecuteProcess/index.html diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java index 6f68f7d209..c51064163a 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java @@ -57,7 +57,6 @@ import org.apache.nifi.processor.Relationship; import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.stream.io.StreamUtils; @Tags({"command", "process", "source", "external", "invoke", "script"}) @@ -66,13 +65,8 @@ import org.apache.nifi.stream.io.StreamUtils; + "format, as it typically does not make sense to split binary data on arbitrary time-based intervals.") public class ExecuteProcess extends AbstractProcessor { - public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All created FlowFiles are routed to this relationship") - .build(); - public static final PropertyDescriptor COMMAND = new PropertyDescriptor.Builder() - .name("Command Path") + .name("Command") .description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.") .required(true) .expressionLanguageSupported(false) @@ -105,6 +99,12 @@ public class ExecuteProcess extends AbstractProcessor { .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() + .name("success") + .description("All created FlowFiles are routed to this relationship") + .build(); + + private volatile ExecutorService executor; @Override @@ -317,6 +317,10 @@ public class ExecuteProcess extends AbstractProcessor { process.destroy(); break; } + + // Create a FlowFile that we can write to and set the OutputStream for the FlowFile + // as the delegate for the ProxyOuptutStream, then wait until the process finishes + // or until the specified amount of time FlowFile flowFile = session.create(); flowFile = session.write(flowFile, new OutputStreamCallback() { @Override @@ -345,17 +349,22 @@ public class ExecuteProcess extends AbstractProcessor { }); if ( flowFile.getSize() == 0L ) { + // If no data was written to the file, remove it session.remove(flowFile); } else if ( failure.get() ) { + // If there was a failure processing the output of the Process, remove the FlowFile session.remove(flowFile); getLogger().error("Failed to read data from Process, so will not generate FlowFile"); + break; } else { + // All was good. Generate event and transfer FlowFile. session.getProvenanceReporter().create(flowFile, "Created from command: " + commandString); getLogger().info("Created {} and routed to success", new Object[] {flowFile}); session.transfer(flowFile, REL_SUCCESS); flowFileCount++; } + // Commit the session so that the FlowFile is transferred to the next processor session.commit(); } @@ -394,6 +403,10 @@ public class ExecuteProcess extends AbstractProcessor { } + /** + * Output stream that is used to wrap another output stream in a way that the + * underlying output stream can be swapped out for a different one when needed + */ private static class ProxyOutputStream extends OutputStream { private final ProcessorLog logger; diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ExecuteProcess/index.html b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ExecuteProcess/index.html new file mode 100644 index 0000000000..9129855281 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ExecuteProcess/index.html @@ -0,0 +1,90 @@ + + + + + + ExecuteProcess + + + + + + +

Description:

+

+ Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected + to be long-running, the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual + format, as it typically does not make sense to split binary data on arbitrary time-based intervals. +

+ +

+ Properties: +

+

In the list below, the names of required properties appear + in bold. Any other properties (not in bold) are considered optional. + If a property has a default value, it is indicated. If a property + supports the use of the NiFi Expression Language (or simply, + "expression language"), that is also indicated.

+
    +
  • Command +
      +
    • Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.
    • +
    • Default value: none
    • +
    • Supports expression language: false
    • +
    +
  • +
  • Command Arguments +
      +
    • The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.
    • +
    • Default value: none
    • +
    • Supports expression language: false
    • +
    +
  • +
  • Working Directory +
      +
    • The directory to use as the current working directory when executing the command
    • +
    • Default value: none (which means whatever NiFi's root installation directory is)
    • +
    • Supports expression language: false
    • +
    +
  • +
  • + Batch Duration> +
      +
    • + If the process is expected to be long-running and produce textual output, a batch duration can be specified so + that the output will be captured for this amount of time and a FlowFile will then be sent out with the results + and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results. + If no value is provided, the process will run to completion and the entire output of the process will be written + to a single FlowFile. +
    • +
    • Default value: none
    • +
    • Supports expression language: false
    • +
    +
  • +
+ +

+ Relationships: +

+
    +
  • success +
      +
    • All FlowFiles that are created are routed to this relationship.
    • +
    +
  • +
+ + From 3533a4a58ee2533b32c7f6cc6f758a4a5be93da4 Mon Sep 17 00:00:00 2001 From: joewitt Date: Thu, 5 Mar 2015 22:56:41 -0500 Subject: [PATCH 3/3] NIFI-380 fixed unit test to use a more os portable command. Modify execute process to enable error stream redirection. --- .../processors/standard/ExecuteProcess.java | 272 +++++++++--------- .../index.html | 42 +-- .../standard/TestExecuteProcess.java | 6 +- 3 files changed, 168 insertions(+), 152 deletions(-) diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java index c51064163a..31efd187d9 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java @@ -38,7 +38,6 @@ import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -58,7 +57,6 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.io.OutputStreamCallback; import org.apache.nifi.processor.util.StandardValidators; - @Tags({"command", "process", "source", "external", "invoke", "script"}) @CapabilityDescription("Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected " + "to be long-running, the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual " @@ -66,94 +64,105 @@ import org.apache.nifi.processor.util.StandardValidators; public class ExecuteProcess extends AbstractProcessor { public static final PropertyDescriptor COMMAND = new PropertyDescriptor.Builder() - .name("Command") - .description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.") - .required(true) - .expressionLanguageSupported(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - + .name("Command") + .description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.") + .required(true) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + public static final PropertyDescriptor COMMAND_ARGUMENTS = new PropertyDescriptor.Builder() - .name("Command Arguments") - .description("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.") - .required(false) - .expressionLanguageSupported(false) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name("Command Arguments") + .description("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.") + .required(false) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); public static final PropertyDescriptor WORKING_DIR = new PropertyDescriptor.Builder() - .name("Working Directory") - .description("The directory to use as the current working directory when executing the command") - .expressionLanguageSupported(false) - .addValidator(StandardValidators.createDirectoryExistsValidator(false, true)) - .required(false) - .build(); - + .name("Working Directory") + .description("The directory to use as the current working directory when executing the command") + .expressionLanguageSupported(false) + .addValidator(StandardValidators.createDirectoryExistsValidator(false, true)) + .required(false) + .build(); + public static final PropertyDescriptor BATCH_DURATION = new PropertyDescriptor.Builder() - .name("Batch Duration") - .description("If the process is expected to be long-running and produce textual output, a batch duration can be specified so " - + "that the output will be captured for this amount of time and a FlowFile will then be sent out with the results " - + "and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results") - .required(false) - .expressionLanguageSupported(false) - .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) - .build(); - + .name("Batch Duration") + .description("If the process is expected to be long-running and produce textual output, a batch duration can be specified so " + + "that the output will be captured for this amount of time and a FlowFile will then be sent out with the results " + + "and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results") + .required(false) + .expressionLanguageSupported(false) + .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) + .build(); + + public static final PropertyDescriptor REDIRECT_ERROR_STREAM = new PropertyDescriptor.Builder() + .name("Redirect Error Stream") + .description("If true will redirect any error stream output of the process to the output stream. " + + "This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.") + .required(false) + .allowableValues("true", "false") + .defaultValue("false") + .expressionLanguageSupported(false) + .addValidator(StandardValidators.BOOLEAN_VALIDATOR) + .build(); + public static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("All created FlowFiles are routed to this relationship") - .build(); - + .name("success") + .description("All created FlowFiles are routed to this relationship") + .build(); private volatile ExecutorService executor; - + @Override public Set getRelationships() { return Collections.singleton(REL_SUCCESS); } - + @Override protected List getSupportedPropertyDescriptors() { final List properties = new ArrayList<>(); properties.add(COMMAND); properties.add(COMMAND_ARGUMENTS); properties.add(BATCH_DURATION); + properties.add(REDIRECT_ERROR_STREAM); return properties; } - + @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment") - .dynamic(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); + .name(propertyDescriptorName) + .description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment") + .dynamic(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); } - + static List splitArgs(final String input) { - if ( input == null ) { + if (input == null) { return Collections.emptyList(); } - + final List args = new ArrayList<>(); - + final String trimmed = input.trim(); boolean inQuotes = false; - + final StringBuilder sb = new StringBuilder(); - for (int i=0; i < trimmed.length(); i++) { + for (int i = 0; i < trimmed.length(); i++) { final char c = trimmed.charAt(i); switch (c) { case ' ': case '\t': case '\r': case '\n': { - if ( inQuotes ) { + if (inQuotes) { sb.append(c); } else { final String arg = sb.toString().trim(); - if ( !arg.isEmpty() ) { + if (!arg.isEmpty()) { args.add(arg); } sb.setLength(0); @@ -168,20 +177,20 @@ public class ExecuteProcess extends AbstractProcessor { break; } } - + final String finalArg = sb.toString().trim(); - if ( !finalArg.isEmpty() ) { + if (!finalArg.isEmpty()) { args.add(finalArg); } - + return args; } - + @OnScheduled public void setupExecutor(final ProcessContext context) { executor = Executors.newFixedThreadPool(context.getMaxConcurrentTasks() * 2, new ThreadFactory() { private final ThreadFactory defaultFactory = Executors.defaultThreadFactory(); - + @Override public Thread newThread(final Runnable r) { final Thread t = defaultFactory.newThread(r); @@ -190,73 +199,67 @@ public class ExecuteProcess extends AbstractProcessor { } }); } - + @OnUnscheduled public void shutdownExecutor() { executor.shutdown(); } - + @Override public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { final String command = context.getProperty(COMMAND).getValue(); final List args = splitArgs(context.getProperty(COMMAND_ARGUMENTS).getValue()); - + final Boolean redirectErrorStream = context.getProperty(REDIRECT_ERROR_STREAM).asBoolean(); + final List commandStrings = new ArrayList<>(args.size() + 1); commandStrings.add(command); commandStrings.addAll(args); final String commandString = StringUtils.join(commandStrings, " "); - + final ProcessBuilder builder = new ProcessBuilder(commandStrings); final String workingDirName = context.getProperty(WORKING_DIR).getValue(); - if ( workingDirName != null ) { + if (workingDirName != null) { builder.directory(new File(workingDirName)); } - + final Map environment = new HashMap<>(); - for ( final Map.Entry entry : context.getProperties().entrySet() ) { - if ( entry.getKey().isDynamic() ) { + for (final Map.Entry entry : context.getProperties().entrySet()) { + if (entry.getKey().isDynamic()) { environment.put(entry.getKey().getName(), entry.getValue()); } } - - if ( !environment.isEmpty() ) { + + if (!environment.isEmpty()) { builder.environment().putAll(environment); } - + final long startNanos = System.nanoTime(); final Process process; try { - process = builder.start(); + process = builder.redirectErrorStream(redirectErrorStream).start(); } catch (final IOException ioe) { - getLogger().error("Failed to create process due to {}", new Object[] {ioe}); + getLogger().error("Failed to create process due to {}", new Object[]{ioe}); context.yield(); return; } - + final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS); - + // Submit task to read error stream from process - final AtomicReference errorStream = new AtomicReference<>(); - executor.submit(new Runnable() { - @Override - public void run() { - final StringBuilder sb = new StringBuilder(); - try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) { - String line; - while ((line = reader.readLine()) != null) { - if ( sb.length() < 4000 ) { - sb.append(line); - sb.append("\n"); + if (!redirectErrorStream) { + executor.submit(new Runnable() { + @Override + public void run() { + try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) { + while (reader.read() >= 0) { } + } catch (final IOException ioe) { } - } catch (final IOException ioe) { } - - errorStream.set(sb.toString()); - } - }); - + }); + } + // Submit task to read output of Process and write to FlowFile. final ProxyOutputStream proxyOut = new ProxyOutputStream(getLogger()); final AtomicBoolean failure = new AtomicBoolean(false); @@ -265,16 +268,16 @@ public class ExecuteProcess extends AbstractProcessor { @Override public Object call() throws IOException { try { - if ( batchNanos == null ) { + if (batchNanos == null) { // if we aren't batching, just copy the stream from the process to the flowfile. try (final BufferedInputStream bufferedIn = new BufferedInputStream(process.getInputStream())) { final byte[] buffer = new byte[4096]; int len; while ((len = bufferedIn.read(buffer)) > 0) { - if ( !isScheduled() ) { + if (!isScheduled()) { return null; } - + proxyOut.write(buffer, 0, len); } } @@ -286,9 +289,9 @@ public class ExecuteProcess extends AbstractProcessor { // to read lines of text and write them as lines of text. try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { String line; - + while ((line = reader.readLine()) != null) { - if ( !isScheduled() ) { + if (!isScheduled()) { return null; } @@ -302,22 +305,22 @@ public class ExecuteProcess extends AbstractProcessor { } finally { finishedCopying.set(true); } - + return null; } }); - + // continue to do this loop until both the process has finished and we have finished copying // the output from the process to the FlowFile. Unfortunately, even after calling Process.exitValue(), // there can be data buffered on the InputStream; so we will wait until the stream is empty as well. int flowFileCount = 0; while (!finishedCopying.get() || isAlive(process)) { - if ( !isScheduled() ) { + if (!isScheduled()) { getLogger().info("User stopped processor; will terminate process immediately"); process.destroy(); break; } - + // Create a FlowFile that we can write to and set the OutputStream for the FlowFile // as the delegate for the ProxyOuptutStream, then wait until the process finishes // or until the specified amount of time @@ -327,31 +330,33 @@ public class ExecuteProcess extends AbstractProcessor { public void process(final OutputStream flowFileOut) throws IOException { try (final OutputStream out = new BufferedOutputStream(flowFileOut)) { proxyOut.setDelegate(out); - - if ( batchNanos == null ) { + + if (batchNanos == null) { // we are not creating batches; wait until process terminates. Integer exitCode = null; while (exitCode == null) { try { exitCode = process.waitFor(); - } catch (final InterruptedException ie) {} + } catch (final InterruptedException ie) { + } } } else { // wait the allotted amount of time. try { TimeUnit.NANOSECONDS.sleep(batchNanos); - } catch (final InterruptedException ie) {} + } catch (final InterruptedException ie) { + } } - + proxyOut.setDelegate(null); // prevent from writing to this stream } } }); - - if ( flowFile.getSize() == 0L ) { + + if (flowFile.getSize() == 0L) { // If no data was written to the file, remove it session.remove(flowFile); - } else if ( failure.get() ) { + } else if (failure.get()) { // If there was a failure processing the output of the Process, remove the FlowFile session.remove(flowFile); getLogger().error("Failed to read data from Process, so will not generate FlowFile"); @@ -359,15 +364,15 @@ public class ExecuteProcess extends AbstractProcessor { } else { // All was good. Generate event and transfer FlowFile. session.getProvenanceReporter().create(flowFile, "Created from command: " + commandString); - getLogger().info("Created {} and routed to success", new Object[] {flowFile}); + getLogger().info("Created {} and routed to success", new Object[]{flowFile}); session.transfer(flowFile, REL_SUCCESS); flowFileCount++; } - + // Commit the session so that the FlowFile is transferred to the next processor session.commit(); } - + final int exitCode; final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos); try { @@ -376,20 +381,19 @@ public class ExecuteProcess extends AbstractProcessor { getLogger().warn("Process was interrupted before finishing"); return; } - + try { future.get(); } catch (final ExecutionException e) { - getLogger().error("Failed to copy output from Process to FlowFile due to {}", new Object[] {e.getCause()}); + getLogger().error("Failed to copy output from Process to FlowFile due to {}", new Object[]{e.getCause()}); } catch (final InterruptedException ie) { getLogger().error("Interrupted while waiting to copy data form Process to FlowFile"); return; } - - getLogger().info("Process finished with exit code {} after creating {} FlowFiles in {} millis", new Object[] {exitCode, flowFileCount, millis}); + + getLogger().info("Process finished with exit code {} after creating {} FlowFiles in {} millis", new Object[]{exitCode, flowFileCount, millis}); } - private boolean isAlive(final Process process) { // unfortunately, java provides no straight-forward way to test if a Process is alive. // In Java 8, Process.isAlive() is introduced, but NiFi needs to run against Java 7, @@ -401,46 +405,48 @@ public class ExecuteProcess extends AbstractProcessor { return true; } } - - + /** - * Output stream that is used to wrap another output stream in a way that the - * underlying output stream can be swapped out for a different one when needed + * Output stream that is used to wrap another output stream in a way that + * the underlying output stream can be swapped out for a different one when + * needed */ private static class ProxyOutputStream extends OutputStream { + private final ProcessorLog logger; - + private final Lock lock = new ReentrantLock(); private OutputStream delegate; public ProxyOutputStream(final ProcessorLog logger) { this.logger = logger; } - + public void setDelegate(final OutputStream delegate) { lock.lock(); try { - logger.trace("Switching delegate from {} to {}", new Object[] {this.delegate, delegate}); + logger.trace("Switching delegate from {} to {}", new Object[]{this.delegate, delegate}); this.delegate = delegate; } finally { lock.unlock(); } } - + private void sleep(final long millis) { try { Thread.sleep(millis); - } catch (final InterruptedException ie) {} + } catch (final InterruptedException ie) { + } } - + @Override public void write(final int b) throws IOException { lock.lock(); try { while (true) { - if ( delegate != null ) { - logger.trace("Writing to {}", new Object[] {delegate}); + if (delegate != null) { + logger.trace("Writing to {}", new Object[]{delegate}); delegate.write(b); return; @@ -454,15 +460,15 @@ public class ExecuteProcess extends AbstractProcessor { lock.unlock(); } } - + @Override public void write(final byte[] b, final int off, final int len) throws IOException { lock.lock(); try { while (true) { - if ( delegate != null ) { - logger.trace("Writing to {}", new Object[] {delegate}); - + if (delegate != null) { + logger.trace("Writing to {}", new Object[]{delegate}); + delegate.write(b, off, len); return; } else { @@ -475,22 +481,22 @@ public class ExecuteProcess extends AbstractProcessor { lock.unlock(); } } - + @Override public void write(final byte[] b) throws IOException { write(b, 0, b.length); } - + @Override public void close() throws IOException { } - + @Override public void flush() throws IOException { lock.lock(); try { while (true) { - if ( delegate != null ) { + if (delegate != null) { delegate.flush(); return; } else { diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ExecuteProcess/index.html b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ExecuteProcess/index.html index 9129855281..3526cd1526 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ExecuteProcess/index.html +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ExecuteProcess/index.html @@ -25,11 +25,11 @@

Description:

- Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected - to be long-running, the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual + Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected + to be long-running, the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual format, as it typically does not make sense to split binary data on arbitrary time-based intervals.

- +

Properties:

@@ -60,20 +60,30 @@
  • Supports expression language: false
  • -
  • - Batch Duration> -
      -
    • - If the process is expected to be long-running and produce textual output, a batch duration can be specified so - that the output will be captured for this amount of time and a FlowFile will then be sent out with the results - and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results. - If no value is provided, the process will run to completion and the entire output of the process will be written - to a single FlowFile. -
    • -
    • Default value: none
    • +
    • Batch Duration +
        +
      • + If the process is expected to be long-running and produce textual output, a batch duration can be specified so + that the output will be captured for this amount of time and a FlowFile will then be sent out with the results + and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results. + If no value is provided, the process will run to completion and the entire output of the process will be written + to a single FlowFile. +
      • +
      • Default value: none
      • Supports expression language: false
      • -
      -
    • +
    +
  • +
  • Redirect Error Stream +
      +
    • + If true will redirect any error stream output of the process to the output stream. + This is particularly helpful for processes which write extensively to the error stream or for troubleshooting. +
    • +
    • Default value: false
    • +
    • Allowed Values: true, false
    • +
    • Supports expression language: false
    • +
    +
  • diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java index 0aa2ee3237..897973c398 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java @@ -59,12 +59,12 @@ public class TestExecuteProcess { } @Test - public void testPing() { + public void testEcho() { System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "TRACE"); final TestRunner runner = TestRunners.newTestRunner(ExecuteProcess.class); - runner.setProperty(ExecuteProcess.COMMAND, "ping"); - runner.setProperty(ExecuteProcess.COMMAND_ARGUMENTS, "127.0.0.1"); + runner.setProperty(ExecuteProcess.COMMAND, "echo"); + runner.setProperty(ExecuteProcess.COMMAND_ARGUMENTS, "test-args"); runner.setProperty(ExecuteProcess.BATCH_DURATION, "500 millis"); runner.run();