From b53948a6ba1b3d1615d402eb49238dd5fd0f316b Mon Sep 17 00:00:00 2001 From: ricky Date: Fri, 8 May 2015 13:37:37 -0400 Subject: [PATCH] NIFI-601: Add Dynamic Props - ExecuteStreamCommand - Added dynamic property support to ExecuteStreamCommand. All properties added by user will be inserted into the command's runtime environment. Signed-off-by: Aldrin Piri --- .../nifi-standard-processors/pom.xml | 1 + .../standard/ExecuteStreamCommand.java | 19 ++++++++++++ .../src/test/java/TestDynamicEnvironment.java | 29 ++++++++++++++++++ .../standard/TestExecuteStreamCommand.java | 26 ++++++++++++++++ .../ExecuteCommand/TestDynamicEnvironment.jar | Bin 0 -> 1200 bytes 5 files changed, 75 insertions(+) create mode 100644 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestDynamicEnvironment.java create mode 100755 nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml index f8c272ca56..0d56bbbea2 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml @@ -248,6 +248,7 @@ src/test/resources/CompressedData/SampleFileConcat.txt.bz2 src/test/resources/ExecuteCommand/TestIngestAndUpdate.jar src/test/resources/ExecuteCommand/TestSuccess.jar + src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar src/test/resources/TestIdentifyMimeType/1.jar src/test/resources/TestIdentifyMimeType/1.tar src/test/resources/TestIdentifyMimeType/1.tar.gz diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java index 76512dc1a2..4c4288aad0 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ExecuteStreamCommand.java @@ -33,6 +33,7 @@ import java.util.Set; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.nifi.annotation.behavior.DynamicProperty; import org.apache.nifi.annotation.behavior.EventDriven; import org.apache.nifi.annotation.behavior.SupportsBatching; import org.apache.nifi.annotation.documentation.CapabilityDescription; @@ -113,6 +114,7 @@ import org.apache.nifi.stream.io.StreamUtils; @SupportsBatching @Tags({"command execution", "command", "stream", "execute"}) @CapabilityDescription("Executes an external command on the contents of a flow file, and creates a new flow file with the results of the command.") +@DynamicProperty(name = "An environment variable name", value = "An environment variable value", description = "These environment variables are passed to the process spawned by this Processor") @WritesAttributes({ @WritesAttribute(attribute = "execution.command", description = "The name of the command executed to create the new FlowFile"), @WritesAttribute(attribute = "execution.command.args", description = "The semi-colon delimited list of arguments"), @@ -202,6 +204,16 @@ public class ExecuteStreamCommand extends AbstractProcessor { return PROPERTIES; } + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .description("Sets the environment variable '" + propertyDescriptorName + "' for the process' environment") + .dynamic(true) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .build(); + } + @Override public void onTrigger(ProcessContext context, final ProcessSession session) throws ProcessException { FlowFile flowFile = session.get(); @@ -230,6 +242,13 @@ public class ExecuteStreamCommand extends AbstractProcessor { logger.warn("Failed to create working directory {}, using current working directory {}", new Object[]{workingDir, System.getProperty("user.dir")}); } } + final Map environment = new HashMap<>(); + for (final Map.Entry entry : context.getProperties().entrySet()) { + if (entry.getKey().isDynamic()) { + environment.put(entry.getKey().getName(), entry.getValue()); + } + } + builder.environment().putAll(environment); builder.command(args); builder.directory(dir); builder.redirectInput(Redirect.PIPE); diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestDynamicEnvironment.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestDynamicEnvironment.java new file mode 100644 index 0000000000..3e6cad2fc9 --- /dev/null +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/TestDynamicEnvironment.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import java.util.Map; + +public class TestDynamicEnvironment { + public static void main(String[] args) { + // iterate through current environment and print out all properties starting with NIFI + for (Map.Entry env: System.getenv().entrySet()) { + if (env.getKey().startsWith("NIFI")) { + System.out.println(env.getKey() + "=" + env.getValue()); + } + } + } +} \ No newline at end of file diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java index 66a93e0a5c..555c3e4350 100644 --- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java +++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestExecuteStreamCommand.java @@ -182,4 +182,30 @@ public class TestExecuteStreamCommand { assertEquals(0, flowFiles.get(0).getSize()); } + + @Test + public void testDynamicEnvironment() throws Exception { + File exJar = new File("src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar"); + File dummy = new File("src/test/resources/ExecuteCommand/1000bytes.txt"); + String jarPath = exJar.getAbsolutePath(); + exJar.setExecutable(true); + final TestRunner controller = TestRunners.newTestRunner(ExecuteStreamCommand.class); + controller.setProperty("NIFI_TEST_1", "testvalue1"); + controller.setProperty("NIFI_TEST_2", "testvalue2"); + controller.setValidateExpressionUsage(false); + controller.enqueue(dummy.toPath()); + controller.setProperty(ExecuteStreamCommand.WORKING_DIR, "target"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_COMMAND, "java"); + controller.setProperty(ExecuteStreamCommand.EXECUTION_ARGUMENTS, "-jar;" + jarPath); + controller.run(1); + controller.assertTransferCount(ExecuteStreamCommand.ORIGINAL_RELATIONSHIP, 1); + controller.assertTransferCount(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP, 1); + List flowFiles = controller.getFlowFilesForRelationship(ExecuteStreamCommand.OUTPUT_STREAM_RELATIONSHIP); + byte[] byteArray = flowFiles.get(0).toByteArray(); + String result = new String(byteArray); + String[] dynamicEnvironment = result.split("\n"); + assertEquals("Should contain two environment variables starting with NIFI", 2, dynamicEnvironment.length); + assertEquals("NIFI_TEST_2 environment variable is missing", "NIFI_TEST_2=testvalue2", dynamicEnvironment[0]); + assertEquals("NIFI_TEST_1 environment variable is missing", "NIFI_TEST_1=testvalue1", dynamicEnvironment[1]); + } } diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/ExecuteCommand/TestDynamicEnvironment.jar new file mode 100755 index 0000000000000000000000000000000000000000..8ff95a1798326a5cd9d2f4aba451bfb5d786779a GIT binary patch literal 1200 zcmWIWW@Zs#;Nak3_>s86jR6U8GO#fCx`sIFdiuHP|2xINz|0Wf&CUT*!30$nfK#&w zPz7AGucM!*n`>~0p0Ate8AGlH10IKqdoNg*Tv-^zE-16M?~&3vArmKuV9)LcZ?As+ z{Daxxt;FQHfl91-K9Lq{_b8~`$@%?fq28UKP&H}RYs|g;MXZfqKE|wO>{x5BaO{}R z{gu@ceo@u?JelU7@YG%#2Xf}8JHbAwKu2T%F~XV4(EKO^lnqHOE^(>MOU%tocFilx zEXvQzP0cIOOU_9wE-nqZ9VF~1VC(E*b~9%O3x~!dQPY^_2eKay`6?+cI2iG&*_1PQ z!wjFG-NIgWt?Kf%TRtdCtld=eZSs|%>gg=8M{+{XpP702?(+1iPhX!OKhK~x;p)ZK z%~t0g)<}q7kU!=c!}ZwcQJ>@m1BtdaLEgoVDkUujVoL8sc^_LT^(C_iJI0iHT%U8a zRMxy8_^>YTc1fn-WhPf;+2U&yPG7KzjuPits~Ox?vUG;E|KBrBT)k8FWOrI!T;A+3 zPqEgJ#eN3Nh9%iEj0}3MrQS*HX#Xvh#&agA&q;E|@uo)hh?Exvo5QC>dIW6xSGjVH zEL(l8$)q5z>}>~fqi?WH>T|yxxHrVdYUc!#4+7q7K@n*$>`u*GIis~SQPLy1wb55P z;^?AA_lh-&$qBsvFLe6!PfjjW-LCqH#r6LQKdr>V&`omvaqc{OK~CPI z(rYPmZv{i!f`V1&I7+AZO_=_!sOeeavl;F80$l8FscpafXlv2$hn91vl!(ve{TJ`* zE4Lz6sN~(QwL6x+S-$L6%8M@f%Ss}r!&e2YSl8^kc}m1mS=Zl9;&UVR#E2EmO?+bU zy?((j)``gzTv93*I{Au)EIlS#EqeX1$*tqF=d9ARUfuMk|C`R<<$hHy=hkzcc^S6J zcFU{96ED7yzr%ieG3Q~t<&^Yc z@>{jPwi4=3R8O9KC{pXdU1NV?(ZcuEVfzL5b1skFZ!EDaQ2nOV-@81lPWu@_$=&CW zan({_w(Md7C3i+95eC#u4a>2hOpOZQxfGPe(X}GyB2bn_09zmvt`#YRBbxxqZO9G- c