NIFI-11524 Improved ExecuteStreamCommand documentation and configuration

This closes #7228

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Nandor Soma Abonyi 2023-05-04 10:52:11 +02:00 committed by exceptionfactory
parent e302f2aff7
commit 2049d5068b
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
2 changed files with 236 additions and 57 deletions

View File

@ -65,6 +65,7 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.lang.ProcessBuilder.Redirect;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@ -148,14 +149,15 @@ import java.util.regex.Pattern;
@SupportsBatching
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"command execution", "command", "stream", "execute"})
@CapabilityDescription("Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.")
@CapabilityDescription("The ExecuteStreamCommand processor provides a flexible way to integrate external commands and scripts into NiFi data flows."
+ " ExecuteStreamCommand can pass the incoming FlowFile's content to the command that it executes similarly how piping works.")
@SupportsSensitiveDynamicProperties
@DynamicProperties({
@DynamicProperty(name = "An environment variable name", value = "An environment variable value",
description = "These environment variables are passed to the process spawned by this Processor"),
@DynamicProperty(name = "command.argument.<NUMBER>", value = "Argument to be supplied to the command",
@DynamicProperty(name = "command.argument.<commandIndex>", value = "Argument to be supplied to the command",
description = "These arguments are supplied to the process spawned by this Processor when using the "
+ "Command Arguments Strategy : Dynamic Property Arguments. The NUMBER will determine the order.")
+ "Command Arguments Strategy : Dynamic Property Arguments. <commandIndex> is a number and it will determine the order.")
})
@WritesAttributes({
@WritesAttribute(attribute = "execution.command", description = "The name of the command executed"),
@ -189,7 +191,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
private final static Set<Relationship> OUTPUT_STREAM_RELATIONSHIP_SET;
private final static Set<Relationship> ATTRIBUTE_RELATIONSHIP_SET;
private static final Pattern DYNAMIC_PARAMETER_NAME = Pattern.compile("command\\.argument\\.(?<commandIndex>[0-9]+)$");
private static final Pattern COMMAND_ARGUMENT_PATTERN = Pattern.compile("command\\.argument\\.(?<commandIndex>[0-9]+)$");
public static final String executionArguments = "Command Arguments Property";
public static final String dynamicArguements = "Dynamic Property Arguments";
@ -197,8 +199,15 @@ public class ExecuteStreamCommand extends AbstractProcessor {
"Arguments to be supplied to the executable are taken from the Command Arguments property");
static final AllowableValue DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY = new AllowableValue(dynamicArguements, dynamicArguements,
"Arguments to be supplied to the executable are taken from dynamic properties");
"Arguments to be supplied to the executable are taken from dynamic properties with pattern of 'command.argument.<commandIndex>'");
static final PropertyDescriptor WORKING_DIR = new PropertyDescriptor.Builder()
.name("Working Directory")
.description("The directory to use as the current working directory when executing the command")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.createDirectoryExistsValidator(true, true))
.required(false)
.build();
private static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true);
static final PropertyDescriptor EXECUTION_COMMAND = new PropertyDescriptor.Builder()
@ -215,38 +224,36 @@ public class ExecuteStreamCommand extends AbstractProcessor {
.description("Strategy for configuring arguments to be supplied to the command.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.allowableValues(EXECUTION_ARGUMENTS_PROPERTY_STRATEGY.getValue(), DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue())
.allowableValues(EXECUTION_ARGUMENTS_PROPERTY_STRATEGY, DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY)
.defaultValue(EXECUTION_ARGUMENTS_PROPERTY_STRATEGY.getValue())
.build();
static final PropertyDescriptor EXECUTION_ARGUMENTS = new PropertyDescriptor.Builder()
.name("Command Arguments")
.description("The arguments to supply to the executable delimited by the ';' character.")
.dependsOn(ARGUMENTS_STRATEGY, EXECUTION_ARGUMENTS_PROPERTY_STRATEGY)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(new Validator() {
@Override
public ValidationResult validate(String subject, String input, ValidationContext context) {
ValidationResult result = new ValidationResult.Builder()
.subject(subject).valid(true).input(input).build();
List<String> args = ArgumentUtils.splitArgs(input, context.getProperty(ARG_DELIMITER).getValue().charAt(0));
for (String arg : args) {
ValidationResult valResult = ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR.validate(subject, arg, context);
if (!valResult.isValid()) {
result = valResult;
break;
}
.addValidator((subject, input, context) -> {
ValidationResult result = new ValidationResult.Builder()
.subject(subject).valid(true).input(input).build();
List<String> args = ArgumentUtils.splitArgs(input, context.getProperty(ExecuteStreamCommand.ARG_DELIMITER).getValue().charAt(0));
for (String arg : args) {
ValidationResult valResult = ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR.validate(subject, arg, context);
if (!valResult.isValid()) {
result = valResult;
break;
}
return result;
}
return result;
}).build();
static final PropertyDescriptor WORKING_DIR = new PropertyDescriptor.Builder()
.name("Working Directory")
.description("The directory to use as the current working directory when executing the command")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.createDirectoryExistsValidator(true, true))
.required(false)
static final PropertyDescriptor ARG_DELIMITER = new PropertyDescriptor.Builder()
.name("Argument Delimiter")
.description("Delimiter to use to separate arguments for a command [default: ;]. Must be a single character")
.dependsOn(ARGUMENTS_STRATEGY, EXECUTION_ARGUMENTS_PROPERTY_STRATEGY)
.addValidator(StandardValidators.SINGLE_CHAR_VALIDATOR)
.required(true)
.defaultValue(";")
.build();
static final PropertyDescriptor IGNORE_STDIN = new PropertyDescriptor.Builder()
@ -273,33 +280,21 @@ public class ExecuteStreamCommand extends AbstractProcessor {
.defaultValue("256")
.build();
private static final Validator characterValidator = new StandardValidators.StringLengthValidator(1, 1);
static final PropertyDescriptor ARG_DELIMITER = new PropertyDescriptor.Builder()
.name("Argument Delimiter")
.description("Delimiter to use to separate arguments for a command [default: ;]. Must be a single character")
.addValidator(Validator.VALID)
.addValidator(characterValidator)
.required(true)
.defaultValue(";")
.build();
private static final List<PropertyDescriptor> PROPERTIES;
private static final String MASKED_ARGUMENT = "********";
static {
List<PropertyDescriptor> props = new ArrayList<>();
props.add(WORKING_DIR);
props.add(EXECUTION_COMMAND);
props.add(ARGUMENTS_STRATEGY);
props.add(EXECUTION_ARGUMENTS);
props.add(EXECUTION_COMMAND);
props.add(IGNORE_STDIN);
props.add(WORKING_DIR);
props.add(ARG_DELIMITER);
props.add(IGNORE_STDIN);
props.add(PUT_OUTPUT_IN_ATTRIBUTE);
props.add(PUT_ATTRIBUTE_MAX_LENGTH);
PROPERTIES = Collections.unmodifiableList(props);
Set<Relationship> outputStreamRelationships = new HashSet<>();
outputStreamRelationships.add(OUTPUT_STREAM_RELATIONSHIP);
outputStreamRelationships.add(ORIGINAL_RELATIONSHIP);
@ -343,17 +338,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
if (!propertyDescriptorName.startsWith("command.argument.")) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.description(
"Sets the environment variable '" + propertyDescriptorName + "' for the process' environment")
.dynamic(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
}
// get the number part of the name
Matcher matcher = DYNAMIC_PARAMETER_NAME.matcher(propertyDescriptorName);
final Matcher matcher = COMMAND_ARGUMENT_PATTERN.matcher(propertyDescriptorName);
if (matcher.matches()) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
@ -363,8 +348,37 @@ public class ExecuteStreamCommand extends AbstractProcessor {
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.build();
} else {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment")
.dynamic(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
}
return null;
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
final List<ValidationResult> validationResults = new ArrayList<>(super.customValidate(validationContext));
final String argumentStrategy = validationContext.getProperty(ARGUMENTS_STRATEGY).getValue();
if (DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue() != argumentStrategy) {
for (final PropertyDescriptor propertyDescriptor : validationContext.getProperties().keySet()) {
if (!propertyDescriptor.isDynamic()) {
continue;
}
final String propertyName = propertyDescriptor.getName();
final Matcher matcher = COMMAND_ARGUMENT_PATTERN.matcher(propertyName);
if (matcher.matches()) {
logger.warn("[{}] should be set to [{}] when command arguments are supplied as Dynamic Properties. The property [{}] will be ignored.",
ARGUMENTS_STRATEGY.getDisplayName(), DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getDisplayName(), propertyName);
}
}
}
return validationResults;
}
@Override
@ -393,21 +407,20 @@ public class ExecuteStreamCommand extends AbstractProcessor {
.splitArgs(commandArguments, context.getProperty(ARG_DELIMITER).getValue().charAt(0)));
}
} else {
List<PropertyDescriptor> propertyDescriptors = new ArrayList<>();
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
Matcher matcher = DYNAMIC_PARAMETER_NAME.matcher(entry.getKey().getName());
Matcher matcher = COMMAND_ARGUMENT_PATTERN.matcher(entry.getKey().getName());
if (matcher.matches()) {
propertyDescriptors.add(entry.getKey());
}
}
propertyDescriptors.sort((p1, p2) -> {
Matcher matcher = DYNAMIC_PARAMETER_NAME.matcher(p1.getName());
Matcher matcher = COMMAND_ARGUMENT_PATTERN.matcher(p1.getName());
String indexString1 = null;
while (matcher.find()) {
indexString1 = matcher.group("commandIndex");
}
matcher = DYNAMIC_PARAMETER_NAME.matcher(p2.getName());
matcher = COMMAND_ARGUMENT_PATTERN.matcher(p2.getName());
String indexString2 = null;
while (matcher.find()) {
indexString2 = matcher.group("commandIndex");

View File

@ -0,0 +1,166 @@
<!DOCTYPE html>
<html lang="en">
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head>
<meta charset="utf-8" />
<title>ExecuteStreamCommand</title>
<link rel="stylesheet" href="../../../../../css/component-usage.css" type="text/css" />
</head>
<body>
<h2>Description</h2>
<p>The ExecuteStreamCommand processor provides a flexible way to integrate external commands and scripts into NiFi data flows.
ExecuteStreamCommand can pass the incoming FlowFile's content to the command that it executes similarly how piping works.</p>
<h2>Configuration options</h2>
<h3>Working Directory</h3>
<p>If not specified, NiFi root will be the default working directory.</p>
<h3>Configuring command arguments</h3>
<p>The ExecuteStreamCommand processor provides two ways to specify command arguments: using Dynamic Properties and the Command Arguments field.</p>
<h4>Command Arguments field</h4>
<p>This is the default. If there are multiple arguments, they need to be separated by a character specified in the Argument Delimiter field.
When needed, '-' and '--' can be provided, but in these cases Argument Delimiter should be a different character.</p>
<p>Consider that we want to list all files in a directory which is different from the working directory:</p>
<table>
<tr>
<th>Command Path</th>
<th>Command Arguments</th>
</tr>
<tr>
<td>ls</td>
<td>-lah;/path/to/dir</td>
</tr>
</table>
<p><b>NOTE:</b> the command should be on <code>$PATH</code> or it should be in the working directory, otherwise path also should be specified.</p>
<h4>Dynamic Properties</h4>
<p>Arguments can be specified with Dynamic Properties. Dynamic Properties with the pattern of 'command.arguments.&lt;commandIndex&gt;' will be appended
to the command in ascending order.</p>
<p>The above example with dynamic properties would look like this:</p>
<table>
<tr>
<th>Property Name</th>
<th>Property Value</th>
</tr>
<tr>
<td>command.arguments.0</td>
<td>-lah</td>
</tr>
<tr>
<td>command.arguments.1</td>
<td>/path/to/dir</td>
</tr>
</table>
<h3>Configuring environment variables</h3>
<p>In addition to specifying command arguments using the Command Argument field or Dynamic Properties, users can also use environment variables with
the ExecuteStreamCommand processor. Environment variables are a set of key-value pairs that can be accessed by processes running on the system.
ExecuteStreamCommand will treat every Dynamic Property as an environment variable that doesn't match the pattern 'command.arguments.&lt;commandIndex&gt;'.</p>
<p>Consider that we want to execute a Maven command with the processor. If there are multiple Java versions installed on the system, you can specify
which version will be used by setting the <code>JAVA_HOME</code> environment variable. The output FlowFile will looke like this if we run
<code>mvn</code> command with <code>--version</code> argument:</p>
<code>
<pre style="overflow:scroll">
Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
Maven home: /path/to/maven/home
Java version: 11.0.18, vendor: Eclipse Adoptium, runtime: /path/to/default/java/home
Default locale: en_US, platform encoding: UTF-8
OS name: "mac os x", version: "13.1", arch: "x86_64", family: "mac"
</pre>
</code>
<table>
<tr>
<th>Property Name</th>
<th>Property Value</th>
</tr>
<tr>
<td>JAVA_HOME</td>
<td>path/to/another/java/home</td>
</tr>
</table>
<p>After specifying the <code>JAVA_HOME</code> property, you can notice that maven is using a different runtime:</p>
<code>
<pre>
Apache Maven 3.8.6 (84538c9988a25aec085021c365c560670ad80f63)
Maven home: /path/to/maven/home
Java version: 11.0.18, vendor: Eclipse Adoptium, runtime: /path/to/another/java/home
Default locale: en_US, platform encoding: UTF-8
OS name: "mac os x", version: "13.1", arch: "x86_64", family: "mac"
</pre>
</code>
<h3>Streaming input to the command</h3>
<p>ExecuteStreamCommand passes the incoming FlowFile's content to the command that it executes similarly how piping works.
It is possible to disable this behavior with the Ignore STDIN property. In the above examples we didn't use the
incoming FlowFile's content, so in this case we could leverage this property for additional performance gain.</p>
<p>To utilize the streaming capability, consider that we want to use <code>grep</code> on the FlowFile. Let's presume that
we need to list all <code>POST</code> requests from an Apache HTTPD log:</p>
<code>
<pre style="overflow:scroll">
127.0.0.1 - - [03/May/2023:13:54:26 +0000] "GET /example-page HTTP/1.1" 200 4825 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101 Firefox/88.0"
127.0.0.1 - - [03/May/2023:14:05:32 +0000] "POST /submit-form HTTP/1.1" 302 0 "http://localhost/example-page" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101 Firefox/88.0"
127.0.0.1 - - [03/May/2023:14:10:48 +0000] "GET /image.jpg HTTP/1.1" 200 35785 "http://localhost/example-page" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101 Firefox/88.0"
127.0.0.1 - - [03/May/2023:14:20:15 +0000] "GET /example-page HTTP/1.1" 200 4825 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101 Firefox/88.0"
127.0.0.1 - - [03/May/2023:14:30:42 +0000] "GET /example-page HTTP/1.1" 200 4825 "-" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101 Firefox/88.0"
</pre>
</code>
<p>Processor configuration:</p>
<table>
<tr>
<th>Working Directory</th>
<th>Command Path</th>
<th>Command Arguments Strategy</th>
<th>Command Arguments</th>
<th>Argument Delimiter</th>
<th>Ignore STDIN</th>
<th>Output Destination Attribute</th>
<th>Max Attribute Length</th>
</tr>
<tr>
<td></td>
<td>grep</td>
<td>Command Arguments Property</td>
<td>POST</td>
<td>;</td>
<td>false</td>
<td></td>
<td>256</td>
</tr>
</table>
<p>With this the emitted FlowFile on the "output stream" relationship should be:</p>
<code>
<pre style="overflow:scroll">
127.0.0.1 - - [03/May/2023:14:05:32 +0000] "POST /submit-form HTTP/1.1" 302 0 "http://localhost/example-page" "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:88.0) Gecko/20100101 Firefox/88.0"
</pre>
</code>
</body>
</html>