NIFI-421: Deleted unused method; formatted whitespace

This commit is contained in:
Mark Payne 2015-04-29 14:52:20 -04:00
parent ad98ac50ca
commit fb8984cfa5
1 changed files with 46 additions and 67 deletions

View File

@ -66,43 +66,43 @@ import org.apache.nifi.processor.util.StandardValidators;
public class ExecuteProcess extends AbstractProcessor {
public static final PropertyDescriptor COMMAND = new PropertyDescriptor.Builder()
.name("Command")
.description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.")
.required(true)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
.name("Command")
.description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.")
.required(true)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor COMMAND_ARGUMENTS = new PropertyDescriptor.Builder()
.name("Command Arguments")
.description("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.")
.required(false)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
.name("Command Arguments")
.description("The arguments to supply to the executable delimited by white space. White space can be escaped by enclosing it in double-quotes.")
.required(false)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final PropertyDescriptor WORKING_DIR = new PropertyDescriptor.Builder()
.name("Working Directory")
.description("The directory to use as the current working directory when executing the command")
.expressionLanguageSupported(false)
.addValidator(StandardValidators.createDirectoryExistsValidator(false, true))
.required(false)
.build();
.name("Working Directory")
.description("The directory to use as the current working directory when executing the command")
.expressionLanguageSupported(false)
.addValidator(StandardValidators.createDirectoryExistsValidator(false, true))
.required(false)
.build();
public static final PropertyDescriptor BATCH_DURATION = new PropertyDescriptor.Builder()
.name("Batch Duration")
.description("If the process is expected to be long-running and produce textual output, a batch duration can be specified so "
+ "that the output will be captured for this amount of time and a FlowFile will then be sent out with the results "
+ "and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results")
.name("Batch Duration")
.description("If the process is expected to be long-running and produce textual output, a batch duration can be specified so "
+ "that the output will be captured for this amount of time and a FlowFile will then be sent out with the results "
+ "and a new FlowFile will be started, rather than waiting for the process to finish before sending out the results")
.required(false)
.expressionLanguageSupported(false)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.build();
public static final PropertyDescriptor REDIRECT_ERROR_STREAM = new PropertyDescriptor.Builder()
.name("Redirect Error Stream")
.description("If true will redirect any error stream output of the process to the output stream. "
+ "This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.")
.name("Redirect Error Stream")
.description("If true will redirect any error stream output of the process to the output stream. "
+ "This is particularly helpful for processes which write extensively to the error stream or for troubleshooting.")
.required(false)
.allowableValues("true", "false")
.defaultValue("false")
@ -111,9 +111,9 @@ public class ExecuteProcess extends AbstractProcessor {
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("All created FlowFiles are routed to this relationship")
.build();
.name("success")
.description("All created FlowFiles are routed to this relationship")
.build();
private volatile ExecutorService executor;
private Future<?> longRunningProcess;
@ -138,11 +138,11 @@ public class ExecuteProcess extends AbstractProcessor {
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment")
.dynamic(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
.name(propertyDescriptorName)
.description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment")
.dynamic(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
}
static List<String> splitArgs(final String input) {
@ -212,17 +212,16 @@ public class ExecuteProcess extends AbstractProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final long startNanos = System.nanoTime();
if (proxyOut==null)
if (proxyOut==null) {
proxyOut = new ProxyOutputStream(getLogger());
}
final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS);
final List<String> commandStrings = createCommandStrings(context);
final String commandString = StringUtils.join(commandStrings, " ");
if (longRunningProcess == null || longRunningProcess.isDone())
if (longRunningProcess == null || longRunningProcess.isDone()) {
try {
longRunningProcess = launchProcess(context, commandStrings, batchNanos, proxyOut);
} catch (final IOException ioe) {
@ -230,8 +229,9 @@ public class ExecuteProcess extends AbstractProcessor {
context.yield();
return;
}
else
} else {
getLogger().info("Read from long running process");
}
if (!isScheduled()) {
getLogger().info("User stopped processor; will terminate process immediately");
@ -239,10 +239,8 @@ public class ExecuteProcess extends AbstractProcessor {
return;
}
// Create a FlowFile that we can write to and set the OutputStream for
// the FlowFile
// as the delegate for the ProxyOuptutStream, then wait until the
// process finishes
// Create a FlowFile that we can write to and set the OutputStream for the FlowFile
// as the delegate for the ProxyOuptutStream, then wait until the process finishes
// or until the specified amount of time
FlowFile flowFile = session.create();
flowFile = session.write(flowFile, new OutputStreamCallback() {
@ -252,8 +250,7 @@ public class ExecuteProcess extends AbstractProcessor {
proxyOut.setDelegate(out);
if (batchNanos == null) {
// we are not creating batches; wait until process
// terminates.
// we are not creating batches; wait until process terminates.
// NB!!! Maybe get(long timeout, TimeUnit unit) should
// be used to avoid waiting forever.
try {
@ -271,7 +268,7 @@ public class ExecuteProcess extends AbstractProcessor {
}
proxyOut.setDelegate(null); // prevent from writing to this
// stream
// stream
}
}
});
@ -280,8 +277,7 @@ public class ExecuteProcess extends AbstractProcessor {
// If no data was written to the file, remove it
session.remove(flowFile);
} else if (failure.get()) {
// If there was a failure processing the output of the Process,
// remove the FlowFile
// If there was a failure processing the output of the Process, remove the FlowFile
session.remove(flowFile);
getLogger().error("Failed to read data from Process, so will not generate FlowFile");
} else {
@ -291,13 +287,11 @@ public class ExecuteProcess extends AbstractProcessor {
session.transfer(flowFile, REL_SUCCESS);
}
// Commit the session so that the FlowFile is transferred to the next
// processor
// Commit the session so that the FlowFile is transferred to the next processor
session.commit();
}
protected List<String> createCommandStrings(final ProcessContext context) {
final String command = context.getProperty(COMMAND).getValue();
final List<String> args = splitArgs(context.getProperty(COMMAND_ARGUMENTS).getValue());
@ -381,8 +375,7 @@ public class ExecuteProcess extends AbstractProcessor {
// setting a batch during means text.
// Also, we don't want that text to get split up in the
// middle of a line, so we use BufferedReader
// to read lines of text and write them as lines of
// text.
// to read lines of text and write them as lines of text.
try (final BufferedReader reader = new BufferedReader(new InputStreamReader(newProcess.getInputStream()))) {
String line;
@ -402,12 +395,10 @@ public class ExecuteProcess extends AbstractProcessor {
int exitCode;
try {
exitCode = newProcess.exitValue();
} catch (Exception e) {
} catch (final Exception e) {
exitCode = -99999;
}
getLogger().info("Process finished with exit code {} ", new Object[] { exitCode });
// getLogger().info("Process finished with exit code {} after creating {} FlowFiles in {} millis",
// new Object[]{exitCode, flowFileCount, millis});
}
return null;
@ -417,18 +408,6 @@ public class ExecuteProcess extends AbstractProcessor {
return future;
}
// NB!!! Currently not used, Future<?> longRunningProcess is used to check whether process is done or not.
private boolean isAlive(final Process process) {
// unfortunately, java provides no straight-forward way to test if a Process is alive.
// In Java 8, Process.isAlive() is introduced, but NiFi needs to run against Java 7,
// so we have this solution in the mean time.
try {
process.exitValue();
return false;
} catch (final IllegalThreadStateException itse) {
return true;
}
}
/**
* Output stream that is used to wrap another output stream in a way that the underlying output stream can be swapped out for a different one when needed