mirror of https://github.com/apache/nifi.git
NIFI-3221 This closes #3396. Add a new property for setting the argument passing strategy, either the existing parameter, or by adding new dynamic parameters, along with implementation and tests
This allows for passing arguments with quotes. Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
parent
d1fd1f5092
commit
4b509aa5a5
|
@ -32,9 +32,12 @@ import java.util.List;
|
|||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.nifi.annotation.behavior.DynamicProperties;
|
||||
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||
import org.apache.nifi.annotation.behavior.InputRequirement;
|
||||
|
@ -46,7 +49,9 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
|
|||
import org.apache.nifi.annotation.behavior.WritesAttributes;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.components.AllowableValue;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.components.RequiredPermission;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
|
@ -83,6 +88,15 @@ import org.apache.nifi.stream.io.StreamUtils;
|
|||
* <li>Supports expression language: true</li>
|
||||
* </ul>
|
||||
* </li>
|
||||
* <li>Arguments Strategy
|
||||
* <ul>
|
||||
* <li>Selects the strategy to use for arguments to the executable</li>
|
||||
* <ul>
|
||||
* <li>Command Arguments Property: Use the delimited list of arguments from the Command Arguments Property. Does not support quotations in parameters.</li>
|
||||
* <li>Dynamic Property Arguments: Use Dynamic Properties, with each property a separate argument. Does support quotes.</li>
|
||||
* </ul>
|
||||
* </ul>
|
||||
* </li>
|
||||
* <li>Command Arguments
|
||||
* <ul>
|
||||
* <li>The arguments to supply to the executable delimited by the ';' character. Each argument may be an Expression Language statement.</li>
|
||||
|
@ -134,7 +148,13 @@ import org.apache.nifi.stream.io.StreamUtils;
|
|||
@InputRequirement(Requirement.INPUT_REQUIRED)
|
||||
@Tags({"command execution", "command", "stream", "execute"})
|
||||
@CapabilityDescription("Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.")
|
||||
@DynamicProperty(name = "An environment variable name", value = "An environment variable value", description = "These environment variables are passed to the process spawned by this Processor")
|
||||
@DynamicProperties({
|
||||
@DynamicProperty(name = "An environment variable name", value = "An environment variable value",
|
||||
description = "These environment variables are passed to the process spawned by this Processor"),
|
||||
@DynamicProperty(name = "command.argument.<NUMBER>", value = "Argument to be supplied to the command",
|
||||
description = "These arguments are supplied to the process spawned by this Processor when using the "
|
||||
+ "Command Arguments Strategy : Dynamic Property Arguments. The NUMBER will determine the order.")
|
||||
})
|
||||
@WritesAttributes({
|
||||
@WritesAttribute(attribute = "execution.command", description = "The name of the command executed"),
|
||||
@WritesAttribute(attribute = "execution.command.args", description = "The semi-colon delimited list of arguments"),
|
||||
|
@ -167,6 +187,17 @@ public class ExecuteStreamCommand extends AbstractProcessor {
|
|||
private final static Set<Relationship> OUTPUT_STREAM_RELATIONSHIP_SET;
|
||||
private final static Set<Relationship> ATTRIBUTE_RELATIONSHIP_SET;
|
||||
|
||||
private static final Pattern DYNAMIC_PARAMETER_NAME = Pattern.compile("command\\.argument\\.(?<commandIndex>[0-9]+)$");
|
||||
public static final String executionArguments = "Command Arguments Property";
|
||||
public static final String dynamicArguements = "Dynamic Property Arguments";
|
||||
|
||||
static final AllowableValue EXECUTION_ARGUMENTS_PROPERTY_STRATEGEY = new AllowableValue(executionArguments, executionArguments,
|
||||
"Arguments to be supplied to the executable are taken from the Command Arguments property");
|
||||
|
||||
static final AllowableValue DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY = new AllowableValue(dynamicArguements,dynamicArguements,
|
||||
"Arguments to be supplied to the executable are taken from dynamic properties");
|
||||
|
||||
|
||||
private static final Validator ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR = StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true);
|
||||
static final PropertyDescriptor EXECUTION_COMMAND = new PropertyDescriptor.Builder()
|
||||
.name("Command Path")
|
||||
|
@ -176,6 +207,16 @@ public class ExecuteStreamCommand extends AbstractProcessor {
|
|||
.required(true)
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor ARGUMENTS_STRATEGY = new PropertyDescriptor.Builder()
|
||||
.name("argumentsStrategy")
|
||||
.displayName("Command Arguments Strategy")
|
||||
.description("Strategy for configuring arguments to be supplied to the command.")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
|
||||
.required(false)
|
||||
.allowableValues(EXECUTION_ARGUMENTS_PROPERTY_STRATEGEY.getValue(),DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue())
|
||||
.defaultValue(EXECUTION_ARGUMENTS_PROPERTY_STRATEGEY.getValue())
|
||||
.build();
|
||||
|
||||
static final PropertyDescriptor EXECUTION_ARGUMENTS = new PropertyDescriptor.Builder()
|
||||
.name("Command Arguments")
|
||||
.description("The arguments to supply to the executable delimited by the ';' character.")
|
||||
|
@ -245,6 +286,7 @@ public class ExecuteStreamCommand extends AbstractProcessor {
|
|||
|
||||
static {
|
||||
List<PropertyDescriptor> props = new ArrayList<>();
|
||||
props.add(ARGUMENTS_STRATEGY);
|
||||
props.add(EXECUTION_ARGUMENTS);
|
||||
props.add(EXECUTION_COMMAND);
|
||||
props.add(IGNORE_STDIN);
|
||||
|
@ -298,12 +340,29 @@ public class ExecuteStreamCommand extends AbstractProcessor {
|
|||
|
||||
@Override
|
||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||
return new PropertyDescriptor.Builder()
|
||||
if (!propertyDescriptorName.startsWith("command.argument.")) {
|
||||
return new PropertyDescriptor.Builder()
|
||||
.name(propertyDescriptorName)
|
||||
.description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment")
|
||||
.description(
|
||||
"Sets the environment variable '" + propertyDescriptorName + "' for the process' environment")
|
||||
.dynamic(true)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.build();
|
||||
}
|
||||
// get the number part of the name
|
||||
Matcher matcher = DYNAMIC_PARAMETER_NAME.matcher(propertyDescriptorName);
|
||||
if (matcher.matches()) {
|
||||
final String commandIndex = matcher.group("commandIndex");
|
||||
return new PropertyDescriptor.Builder()
|
||||
.name(propertyDescriptorName)
|
||||
.displayName(propertyDescriptorName)
|
||||
.description("Argument passed to command")
|
||||
.dynamic(true)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.addValidator(ATTRIBUTE_EXPRESSION_LANGUAGE_VALIDATOR)
|
||||
.build();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -315,18 +374,67 @@ public class ExecuteStreamCommand extends AbstractProcessor {
|
|||
|
||||
final ArrayList<String> args = new ArrayList<>();
|
||||
final boolean putToAttribute = context.getProperty(PUT_OUTPUT_IN_ATTRIBUTE).isSet();
|
||||
final PropertyValue argumentsStrategyPropertyValue = context.getProperty(ARGUMENTS_STRATEGY);
|
||||
final boolean useDynamicPropertyArguments = argumentsStrategyPropertyValue.isSet() && argumentsStrategyPropertyValue.getValue().equals(DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
|
||||
final Integer attributeSize = context.getProperty(PUT_ATTRIBUTE_MAX_LENGTH).asInteger();
|
||||
final String attributeName = context.getProperty(PUT_OUTPUT_IN_ATTRIBUTE).getValue();
|
||||
|
||||
final String executeCommand = context.getProperty(EXECUTION_COMMAND).evaluateAttributeExpressions(inputFlowFile).getValue();
|
||||
args.add(executeCommand);
|
||||
final String commandArguments = context.getProperty(EXECUTION_ARGUMENTS).evaluateAttributeExpressions(inputFlowFile).getValue();
|
||||
final boolean ignoreStdin = Boolean.parseBoolean(context.getProperty(IGNORE_STDIN).getValue());
|
||||
if (!StringUtils.isBlank(commandArguments)) {
|
||||
for (String arg : ArgumentUtils.splitArgs(commandArguments, context.getProperty(ARG_DELIMITER).getValue().charAt(0))) {
|
||||
args.add(arg);
|
||||
final String commandArguments;
|
||||
if (!useDynamicPropertyArguments) {
|
||||
commandArguments = context.getProperty(EXECUTION_ARGUMENTS).evaluateAttributeExpressions(inputFlowFile).getValue();
|
||||
if (!StringUtils.isBlank(commandArguments)) {
|
||||
for (String arg : ArgumentUtils
|
||||
.splitArgs(commandArguments, context.getProperty(ARG_DELIMITER).getValue().charAt(0))) {
|
||||
args.add(arg);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
||||
ArrayList<PropertyDescriptor> propertyDescriptors = new ArrayList<>();
|
||||
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
|
||||
Matcher matcher = DYNAMIC_PARAMETER_NAME.matcher(entry.getKey().getName());
|
||||
if (matcher.matches()) {
|
||||
propertyDescriptors.add(entry.getKey());
|
||||
}
|
||||
}
|
||||
Collections.sort(propertyDescriptors,(p1,p2) -> {
|
||||
Matcher matcher = DYNAMIC_PARAMETER_NAME.matcher(p1.getName());
|
||||
String indexString1 = null;
|
||||
while (matcher.find()) {
|
||||
indexString1 = matcher.group("commandIndex");
|
||||
}
|
||||
matcher = DYNAMIC_PARAMETER_NAME.matcher(p2.getName());
|
||||
String indexString2 = null;
|
||||
while (matcher.find()) {
|
||||
indexString2 = matcher.group("commandIndex");
|
||||
}
|
||||
final int index1 = Integer.parseInt(indexString1);
|
||||
final int index2 = Integer.parseInt(indexString2);
|
||||
if ( index1 > index2 ) {
|
||||
return 1;
|
||||
} else if (index1 < index2) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
});
|
||||
for ( final PropertyDescriptor descriptor : propertyDescriptors) {
|
||||
args.add(context.getProperty(descriptor.getName()).evaluateAttributeExpressions(inputFlowFile).getValue());
|
||||
}
|
||||
if (args.size() > 0) {
|
||||
final StringBuilder builder = new StringBuilder();
|
||||
|
||||
for ( int i = 1; i < args.size(); i++) {
|
||||
builder.append(args.get(i)).append("\t");
|
||||
}
|
||||
commandArguments = builder.toString().trim();
|
||||
} else {
|
||||
commandArguments = "";
|
||||
}
|
||||
}
|
||||
|
||||
final String workingDir = context.getProperty(WORKING_DIR).evaluateAttributeExpressions(inputFlowFile).getValue();
|
||||
|
||||
final ProcessBuilder builder = new ProcessBuilder();
|
||||
|
|
|
@ -23,13 +23,21 @@ import static org.junit.Assert.assertTrue;
|
|||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.processors.standard.util.ArgumentUtils;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
|
@ -79,6 +87,51 @@ public class TestExecuteStreamCommand {
|
|||
assertEquals(outputFlowFile.getAttribute("execution.command.args"), originalFlowFile.getAttribute("execution.command.args"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteJarDynamicPropArgs() throws Exception {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestSuccess.jar");
|
||||
File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
|
||||
String jarPath = exJar.getAbsolutePath();
|
||||
exJar.setExecutable(true);
|
||||
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
|
||||
controller.enqueue(dummy.toPath());
|
||||
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
|
||||
controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
|
||||
PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.1")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp1, "-jar");
|
||||
PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.2")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp2, jarPath);
|
||||
controller.run(1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1);
|
||||
|
||||
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
|
||||
MockFlowFile outputFlowFile = flowFiles.get(0);
|
||||
byte[] byteArray = outputFlowFile.toByteArray();
|
||||
String result = new String(byteArray);
|
||||
assertTrue(Pattern.compile("Test was a success\r?\n").matcher(result).find());
|
||||
assertEquals("0", outputFlowFile.getAttribute("execution.status"));
|
||||
assertEquals("java", outputFlowFile.getAttribute("execution.command"));
|
||||
assertEquals("-jar", outputFlowFile.getAttribute("execution.command.args").substring(0, 4).trim());
|
||||
String attribute = outputFlowFile.getAttribute("execution.command.args");
|
||||
String expected = "src" + File.separator + "test" + File.separator + "resources" + File.separator + "ExecuteCommand" + File.separator + "TestSuccess.jar";
|
||||
assertEquals(expected, attribute.substring(attribute.length() - expected.length()));
|
||||
|
||||
MockFlowFile originalFlowFile = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP).get(0);
|
||||
assertEquals(outputFlowFile.getAttribute("execution.status"), originalFlowFile.getAttribute("execution.status"));
|
||||
assertEquals(outputFlowFile.getAttribute("execution.command"), originalFlowFile.getAttribute("execution.command"));
|
||||
assertEquals(outputFlowFile.getAttribute("execution.command.args"), originalFlowFile.getAttribute("execution.command.args"));
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testExecuteJarWithBadPath() throws Exception {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/noSuchFile.jar");
|
||||
|
@ -100,6 +153,39 @@ public class TestExecuteStreamCommand {
|
|||
assertTrue(flowFile.isPenalized());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteJarWithBadPathDynamicProperties() throws Exception {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/noSuchFile.jar");
|
||||
File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
|
||||
String jarPath = exJar.getAbsolutePath();
|
||||
exJar.setExecutable(true);
|
||||
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
|
||||
controller.enqueue(dummy.toPath());
|
||||
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
|
||||
controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
|
||||
PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.1")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp1, "-jar");
|
||||
PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.2")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp2, jarPath);
|
||||
controller.run(1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP, 1);
|
||||
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP);
|
||||
MockFlowFile flowFile = flowFiles.get(0);
|
||||
assertEquals(0, flowFile.getSize());
|
||||
assertEquals("Error: Unable to access jarfile", flowFile.getAttribute("execution.error").substring(0, 31));
|
||||
assertTrue(flowFile.isPenalized());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteIngestAndUpdate() throws IOException {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
|
||||
|
@ -128,6 +214,46 @@ public class TestExecuteStreamCommand {
|
|||
assertTrue(Pattern.compile("nifi-standard-processors:ModifiedResult\r?\n").matcher(result).find());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteIngestAndUpdateDynamicProperties() throws IOException {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
|
||||
File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
|
||||
File dummy10MBytes = new File("target/10MB.txt");
|
||||
try (FileOutputStream fos = new FileOutputStream(dummy10MBytes)) {
|
||||
byte[] bytes = Files.readAllBytes(dummy.toPath());
|
||||
assertEquals(1000, bytes.length);
|
||||
for (int i = 0; i < 10000; i++) {
|
||||
fos.write(bytes, 0, 1000);
|
||||
}
|
||||
}
|
||||
String jarPath = exJar.getAbsolutePath();
|
||||
exJar.setExecutable(true);
|
||||
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
|
||||
controller.enqueue(dummy10MBytes.toPath());
|
||||
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
|
||||
controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
|
||||
PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.1")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp1, "-jar");
|
||||
PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.2")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp2, jarPath);
|
||||
controller.run(1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1);
|
||||
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
|
||||
byte[] byteArray = flowFiles.get(0).toByteArray();
|
||||
String result = new String(byteArray);
|
||||
|
||||
assertTrue(Pattern.compile("nifi-standard-processors:ModifiedResult\r?\n").matcher(result).find());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoggingToStdErr() throws IOException {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestLogStdErr.jar");
|
||||
|
@ -148,6 +274,38 @@ public class TestExecuteStreamCommand {
|
|||
assertEquals("fffffffffffffffffffffffffffffff", flowFile.getAttribute("execution.error").substring(0, 31));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLoggingToStdErrDynamicProperties() throws IOException {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestLogStdErr.jar");
|
||||
File dummy = new File("src/test/resources/ExecuteCommand/1mb.txt");
|
||||
String jarPath = exJar.getAbsolutePath();
|
||||
exJar.setExecutable(true);
|
||||
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
|
||||
controller.setValidateExpressionUsage(false);
|
||||
controller.enqueue(dummy.toPath());
|
||||
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
|
||||
controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
|
||||
PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.1")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp1, "-jar");
|
||||
PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.2")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp2, jarPath);
|
||||
controller.run(1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1);
|
||||
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
|
||||
MockFlowFile flowFile = flowFiles.get(0);
|
||||
assertEquals(0, flowFile.getSize());
|
||||
assertEquals("fffffffffffffffffffffffffffffff", flowFile.getAttribute("execution.error").substring(0, 31));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteIngestAndUpdateWithWorkingDir() throws IOException {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
|
||||
|
@ -170,6 +328,40 @@ public class TestExecuteStreamCommand {
|
|||
assertTrue(Pattern.compile(quotedSeparator + "nifi-standard-processors" + quotedSeparator + "target:ModifiedResult\r?\n").matcher(result).find());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteIngestAndUpdateWithWorkingDirDynamicProperties() throws IOException {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
|
||||
File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
|
||||
String jarPath = exJar.getAbsolutePath();
|
||||
exJar.setExecutable(true);
|
||||
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
|
||||
controller.enqueue(dummy.toPath());
|
||||
controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
|
||||
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
|
||||
controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
|
||||
PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.1")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp1, "-jar");
|
||||
PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.2")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp2, jarPath);
|
||||
controller.run(1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1);
|
||||
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
|
||||
byte[] byteArray = flowFiles.get(0).toByteArray();
|
||||
String result = new String(byteArray);
|
||||
|
||||
final String quotedSeparator = Pattern.quote(File.separator);
|
||||
assertTrue(Pattern.compile(quotedSeparator + "nifi-standard-processors" + quotedSeparator + "target:ModifiedResult\r?\n").matcher(result).find());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIgnoredStdin() throws IOException {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
|
||||
|
@ -192,6 +384,40 @@ public class TestExecuteStreamCommand {
|
|||
Pattern.compile("target:ModifiedResult\r?\n$").matcher(result).find());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIgnoredStdinDynamicProperties() throws IOException {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
|
||||
File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
|
||||
String jarPath = exJar.getAbsolutePath();
|
||||
exJar.setExecutable(true);
|
||||
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
|
||||
controller.enqueue(dummy.toPath());
|
||||
controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
|
||||
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
|
||||
controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
|
||||
PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.1")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp1, "-jar");
|
||||
PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.2")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp2, jarPath);
|
||||
controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true");
|
||||
controller.run(1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1);
|
||||
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
|
||||
byte[] byteArray = flowFiles.get(0).toByteArray();
|
||||
String result = new String(byteArray);
|
||||
assertTrue("TestIngestAndUpdate.jar should not have received anything to modify",
|
||||
Pattern.compile("target:ModifiedResult\r?\n$").matcher(result).find());
|
||||
}
|
||||
|
||||
// this is dependent on window with cygwin...so it's not enabled
|
||||
@Ignore
|
||||
@Test
|
||||
|
@ -242,6 +468,43 @@ public class TestExecuteStreamCommand {
|
|||
assertTrue("NIFI_TEST_2 environment variable is missing", dynamicEnvironmentVariables.contains("NIFI_TEST_2=testvalue2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDynamicEnvironmentDynamicProperties() throws Exception {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar");
|
||||
File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
|
||||
String jarPath = exJar.getAbsolutePath();
|
||||
exJar.setExecutable(true);
|
||||
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
|
||||
controller.setProperty("NIFI_TEST_1", "testvalue1");
|
||||
controller.setProperty("NIFI_TEST_2", "testvalue2");
|
||||
controller.enqueue(dummy.toPath());
|
||||
controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
|
||||
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
|
||||
controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
|
||||
PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.1")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp1, "-jar");
|
||||
PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.2")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp2, jarPath);
|
||||
controller.run(1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1);
|
||||
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
|
||||
byte[] byteArray = flowFiles.get(0).toByteArray();
|
||||
String result = new String(byteArray);
|
||||
Set<String> dynamicEnvironmentVariables = new HashSet<>(Arrays.asList(result.split("\r?\n")));
|
||||
assertFalse("Should contain at least two environment variables starting with NIFI", dynamicEnvironmentVariables.size() < 2);
|
||||
assertTrue("NIFI_TEST_1 environment variable is missing", dynamicEnvironmentVariables.contains("NIFI_TEST_1=testvalue1"));
|
||||
assertTrue("NIFI_TEST_2 environment variable is missing", dynamicEnvironmentVariables.contains("NIFI_TEST_2=testvalue2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSmallEchoPutToAttribute() throws Exception {
|
||||
File dummy = new File("src/test/resources/hello.txt");
|
||||
|
@ -273,6 +536,108 @@ public class TestExecuteStreamCommand {
|
|||
assertEquals(isWindows() ? "cmd.exe" : "echo", outputFlowFile.getAttribute("execution.command"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSmallEchoPutToAttributeDynamicProperties() throws Exception {
|
||||
File dummy = new File("src/test/resources/hello.txt");
|
||||
assertTrue(dummy.exists());
|
||||
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
|
||||
controller.enqueue("".getBytes());
|
||||
controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
|
||||
|
||||
if(isWindows()) {
|
||||
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "cmd.exe");
|
||||
PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.1")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp1, "/c");
|
||||
PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.2")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp2, "echo Hello");
|
||||
controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "/c;echo Hello");
|
||||
} else{
|
||||
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "echo");
|
||||
PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.1")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp1, "Hello");
|
||||
}
|
||||
controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true");
|
||||
controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output");
|
||||
|
||||
controller.run(1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
|
||||
|
||||
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
|
||||
MockFlowFile outputFlowFile = flowFiles.get(0);
|
||||
outputFlowFile.assertContentEquals("");
|
||||
String ouput = outputFlowFile.getAttribute("executeStreamCommand.output");
|
||||
assertTrue(ouput.startsWith("Hello"));
|
||||
assertEquals("0", outputFlowFile.getAttribute("execution.status"));
|
||||
assertEquals(isWindows() ? "cmd.exe" : "echo", outputFlowFile.getAttribute("execution.command"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testArgumentsWithQuotesFromAttributeDynamicProperties() throws Exception {
|
||||
File dummy = new File("src/test/resources/TestJson/json-sample.json");
|
||||
assertTrue(dummy.exists());
|
||||
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
|
||||
|
||||
Map<String,String> attrs = new HashMap<>();
|
||||
|
||||
String json = FileUtils.readFileToString(dummy, StandardCharsets.UTF_8);
|
||||
attrs.put("json.attribute",json);
|
||||
controller.enqueue("".getBytes(),attrs);
|
||||
|
||||
controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
|
||||
|
||||
if(isWindows()) {
|
||||
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "cmd.exe");
|
||||
PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.1")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp1, "/c");
|
||||
PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.2")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp2, "echo");
|
||||
} else{
|
||||
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "echo");
|
||||
}
|
||||
PropertyDescriptor dynamicProp3 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.3")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp3, "${json.attribute}");
|
||||
controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true");
|
||||
|
||||
controller.run(1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1);
|
||||
|
||||
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
|
||||
MockFlowFile outputFlowFile = flowFiles.get(0);
|
||||
String output = new String(outputFlowFile.toByteArray());
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
JsonNode tree1 = mapper.readTree(json);
|
||||
JsonNode tree2 = mapper.readTree(output);
|
||||
assertEquals(tree1,tree2);
|
||||
assertEquals("0", outputFlowFile.getAttribute("execution.status"));
|
||||
assertEquals(isWindows() ? "cmd.exe" : "echo", outputFlowFile.getAttribute("execution.command"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteJarPutToAttribute() throws Exception {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestSuccess.jar");
|
||||
|
@ -301,6 +666,46 @@ public class TestExecuteStreamCommand {
|
|||
assertEquals(expected, attribute.substring(attribute.length() - expected.length()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteJarPutToAttributeDynamicProperties() throws Exception {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestSuccess.jar");
|
||||
File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
|
||||
String jarPath = exJar.getAbsolutePath();
|
||||
exJar.setExecutable(true);
|
||||
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
|
||||
controller.enqueue(dummy.toPath());
|
||||
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
|
||||
controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
|
||||
PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.1")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp1, "-jar");
|
||||
PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.2")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp2, jarPath);
|
||||
controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output");
|
||||
controller.run(1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
|
||||
|
||||
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
|
||||
MockFlowFile outputFlowFile = flowFiles.get(0);
|
||||
String result = outputFlowFile.getAttribute("executeStreamCommand.output");
|
||||
outputFlowFile.assertContentEquals(dummy);
|
||||
assertTrue(Pattern.compile("Test was a success\r?\n").matcher(result).find());
|
||||
assertEquals("0", outputFlowFile.getAttribute("execution.status"));
|
||||
assertEquals("java", outputFlowFile.getAttribute("execution.command"));
|
||||
assertEquals("-jar", outputFlowFile.getAttribute("execution.command.args").substring(0, 4));
|
||||
String attribute = outputFlowFile.getAttribute("execution.command.args");
|
||||
String expected = "src" + File.separator + "test" + File.separator + "resources" + File.separator + "ExecuteCommand" + File.separator + "TestSuccess.jar";
|
||||
assertEquals(expected, attribute.substring(attribute.length() - expected.length()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteJarToAttributeConfiguration() throws Exception {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestSuccess.jar");
|
||||
|
@ -331,6 +736,48 @@ public class TestExecuteStreamCommand {
|
|||
assertEquals(expected, attribute.substring(attribute.length() - expected.length()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteJarToAttributeConfigurationDyanmicProperties() throws Exception {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestSuccess.jar");
|
||||
String jarPath = exJar.getAbsolutePath();
|
||||
exJar.setExecutable(true);
|
||||
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
|
||||
controller.enqueue("small test".getBytes());
|
||||
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
|
||||
controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
|
||||
PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.1")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp1, "-jar");
|
||||
PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.2")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp2, jarPath);
|
||||
controller.setProperty(ExecuteStreamCommand.PUT_ATTRIBUTE_MAX_LENGTH, "10");
|
||||
controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "outputDest");
|
||||
assertEquals(1, controller.getProcessContext().getAvailableRelationships().size());
|
||||
controller.run(1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP, 0);
|
||||
|
||||
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
|
||||
MockFlowFile outputFlowFile = flowFiles.get(0);
|
||||
outputFlowFile.assertContentEquals("small test".getBytes());
|
||||
String result = outputFlowFile.getAttribute("outputDest");
|
||||
assertTrue(Pattern.compile("Test was a").matcher(result).find());
|
||||
assertEquals("0", outputFlowFile.getAttribute("execution.status"));
|
||||
assertEquals("java", outputFlowFile.getAttribute("execution.command"));
|
||||
assertEquals("-jar", outputFlowFile.getAttribute("execution.command.args").substring(0,4));
|
||||
String attribute = outputFlowFile.getAttribute("execution.command.args");
|
||||
String expected = "src" + File.separator + "test" + File.separator + "resources" + File.separator + "ExecuteCommand" + File.separator + "TestSuccess.jar";
|
||||
assertEquals(expected, attribute.substring(attribute.length() - expected.length()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteIngestAndUpdatePutToAttribute() throws IOException {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
|
||||
|
@ -358,6 +805,45 @@ public class TestExecuteStreamCommand {
|
|||
assertTrue(Pattern.compile("nifi-standard-processors:ModifiedResult\r?\n").matcher(result).find());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteIngestAndUpdatePutToAttributeDynamicProperties() throws IOException {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
|
||||
File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
|
||||
File dummy10MBytes = new File("target/10MB.txt");
|
||||
byte[] bytes = Files.readAllBytes(dummy.toPath());
|
||||
try (FileOutputStream fos = new FileOutputStream(dummy10MBytes)) {
|
||||
for (int i = 0; i < 10000; i++) {
|
||||
fos.write(bytes, 0, 1000);
|
||||
}
|
||||
}
|
||||
String jarPath = exJar.getAbsolutePath();
|
||||
exJar.setExecutable(true);
|
||||
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
|
||||
controller.enqueue(dummy10MBytes.toPath());
|
||||
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
|
||||
controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
|
||||
PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.1")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp1, "-jar");
|
||||
PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.2")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp2, jarPath);
|
||||
controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "outputDest");
|
||||
controller.run(1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
|
||||
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
|
||||
String result = flowFiles.get(0).getAttribute("outputDest");
|
||||
|
||||
assertTrue(Pattern.compile("nifi-standard-processors:ModifiedResult\r?\n").matcher(result).find());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLargePutToAttribute() throws IOException {
|
||||
File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
|
||||
|
@ -393,6 +879,57 @@ public class TestExecuteStreamCommand {
|
|||
assertTrue(Pattern.compile("a{256}").matcher(result).matches());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testLargePutToAttributeDynamicProperties() throws IOException {
|
||||
File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
|
||||
File dummy10MBytes = new File("target/10MB.txt");
|
||||
byte[] bytes = Files.readAllBytes(dummy.toPath());
|
||||
try (FileOutputStream fos = new FileOutputStream(dummy10MBytes)) {
|
||||
for (int i = 0; i < 10000; i++) {
|
||||
fos.write(bytes, 0, 1000);
|
||||
}
|
||||
}
|
||||
|
||||
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
|
||||
controller.enqueue("".getBytes());
|
||||
controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
|
||||
if(isWindows()) {
|
||||
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "cmd.exe");
|
||||
PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.1")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp1, "/c");
|
||||
PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.2")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp2, "type " + dummy10MBytes.getAbsolutePath());
|
||||
} else{
|
||||
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "cat");
|
||||
PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.1")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp1, dummy10MBytes.getAbsolutePath());
|
||||
}
|
||||
controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true");
|
||||
controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output");
|
||||
controller.setProperty(ExecuteStreamCommand.PUT_ATTRIBUTE_MAX_LENGTH, "256");
|
||||
|
||||
controller.run(1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
|
||||
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
|
||||
|
||||
flowFiles.get(0).assertAttributeEquals("execution.status", "0");
|
||||
String result = flowFiles.get(0).getAttribute("executeStreamCommand.output");
|
||||
assertTrue(Pattern.compile("a{256}").matcher(result).matches());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteIngestAndUpdateWithWorkingDirPutToAttribute() throws IOException {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
|
||||
|
@ -414,6 +951,39 @@ public class TestExecuteStreamCommand {
|
|||
assertTrue(Pattern.compile(quotedSeparator + "nifi-standard-processors" + quotedSeparator + "target:ModifiedResult\r?\n").matcher(result).find());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteIngestAndUpdateWithWorkingDirPutToAttributeDynamicProperties() throws IOException {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
|
||||
File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
|
||||
String jarPath = exJar.getAbsolutePath();
|
||||
exJar.setExecutable(true);
|
||||
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
|
||||
controller.enqueue(dummy.toPath());
|
||||
controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
|
||||
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
|
||||
controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "streamOutput");
|
||||
controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
|
||||
PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.1")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp1, "-jar");
|
||||
PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.2")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp2, jarPath);
|
||||
controller.run(1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
|
||||
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
|
||||
String result = flowFiles.get(0).getAttribute("streamOutput");
|
||||
|
||||
final String quotedSeparator = Pattern.quote(File.separator);
|
||||
assertTrue(Pattern.compile(quotedSeparator + "nifi-standard-processors" + quotedSeparator + "target:ModifiedResult\r?\n").matcher(result).find());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIgnoredStdinPutToAttribute() throws IOException {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
|
||||
|
@ -435,6 +1005,39 @@ public class TestExecuteStreamCommand {
|
|||
Pattern.compile("target:ModifiedResult\r?\n?").matcher(result).find());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIgnoredStdinPutToAttributeDynamicProperties() throws IOException {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar");
|
||||
File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
|
||||
String jarPath = exJar.getAbsolutePath();
|
||||
exJar.setExecutable(true);
|
||||
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
|
||||
controller.enqueue(dummy.toPath());
|
||||
controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
|
||||
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
|
||||
controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
|
||||
PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.1")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp1, "-jar");
|
||||
PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.2")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp2, jarPath);
|
||||
controller.setProperty(ExecuteStreamCommand.IGNORE_STDIN, "true");
|
||||
controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output");
|
||||
controller.run(1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
|
||||
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
|
||||
String result = flowFiles.get(0).getAttribute("executeStreamCommand.output");
|
||||
assertTrue("TestIngestAndUpdate.jar should not have received anything to modify",
|
||||
Pattern.compile("target:ModifiedResult\r?\n?").matcher(result).find());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDynamicEnvironmentPutToAttribute() throws Exception {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar");
|
||||
|
@ -459,6 +1062,42 @@ public class TestExecuteStreamCommand {
|
|||
assertTrue("NIFI_TEST_2 environment variable is missing", dynamicEnvironmentVariables.contains("NIFI_TEST_2=testvalue2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDynamicEnvironmentPutToAttributeDynamicProperties() throws Exception {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar");
|
||||
File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
|
||||
String jarPath = exJar.getAbsolutePath();
|
||||
exJar.setExecutable(true);
|
||||
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
|
||||
controller.setProperty("NIFI_TEST_1", "testvalue1");
|
||||
controller.setProperty("NIFI_TEST_2", "testvalue2");
|
||||
controller.enqueue(dummy.toPath());
|
||||
controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
|
||||
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
|
||||
controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
|
||||
PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.1")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp1, "-jar");
|
||||
PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.2")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp2, jarPath);
|
||||
controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output");
|
||||
controller.run(1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
|
||||
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
|
||||
String result = flowFiles.get(0).getAttribute("executeStreamCommand.output");
|
||||
Set<String> dynamicEnvironmentVariables = new HashSet<>(Arrays.asList(result.split("\r?\n")));
|
||||
assertFalse("Should contain at least two environment variables starting with NIFI", dynamicEnvironmentVariables.size() < 2);
|
||||
assertTrue("NIFI_TEST_1 environment variable is missing", dynamicEnvironmentVariables.contains("NIFI_TEST_1=testvalue1"));
|
||||
assertTrue("NIFI_TEST_2 environment variable is missing", dynamicEnvironmentVariables.contains("NIFI_TEST_2=testvalue2"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQuotedArguments() throws Exception {
|
||||
List<String> args = ArgumentUtils.splitArgs("echo -n \"arg1 arg2 arg3\"", ' ');
|
||||
|
@ -507,6 +1146,47 @@ public class TestExecuteStreamCommand {
|
|||
assertEquals(expected, attribute.substring(attribute.length() - expected.length()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExecuteJarPutToAttributeBadPathDynamicProperties() throws Exception {
|
||||
File exJar = new File("src/test/resources/ExecuteCommand/noSuchFile.jar");
|
||||
File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
|
||||
String jarPath = exJar.getAbsolutePath();
|
||||
exJar.setExecutable(true);
|
||||
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
|
||||
controller.enqueue(dummy.toPath());
|
||||
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
|
||||
controller.setProperty(ExecuteStreamCommand.ARGUMENTS_STRATEGY, ExecuteStreamCommand.DYNAMIC_PROPERTY_ARGUMENTS_STRATEGY.getValue());
|
||||
PropertyDescriptor dynamicProp1 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.1")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp1, "-jar");
|
||||
PropertyDescriptor dynamicProp2 = new PropertyDescriptor.Builder()
|
||||
.dynamic(true)
|
||||
.name("command.argument.2")
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
|
||||
.build();
|
||||
controller.setProperty(dynamicProp2, jarPath);
|
||||
controller.setProperty(ExecuteStreamCommand.PUT_OUTPUT_IN_ATTRIBUTE, "executeStreamCommand.output");
|
||||
controller.run(1);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 0);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.NONZERO_STATUS_RELATIONSHIP, 0);
|
||||
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
|
||||
|
||||
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP);
|
||||
MockFlowFile outputFlowFile = flowFiles.get(0);
|
||||
String result = outputFlowFile.getAttribute("executeStreamCommand.output");
|
||||
outputFlowFile.assertContentEquals(dummy);
|
||||
assertTrue(result.isEmpty()); // java -jar with bad path only prints to standard error not standard out
|
||||
assertEquals("1", outputFlowFile.getAttribute("execution.status")); // java -jar with bad path exits with code 1
|
||||
assertEquals("java", outputFlowFile.getAttribute("execution.command"));
|
||||
assertEquals("-jar", outputFlowFile.getAttribute("execution.command.args").substring(0, 4));
|
||||
String attribute = outputFlowFile.getAttribute("execution.command.args");
|
||||
String expected = "src" + File.separator + "test" + File.separator + "resources" + File.separator + "ExecuteCommand" + File.separator + "noSuchFile.jar";
|
||||
assertEquals(expected, attribute.substring(attribute.length() - expected.length()));
|
||||
}
|
||||
|
||||
private static boolean isWindows() {
|
||||
return System.getProperty("os.name").toLowerCase().startsWith("windows");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue