NIFI-10069 Updated multiple components to support Sensitive Dynamic Properties

- ExecuteScript
- ExecuteStreamCommand
- InvokeScriptedProcessor
- HikariCPConnectionPool

This closes #6085.

Signed-off-by: Kevin Doran <kdoran@apache.org>
This commit is contained in:
exceptionfactory 2022-05-31 11:00:29 -05:00 committed by Kevin Doran
parent 314232ca6c
commit 45cc3cefe5
4 changed files with 53 additions and 57 deletions

View File

@ -23,6 +23,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
@ -68,9 +69,10 @@ import java.util.Set;
+ "handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by "
+ "the script. If the handling is incomplete or incorrect, the session will be rolled back. Experimental: "
+ "Impact of sustained usage not yet verified.")
@SupportsSensitiveDynamicProperties
@DynamicProperty(
name = "A script engine property to update",
value = "The value to set it to",
name = "Script Engine Binding property",
value = "Binding property value passed to Script Runner",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description = "Updates a script engine property specified by the Dynamic Property's key with the value "
+ "specified by the Dynamic Property's value")

View File

@ -22,6 +22,7 @@ import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.Stateful;
import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
@ -76,7 +77,8 @@ import java.util.concurrent.atomic.AtomicReference;
+ "public void onStopped(ProcessContext context) methods to be invoked when the parent InvokeScriptedProcessor is scheduled or stopped, respectively. "
+ "NOTE: The script will be loaded when the processor is populated with property values, see the Restrictions section for more security implications. "
+ "Experimental: Impact of sustained usage not yet verified.")
@DynamicProperty(name = "A script engine property to update", value = "The value to set it to",
@SupportsSensitiveDynamicProperties
@DynamicProperty(name = "Script Engine Binding property", value = "Binding property value passed to Script Runner",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description = "Updates a script engine property specified by the Dynamic Property's key with the value specified by the Dynamic Property's value")
@Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER},
@ -301,7 +303,7 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
// store the updated validation results
validationResults.set(results);
// return whether there was any issues loading the configured script
// return whether there were any issues loading the configured script
return results.isEmpty();
}
@ -332,7 +334,7 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
// store the updated validation results
validationResults.set(results);
// return whether there was any issues loading the configured script
// return whether there were any issues loading the configured script
return results.isEmpty();
}
@ -360,7 +362,7 @@ public class InvokeScriptedProcessor extends AbstractSessionFactoryProcessor {
}
if (scriptRunner == null) {
throw new ProcessException("No script runner available!");
throw new ProcessException("No script runner available");
}
// get the engine and ensure its invocable
ScriptEngine scriptEngine = scriptRunner.getScriptEngine();

View File

@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperties;
@ -45,6 +46,7 @@ import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
@ -67,7 +69,6 @@ import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.standard.util.ArgumentUtils;
import org.apache.nifi.processors.standard.util.SoftLimitBoundedByteArrayOutputStream;
@ -148,6 +149,7 @@ import org.apache.nifi.stream.io.StreamUtils;
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({"command execution", "command", "stream", "execute"})
@CapabilityDescription("Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.")
@SupportsSensitiveDynamicProperties
@DynamicProperties({
@DynamicProperty(name = "An environment variable name", value = "An environment variable value",
description = "These environment variables are passed to the process spawned by this Processor"),
@ -182,7 +184,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
.description("The destination path for the flow file created from the command's output, if the returned status code is non-zero. "
+ "All flow files routed to this relationship will be penalized.")
.build();
private AtomicReference<Set<Relationship>> relationships = new AtomicReference<>();
private final AtomicReference<Set<Relationship>> relationships = new AtomicReference<>();
private final static Set<Relationship> OUTPUT_STREAM_RELATIONSHIP_SET;
private final static Set<Relationship> ATTRIBUTE_RELATIONSHIP_SET;
@ -352,7 +354,6 @@ public class ExecuteStreamCommand extends AbstractProcessor {
// get the number part of the name
Matcher matcher = DYNAMIC_PARAMETER_NAME.matcher(propertyDescriptorName);
if (matcher.matches()) {
final String commandIndex = matcher.group("commandIndex");
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.displayName(propertyDescriptorName)
@ -386,21 +387,19 @@ public class ExecuteStreamCommand extends AbstractProcessor {
if (!useDynamicPropertyArguments) {
commandArguments = context.getProperty(EXECUTION_ARGUMENTS).evaluateAttributeExpressions(inputFlowFile).getValue();
if (!StringUtils.isBlank(commandArguments)) {
for (String arg : ArgumentUtils
.splitArgs(commandArguments, context.getProperty(ARG_DELIMITER).getValue().charAt(0))) {
args.add(arg);
}
args.addAll(ArgumentUtils
.splitArgs(commandArguments, context.getProperty(ARG_DELIMITER).getValue().charAt(0)));
}
} else {
ArrayList<PropertyDescriptor> propertyDescriptors = new ArrayList<>();
List<PropertyDescriptor> propertyDescriptors = new ArrayList<>();
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
Matcher matcher = DYNAMIC_PARAMETER_NAME.matcher(entry.getKey().getName());
if (matcher.matches()) {
propertyDescriptors.add(entry.getKey());
}
}
Collections.sort(propertyDescriptors,(p1,p2) -> {
propertyDescriptors.sort((p1, p2) -> {
Matcher matcher = DYNAMIC_PARAMETER_NAME.matcher(p1.getName());
String indexString1 = null;
while (matcher.find()) {
@ -413,7 +412,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
}
final int index1 = Integer.parseInt(indexString1);
final int index2 = Integer.parseInt(indexString2);
if ( index1 > index2 ) {
if (index1 > index2) {
return 1;
} else if (index1 < index2) {
return -1;
@ -439,12 +438,13 @@ public class ExecuteStreamCommand extends AbstractProcessor {
final ProcessBuilder builder = new ProcessBuilder();
logger.debug("Executing and waiting for command {} with arguments {}", new Object[]{executeCommand, commandArguments});
// Avoid logging arguments that could contain sensitive values
logger.debug("Executing and waiting for command: {}", executeCommand);
File dir = null;
if (!StringUtils.isBlank(workingDir)) {
dir = new File(workingDir);
if (!dir.exists() && !dir.mkdirs()) {
logger.warn("Failed to create working directory {}, using current working directory {}", new Object[]{workingDir, System.getProperty("user.dir")});
logger.warn("Failed to create working directory {}, using current working directory {}", workingDir, System.getProperty("user.dir"));
}
}
final Map<String, String> environment = new HashMap<>();
@ -484,7 +484,6 @@ public class ExecuteStreamCommand extends AbstractProcessor {
try (final OutputStream pos = process.getOutputStream();
final InputStream pis = process.getInputStream();
final BufferedInputStream bis = new BufferedInputStream(pis)) {
int exitCode = -1;
final BufferedOutputStream bos = new BufferedOutputStream(pos);
FlowFile outputFlowFile = putToAttribute ? inputFlowFile : session.create(inputFlowFile);
@ -497,8 +496,8 @@ public class ExecuteStreamCommand extends AbstractProcessor {
outputFlowFile = session.putAttribute(outputFlowFile, attributeName, new String(callback.outputBuffer, 0, callback.size));
}
exitCode = callback.exitCode;
logger.debug("Execution complete for command: {}. Exited with code: {}", new Object[]{executeCommand, exitCode});
int exitCode = callback.exitCode;
logger.debug("Execution complete for command: {}. Exited with code: {}", executeCommand, exitCode);
Map<String, String> attributes = new HashMap<>();
@ -511,16 +510,15 @@ public class ExecuteStreamCommand extends AbstractProcessor {
} catch (IOException e) {
strBldr.append("Unknown...could not read Process's Std Error");
}
int length = strBldr.length() > 4000 ? 4000 : strBldr.length();
int length = Math.min(strBldr.length(), 4000);
attributes.put("execution.error", strBldr.substring(0, length));
final Relationship outputFlowFileRelationship = putToAttribute ? ORIGINAL_RELATIONSHIP : (exitCode != 0) ? NONZERO_STATUS_RELATIONSHIP : OUTPUT_STREAM_RELATIONSHIP;
if (exitCode == 0) {
logger.info("Transferring flow file {} to {}",
new Object[]{outputFlowFile,outputFlowFileRelationship.getName()});
logger.info("Transferring {} to {}", outputFlowFile, outputFlowFileRelationship.getName());
} else {
logger.error("Transferring flow file {} to {}. Executable command {} ended in an error: {}",
new Object[]{outputFlowFile,outputFlowFileRelationship.getName(), executeCommand, strBldr.toString()});
logger.error("Transferring {} to {}. Executable command {} ended in an error: {}",
outputFlowFile,outputFlowFileRelationship.getName(), executeCommand, strBldr.toString());
}
attributes.put("execution.status", Integer.toString(exitCode));
@ -536,16 +534,16 @@ public class ExecuteStreamCommand extends AbstractProcessor {
session.transfer(outputFlowFile, outputFlowFileRelationship);
if (!putToAttribute) {
logger.info("Transferring flow file {} to original", new Object[]{inputFlowFile});
logger.info("Transferring {} to original", inputFlowFile);
inputFlowFile = session.putAllAttributes(inputFlowFile, attributes);
session.transfer(inputFlowFile, ORIGINAL_RELATIONSHIP);
}
} catch (final IOException ex) {
} catch (final IOException e) {
// could not close Process related streams
logger.warn("Problem terminating Process {}", new Object[]{process}, ex);
logger.warn("Problem terminating Process {}", process, e);
} finally {
errorOut.delete();
FileUtils.deleteQuietly(errorOut);
process.destroy(); // last ditch effort to clean up that process.
}
}
@ -606,17 +604,13 @@ public class ExecuteStreamCommand extends AbstractProcessor {
}
}
} else {
outputFlowFile = session.write(outputFlowFile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
readStdoutReadable(ignoreStdin, stdinWritable, logger, incomingFlowFileIS);
StreamUtils.copy(stdoutReadable, out);
try {
exitCode = process.waitFor();
} catch (InterruptedException e) {
logger.warn("Command Execution Process was interrupted", e);
}
outputFlowFile = session.write(outputFlowFile, out -> {
readStdoutReadable(ignoreStdin, stdinWritable, logger, incomingFlowFileIS);
StreamUtils.copy(stdoutReadable, out);
try {
exitCode = process.waitFor();
} catch (InterruptedException e) {
logger.warn("Command Execution Process was interrupted", e);
}
});
}
@ -624,24 +618,20 @@ public class ExecuteStreamCommand extends AbstractProcessor {
}
private static void readStdoutReadable(final boolean ignoreStdin, final OutputStream stdinWritable,
final ComponentLog logger, final InputStream incomingFlowFileIS) throws IOException {
Thread writerThread = new Thread(new Runnable() {
@Override
public void run() {
if (!ignoreStdin) {
try {
StreamUtils.copy(incomingFlowFileIS, stdinWritable);
} catch (IOException e) {
// This is unlikely to occur, and isn't handled at the moment
// Bug captured in NIFI-1194
logger.error("Failed to write flow file to stdin due to {}", new Object[]{e}, e);
}
final ComponentLog logger, final InputStream incomingFlowFileIS) {
Thread writerThread = new Thread(() -> {
if (!ignoreStdin) {
try {
StreamUtils.copy(incomingFlowFileIS, stdinWritable);
} catch (IOException e) {
// This is unlikely to occur, and isn't handled at the moment
// Bug captured in NIFI-1194
logger.error("Failed to write FlowFile to Standard Input Stream", e);
}
// MUST close the output stream to the stdin so that whatever is reading knows
// there is no more data.
IOUtils.closeQuietly(stdinWritable);
}
// MUST close the output stream to the stdin so that whatever is reading knows
// there is no more data.
IOUtils.closeQuietly(stdinWritable);
});
writerThread.setDaemon(true);
writerThread.start();

View File

@ -20,6 +20,7 @@ import com.zaxxer.hikari.HikariDataSource;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.annotation.behavior.SupportsSensitiveDynamicProperties;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
@ -54,6 +55,7 @@ import java.util.stream.Collectors;
@RequiresInstanceClassLoading
@Tags({"dbcp", "hikari", "jdbc", "database", "connection", "pooling", "store"})
@CapabilityDescription("Provides Database Connection Pooling Service based on HikariCP. Connections can be asked from pool and returned after usage.")
@SupportsSensitiveDynamicProperties
@DynamicProperty(name = "JDBC property name", value = "JDBC property value", expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY,
description = "Specifies a property name and value to be set on the JDBC connection(s). "
+ "If Expression Language is used, evaluation will be performed upon the controller service being enabled. "