From 4bea7f5f79ae8ff46e4c57e649e2f10999601709 Mon Sep 17 00:00:00 2001 From: Nandor Soma Abonyi Date: Thu, 4 May 2023 10:52:11 +0200 Subject: [PATCH] NIFI-11524 Improved ExecuteStreamCommand documentation and configuration This closes #7228 Signed-off-by: David Handermann (cherry picked from commit 2049d5068b8238fc9251d3e735f1d80e0c86558b) --- .../standard/ExecuteStreamCommand.java | 127 ++++++++------ .../additionalDetails.html | 166 ++++++++++++++++++ 2 files changed, 236 insertions(+), 57 deletions(-) create mode 100644 nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ExecuteStreamCommand/additionalDetails.html diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java index ca7dc62971..01a2fb90d0 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java @@ -65,6 +65,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.lang.ProcessBuilder.Redirect; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -148,14 +149,15 @@ import java.util.regex.Pattern; @SupportsBatching @InputRequirement(Requirement.INPUT_REQUIRED) @Tags({"command execution", "command", "stream", "execute"}) -@CapabilityDescription("Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.") +@CapabilityDescription("The ExecuteStreamCommand processor provides a flexible way to integrate external commands and scripts into NiFi data flows." + + " ExecuteStreamCommand can pass the incoming FlowFile's content to the command that it executes similarly how piping works.") @SupportsSensitiveDynamicProperties @DynamicProperties({ @DynamicProperty(name = "An environment variable name", value = "An environment variable value", description = "These environment variables are passed to the process spawned by this Processor"), - @DynamicProperty(name = "command.argument.", value = "Argument to be supplied to the command", + @DynamicProperty(name = "command.argument.", value = "Argument to be supplied to the command", description = "These arguments are supplied to the process spawned by this Processor when using the " - + "Command Arguments Strategy : Dynamic Property Arguments. The NUMBER will determine the order.") + + "Command Arguments Strategy : Dynamic Property Arguments. is a number and it will determine the order.") }) @WritesAttributes({ @WritesAttribute(attribute = "execution.command", description = "The name of the command executed"), @@ -189,7 +191,7 @@ public class ExecuteStreamCommand extends AbstractProcessor { private final static Set OUTPUT_STREAM_RELATIONSHIP_SET; private final static Set ATTRIBUTE_RELATIONSHIP_SET; - private static final Pattern DYNAMIC_PARAMETER_NAME = Pattern.compile("command\\.argument\\.(?[0-9]+)$"); + private static final Pattern COMMAND_ARGUMENT_PATTERN = Pattern.compile("command\\.argument\\.(?[0-9]+)$"); public static final String executionArguments = "Command Arguments Property"; public static final String dynamicArguements = "Dynamic Property Arguments"; @@ -197,8 +199,15 @@ public class ExecuteStreamCommand extends AbstractProcessor { "Arguments to be supplied to the executable are taken from the Command Arguments property"); static final AllowableValue DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY = new AllowableValue(dynamicArguements, dynamicArguements, - "Arguments to be supplied to the executable are taken from dynamic properties"); + "Arguments to be supplied to the executable are taken from dynamic properties with pattern of 'command.argument.'"); + static final PropertyDescriptor WORKING_DIR = new PropertyDescriptor.Builder() + .name("Working Directory") + .description("The directory to use as the current working directory when executing the command") + .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) + .addValidator(StandardValidators.createDirectoryExistsValidator(true, true)) + .required(false) + .build(); private static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true); static final PropertyDescriptor EXECUTION_COMMAND = new PropertyDescriptor.Builder() @@ -215,38 +224,36 @@ public class ExecuteStreamCommand extends AbstractProcessor { .description("Strategy for configuring arguments to be supplied to the command.") .expressionLanguageSupported(ExpressionLanguageScope.NONE) .required(false) - .allowableValues(EXECUTION_ARGUMENTS_PROPERTY_STRATEGY.getValue(), DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue()) + .allowableValues(EXECUTION_ARGUMENTS_PROPERTY_STRATEGY, DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY) .defaultValue(EXECUTION_ARGUMENTS_PROPERTY_STRATEGY.getValue()) .build(); static final PropertyDescriptor EXECUTION_ARGUMENTS = new PropertyDescriptor.Builder() .name("Command Arguments") .description("The arguments to supply to the executable delimited by the ';' character.") + .dependsOn(ARGUMENTS_STRATEGY, EXECUTION_ARGUMENTS_PROPERTY_STRATEGY) .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(new Validator() { - - @Override - public ValidationResult validate(String subject, String input, ValidationContext context) { - ValidationResult result = new ValidationResult.Builder() - .subject(subject).valid(true).input(input).build(); - List args = ArgumentUtils.splitArgs(input, context.getProperty(ARG_DELIMITER).getValue().charAt(0)); - for (String arg : args) { - ValidationResult valResult = ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR.validate(subject, arg, context); - if (!valResult.isValid()) { - result = valResult; - break; - } + .addValidator((subject, input, context) -> { + ValidationResult result = new ValidationResult.Builder() + .subject(subject).valid(true).input(input).build(); + List args = ArgumentUtils.splitArgs(input, context.getProperty(ExecuteStreamCommand.ARG_DELIMITER).getValue().charAt(0)); + for (String arg : args) { + ValidationResult valResult = ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR.validate(subject, arg, context); + if (!valResult.isValid()) { + result = valResult; + break; } - return result; } + return result; }).build(); - static final PropertyDescriptor WORKING_DIR = new PropertyDescriptor.Builder() - .name("Working Directory") - .description("The directory to use as the current working directory when executing the command") - .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) - .addValidator(StandardValidators.createDirectoryExistsValidator(true, true)) - .required(false) + static final PropertyDescriptor ARG_DELIMITER = new PropertyDescriptor.Builder() + .name("Argument Delimiter") + .description("Delimiter to use to separate arguments for a command [default: ;]. Must be a single character") + .dependsOn(ARGUMENTS_STRATEGY, EXECUTION_ARGUMENTS_PROPERTY_STRATEGY) + .addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR) + .required(true) + .defaultValue(";") .build(); static final PropertyDescriptor IGNORE_STDIN = new PropertyDescriptor.Builder() @@ -273,33 +280,21 @@ public class ExecuteStreamCommand extends AbstractProcessor { .defaultValue("256") .build(); - private static final Validator characterValidator = new StandardValidators.StringLengthValidator(1, 1); - - static final PropertyDescriptor ARG_DELIMITER = new PropertyDescriptor.Builder() - .name("Argument Delimiter") - .description("Delimiter to use to separate arguments for a command [default: ;]. Must be a single character") - .addValidator(Validator.VALID) - .addValidator(characterValidator) - .required(true) - .defaultValue(";") - .build(); - private static final List PROPERTIES; private static final String MASKED_ARGUMENT = "********"; static { List props = new ArrayList<>(); + props.add(WORKING_DIR); + props.add(EXECUTION_COMMAND); props.add(ARGUMENTS_STRATEGY); props.add(EXECUTION_ARGUMENTS); - props.add(EXECUTION_COMMAND); - props.add(IGNORE_STDIN); - props.add(WORKING_DIR); props.add(ARG_DELIMITER); + props.add(IGNORE_STDIN); props.add(PUT_OUTPUT_IN_ATTRIBUTE); props.add(PUT_ATTRIBUTE_MAX_LENGTH); PROPERTIES = Collections.unmodifiableList(props); - Set outputStreamRelationships = new HashSet<>(); outputStreamRelationships.add(OUTPUT_STREAM_RELATIONSHIP); outputStreamRelationships.add(ORIGINAL_RELATIONSHIP); @@ -343,17 +338,7 @@ public class ExecuteStreamCommand extends AbstractProcessor { @Override protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { - if (!propertyDescriptorName.startsWith("command.argument.")) { - return new PropertyDescriptor.Builder() - .name(propertyDescriptorName) - .description( - "Sets the environment variable '" + propertyDescriptorName + "' for the process' environment") - .dynamic(true) - .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) - .build(); - } - // get the number part of the name - Matcher matcher = DYNAMIC_PARAMETER_NAME.matcher(propertyDescriptorName); + final Matcher matcher = COMMAND_ARGUMENT_PATTERN.matcher(propertyDescriptorName); if (matcher.matches()) { return new PropertyDescriptor.Builder() .name(propertyDescriptorName) @@ -363,8 +348,37 @@ public class ExecuteStreamCommand extends AbstractProcessor { .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES) .addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR) .build(); + } else { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment") + .dynamic(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); } - return null; + } + + @Override + protected Collection customValidate(ValidationContext validationContext) { + final List validationResults = new ArrayList<>(super.customValidate(validationContext)); + + final String argumentStrategy = validationContext.getProperty(ARGUMENTS_STRATEGY).getValue(); + if (DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue() != argumentStrategy) { + for (final PropertyDescriptor propertyDescriptor : validationContext.getProperties().keySet()) { + if (!propertyDescriptor.isDynamic()) { + continue; + } + + final String propertyName = propertyDescriptor.getName(); + final Matcher matcher = COMMAND_ARGUMENT_PATTERN.matcher(propertyName); + if (matcher.matches()) { + logger.warn("[{}] should be set to [{}] when command arguments are supplied as Dynamic Properties. The property [{}] will be ignored.", + ARGUMENTS_STRATEGY.getDisplayName(), DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getDisplayName(), propertyName); + } + } + } + + return validationResults; } @Override @@ -393,21 +407,20 @@ public class ExecuteStreamCommand extends AbstractProcessor { .splitArgs(commandArguments, context.getProperty(ARG_DELIMITER).getValue().charAt(0))); } } else { - List propertyDescriptors = new ArrayList<>(); for (final Map.Entry entry : context.getProperties().entrySet()) { - Matcher matcher = DYNAMIC_PARAMETER_NAME.matcher(entry.getKey().getName()); + Matcher matcher = COMMAND_ARGUMENT_PATTERN.matcher(entry.getKey().getName()); if (matcher.matches()) { propertyDescriptors.add(entry.getKey()); } } propertyDescriptors.sort((p1, p2) -> { - Matcher matcher = DYNAMIC_PARAMETER_NAME.matcher(p1.getName()); + Matcher matcher = COMMAND_ARGUMENT_PATTERN.matcher(p1.getName()); String indexString1 = null; while (matcher.find()) { indexString1 = matcher.group("commandIndex"); } - matcher = DYNAMIC_PARAMETER_NAME.matcher(p2.getName()); + matcher = COMMAND_ARGUMENT_PATTERN.matcher(p2.getName()); String indexString2 = null; while (matcher.find()) { indexString2 = matcher.group("commandIndex"); diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ExecuteStreamCommand/additionalDetails.html b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ExecuteStreamCommand/additionalDetails.html new file mode 100644 index 0000000000..23fd22f0da --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.ExecuteStreamCommand/additionalDetails.html @@ -0,0 +1,166 @@ + + + + + + ExecuteStreamCommand + + + + +

Description

+

The ExecuteStreamCommand processor provides a flexible way to integrate external commands and scripts into NiFi data flows. + ExecuteStreamCommand can pass the incoming FlowFile's content to the command that it executes similarly how piping works.

+ +

Configuration options

+ +

Working Directory

+

If not specified, NiFi root will be the default working directory.

+ +

Configuring command arguments

+

The ExecuteStreamCommand processor provides two ways to specify command arguments: using Dynamic Properties and the Command Arguments field.

+ +

Command Arguments field

+

This is the default. If there are multiple arguments, they need to be separated by a character specified in the Argument Delimiter field. + When needed, '-' and '--' can be provided, but in these cases Argument Delimiter should be a different character.

+ +

Consider that we want to list all files in a directory which is different from the working directory:

+ + + + + + + + + + +
Command PathCommand Arguments
ls-lah;/path/to/dir
+ +

NOTE: the command should be on $PATH or it should be in the working directory, otherwise path also should be specified.

+ +

Dynamic Properties

+

Arguments can be specified with Dynamic Properties. Dynamic Properties with the pattern of 'command.arguments.<commandIndex>' will be appended + to the command in ascending order.

+ +

The above example with dynamic properties would look like this:

+ + + + + + + + + + + + + + +
Property NameProperty Value
command.arguments.0-lah
command.arguments.1/path/to/dir
+ +

Configuring environment variables

+

In addition to specifying command arguments using the Command Argument field or Dynamic Properties, users can also use environment variables with + the ExecuteStreamCommand processor. Environment variables are a set of key-value pairs that can be accessed by processes running on the system. + ExecuteStreamCommand will treat every Dynamic Property as an environment variable that doesn't match the pattern 'command.arguments.<commandIndex>'.

+ +

Consider that we want to execute a Maven command with the processor. If there are multiple Java versions installed on the system, you can specify + which version will be used by setting the JAVA_HOME environment variable. The output FlowFile will looke like this if we run + mvn command with --version argument:

+ + +
+Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
+Maven home: /path/to/maven/home
+Java version: 11.0.18, vendor: Eclipse Adoptium, runtime: /path/to/default/java/home
+Default locale: en_US, platform encoding: UTF-8
+OS name: "mac os x", version: "13.1", arch: "x86_64", family: "mac"
+            
+
+ + + + + + + + + + +
Property NameProperty Value
JAVA_HOMEpath/to/another/java/home
+ +

After specifying the JAVA_HOME property, you can notice that maven is using a different runtime:

+ +
+Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
+Maven home: /path/to/maven/home
+Java version: 11.0.18, vendor: Eclipse Adoptium, runtime: /path/to/another/java/home
+Default locale: en_US, platform encoding: UTF-8
+OS name: "mac os x", version: "13.1", arch: "x86_64", family: "mac"
+            
+
+ +

Streaming input to the command

+

ExecuteStreamCommand passes the incoming FlowFile's content to the command that it executes similarly how piping works. + It is possible to disable this behavior with the Ignore STDIN property. In the above examples we didn't use the + incoming FlowFile's content, so in this case we could leverage this property for additional performance gain.

+ +

To utilize the streaming capability, consider that we want to use grep on the FlowFile. Let's presume that + we need to list all POST requests from an Apache HTTPD log:

+ +
+127.0.0.1 - - [03/May/2023:13:54:26 +0000] "GET /example-page HTTP/1.1" 200 4825 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101 Firefox/88.0"
+127.0.0.1 - - [03/May/2023:14:05:32 +0000] "POST /submit-form HTTP/1.1" 302 0 "http://localhost/example-page" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101 Firefox/88.0"
+127.0.0.1 - - [03/May/2023:14:10:48 +0000] "GET /image.jpg HTTP/1.1" 200 35785 "http://localhost/example-page" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101 Firefox/88.0"
+127.0.0.1 - - [03/May/2023:14:20:15 +0000] "GET /example-page HTTP/1.1" 200 4825 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101 Firefox/88.0"
+127.0.0.1 - - [03/May/2023:14:30:42 +0000] "GET /example-page HTTP/1.1" 200 4825 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101 Firefox/88.0"
+            
+
+ +

Processor configuration:

+ + + + + + + + + + + + + + + + + + + + + +
Working DirectoryCommand PathCommand Arguments StrategyCommand ArgumentsArgument DelimiterIgnore STDINOutput Destination AttributeMax Attribute Length
grepCommand Arguments PropertyPOST;false256
+ +

With this the emitted FlowFile on the "output stream" relationship should be:

+ +
+127.0.0.1 - - [03/May/2023:14:05:32 +0000] "POST /submit-form HTTP/1.1" 302 0 "http://localhost/example-page" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101 Firefox/88.0"
+            
+
+ + +