NIFI-380 fixed unit test to use a more os portable command. Modify execute process to enable error stream redirection.

This commit is contained in:
joewitt 2015-03-05 22:56:41 -05:00
parent a066d9b601
commit 3533a4a58e
3 changed files with 168 additions and 152 deletions

View File

@ -38,7 +38,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -58,7 +57,6 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
@Tags({"command", "process", "source", "external", "invoke", "script"})
@CapabilityDescription("Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected "
+ "to be long-running, the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual "
@ -99,12 +97,22 @@ public class ExecuteProcess extends AbstractProcessor {
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor REDIRECT_ERROR_STREAM = new PropertyDescriptor.Builder()
.name("Redirect Error Stream")
.description("If true will redirect any error stream output of the process to the output stream. "
+ "This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.")
.required(false)
.allowableValues("true", "false")
.defaultValue("false")
.expressionLanguageSupported(false)
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All created FlowFiles are routed to this relationship")
.build();
private volatile ExecutorService executor;
@Override
@ -118,6 +126,7 @@ public class ExecuteProcess extends AbstractProcessor {
properties.add(COMMAND);
properties.add(COMMAND_ARGUMENTS);
properties.add(BATCH_DURATION);
properties.add(REDIRECT_ERROR_STREAM);
return properties;
}
@ -132,7 +141,7 @@ public class ExecuteProcess extends AbstractProcessor {
}
static List<String> splitArgs(final String input) {
if ( input == null ) {
if (input == null) {
return Collections.emptyList();
}
@ -142,18 +151,18 @@ public class ExecuteProcess extends AbstractProcessor {
boolean inQuotes = false;
final StringBuilder sb = new StringBuilder();
for (int i=0; i < trimmed.length(); i++) {
for (int i = 0; i < trimmed.length(); i++) {
final char c = trimmed.charAt(i);
switch (c) {
case ' ':
case '\t':
case '\r':
case '\n': {
if ( inQuotes ) {
if (inQuotes) {
sb.append(c);
} else {
final String arg = sb.toString().trim();
if ( !arg.isEmpty() ) {
if (!arg.isEmpty()) {
args.add(arg);
}
sb.setLength(0);
@ -170,7 +179,7 @@ public class ExecuteProcess extends AbstractProcessor {
}
final String finalArg = sb.toString().trim();
if ( !finalArg.isEmpty() ) {
if (!finalArg.isEmpty()) {
args.add(finalArg);
}
@ -200,6 +209,7 @@ public class ExecuteProcess extends AbstractProcessor {
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final String command = context.getProperty(COMMAND).getValue();
final List<String> args = splitArgs(context.getProperty(COMMAND_ARGUMENTS).getValue());
final Boolean redirectErrorStream = context.getProperty(REDIRECT_ERROR_STREAM).asBoolean();
final List<String> commandStrings = new ArrayList<>(args.size() + 1);
commandStrings.add(command);
@ -209,27 +219,27 @@ public class ExecuteProcess extends AbstractProcessor {
final ProcessBuilder builder = new ProcessBuilder(commandStrings);
final String workingDirName = context.getProperty(WORKING_DIR).getValue();
if ( workingDirName != null ) {
if (workingDirName != null) {
builder.directory(new File(workingDirName));
}
final Map<String, String> environment = new HashMap<>();
for ( final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet() ) {
if ( entry.getKey().isDynamic() ) {
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
if (entry.getKey().isDynamic()) {
environment.put(entry.getKey().getName(), entry.getValue());
}
}
if ( !environment.isEmpty() ) {
if (!environment.isEmpty()) {
builder.environment().putAll(environment);
}
final long startNanos = System.nanoTime();
final Process process;
try {
process = builder.start();
process = builder.redirectErrorStream(redirectErrorStream).start();
} catch (final IOException ioe) {
getLogger().error("Failed to create process due to {}", new Object[] {ioe});
getLogger().error("Failed to create process due to {}", new Object[]{ioe});
context.yield();
return;
}
@ -237,25 +247,18 @@ public class ExecuteProcess extends AbstractProcessor {
final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS);
// Submit task to read error stream from process
final AtomicReference<String> errorStream = new AtomicReference<>();
if (!redirectErrorStream) {
executor.submit(new Runnable() {
@Override
public void run() {
final StringBuilder sb = new StringBuilder();
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
String line;
while ((line = reader.readLine()) != null) {
if ( sb.length() < 4000 ) {
sb.append(line);
sb.append("\n");
}
while (reader.read() >= 0) {
}
} catch (final IOException ioe) {
}
errorStream.set(sb.toString());
}
});
}
// Submit task to read output of Process and write to FlowFile.
final ProxyOutputStream proxyOut = new ProxyOutputStream(getLogger());
@ -265,13 +268,13 @@ public class ExecuteProcess extends AbstractProcessor {
@Override
public Object call() throws IOException {
try {
if ( batchNanos == null ) {
if (batchNanos == null) {
// if we aren't batching, just copy the stream from the process to the flowfile.
try (final BufferedInputStream bufferedIn = new BufferedInputStream(process.getInputStream())) {
final byte[] buffer = new byte[4096];
int len;
while ((len = bufferedIn.read(buffer)) > 0) {
if ( !isScheduled() ) {
if (!isScheduled()) {
return null;
}
@ -288,7 +291,7 @@ public class ExecuteProcess extends AbstractProcessor {
String line;
while ((line = reader.readLine()) != null) {
if ( !isScheduled() ) {
if (!isScheduled()) {
return null;
}
@ -312,7 +315,7 @@ public class ExecuteProcess extends AbstractProcessor {
// there can be data buffered on the InputStream; so we will wait until the stream is empty as well.
int flowFileCount = 0;
while (!finishedCopying.get() || isAlive(process)) {
if ( !isScheduled() ) {
if (!isScheduled()) {
getLogger().info("User stopped processor; will terminate process immediately");
process.destroy();
break;
@ -328,19 +331,21 @@ public class ExecuteProcess extends AbstractProcessor {
try (final OutputStream out = new BufferedOutputStream(flowFileOut)) {
proxyOut.setDelegate(out);
if ( batchNanos == null ) {
if (batchNanos == null) {
// we are not creating batches; wait until process terminates.
Integer exitCode = null;
while (exitCode == null) {
try {
exitCode = process.waitFor();
} catch (final InterruptedException ie) {}
} catch (final InterruptedException ie) {
}
}
} else {
// wait the allotted amount of time.
try {
TimeUnit.NANOSECONDS.sleep(batchNanos);
} catch (final InterruptedException ie) {}
} catch (final InterruptedException ie) {
}
}
proxyOut.setDelegate(null); // prevent from writing to this stream
@ -348,10 +353,10 @@ public class ExecuteProcess extends AbstractProcessor {
}
});
if ( flowFile.getSize() == 0L ) {
if (flowFile.getSize() == 0L) {
// If no data was written to the file, remove it
session.remove(flowFile);
} else if ( failure.get() ) {
} else if (failure.get()) {
// If there was a failure processing the output of the Process, remove the FlowFile
session.remove(flowFile);
getLogger().error("Failed to read data from Process, so will not generate FlowFile");
@ -359,7 +364,7 @@ public class ExecuteProcess extends AbstractProcessor {
} else {
// All was good. Generate event and transfer FlowFile.
session.getProvenanceReporter().create(flowFile, "Created from command: " + commandString);
getLogger().info("Created {} and routed to success", new Object[] {flowFile});
getLogger().info("Created {} and routed to success", new Object[]{flowFile});
session.transfer(flowFile, REL_SUCCESS);
flowFileCount++;
}
@ -380,16 +385,15 @@ public class ExecuteProcess extends AbstractProcessor {
try {
future.get();
} catch (final ExecutionException e) {
getLogger().error("Failed to copy output from Process to FlowFile due to {}", new Object[] {e.getCause()});
getLogger().error("Failed to copy output from Process to FlowFile due to {}", new Object[]{e.getCause()});
} catch (final InterruptedException ie) {
getLogger().error("Interrupted while waiting to copy data form Process to FlowFile");
return;
}
getLogger().info("Process finished with exit code {} after creating {} FlowFiles in {} millis", new Object[] {exitCode, flowFileCount, millis});
getLogger().info("Process finished with exit code {} after creating {} FlowFiles in {} millis", new Object[]{exitCode, flowFileCount, millis});
}
private boolean isAlive(final Process process) {
// unfortunately, java provides no straight-forward way to test if a Process is alive.
// In Java 8, Process.isAlive() is introduced, but NiFi needs to run against Java 7,
@ -402,12 +406,13 @@ public class ExecuteProcess extends AbstractProcessor {
}
}
/**
* Output stream that is used to wrap another output stream in a way that the
* underlying output stream can be swapped out for a different one when needed
* Output stream that is used to wrap another output stream in a way that
* the underlying output stream can be swapped out for a different one when
* needed
*/
private static class ProxyOutputStream extends OutputStream {
private final ProcessorLog logger;
private final Lock lock = new ReentrantLock();
@ -420,7 +425,7 @@ public class ExecuteProcess extends AbstractProcessor {
public void setDelegate(final OutputStream delegate) {
lock.lock();
try {
logger.trace("Switching delegate from {} to {}", new Object[] {this.delegate, delegate});
logger.trace("Switching delegate from {} to {}", new Object[]{this.delegate, delegate});
this.delegate = delegate;
} finally {
@ -431,7 +436,8 @@ public class ExecuteProcess extends AbstractProcessor {
private void sleep(final long millis) {
try {
Thread.sleep(millis);
} catch (final InterruptedException ie) {}
} catch (final InterruptedException ie) {
}
}
@Override
@ -439,8 +445,8 @@ public class ExecuteProcess extends AbstractProcessor {
lock.lock();
try {
while (true) {
if ( delegate != null ) {
logger.trace("Writing to {}", new Object[] {delegate});
if (delegate != null) {
logger.trace("Writing to {}", new Object[]{delegate});
delegate.write(b);
return;
@ -460,8 +466,8 @@ public class ExecuteProcess extends AbstractProcessor {
lock.lock();
try {
while (true) {
if ( delegate != null ) {
logger.trace("Writing to {}", new Object[] {delegate});
if (delegate != null) {
logger.trace("Writing to {}", new Object[]{delegate});
delegate.write(b, off, len);
return;
@ -490,7 +496,7 @@ public class ExecuteProcess extends AbstractProcessor {
lock.lock();
try {
while (true) {
if ( delegate != null ) {
if (delegate != null) {
delegate.flush();
return;
} else {

View File

@ -60,8 +60,7 @@
<li>Supports expression language: false</li>
</ul>
</li>
<li>
Batch Duration>
<li>Batch Duration
<ul>
<li>
If the process is expected to be long-running and produce textual output, a batch duration can be specified so
@ -74,6 +73,17 @@
<li>Supports expression language: false</li>
</ul>
</li>
<li>Redirect Error Stream
<ul>
<li>
If true will redirect any error stream output of the process to the output stream.
This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.
</li>
<li>Default value: false</li>
<li>Allowed Values: true, false</li>
<li>Supports expression language: false</li>
</ul>
</li>
</ul>
<p>

View File

@ -59,12 +59,12 @@ public class TestExecuteProcess {
}
@Test
public void testPing() {
public void testEcho() {
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "TRACE");
final TestRunner runner = TestRunners.newTestRunner(ExecuteProcess.class);
runner.setProperty(ExecuteProcess.COMMAND, "ping");
runner.setProperty(ExecuteProcess.COMMAND_ARGUMENTS, "127.0.0.1");
runner.setProperty(ExecuteProcess.COMMAND, "echo");
runner.setProperty(ExecuteProcess.COMMAND_ARGUMENTS, "test-args");
runner.setProperty(ExecuteProcess.BATCH_DURATION, "500 millis");
runner.run();