From 6279fd41844205a14cf1f2fb225ead8881b197eb Mon Sep 17 00:00:00 2001 From: Pierre Villard Date: Wed, 28 Dec 2016 18:30:24 +0100 Subject: [PATCH] NIFI-3183 This closes #3183. Added command and arguments as FF attributes in ExecuteProcess processor --- .../processors/standard/ExecuteProcess.java | 29 ++++++++++++++----- .../standard/TestExecuteProcess.java | 2 ++ 2 files changed, 24 insertions(+), 7 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java index 4cbe3a14ef..287c39c1d2 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteProcess.java @@ -21,6 +21,8 @@ import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.InputRequirement; import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; import org.apache.nifi.annotation.behavior.Restricted; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; import org.apache.nifi.annotation.lifecycle.OnScheduled; @@ -70,8 +72,15 @@ import java.util.concurrent.locks.ReentrantLock; + "format, as it typically does not make sense to split binary data on arbitrary time-based intervals.") @DynamicProperty(name = "An environment variable name", value = "An environment variable value", description = "These environment variables are passed to the process spawned by this Processor") @Restricted("Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.") +@WritesAttributes({ + @WritesAttribute(attribute = "command", description = "Executed command"), + @WritesAttribute(attribute = "command.arguments", description = "Arguments of the command") +}) public class ExecuteProcess extends AbstractProcessor { + final static String ATTRIBUTE_COMMAND = "command"; + final static String ATTRIBUTE_COMMAND_ARGS = "command.arguments"; + public static final PropertyDescriptor COMMAND = new PropertyDescriptor.Builder() .name("Command") .description("Specifies the command to be executed; if just the name of an executable is provided, it must be in the user's environment PATH.") @@ -203,7 +212,12 @@ public class ExecuteProcess extends AbstractProcessor { final Long batchNanos = context.getProperty(BATCH_DURATION).asTimePeriod(TimeUnit.NANOSECONDS); - final List commandStrings = createCommandStrings(context); + final String command = context.getProperty(COMMAND).getValue(); + final String arguments = context.getProperty(COMMAND_ARGUMENTS).isSet() + ? context.getProperty(COMMAND_ARGUMENTS).evaluateAttributeExpressions().getValue() + : null; + + final List commandStrings = createCommandStrings(context, command, arguments); final String commandString = StringUtils.join(commandStrings, " "); if (longRunningProcess == null || longRunningProcess.isDone()) { @@ -266,6 +280,12 @@ public class ExecuteProcess extends AbstractProcessor { session.remove(flowFile); getLogger().error("Failed to read data from Process, so will not generate FlowFile"); } else { + // add command and arguments as attribute + flowFile = session.putAttribute(flowFile, ATTRIBUTE_COMMAND, command); + if(arguments != null) { + flowFile = session.putAttribute(flowFile, ATTRIBUTE_COMMAND_ARGS, arguments); + } + // All was good. Generate event and transfer FlowFile. session.getProvenanceReporter().create(flowFile, "Created from command: " + commandString); getLogger().info("Created {} and routed to success", new Object[] { flowFile }); @@ -276,13 +296,8 @@ public class ExecuteProcess extends AbstractProcessor { session.commit(); } - protected List createCommandStrings(final ProcessContext context) { - final String command = context.getProperty(COMMAND).getValue(); - final String arguments = context.getProperty(COMMAND_ARGUMENTS).isSet() - ? context.getProperty(COMMAND_ARGUMENTS).evaluateAttributeExpressions().getValue() - : null; + protected List createCommandStrings(final ProcessContext context, final String command, final String arguments) { final List args = ArgumentUtils.splitArgs(arguments, context.getProperty(ARG_DELIMITER).getValue().charAt(0)); - final List commandStrings = new ArrayList<>(args.size() + 1); commandStrings.add(command); commandStrings.addAll(args); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java index a5d965a73a..0d2ed48b15 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteProcess.java @@ -291,6 +291,8 @@ public class TestExecuteProcess { final List succeeded = runner.getFlowFilesForRelationship(ExecuteProcess.REL_SUCCESS); assertEquals(1, succeeded.size()); assertTrue(new String(succeeded.get(0).toByteArray()).contains("DOES-NOT-EXIST")); + assertEquals(succeeded.get(0).getAttribute(ExecuteProcess.ATTRIBUTE_COMMAND), "ls"); + assertEquals(succeeded.get(0).getAttribute(ExecuteProcess.ATTRIBUTE_COMMAND_ARGS), "DOES-NOT-EXIST"); } /**