mirror of https://github.com/apache/nifi.git
NIFI-380 fixed unit test to use a more os portable command. Modify execute process to enable error stream redirection.
This commit is contained in:
parent
a066d9b601
commit
3533a4a58e
|
@ -38,7 +38,6 @@ import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
@ -58,7 +57,6 @@ import org.apache.nifi.processor.exception.ProcessException;
|
||||||
import org.apache.nifi.processor.io.OutputStreamCallback;
|
import org.apache.nifi.processor.io.OutputStreamCallback;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
|
||||||
|
|
||||||
@Tags({"command", "process", "source", "external", "invoke", "script"})
|
@Tags({"command", "process", "source", "external", "invoke", "script"})
|
||||||
@CapabilityDescription("Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected "
|
@CapabilityDescription("Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected "
|
||||||
+ "to be long-running, the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual "
|
+ "to be long-running, the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual "
|
||||||
|
@ -66,94 +64,105 @@ import org.apache.nifi.processor.util.StandardValidators;
|
||||||
public class ExecuteProcess extends AbstractProcessor {
|
public class ExecuteProcess extends AbstractProcessor {
|
||||||
|
|
||||||
public static final PropertyDescriptor COMMAND = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor COMMAND = new PropertyDescriptor.Builder()
|
||||||
.name("Command")
|
.name("Command")
|
||||||
.description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.")
|
.description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.")
|
||||||
.required(true)
|
.required(true)
|
||||||
.expressionLanguageSupported(false)
|
.expressionLanguageSupported(false)
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor COMMAND_ARGUMENTS = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor COMMAND_ARGUMENTS = new PropertyDescriptor.Builder()
|
||||||
.name("Command Arguments")
|
.name("Command Arguments")
|
||||||
.description("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.")
|
.description("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.")
|
||||||
.required(false)
|
.required(false)
|
||||||
.expressionLanguageSupported(false)
|
.expressionLanguageSupported(false)
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor WORKING_DIR = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor WORKING_DIR = new PropertyDescriptor.Builder()
|
||||||
.name("Working Directory")
|
.name("Working Directory")
|
||||||
.description("The directory to use as the current working directory when executing the command")
|
.description("The directory to use as the current working directory when executing the command")
|
||||||
.expressionLanguageSupported(false)
|
.expressionLanguageSupported(false)
|
||||||
.addValidator(StandardValidators.createDirectoryExistsValidator(false, true))
|
.addValidator(StandardValidators.createDirectoryExistsValidator(false, true))
|
||||||
.required(false)
|
.required(false)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static final PropertyDescriptor BATCH_DURATION = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor BATCH_DURATION = new PropertyDescriptor.Builder()
|
||||||
.name("Batch Duration")
|
.name("Batch Duration")
|
||||||
.description("If the process is expected to be long-running and produce textual output, a batch duration can be specified so "
|
.description("If the process is expected to be long-running and produce textual output, a batch duration can be specified so "
|
||||||
+ "that the output will be captured for this amount of time and a FlowFile will then be sent out with the results "
|
+ "that the output will be captured for this amount of time and a FlowFile will then be sent out with the results "
|
||||||
+ "and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results")
|
+ "and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results")
|
||||||
.required(false)
|
.required(false)
|
||||||
.expressionLanguageSupported(false)
|
.expressionLanguageSupported(false)
|
||||||
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
public static final PropertyDescriptor REDIRECT_ERROR_STREAM = new PropertyDescriptor.Builder()
|
||||||
|
.name("Redirect Error Stream")
|
||||||
|
.description("If true will redirect any error stream output of the process to the output stream. "
|
||||||
|
+ "This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.")
|
||||||
|
.required(false)
|
||||||
|
.allowableValues("true", "false")
|
||||||
|
.defaultValue("false")
|
||||||
|
.expressionLanguageSupported(false)
|
||||||
|
.addValidator(StandardValidators.BOOLEAN_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
|
||||||
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
public static final Relationship REL_SUCCESS = new Relationship.Builder()
|
||||||
.name("success")
|
.name("success")
|
||||||
.description("All created FlowFiles are routed to this relationship")
|
.description("All created FlowFiles are routed to this relationship")
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
|
|
||||||
private volatile ExecutorService executor;
|
private volatile ExecutorService executor;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<Relationship> getRelationships() {
|
public Set<Relationship> getRelationships() {
|
||||||
return Collections.singleton(REL_SUCCESS);
|
return Collections.singleton(REL_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||||
final List<PropertyDescriptor> properties = new ArrayList<>();
|
final List<PropertyDescriptor> properties = new ArrayList<>();
|
||||||
properties.add(COMMAND);
|
properties.add(COMMAND);
|
||||||
properties.add(COMMAND_ARGUMENTS);
|
properties.add(COMMAND_ARGUMENTS);
|
||||||
properties.add(BATCH_DURATION);
|
properties.add(BATCH_DURATION);
|
||||||
|
properties.add(REDIRECT_ERROR_STREAM);
|
||||||
return properties;
|
return properties;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||||
return new PropertyDescriptor.Builder()
|
return new PropertyDescriptor.Builder()
|
||||||
.name(propertyDescriptorName)
|
.name(propertyDescriptorName)
|
||||||
.description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment")
|
.description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment")
|
||||||
.dynamic(true)
|
.dynamic(true)
|
||||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
static List<String> splitArgs(final String input) {
|
static List<String> splitArgs(final String input) {
|
||||||
if ( input == null ) {
|
if (input == null) {
|
||||||
return Collections.emptyList();
|
return Collections.emptyList();
|
||||||
}
|
}
|
||||||
|
|
||||||
final List<String> args = new ArrayList<>();
|
final List<String> args = new ArrayList<>();
|
||||||
|
|
||||||
final String trimmed = input.trim();
|
final String trimmed = input.trim();
|
||||||
boolean inQuotes = false;
|
boolean inQuotes = false;
|
||||||
|
|
||||||
final StringBuilder sb = new StringBuilder();
|
final StringBuilder sb = new StringBuilder();
|
||||||
for (int i=0; i < trimmed.length(); i++) {
|
for (int i = 0; i < trimmed.length(); i++) {
|
||||||
final char c = trimmed.charAt(i);
|
final char c = trimmed.charAt(i);
|
||||||
switch (c) {
|
switch (c) {
|
||||||
case ' ':
|
case ' ':
|
||||||
case '\t':
|
case '\t':
|
||||||
case '\r':
|
case '\r':
|
||||||
case '\n': {
|
case '\n': {
|
||||||
if ( inQuotes ) {
|
if (inQuotes) {
|
||||||
sb.append(c);
|
sb.append(c);
|
||||||
} else {
|
} else {
|
||||||
final String arg = sb.toString().trim();
|
final String arg = sb.toString().trim();
|
||||||
if ( !arg.isEmpty() ) {
|
if (!arg.isEmpty()) {
|
||||||
args.add(arg);
|
args.add(arg);
|
||||||
}
|
}
|
||||||
sb.setLength(0);
|
sb.setLength(0);
|
||||||
|
@ -168,20 +177,20 @@ public class ExecuteProcess extends AbstractProcessor {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
final String finalArg = sb.toString().trim();
|
final String finalArg = sb.toString().trim();
|
||||||
if ( !finalArg.isEmpty() ) {
|
if (!finalArg.isEmpty()) {
|
||||||
args.add(finalArg);
|
args.add(finalArg);
|
||||||
}
|
}
|
||||||
|
|
||||||
return args;
|
return args;
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnScheduled
|
@OnScheduled
|
||||||
public void setupExecutor(final ProcessContext context) {
|
public void setupExecutor(final ProcessContext context) {
|
||||||
executor = Executors.newFixedThreadPool(context.getMaxConcurrentTasks() * 2, new ThreadFactory() {
|
executor = Executors.newFixedThreadPool(context.getMaxConcurrentTasks() * 2, new ThreadFactory() {
|
||||||
private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
|
private final ThreadFactory defaultFactory = Executors.defaultThreadFactory();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Thread newThread(final Runnable r) {
|
public Thread newThread(final Runnable r) {
|
||||||
final Thread t = defaultFactory.newThread(r);
|
final Thread t = defaultFactory.newThread(r);
|
||||||
|
@ -190,73 +199,67 @@ public class ExecuteProcess extends AbstractProcessor {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@OnUnscheduled
|
@OnUnscheduled
|
||||||
public void shutdownExecutor() {
|
public void shutdownExecutor() {
|
||||||
executor.shutdown();
|
executor.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||||
final String command = context.getProperty(COMMAND).getValue();
|
final String command = context.getProperty(COMMAND).getValue();
|
||||||
final List<String> args = splitArgs(context.getProperty(COMMAND_ARGUMENTS).getValue());
|
final List<String> args = splitArgs(context.getProperty(COMMAND_ARGUMENTS).getValue());
|
||||||
|
final Boolean redirectErrorStream = context.getProperty(REDIRECT_ERROR_STREAM).asBoolean();
|
||||||
|
|
||||||
final List<String> commandStrings = new ArrayList<>(args.size() + 1);
|
final List<String> commandStrings = new ArrayList<>(args.size() + 1);
|
||||||
commandStrings.add(command);
|
commandStrings.add(command);
|
||||||
commandStrings.addAll(args);
|
commandStrings.addAll(args);
|
||||||
|
|
||||||
final String commandString = StringUtils.join(commandStrings, " ");
|
final String commandString = StringUtils.join(commandStrings, " ");
|
||||||
|
|
||||||
final ProcessBuilder builder = new ProcessBuilder(commandStrings);
|
final ProcessBuilder builder = new ProcessBuilder(commandStrings);
|
||||||
final String workingDirName = context.getProperty(WORKING_DIR).getValue();
|
final String workingDirName = context.getProperty(WORKING_DIR).getValue();
|
||||||
if ( workingDirName != null ) {
|
if (workingDirName != null) {
|
||||||
builder.directory(new File(workingDirName));
|
builder.directory(new File(workingDirName));
|
||||||
}
|
}
|
||||||
|
|
||||||
final Map<String, String> environment = new HashMap<>();
|
final Map<String, String> environment = new HashMap<>();
|
||||||
for ( final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet() ) {
|
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
|
||||||
if ( entry.getKey().isDynamic() ) {
|
if (entry.getKey().isDynamic()) {
|
||||||
environment.put(entry.getKey().getName(), entry.getValue());
|
environment.put(entry.getKey().getName(), entry.getValue());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( !environment.isEmpty() ) {
|
if (!environment.isEmpty()) {
|
||||||
builder.environment().putAll(environment);
|
builder.environment().putAll(environment);
|
||||||
}
|
}
|
||||||
|
|
||||||
final long startNanos = System.nanoTime();
|
final long startNanos = System.nanoTime();
|
||||||
final Process process;
|
final Process process;
|
||||||
try {
|
try {
|
||||||
process = builder.start();
|
process = builder.redirectErrorStream(redirectErrorStream).start();
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
getLogger().error("Failed to create process due to {}", new Object[] {ioe});
|
getLogger().error("Failed to create process due to {}", new Object[]{ioe});
|
||||||
context.yield();
|
context.yield();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS);
|
final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS);
|
||||||
|
|
||||||
// Submit task to read error stream from process
|
// Submit task to read error stream from process
|
||||||
final AtomicReference<String> errorStream = new AtomicReference<>();
|
if (!redirectErrorStream) {
|
||||||
executor.submit(new Runnable() {
|
executor.submit(new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
final StringBuilder sb = new StringBuilder();
|
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
|
||||||
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getErrorStream()))) {
|
while (reader.read() >= 0) {
|
||||||
String line;
|
|
||||||
while ((line = reader.readLine()) != null) {
|
|
||||||
if ( sb.length() < 4000 ) {
|
|
||||||
sb.append(line);
|
|
||||||
sb.append("\n");
|
|
||||||
}
|
}
|
||||||
|
} catch (final IOException ioe) {
|
||||||
}
|
}
|
||||||
} catch (final IOException ioe) {
|
|
||||||
}
|
}
|
||||||
|
});
|
||||||
errorStream.set(sb.toString());
|
}
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Submit task to read output of Process and write to FlowFile.
|
// Submit task to read output of Process and write to FlowFile.
|
||||||
final ProxyOutputStream proxyOut = new ProxyOutputStream(getLogger());
|
final ProxyOutputStream proxyOut = new ProxyOutputStream(getLogger());
|
||||||
final AtomicBoolean failure = new AtomicBoolean(false);
|
final AtomicBoolean failure = new AtomicBoolean(false);
|
||||||
|
@ -265,16 +268,16 @@ public class ExecuteProcess extends AbstractProcessor {
|
||||||
@Override
|
@Override
|
||||||
public Object call() throws IOException {
|
public Object call() throws IOException {
|
||||||
try {
|
try {
|
||||||
if ( batchNanos == null ) {
|
if (batchNanos == null) {
|
||||||
// if we aren't batching, just copy the stream from the process to the flowfile.
|
// if we aren't batching, just copy the stream from the process to the flowfile.
|
||||||
try (final BufferedInputStream bufferedIn = new BufferedInputStream(process.getInputStream())) {
|
try (final BufferedInputStream bufferedIn = new BufferedInputStream(process.getInputStream())) {
|
||||||
final byte[] buffer = new byte[4096];
|
final byte[] buffer = new byte[4096];
|
||||||
int len;
|
int len;
|
||||||
while ((len = bufferedIn.read(buffer)) > 0) {
|
while ((len = bufferedIn.read(buffer)) > 0) {
|
||||||
if ( !isScheduled() ) {
|
if (!isScheduled()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
proxyOut.write(buffer, 0, len);
|
proxyOut.write(buffer, 0, len);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -286,9 +289,9 @@ public class ExecuteProcess extends AbstractProcessor {
|
||||||
// to read lines of text and write them as lines of text.
|
// to read lines of text and write them as lines of text.
|
||||||
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
|
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
|
||||||
String line;
|
String line;
|
||||||
|
|
||||||
while ((line = reader.readLine()) != null) {
|
while ((line = reader.readLine()) != null) {
|
||||||
if ( !isScheduled() ) {
|
if (!isScheduled()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -302,22 +305,22 @@ public class ExecuteProcess extends AbstractProcessor {
|
||||||
} finally {
|
} finally {
|
||||||
finishedCopying.set(true);
|
finishedCopying.set(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// continue to do this loop until both the process has finished and we have finished copying
|
// continue to do this loop until both the process has finished and we have finished copying
|
||||||
// the output from the process to the FlowFile. Unfortunately, even after calling Process.exitValue(),
|
// the output from the process to the FlowFile. Unfortunately, even after calling Process.exitValue(),
|
||||||
// there can be data buffered on the InputStream; so we will wait until the stream is empty as well.
|
// there can be data buffered on the InputStream; so we will wait until the stream is empty as well.
|
||||||
int flowFileCount = 0;
|
int flowFileCount = 0;
|
||||||
while (!finishedCopying.get() || isAlive(process)) {
|
while (!finishedCopying.get() || isAlive(process)) {
|
||||||
if ( !isScheduled() ) {
|
if (!isScheduled()) {
|
||||||
getLogger().info("User stopped processor; will terminate process immediately");
|
getLogger().info("User stopped processor; will terminate process immediately");
|
||||||
process.destroy();
|
process.destroy();
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a FlowFile that we can write to and set the OutputStream for the FlowFile
|
// Create a FlowFile that we can write to and set the OutputStream for the FlowFile
|
||||||
// as the delegate for the ProxyOuptutStream, then wait until the process finishes
|
// as the delegate for the ProxyOuptutStream, then wait until the process finishes
|
||||||
// or until the specified amount of time
|
// or until the specified amount of time
|
||||||
|
@ -327,31 +330,33 @@ public class ExecuteProcess extends AbstractProcessor {
|
||||||
public void process(final OutputStream flowFileOut) throws IOException {
|
public void process(final OutputStream flowFileOut) throws IOException {
|
||||||
try (final OutputStream out = new BufferedOutputStream(flowFileOut)) {
|
try (final OutputStream out = new BufferedOutputStream(flowFileOut)) {
|
||||||
proxyOut.setDelegate(out);
|
proxyOut.setDelegate(out);
|
||||||
|
|
||||||
if ( batchNanos == null ) {
|
if (batchNanos == null) {
|
||||||
// we are not creating batches; wait until process terminates.
|
// we are not creating batches; wait until process terminates.
|
||||||
Integer exitCode = null;
|
Integer exitCode = null;
|
||||||
while (exitCode == null) {
|
while (exitCode == null) {
|
||||||
try {
|
try {
|
||||||
exitCode = process.waitFor();
|
exitCode = process.waitFor();
|
||||||
} catch (final InterruptedException ie) {}
|
} catch (final InterruptedException ie) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// wait the allotted amount of time.
|
// wait the allotted amount of time.
|
||||||
try {
|
try {
|
||||||
TimeUnit.NANOSECONDS.sleep(batchNanos);
|
TimeUnit.NANOSECONDS.sleep(batchNanos);
|
||||||
} catch (final InterruptedException ie) {}
|
} catch (final InterruptedException ie) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
proxyOut.setDelegate(null); // prevent from writing to this stream
|
proxyOut.setDelegate(null); // prevent from writing to this stream
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
if ( flowFile.getSize() == 0L ) {
|
if (flowFile.getSize() == 0L) {
|
||||||
// If no data was written to the file, remove it
|
// If no data was written to the file, remove it
|
||||||
session.remove(flowFile);
|
session.remove(flowFile);
|
||||||
} else if ( failure.get() ) {
|
} else if (failure.get()) {
|
||||||
// If there was a failure processing the output of the Process, remove the FlowFile
|
// If there was a failure processing the output of the Process, remove the FlowFile
|
||||||
session.remove(flowFile);
|
session.remove(flowFile);
|
||||||
getLogger().error("Failed to read data from Process, so will not generate FlowFile");
|
getLogger().error("Failed to read data from Process, so will not generate FlowFile");
|
||||||
|
@ -359,15 +364,15 @@ public class ExecuteProcess extends AbstractProcessor {
|
||||||
} else {
|
} else {
|
||||||
// All was good. Generate event and transfer FlowFile.
|
// All was good. Generate event and transfer FlowFile.
|
||||||
session.getProvenanceReporter().create(flowFile, "Created from command: " + commandString);
|
session.getProvenanceReporter().create(flowFile, "Created from command: " + commandString);
|
||||||
getLogger().info("Created {} and routed to success", new Object[] {flowFile});
|
getLogger().info("Created {} and routed to success", new Object[]{flowFile});
|
||||||
session.transfer(flowFile, REL_SUCCESS);
|
session.transfer(flowFile, REL_SUCCESS);
|
||||||
flowFileCount++;
|
flowFileCount++;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit the session so that the FlowFile is transferred to the next processor
|
// Commit the session so that the FlowFile is transferred to the next processor
|
||||||
session.commit();
|
session.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
final int exitCode;
|
final int exitCode;
|
||||||
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
|
||||||
try {
|
try {
|
||||||
|
@ -376,20 +381,19 @@ public class ExecuteProcess extends AbstractProcessor {
|
||||||
getLogger().warn("Process was interrupted before finishing");
|
getLogger().warn("Process was interrupted before finishing");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
future.get();
|
future.get();
|
||||||
} catch (final ExecutionException e) {
|
} catch (final ExecutionException e) {
|
||||||
getLogger().error("Failed to copy output from Process to FlowFile due to {}", new Object[] {e.getCause()});
|
getLogger().error("Failed to copy output from Process to FlowFile due to {}", new Object[]{e.getCause()});
|
||||||
} catch (final InterruptedException ie) {
|
} catch (final InterruptedException ie) {
|
||||||
getLogger().error("Interrupted while waiting to copy data form Process to FlowFile");
|
getLogger().error("Interrupted while waiting to copy data form Process to FlowFile");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
getLogger().info("Process finished with exit code {} after creating {} FlowFiles in {} millis", new Object[] {exitCode, flowFileCount, millis});
|
getLogger().info("Process finished with exit code {} after creating {} FlowFiles in {} millis", new Object[]{exitCode, flowFileCount, millis});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private boolean isAlive(final Process process) {
|
private boolean isAlive(final Process process) {
|
||||||
// unfortunately, java provides no straight-forward way to test if a Process is alive.
|
// unfortunately, java provides no straight-forward way to test if a Process is alive.
|
||||||
// In Java 8, Process.isAlive() is introduced, but NiFi needs to run against Java 7,
|
// In Java 8, Process.isAlive() is introduced, but NiFi needs to run against Java 7,
|
||||||
|
@ -401,46 +405,48 @@ public class ExecuteProcess extends AbstractProcessor {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Output stream that is used to wrap another output stream in a way that the
|
* Output stream that is used to wrap another output stream in a way that
|
||||||
* underlying output stream can be swapped out for a different one when needed
|
* the underlying output stream can be swapped out for a different one when
|
||||||
|
* needed
|
||||||
*/
|
*/
|
||||||
private static class ProxyOutputStream extends OutputStream {
|
private static class ProxyOutputStream extends OutputStream {
|
||||||
|
|
||||||
private final ProcessorLog logger;
|
private final ProcessorLog logger;
|
||||||
|
|
||||||
private final Lock lock = new ReentrantLock();
|
private final Lock lock = new ReentrantLock();
|
||||||
private OutputStream delegate;
|
private OutputStream delegate;
|
||||||
|
|
||||||
public ProxyOutputStream(final ProcessorLog logger) {
|
public ProxyOutputStream(final ProcessorLog logger) {
|
||||||
this.logger = logger;
|
this.logger = logger;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setDelegate(final OutputStream delegate) {
|
public void setDelegate(final OutputStream delegate) {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
logger.trace("Switching delegate from {} to {}", new Object[] {this.delegate, delegate});
|
logger.trace("Switching delegate from {} to {}", new Object[]{this.delegate, delegate});
|
||||||
|
|
||||||
this.delegate = delegate;
|
this.delegate = delegate;
|
||||||
} finally {
|
} finally {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sleep(final long millis) {
|
private void sleep(final long millis) {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(millis);
|
Thread.sleep(millis);
|
||||||
} catch (final InterruptedException ie) {}
|
} catch (final InterruptedException ie) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(final int b) throws IOException {
|
public void write(final int b) throws IOException {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
while (true) {
|
while (true) {
|
||||||
if ( delegate != null ) {
|
if (delegate != null) {
|
||||||
logger.trace("Writing to {}", new Object[] {delegate});
|
logger.trace("Writing to {}", new Object[]{delegate});
|
||||||
|
|
||||||
delegate.write(b);
|
delegate.write(b);
|
||||||
return;
|
return;
|
||||||
|
@ -454,15 +460,15 @@ public class ExecuteProcess extends AbstractProcessor {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(final byte[] b, final int off, final int len) throws IOException {
|
public void write(final byte[] b, final int off, final int len) throws IOException {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
while (true) {
|
while (true) {
|
||||||
if ( delegate != null ) {
|
if (delegate != null) {
|
||||||
logger.trace("Writing to {}", new Object[] {delegate});
|
logger.trace("Writing to {}", new Object[]{delegate});
|
||||||
|
|
||||||
delegate.write(b, off, len);
|
delegate.write(b, off, len);
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
|
@ -475,22 +481,22 @@ public class ExecuteProcess extends AbstractProcessor {
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(final byte[] b) throws IOException {
|
public void write(final byte[] b) throws IOException {
|
||||||
write(b, 0, b.length);
|
write(b, 0, b.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void flush() throws IOException {
|
public void flush() throws IOException {
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
while (true) {
|
while (true) {
|
||||||
if ( delegate != null ) {
|
if (delegate != null) {
|
||||||
delegate.flush();
|
delegate.flush();
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -25,11 +25,11 @@
|
||||||
<!-- Processor Documentation ================================================== -->
|
<!-- Processor Documentation ================================================== -->
|
||||||
<h2> Description:</h2>
|
<h2> Description:</h2>
|
||||||
<p>
|
<p>
|
||||||
Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected
|
Runs an operating system command specified by the user and writes the output of that command to a FlowFile. If the command is expected
|
||||||
to be long-running, the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual
|
to be long-running, the Processor can output the partial data on a specified interval. When this option is used, the output is expected to be in textual
|
||||||
format, as it typically does not make sense to split binary data on arbitrary time-based intervals.
|
format, as it typically does not make sense to split binary data on arbitrary time-based intervals.
|
||||||
</p>
|
</p>
|
||||||
|
|
||||||
<p>
|
<p>
|
||||||
<strong>Properties:</strong>
|
<strong>Properties:</strong>
|
||||||
</p>
|
</p>
|
||||||
|
@ -60,20 +60,30 @@
|
||||||
<li>Supports expression language: false</li>
|
<li>Supports expression language: false</li>
|
||||||
</ul>
|
</ul>
|
||||||
</li>
|
</li>
|
||||||
<li>
|
<li>Batch Duration
|
||||||
Batch Duration>
|
<ul>
|
||||||
<ul>
|
<li>
|
||||||
<li>
|
If the process is expected to be long-running and produce textual output, a batch duration can be specified so
|
||||||
If the process is expected to be long-running and produce textual output, a batch duration can be specified so
|
that the output will be captured for this amount of time and a FlowFile will then be sent out with the results
|
||||||
that the output will be captured for this amount of time and a FlowFile will then be sent out with the results
|
and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results.
|
||||||
and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results.
|
If no value is provided, the process will run to completion and the entire output of the process will be written
|
||||||
If no value is provided, the process will run to completion and the entire output of the process will be written
|
to a single FlowFile.
|
||||||
to a single FlowFile.
|
</li>
|
||||||
</li>
|
<li>Default value: none</li>
|
||||||
<li>Default value: none</li>
|
|
||||||
<li>Supports expression language: false</li>
|
<li>Supports expression language: false</li>
|
||||||
</ul>
|
</ul>
|
||||||
</li>
|
</li>
|
||||||
|
<li>Redirect Error Stream
|
||||||
|
<ul>
|
||||||
|
<li>
|
||||||
|
If true will redirect any error stream output of the process to the output stream.
|
||||||
|
This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.
|
||||||
|
</li>
|
||||||
|
<li>Default value: false</li>
|
||||||
|
<li>Allowed Values: true, false</li>
|
||||||
|
<li>Supports expression language: false</li>
|
||||||
|
</ul>
|
||||||
|
</li>
|
||||||
</ul>
|
</ul>
|
||||||
|
|
||||||
<p>
|
<p>
|
||||||
|
|
|
@ -59,12 +59,12 @@ public class TestExecuteProcess {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPing() {
|
public void testEcho() {
|
||||||
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "TRACE");
|
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi", "TRACE");
|
||||||
|
|
||||||
final TestRunner runner = TestRunners.newTestRunner(ExecuteProcess.class);
|
final TestRunner runner = TestRunners.newTestRunner(ExecuteProcess.class);
|
||||||
runner.setProperty(ExecuteProcess.COMMAND, "ping");
|
runner.setProperty(ExecuteProcess.COMMAND, "echo");
|
||||||
runner.setProperty(ExecuteProcess.COMMAND_ARGUMENTS, "127.0.0.1");
|
runner.setProperty(ExecuteProcess.COMMAND_ARGUMENTS, "test-args");
|
||||||
runner.setProperty(ExecuteProcess.BATCH_DURATION, "500 millis");
|
runner.setProperty(ExecuteProcess.BATCH_DURATION, "500 millis");
|
||||||
|
|
||||||
runner.run();
|
runner.run();
|
||||||
|
|
Loading…
Reference in New Issue