mirror of https://github.com/apache/nifi.git
NIFI-601: Add Dynamic Props - ExecuteStreamCommand
- Added dynamic property support to ExecuteStreamCommand. All properties added by user will be inserted into the command's runtime environment. Signed-off-by: Aldrin Piri <aldrin@apache.org>
This commit is contained in:
parent
aeefe73f0b
commit
b53948a6ba
|
@ -248,6 +248,7 @@
|
||||||
<exclude>src/test/resources/CompressedData/SampleFileConcat.txt.bz2</exclude>
|
<exclude>src/test/resources/CompressedData/SampleFileConcat.txt.bz2</exclude>
|
||||||
<exclude>src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar</exclude>
|
<exclude>src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar</exclude>
|
||||||
<exclude>src/test/resources/ExecuteCommand/TestSuccess.jar</exclude>
|
<exclude>src/test/resources/ExecuteCommand/TestSuccess.jar</exclude>
|
||||||
|
<exclude>src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar</exclude>
|
||||||
<exclude>src/test/resources/TestIdentifyMimeType/1.jar</exclude>
|
<exclude>src/test/resources/TestIdentifyMimeType/1.jar</exclude>
|
||||||
<exclude>src/test/resources/TestIdentifyMimeType/1.tar</exclude>
|
<exclude>src/test/resources/TestIdentifyMimeType/1.tar</exclude>
|
||||||
<exclude>src/test/resources/TestIdentifyMimeType/1.tar.gz</exclude>
|
<exclude>src/test/resources/TestIdentifyMimeType/1.tar.gz</exclude>
|
||||||
|
|
|
@ -33,6 +33,7 @@ import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.nifi.annotation.behavior.DynamicProperty;
|
||||||
import org.apache.nifi.annotation.behavior.EventDriven;
|
import org.apache.nifi.annotation.behavior.EventDriven;
|
||||||
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
import org.apache.nifi.annotation.behavior.SupportsBatching;
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
@ -113,6 +114,7 @@ import org.apache.nifi.stream.io.StreamUtils;
|
||||||
@SupportsBatching
|
@SupportsBatching
|
||||||
@Tags({"command execution", "command", "stream", "execute"})
|
@Tags({"command execution", "command", "stream", "execute"})
|
||||||
@CapabilityDescription("Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.")
|
@CapabilityDescription("Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.")
|
||||||
|
@DynamicProperty(name = "An environment variable name", value = "An environment variable value", description = "These environment variables are passed to the process spawned by this Processor")
|
||||||
@WritesAttributes({
|
@WritesAttributes({
|
||||||
@WritesAttribute(attribute = "execution.command", description = "The name of the command executed to create the new FlowFile"),
|
@WritesAttribute(attribute = "execution.command", description = "The name of the command executed to create the new FlowFile"),
|
||||||
@WritesAttribute(attribute = "execution.command.args", description = "The semi-colon delimited list of arguments"),
|
@WritesAttribute(attribute = "execution.command.args", description = "The semi-colon delimited list of arguments"),
|
||||||
|
@ -202,6 +204,16 @@ public class ExecuteStreamCommand extends AbstractProcessor {
|
||||||
return PROPERTIES;
|
return PROPERTIES;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||||
|
return new PropertyDescriptor.Builder()
|
||||||
|
.name(propertyDescriptorName)
|
||||||
|
.description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment")
|
||||||
|
.dynamic(true)
|
||||||
|
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onTrigger(ProcessContext context, final ProcessSession session) throws ProcessException {
|
public void onTrigger(ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||||
FlowFile flowFile = session.get();
|
FlowFile flowFile = session.get();
|
||||||
|
@ -230,6 +242,13 @@ public class ExecuteStreamCommand extends AbstractProcessor {
|
||||||
logger.warn("Failed to create working directory {}, using current working directory {}", new Object[]{workingDir, System.getProperty("user.dir")});
|
logger.warn("Failed to create working directory {}, using current working directory {}", new Object[]{workingDir, System.getProperty("user.dir")});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
final Map<String, String> environment = new HashMap<>();
|
||||||
|
for (final Map.Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
|
||||||
|
if (entry.getKey().isDynamic()) {
|
||||||
|
environment.put(entry.getKey().getName(), entry.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
builder.environment().putAll(environment);
|
||||||
builder.command(args);
|
builder.command(args);
|
||||||
builder.directory(dir);
|
builder.directory(dir);
|
||||||
builder.redirectInput(Redirect.PIPE);
|
builder.redirectInput(Redirect.PIPE);
|
||||||
|
|
|
@ -0,0 +1,29 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class TestDynamicEnvironment {
|
||||||
|
public static void main(String[] args) {
|
||||||
|
// iterate through current environment and print out all properties starting with NIFI
|
||||||
|
for (Map.Entry<String, String> env: System.getenv().entrySet()) {
|
||||||
|
if (env.getKey().startsWith("NIFI")) {
|
||||||
|
System.out.println(env.getKey() + "=" + env.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -182,4 +182,30 @@ public class TestExecuteStreamCommand {
|
||||||
assertEquals(0, flowFiles.get(0).getSize());
|
assertEquals(0, flowFiles.get(0).getSize());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDynamicEnvironment() throws Exception {
|
||||||
|
File exJar = new File("src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar");
|
||||||
|
File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt");
|
||||||
|
String jarPath = exJar.getAbsolutePath();
|
||||||
|
exJar.setExecutable(true);
|
||||||
|
final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class);
|
||||||
|
controller.setProperty("NIFI_TEST_1", "testvalue1");
|
||||||
|
controller.setProperty("NIFI_TEST_2", "testvalue2");
|
||||||
|
controller.setValidateExpressionUsage(false);
|
||||||
|
controller.enqueue(dummy.toPath());
|
||||||
|
controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target");
|
||||||
|
controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java");
|
||||||
|
controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath);
|
||||||
|
controller.run(1);
|
||||||
|
controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1);
|
||||||
|
controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1);
|
||||||
|
List<MockFlowFile> flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP);
|
||||||
|
byte[] byteArray = flowFiles.get(0).toByteArray();
|
||||||
|
String result = new String(byteArray);
|
||||||
|
String[] dynamicEnvironment = result.split("\n");
|
||||||
|
assertEquals("Should contain two environment variables starting with NIFI", 2, dynamicEnvironment.length);
|
||||||
|
assertEquals("NIFI_TEST_2 environment variable is missing", "NIFI_TEST_2=testvalue2", dynamicEnvironment[0]);
|
||||||
|
assertEquals("NIFI_TEST_1 environment variable is missing", "NIFI_TEST_1=testvalue1", dynamicEnvironment[1]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Binary file not shown.
Loading…
Reference in New Issue