mirror of https://github.com/apache/nifi.git
NIFI-380 fixed unit test to use a more os portable command. Modify execute process to enable error stream redirection.
This commit is contained in:
parent
bd066cd567
commit
87c666fadb
|
@ -38,7 +38,6 @@ import java.util.concurrent.Future;
|
|||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
|
@ -58,7 +57,6 @@ import org.apache.nifi.processor.exception.ProcessException;
|
|||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
|
||||
|
||||
@Tags({"command", "process", "source", "external", "invoke", "script"})
|
||||
@CapabilityDescription("Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected "
|
||||
+ "to be long-running, the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual "
|
||||
|
@ -99,12 +97,22 @@ public class ExecuteProcess extends AbstractProcessor {
|
|||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final PropertyDescriptor REDIRECT_ERROR_STREAM = new PropertyDescriptor.Builder()
|
||||
.name("Redirect Error Stream")
|
||||
.description("If true will redirect any error stream output of the process to the output stream. "
|
||||
+ "This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.")
|
||||
.required(false)
|
||||
.allowableValues("true", "false")
|
||||
.defaultValue("false")
|
||||
.expressionLanguageSupported(false)
|
||||
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||
.build();
|
||||
|
||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||
.name("success")
|
||||
.description("All created FlowFiles are routed to this relationship")
|
||||
.build();
|
||||
|
||||
|
||||
private volatile ExecutorService executor;
|
||||
|
||||
@Override
|
||||
|
@ -118,6 +126,7 @@ public class ExecuteProcess extends AbstractProcessor {
|
|||
properties.add(COMMAND);
|
||||
properties.add(COMMAND_ARGUMENTS);
|
||||
properties.add(BATCH_DURATION);
|
||||
properties.add(REDIRECT_ERROR_STREAM);
|
||||
return properties;
|
||||
}
|
||||
|
||||
|
@ -200,6 +209,7 @@ public class ExecuteProcess extends AbstractProcessor {
|
|||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
final String command = context.getProperty(COMMAND).getValue();
|
||||
final List<String> args = splitArgs(context.getProperty(COMMAND_ARGUMENTS).getValue());
|
||||
final Boolean redirectErrorStream = context.getProperty(REDIRECT_ERROR_STREAM).asBoolean();
|
||||
|
||||
final List<String> commandStrings = new ArrayList<>(args.size() + 1);
|
||||
commandStrings.add(command);
|
||||
|
@ -227,7 +237,7 @@ public class ExecuteProcess extends AbstractProcessor {
|
|||
final long startNanos = System.nanoTime();
|
||||
final Process process;
|
||||
try {
|
||||
process = builder.start();
|
||||
process = builder.redirectErrorStream(redirectErrorStream).start();
|
||||
} catch (final IOException ioe) {
|
||||
getLogger().error("Failed to create process due to {}", new Object[]{ioe});
|
||||
context.yield();
|
||||
|
@ -237,25 +247,18 @@ public class ExecuteProcess extends AbstractProcessor {
|
|||
final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS);
|
||||
|
||||
// Submit task to read error stream from process
|
||||
final AtomicReference<String> errorStream = new AtomicReference<>();
|
||||
if (!redirectErrorStream) {
|
||||
executor.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
final StringBuilder sb = new StringBuilder();
|
||||
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
if ( sb.length() < 4000 ) {
|
||||
sb.append(line);
|
||||
sb.append("\n");
|
||||
}
|
||||
while (reader.read() >= 0) {
|
||||
}
|
||||
} catch (final IOException ioe) {
|
||||
}
|
||||
|
||||
errorStream.set(sb.toString());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Submit task to read output of Process and write to FlowFile.
|
||||
final ProxyOutputStream proxyOut = new ProxyOutputStream(getLogger());
|
||||
|
@ -334,13 +337,15 @@ public class ExecuteProcess extends AbstractProcessor {
|
|||
while (exitCode == null) {
|
||||
try {
|
||||
exitCode = process.waitFor();
|
||||
} catch (final InterruptedException ie) {}
|
||||
} catch (final InterruptedException ie) {
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// wait the allotted amount of time.
|
||||
try {
|
||||
TimeUnit.NANOSECONDS.sleep(batchNanos);
|
||||
} catch (final InterruptedException ie) {}
|
||||
} catch (final InterruptedException ie) {
|
||||
}
|
||||
}
|
||||
|
||||
proxyOut.setDelegate(null); // prevent from writing to this stream
|
||||
|
@ -389,7 +394,6 @@ public class ExecuteProcess extends AbstractProcessor {
|
|||
getLogger().info("Process finished with exit code {} after creating {} FlowFiles in {} millis", new Object[]{exitCode, flowFileCount, millis});
|
||||
}
|
||||
|
||||
|
||||
private boolean isAlive(final Process process) {
|
||||
// unfortunately, java provides no straight-forward way to test if a Process is alive.
|
||||
// In Java 8, Process.isAlive() is introduced, but NiFi needs to run against Java 7,
|
||||
|
@ -402,12 +406,13 @@ public class ExecuteProcess extends AbstractProcessor {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Output stream that is used to wrap another output stream in a way that the
|
||||
* underlying output stream can be swapped out for a different one when needed
|
||||
* Output stream that is used to wrap another output stream in a way that
|
||||
* the underlying output stream can be swapped out for a different one when
|
||||
* needed
|
||||
*/
|
||||
private static class ProxyOutputStream extends OutputStream {
|
||||
|
||||
private final ProcessorLog logger;
|
||||
|
||||
private final Lock lock = new ReentrantLock();
|
||||
|
@ -431,7 +436,8 @@ public class ExecuteProcess extends AbstractProcessor {
|
|||
private void sleep(final long millis) {
|
||||
try {
|
||||
Thread.sleep(millis);
|
||||
} catch (final InterruptedException ie) {}
|
||||
} catch (final InterruptedException ie) {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -60,8 +60,7 @@
|
|||
<li>Supports expression language: false</li>
|
||||
</ul>
|
||||
</li>
|
||||
<li>
|
||||
Batch Duration>
|
||||
<li>Batch Duration
|
||||
<ul>
|
||||
<li>
|
||||
If the process is expected to be long-running and produce textual output, a batch duration can be specified so
|
||||
|
@ -74,6 +73,17 @@
|
|||
<li>Supports expression language: false</li>
|
||||
</ul>
|
||||
</li>
|
||||
<li>Redirect Error Stream
|
||||
<ul>
|
||||
<li>
|
||||
If true will redirect any error stream output of the process to the output stream.
|
||||
This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.
|
||||
</li>
|
||||
<li>Default value: false</li>
|
||||
<li>Allowed Values: true, false</li>
|
||||
<li>Supports expression language: false</li>
|
||||
</ul>
|
||||
</li>
|
||||
</ul>
|
||||
|
||||
<p>
|
||||
|
|
|
@ -59,12 +59,12 @@ public class TestExecuteProcess {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testPing() {
|
||||
public void testEcho() {
|
||||
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "TRACE");
|
||||
|
||||
final TestRunner runner = TestRunners.newTestRunner(ExecuteProcess.class);
|
||||
runner.setProperty(ExecuteProcess.COMMAND, "ping");
|
||||
runner.setProperty(ExecuteProcess.COMMAND_ARGUMENTS, "127.0.0.1");
|
||||
runner.setProperty(ExecuteProcess.COMMAND, "echo");
|
||||
runner.setProperty(ExecuteProcess.COMMAND_ARGUMENTS, "test-args");
|
||||
runner.setProperty(ExecuteProcess.BATCH_DURATION, "500 millis");
|
||||
|
||||
runner.run();
|
||||
|
|
Loading…
Reference in New Issue