NIFI-10435 Masked sensitive arguments in ExecuteStreamCommand

This closes #6367

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
ravinarayansingh 2022-09-06 00:06:14 -05:00 committed by exceptionfactory
parent 5b565679df
commit 97b8f0e205
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
1 changed files with 70 additions and 62 deletions

View File

@ -16,39 +16,21 @@
*/
package org.apache.nifi.processors.standard;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.ProcessBuilder.Redirect;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperties;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
@ -74,6 +56,25 @@ import org.apache.nifi.processors.standard.util.ArgumentUtils;
import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.ProcessBuilder.Redirect;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* <p>
* This processor executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.
@ -142,7 +143,6 @@ import org.apache.nifi.stream.io.StreamUtils;
* </li>
* </ul>
* <p>
*
*/
@EventDriven
@SupportsBatching
@ -151,15 +151,15 @@ import org.apache.nifi.stream.io.StreamUtils;
@CapabilityDescription("Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.")
@SupportsSensitiveDynamicProperties
@DynamicProperties({
@DynamicProperty(name = "An environment variable name", value = "An environment variable value",
description = "These environment variables are passed to the process spawned by this Processor"),
@DynamicProperty(name = "command.argument.<NUMBER>", value = "Argument to be supplied to the command",
description = "These arguments are supplied to the process spawned by this Processor when using the "
+ "Command Arguments Strategy : Dynamic Property Arguments. The NUMBER will determine the order.")
@DynamicProperty(name = "An environment variable name", value = "An environment variable value",
description = "These environment variables are passed to the process spawned by this Processor"),
@DynamicProperty(name = "command.argument.<NUMBER>", value = "Argument to be supplied to the command",
description = "These arguments are supplied to the process spawned by this Processor when using the "
+ "Command Arguments Strategy : Dynamic Property Arguments. The NUMBER will determine the order.")
})
@WritesAttributes({
@WritesAttribute(attribute = "execution.command", description = "The name of the command executed"),
@WritesAttribute(attribute = "execution.command.args", description = "The semi-colon delimited list of arguments"),
@WritesAttribute(attribute = "execution.command.args", description = "The semi-colon delimited list of arguments. Sensitive properties will be masked"),
@WritesAttribute(attribute = "execution.status", description = "The exit status code returned from executing the command"),
@WritesAttribute(attribute = "execution.error", description = "Any error messages returned from executing the command")})
@Restricted(
@ -193,10 +193,10 @@ public class ExecuteStreamCommand extends AbstractProcessor {
public static final String executionArguments = "Command Arguments Property";
public static final String dynamicArguements = "Dynamic Property Arguments";
static final AllowableValue EXECUTION_ARGUMENTS_PROPERTY_STRATEGEY = new AllowableValue(executionArguments, executionArguments,
static final AllowableValue EXECUTION_ARGUMENTS_PROPERTY_STRATEGY = new AllowableValue(executionArguments, executionArguments,
"Arguments to be supplied to the executable are taken from the Command Arguments property");
static final AllowableValue DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY = new AllowableValue(dynamicArguements,dynamicArguements,
static final AllowableValue DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY = new AllowableValue(dynamicArguements, dynamicArguements,
"Arguments to be supplied to the executable are taken from dynamic properties");
@ -215,8 +215,8 @@ public class ExecuteStreamCommand extends AbstractProcessor {
.description("Strategy for configuring arguments to be supplied to the command.")
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.allowableValues(EXECUTION_ARGUMENTS_PROPERTY_STRATEGEY.getValue(),DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue())
.defaultValue(EXECUTION_ARGUMENTS_PROPERTY_STRATEGEY.getValue())
.allowableValues(EXECUTION_ARGUMENTS_PROPERTY_STRATEGY.getValue(), DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue())
.defaultValue(EXECUTION_ARGUMENTS_PROPERTY_STRATEGY.getValue())
.build();
static final PropertyDescriptor EXECUTION_ARGUMENTS = new PropertyDescriptor.Builder()
@ -285,6 +285,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
.build();
private static final List<PropertyDescriptor> PROPERTIES;
private static final String MASKED_ARGUMENT = "********";
static {
List<PropertyDescriptor> props = new ArrayList<>();
@ -344,24 +345,24 @@ public class ExecuteStreamCommand extends AbstractProcessor {
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
if (!propertyDescriptorName.startsWith("command.argument.")) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.description(
"Sets the environment variable '" + propertyDescriptorName + "' for the process' environment")
.dynamic(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
.name(propertyDescriptorName)
.description(
"Sets the environment variable '" + propertyDescriptorName + "' for the process' environment")
.dynamic(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
}
// get the number part of the name
Matcher matcher = DYNAMIC_PARAMETER_NAME.matcher(propertyDescriptorName);
if (matcher.matches()) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.displayName(propertyDescriptorName)
.description("Argument passed to command")
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.build();
.name(propertyDescriptorName)
.displayName(propertyDescriptorName)
.description("Argument passed to command")
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
.build();
}
return null;
}
@ -374,6 +375,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
}
final ArrayList<String> args = new ArrayList<>();
final ArrayList<String> argumentAttributeValue = new ArrayList<>();
final boolean putToAttribute = context.getProperty(PUT_OUTPUT_IN_ATTRIBUTE).isSet();
final PropertyValue argumentsStrategyPropertyValue = context.getProperty(ARGUMENTS_STRATEGY);
final boolean useDynamicPropertyArguments = argumentsStrategyPropertyValue.isSet() && argumentsStrategyPropertyValue.getValue().equals(DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
@ -419,21 +421,27 @@ public class ExecuteStreamCommand extends AbstractProcessor {
}
return 0;
});
for ( final PropertyDescriptor descriptor : propertyDescriptors) {
args.add(context.getProperty(descriptor.getName()).evaluateAttributeExpressions(inputFlowFile).getValue());
}
if (args.size() > 0) {
final StringBuilder builder = new StringBuilder();
for ( int i = 1; i < args.size(); i++) {
builder.append(args.get(i)).append("\t");
for (final PropertyDescriptor descriptor : propertyDescriptors) {
String argValue = context.getProperty(descriptor.getName()).evaluateAttributeExpressions(inputFlowFile).getValue();
if (descriptor.isSensitive()) {
argumentAttributeValue.add(MASKED_ARGUMENT);
} else {
argumentAttributeValue.add(argValue);
}
args.add(argValue);
}
if (argumentAttributeValue.size() > 0) {
final StringBuilder builder = new StringBuilder();
for (String s : argumentAttributeValue) {
builder.append(s).append("\t");
}
commandArguments = builder.toString().trim();
} else {
commandArguments = "";
}
}
final String workingDir = context.getProperty(WORKING_DIR).evaluateAttributeExpressions(inputFlowFile).getValue();
final ProcessBuilder builder = new ProcessBuilder();
@ -488,7 +496,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
FlowFile outputFlowFile = putToAttribute ? inputFlowFile : session.create(inputFlowFile);
ProcessStreamWriterCallback callback = new ProcessStreamWriterCallback(ignoreStdin, bos, bis, logger,
attributeName, session, outputFlowFile, process,putToAttribute,attributeSize);
attributeName, session, outputFlowFile, process, putToAttribute, attributeSize);
session.read(inputFlowFile, callback);
outputFlowFile = callback.outputFlowFile;
@ -518,7 +526,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
logger.info("Transferring {} to {}", outputFlowFile, outputFlowFileRelationship.getName());
} else {
logger.error("Transferring {} to {}. Executable command {} ended in an error: {}",
outputFlowFile,outputFlowFileRelationship.getName(), executeCommand, strBldr.toString());
outputFlowFile, outputFlowFileRelationship.getName(), executeCommand, strBldr.toString());
}
attributes.put("execution.status", Integer.toString(exitCode));
@ -530,7 +538,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
outputFlowFile = session.penalize(outputFlowFile);
}
// This will transfer the FlowFile that received the stream output to its destined relationship.
// In the event the stream is put to the an attribute of the original, it will be transferred here.
// In the event the stream is put to an attribute of the original, it will be transferred here.
session.transfer(outputFlowFile, outputFlowFileRelationship);
if (!putToAttribute) {
@ -565,7 +573,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
byte[] outputBuffer;
int size;
public ProcessStreamWriterCallback(boolean ignoreStdin, OutputStream stdinWritable, InputStream stdoutReadable,ComponentLog logger, String attributeName,
public ProcessStreamWriterCallback(boolean ignoreStdin, OutputStream stdinWritable, InputStream stdoutReadable, ComponentLog logger, String attributeName,
ProcessSession session, FlowFile outputFlowFile, Process process, boolean putToAttribute, int attributeSize) {
this.ignoreStdin = ignoreStdin;
this.stdinWritable = stdinWritable;
@ -586,11 +594,11 @@ public class ExecuteStreamCommand extends AbstractProcessor {
readStdoutReadable(ignoreStdin, stdinWritable, logger, incomingFlowFileIS);
final long longSize = StreamUtils.copy(stdoutReadable, softLimitBoundedBAOS);
// Because the outputstream has a cap that the copy doesn't know about, adjust
// Because the outputStream has a cap that the copy doesn't know about, adjust
// the actual size
if (longSize > attributeSize) { // Explicit cast for readability
size = attributeSize;
} else{
} else {
size = (int) longSize; // Note: safe cast, longSize is limited by attributeSize
}