NIFI-3183 This closes #3183. Added command and arguments as FF attributes in ExecuteProcess processor

This commit is contained in:
Pierre Villard 2016-12-28 18:30:24 +01:00 committed by joewitt
parent bd88e4335a
commit 6279fd4184
2 changed files with 24 additions and 7 deletions

View File

@ -21,6 +21,8 @@ import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Restricted; import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnScheduled;
@ -70,8 +72,15 @@ import java.util.concurrent.locks.ReentrantLock;
+ "format, as it typically does not make sense to split binary data on arbitrary time-based intervals.") + "format, as it typically does not make sense to split binary data on arbitrary time-based intervals.")
@DynamicProperty(name = "An environment variable name", value = "An environment variable value", description = "These environment variables are passed to the process spawned by this Processor") @DynamicProperty(name = "An environment variable name", value = "An environment variable value", description = "These environment variables are passed to the process spawned by this Processor")
@Restricted("Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.") @Restricted("Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
@WritesAttributes({
@WritesAttribute(attribute = "command", description = "Executed command"),
@WritesAttribute(attribute = "command.arguments", description = "Arguments of the command")
})
public class ExecuteProcess extends AbstractProcessor { public class ExecuteProcess extends AbstractProcessor {
final static String ATTRIBUTE_COMMAND = "command";
final static String ATTRIBUTE_COMMAND_ARGS = "command.arguments";
public static final PropertyDescriptor COMMAND = new PropertyDescriptor.Builder() public static final PropertyDescriptor COMMAND = new PropertyDescriptor.Builder()
.name("Command") .name("Command")
.description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.") .description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.")
@ -203,7 +212,12 @@ public class ExecuteProcess extends AbstractProcessor {
final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS); final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS);
final List<String> commandStrings = createCommandStrings(context); final String command = context.getProperty(COMMAND).getValue();
final String arguments = context.getProperty(COMMAND_ARGUMENTS).isSet()
? context.getProperty(COMMAND_ARGUMENTS).evaluateAttributeExpressions().getValue()
: null;
final List<String> commandStrings = createCommandStrings(context, command, arguments);
final String commandString = StringUtils.join(commandStrings, " "); final String commandString = StringUtils.join(commandStrings, " ");
if (longRunningProcess == null || longRunningProcess.isDone()) { if (longRunningProcess == null || longRunningProcess.isDone()) {
@ -266,6 +280,12 @@ public class ExecuteProcess extends AbstractProcessor {
session.remove(flowFile); session.remove(flowFile);
getLogger().error("Failed to read data from Process, so will not generate FlowFile"); getLogger().error("Failed to read data from Process, so will not generate FlowFile");
} else { } else {
// add command and arguments as attribute
flowFile = session.putAttribute(flowFile, ATTRIBUTE_COMMAND, command);
if(arguments != null) {
flowFile = session.putAttribute(flowFile, ATTRIBUTE_COMMAND_ARGS, arguments);
}
// All was good. Generate event and transfer FlowFile. // All was good. Generate event and transfer FlowFile.
session.getProvenanceReporter().create(flowFile, "Created from command: " + commandString); session.getProvenanceReporter().create(flowFile, "Created from command: " + commandString);
getLogger().info("Created {} and routed to success", new Object[] { flowFile }); getLogger().info("Created {} and routed to success", new Object[] { flowFile });
@ -276,13 +296,8 @@ public class ExecuteProcess extends AbstractProcessor {
session.commit(); session.commit();
} }
protected List<String> createCommandStrings(final ProcessContext context) { protected List<String> createCommandStrings(final ProcessContext context, final String command, final String arguments) {
final String command = context.getProperty(COMMAND).getValue();
final String arguments = context.getProperty(COMMAND_ARGUMENTS).isSet()
? context.getProperty(COMMAND_ARGUMENTS).evaluateAttributeExpressions().getValue()
: null;
final List<String> args = ArgumentUtils.splitArgs(arguments, context.getProperty(ARG_DELIMITER).getValue().charAt(0)); final List<String> args = ArgumentUtils.splitArgs(arguments, context.getProperty(ARG_DELIMITER).getValue().charAt(0));
final List<String> commandStrings = new ArrayList<>(args.size() + 1); final List<String> commandStrings = new ArrayList<>(args.size() + 1);
commandStrings.add(command); commandStrings.add(command);
commandStrings.addAll(args); commandStrings.addAll(args);

View File

@ -291,6 +291,8 @@ public class TestExecuteProcess {
final List<MockFlowFile> succeeded = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS); final List<MockFlowFile> succeeded = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS);
assertEquals(1, succeeded.size()); assertEquals(1, succeeded.size());
assertTrue(new String(succeeded.get(0).toByteArray()).contains("DOES-NOT-EXIST")); assertTrue(new String(succeeded.get(0).toByteArray()).contains("DOES-NOT-EXIST"));
assertEquals(succeeded.get(0).getAttribute(ExecuteProcess.ATTRIBUTE_COMMAND), "ls");
assertEquals(succeeded.get(0).getAttribute(ExecuteProcess.ATTRIBUTE_COMMAND_ARGS), "DOES-NOT-EXIST");
} }
/** /**