NIFI-12105: ExecuteStateless processor accepts additional NAR directories

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes #8129.
This commit is contained in:
Peter Gyori 2023-12-05 15:06:06 +01:00 committed by Pierre Villard
parent 5c1b0a1140
commit 5675d37bed
No known key found for this signature in database
GPG Key ID: F92A93B30C07C6D5
8 changed files with 940 additions and 11 deletions

View File

@ -128,32 +128,93 @@
<version>2.0.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-property-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>compile</scope>
<type>jar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-jslt-nar</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>compile</scope>
<type>nar</type>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-compress-nar</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>compile</scope>
<type>nar</type>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<finalName>nifi-stateless-processors-test-assembly</finalName>
<attach>true</attach>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
<executions>
<execution>
<id>make shared resource</id>
<id>stateless-core-dependencies</id>
<goals>
<goal>single</goal>
</goals>
<phase>process-test-resources</phase>
<configuration>
<finalName>nifi-stateless-processors-test-assembly</finalName>
<archiverConfig>
<defaultDirectoryMode>0775</defaultDirectoryMode>
<directoryMode>0775</directoryMode>
<fileMode>0664</fileMode>
</archiverConfig>
<descriptors>
<descriptor>src/test/assembly/dependencies.xml</descriptor>
<descriptor>src/test/assembly/dependencies-core.xml</descriptor>
</descriptors>
<tarLongFileMode>posix</tarLongFileMode>
<formats>
<format>dir</format>
</formats>
</configuration>
</execution>
<execution>
<id>jslt-dependency</id>
<goals>
<goal>single</goal>
</goals>
<phase>process-test-resources</phase>
<configuration>
<finalName>nifi-stateless-processors-test-assembly</finalName>
<archiverConfig>
<defaultDirectoryMode>0775</defaultDirectoryMode>
<directoryMode>0775</directoryMode>
<fileMode>0664</fileMode>
</archiverConfig>
<descriptors>
<descriptor>src/test/assembly/dependencies-jslt.xml</descriptor>
</descriptors>
<tarLongFileMode>posix</tarLongFileMode>
<formats>
<format>dir</format>
</formats>
</configuration>
</execution>
<execution>
<id>compression-dependency</id>
<goals>
<goal>single</goal>
</goals>
<phase>process-test-resources</phase>
<configuration>
<finalName>nifi-stateless-processors-test-assembly</finalName>
<archiverConfig>
<defaultDirectoryMode>0775</defaultDirectoryMode>
<directoryMode>0775</directoryMode>
<fileMode>0664</fileMode>
</archiverConfig>
<descriptors>
<descriptor>src/test/assembly/dependencies-compress.xml</descriptor>
</descriptors>
<tarLongFileMode>posix</tarLongFileMode>
<formats>
@ -176,6 +237,8 @@
<exclude>src/test/resources/sleep.json</exclude>
<exclude>src/test/resources/throw-exception.json</exclude>
<exclude>src/test/resources/log-message.json</exclude>
<exclude>src/test/resources/jslt-flow.json</exclude>
<exclude>src/test/resources/jslt-compress-flow.json</exclude>
</excludes>
</configuration>
</plugin>

View File

@ -0,0 +1,37 @@
<?xml version="1.0"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<assembly>
<id>compress</id>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<!-- Write out dependency artifacts to directory -->
<dependencySet>
<scope>runtime</scope>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>.</outputDirectory>
<directoryMode>0770</directoryMode>
<fileMode>0664</fileMode>
<useTransitiveFiltering>true</useTransitiveFiltering>
<includes>
<include>*:nifi-compress-nar</include>
</includes>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -14,7 +14,7 @@
limitations under the License.
-->
<assembly>
<id>bin</id>
<id>core</id>
<includeBaseDirectory>false</includeBaseDirectory>
<baseDirectory>nifi-execute-stateless-processor-assembly-${project.version}</baseDirectory>
@ -41,6 +41,7 @@
<include>*:nifi-runtime</include>
<include>*:nifi-nar-utils</include>
<include>*:nifi-stateless-api</include>
<include>*:nifi-property-utils</include>
<include>*:slf4j-api</include>
</includes>
</dependencySet>

View File

@ -0,0 +1,37 @@
<?xml version="1.0"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<assembly>
<id>jslt</id>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<!-- Write out dependency artifacts to directory -->
<dependencySet>
<scope>runtime</scope>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>.</outputDirectory>
<directoryMode>0770</directoryMode>
<fileMode>0664</fileMode>
<useTransitiveFiltering>true</useTransitiveFiltering>
<includes>
<include>*:nifi-jslt-nar</include>
</includes>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -29,12 +29,16 @@ import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestExecuteStateless {
private static final String HELLO_WORLD = "Hello World";
private static final String LIB_DIR = "target/nifi-stateless-processors-test-assembly";
private static final String JSON_OBJECT = "{\"id\": 1,\"name\": \"John\"}";
private static final String LIB_DIR = "target/nifi-stateless-processors-test-assembly-core";
private static final String JSLT_DIR = "target/nifi-stateless-processors-test-assembly-jslt";
private static final String COMPRESS_DIR = "target/nifi-stateless-processors-test-assembly-compress";
private static final String WORK_DIR = "target/work";
private TestRunner runner;
@ -180,6 +184,60 @@ public class TestExecuteStateless {
testBulletinSurfaced("ERROR", true, MockComponentLog::getErrorMessages);
}
@Test
public void testAdditionalNarDirectoryNotSpecified() {
/* The test flow (jslt-flow.json) requires a nar dependency deliberately not found in LIB_DIR.
* Since ExecuteStateless' "Additional NAR Directories" property is purposefully not set,
* Stateless NiFi does not find the dependency and throws an error. */
runner.setProperty(ExecuteStateless.DATAFLOW_FILE, "src/test/resources/jslt-flow.json");
runner.setProperty(ExecuteStateless.INPUT_PORT, "input");
runner.setProperty(ExecuteStateless.FAILURE_PORTS, "failure");
runner.enqueue(JSON_OBJECT.getBytes(), Collections.singletonMap("abc", "xyz"));
runner.run();
final List<LogMessage> logMessages = runner.getLogger().getErrorMessages();
assertTrue(logMessages.get(0).getMsg().contains("Could not create dataflow from snapshot"));
}
@Test
public void testAdditionalNarDirectorySpecified() {
/* The test flow (jslt-flow.json) requires a nar dependency not found in LIB_DIR.
* ExecuteStateless' "Additional NAR Directories" property points to the directory containing the dependency.
* Stateless loads the dependency from that location. */
runner.setProperty(ExecuteStateless.DATAFLOW_FILE, "src/test/resources/jslt-flow.json");
runner.setProperty(ExecuteStateless.ADDITIONAL_LIB_DIRECTORIES, JSLT_DIR);
runner.setProperty(ExecuteStateless.INPUT_PORT, "input");
runner.setProperty(ExecuteStateless.FAILURE_PORTS, "failure");
runner.enqueue(JSON_OBJECT.getBytes(), Collections.singletonMap("abc", "xyz"));
runner.run();
runner.assertTransferCount(ExecuteStateless.REL_OUTPUT, 1);
final List<MockFlowFile> output = runner.getFlowFilesForRelationship(ExecuteStateless.REL_OUTPUT);
output.get(0).assertContentEquals("\"John\"");
}
@Test
public void testAdditionalNarDirectoriesSpecified() {
/* The test flow (jslt-compress-flow.json) requires nar dependencies not found in LIB_DIR.
* ExecuteStateless' "Additional NAR Directories" property points to two separate directories containing the dependencies.
* Stateless loads the dependencies from those locations. */
runner.setProperty(ExecuteStateless.DATAFLOW_FILE, "src/test/resources/jslt-compress-flow.json");
runner.setProperty(ExecuteStateless.ADDITIONAL_LIB_DIRECTORIES, JSLT_DIR + "," + COMPRESS_DIR);
runner.setProperty(ExecuteStateless.INPUT_PORT, "input");
runner.setProperty(ExecuteStateless.FAILURE_PORTS, "failure");
byte[] expectedContents = new byte[] {31, -117, 8, 0, 0, 0, 0, 0, 0, -1, 83, -14, -54, -49, -56, 83, 2, 0, 118, 63, 122, -30, 6, 0, 0, 0};
runner.enqueue(JSON_OBJECT.getBytes(), Collections.singletonMap("abc", "xyz"));
runner.run();
runner.assertTransferCount(ExecuteStateless.REL_OUTPUT, 1);
final List<MockFlowFile> output = runner.getFlowFilesForRelationship(ExecuteStateless.REL_OUTPUT);
output.get(0).assertAttributeEquals("mime.type", "application/gzip");
assertArrayEquals(output.get(0).getData(), expectedContents);
}
private void testBulletinSurfaced(final String logLevel, final boolean shouldBeSurfaced, final Function<MockComponentLog, List<LogMessage>> getMessageFunction) {
final String logMessage = "Unit Test Message";

View File

@ -0,0 +1,423 @@
{
"flowContents": {
"identifier": "c680fa77-5e72-334f-b7c1-e2d6918ce73a",
"instanceIdentifier": "fc646a49-018b-1000-5cb2-37fc54983e37",
"name": "jslt-compress-flow",
"comments": "",
"position": {
"x": 1389.0,
"y": 408.0
},
"processGroups": [],
"remoteProcessGroups": [],
"processors": [
{
"identifier": "1d143bd9-51a5-3e7a-906a-7ac9ec497672",
"instanceIdentifier": "fc648ca8-018b-1000-a8dd-fc45b5f9db3f",
"name": "JSLTTransformJSON",
"comments": "",
"position": {
"x": 1208.0,
"y": 280.0
},
"type": "org.apache.nifi.processors.jslt.JSLTTransformJSON",
"bundle": {
"group": "org.apache.nifi",
"artifact": "nifi-jslt-nar",
"version": "2.0.0-SNAPSHOT"
},
"properties": {
"jslt-transform-pretty_print": "false",
"jslt-transform-result-filter": ". != null and . != {} and . != []",
"jslt-transform-transformation": ".name",
"jslt-transform-cache-size": "1",
"jslt-transform-transformation-strategy": "EACH_OBJECT"
},
"propertyDescriptors": {
"jslt-transform-pretty_print": {
"name": "jslt-transform-pretty_print",
"displayName": "Pretty Print",
"identifiesControllerService": false,
"sensitive": false,
"dynamic": false
},
"jslt-transform-result-filter": {
"name": "jslt-transform-result-filter",
"displayName": "Transform Result Filter",
"identifiesControllerService": false,
"sensitive": false,
"dynamic": false
},
"jslt-transform-transformation": {
"name": "jslt-transform-transformation",
"displayName": "JSLT Transformation",
"identifiesControllerService": false,
"sensitive": false,
"dynamic": false,
"resourceDefinition": {
"cardinality": "SINGLE",
"resourceTypes": [
"FILE",
"TEXT"
]
}
},
"jslt-transform-cache-size": {
"name": "jslt-transform-cache-size",
"displayName": "Transform Cache Size",
"identifiesControllerService": false,
"sensitive": false,
"dynamic": false
},
"jslt-transform-transformation-strategy": {
"name": "jslt-transform-transformation-strategy",
"displayName": "Transformation Strategy",
"identifiesControllerService": false,
"sensitive": false,
"dynamic": false
}
},
"style": {},
"schedulingPeriod": "0 sec",
"schedulingStrategy": "TIMER_DRIVEN",
"executionNode": "ALL",
"penaltyDuration": "30 sec",
"yieldDuration": "1 sec",
"bulletinLevel": "WARN",
"runDurationMillis": 0,
"concurrentlySchedulableTaskCount": 1,
"autoTerminatedRelationships": [],
"scheduledState": "ENABLED",
"retryCount": 10,
"retriedRelationships": [],
"backoffMechanism": "PENALIZE_FLOWFILE",
"maxBackoffPeriod": "10 mins",
"componentType": "PROCESSOR",
"groupIdentifier": "c680fa77-5e72-334f-b7c1-e2d6918ce73a"
},
{
"identifier": "52b14610-18a4-334c-ac38-5e2cea125e9a",
"instanceIdentifier": "fca0d278-018b-1000-a210-fd4d738ea0e7",
"name": "ModifyCompression",
"comments": "",
"position": {
"x": 1208.0,
"y": 472.0
},
"type": "org.apache.nifi.processors.compress.ModifyCompression",
"bundle": {
"group": "org.apache.nifi",
"artifact": "nifi-compress-nar",
"version": "2.0.0-SNAPSHOT"
},
"properties": {
"Output Compression Level": "1",
"Input Compression Strategy": "no compression",
"Output Compression Strategy": "gzip",
"Output Filename Strategy": "UPDATED"
},
"propertyDescriptors": {
"Output Compression Level": {
"name": "Output Compression Level",
"displayName": "Output Compression Level",
"identifiesControllerService": false,
"sensitive": false,
"dynamic": false
},
"Input Compression Strategy": {
"name": "Input Compression Strategy",
"displayName": "Input Compression Strategy",
"identifiesControllerService": false,
"sensitive": false,
"dynamic": false
},
"Output Compression Strategy": {
"name": "Output Compression Strategy",
"displayName": "Output Compression Strategy",
"identifiesControllerService": false,
"sensitive": false,
"dynamic": false
},
"Output Filename Strategy": {
"name": "Output Filename Strategy",
"displayName": "Output Filename Strategy",
"identifiesControllerService": false,
"sensitive": false,
"dynamic": false
}
},
"style": {},
"schedulingPeriod": "0 sec",
"schedulingStrategy": "TIMER_DRIVEN",
"executionNode": "ALL",
"penaltyDuration": "30 sec",
"yieldDuration": "1 sec",
"bulletinLevel": "WARN",
"runDurationMillis": 0,
"concurrentlySchedulableTaskCount": 1,
"autoTerminatedRelationships": [],
"scheduledState": "ENABLED",
"retryCount": 10,
"retriedRelationships": [],
"backoffMechanism": "PENALIZE_FLOWFILE",
"maxBackoffPeriod": "10 mins",
"componentType": "PROCESSOR",
"groupIdentifier": "c680fa77-5e72-334f-b7c1-e2d6918ce73a"
}
],
"inputPorts": [
{
"identifier": "05506a5d-3d47-309e-a2f9-ccdf620fc7bb",
"instanceIdentifier": "fc647f65-018b-1000-a0c0-693b42308743",
"name": "input",
"position": {
"x": 1265.0,
"y": 128.0
},
"type": "INPUT_PORT",
"concurrentlySchedulableTaskCount": 1,
"scheduledState": "ENABLED",
"allowRemoteAccess": false,
"portFunction": "STANDARD",
"componentType": "INPUT_PORT",
"groupIdentifier": "c680fa77-5e72-334f-b7c1-e2d6918ce73a"
}
],
"outputPorts": [
{
"identifier": "a21e1073-b62e-3f2c-af10-0df16221e97e",
"instanceIdentifier": "fc64eadc-018b-1000-549e-85726a49e708",
"name": "failure",
"position": {
"x": 1600.0,
"y": 696.0
},
"type": "OUTPUT_PORT",
"concurrentlySchedulableTaskCount": 1,
"scheduledState": "ENABLED",
"allowRemoteAccess": false,
"portFunction": "STANDARD",
"componentType": "OUTPUT_PORT",
"groupIdentifier": "c680fa77-5e72-334f-b7c1-e2d6918ce73a"
},
{
"identifier": "a0a4d4cb-5e8c-313b-8d72-322c238f7d94",
"instanceIdentifier": "fc64d691-018b-1000-e8e3-16301be6f952",
"name": "success",
"position": {
"x": 1128.0,
"y": 704.0
},
"type": "OUTPUT_PORT",
"concurrentlySchedulableTaskCount": 1,
"scheduledState": "ENABLED",
"allowRemoteAccess": false,
"portFunction": "STANDARD",
"componentType": "OUTPUT_PORT",
"groupIdentifier": "c680fa77-5e72-334f-b7c1-e2d6918ce73a"
}
],
"connections": [
{
"identifier": "e81b2c0b-479e-36ef-adc2-aa2daa60db55",
"instanceIdentifier": "fca12acd-018b-1000-7451-7989cee33fd1",
"name": "",
"source": {
"id": "52b14610-18a4-334c-ac38-5e2cea125e9a",
"type": "PROCESSOR",
"groupId": "c680fa77-5e72-334f-b7c1-e2d6918ce73a",
"name": "ModifyCompression",
"comments": "",
"instanceIdentifier": "fca0d278-018b-1000-a210-fd4d738ea0e7"
},
"destination": {
"id": "a0a4d4cb-5e8c-313b-8d72-322c238f7d94",
"type": "OUTPUT_PORT",
"groupId": "c680fa77-5e72-334f-b7c1-e2d6918ce73a",
"name": "success",
"instanceIdentifier": "fc64d691-018b-1000-e8e3-16301be6f952"
},
"labelIndex": 1,
"zIndex": 0,
"selectedRelationships": [
"success"
],
"backPressureObjectThreshold": 10000,
"backPressureDataSizeThreshold": "1 GB",
"flowFileExpiration": "0 sec",
"prioritizers": [],
"bends": [],
"loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
"partitioningAttribute": "",
"loadBalanceCompression": "DO_NOT_COMPRESS",
"componentType": "CONNECTION",
"groupIdentifier": "c680fa77-5e72-334f-b7c1-e2d6918ce73a"
},
{
"identifier": "7588b92d-ff24-3c32-b48f-dbefc67b0373",
"instanceIdentifier": "fca13b80-018b-1000-86a2-7b666001ec00",
"name": "",
"source": {
"id": "52b14610-18a4-334c-ac38-5e2cea125e9a",
"type": "PROCESSOR",
"groupId": "c680fa77-5e72-334f-b7c1-e2d6918ce73a",
"name": "ModifyCompression",
"comments": "",
"instanceIdentifier": "fca0d278-018b-1000-a210-fd4d738ea0e7"
},
"destination": {
"id": "a21e1073-b62e-3f2c-af10-0df16221e97e",
"type": "OUTPUT_PORT",
"groupId": "c680fa77-5e72-334f-b7c1-e2d6918ce73a",
"name": "failure",
"instanceIdentifier": "fc64eadc-018b-1000-549e-85726a49e708"
},
"labelIndex": 1,
"zIndex": 0,
"selectedRelationships": [
"failure"
],
"backPressureObjectThreshold": 10000,
"backPressureDataSizeThreshold": "1 GB",
"flowFileExpiration": "0 sec",
"prioritizers": [],
"bends": [],
"loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
"partitioningAttribute": "",
"loadBalanceCompression": "DO_NOT_COMPRESS",
"componentType": "CONNECTION",
"groupIdentifier": "c680fa77-5e72-334f-b7c1-e2d6918ce73a"
},
{
"identifier": "ca1b97b0-b158-3e74-afc9-8b1ce3f9a52e",
"instanceIdentifier": "fc6508de-018b-1000-b1a0-a664e9eced65",
"name": "",
"source": {
"id": "1d143bd9-51a5-3e7a-906a-7ac9ec497672",
"type": "PROCESSOR",
"groupId": "c680fa77-5e72-334f-b7c1-e2d6918ce73a",
"name": "JSLTTransformJSON",
"comments": "",
"instanceIdentifier": "fc648ca8-018b-1000-a8dd-fc45b5f9db3f"
},
"destination": {
"id": "52b14610-18a4-334c-ac38-5e2cea125e9a",
"type": "PROCESSOR",
"groupId": "c680fa77-5e72-334f-b7c1-e2d6918ce73a",
"name": "ModifyCompression",
"comments": "",
"instanceIdentifier": "fca0d278-018b-1000-a210-fd4d738ea0e7"
},
"labelIndex": 1,
"zIndex": 0,
"selectedRelationships": [
"success"
],
"backPressureObjectThreshold": 10000,
"backPressureDataSizeThreshold": "1 GB",
"flowFileExpiration": "0 sec",
"prioritizers": [],
"bends": [],
"loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
"partitioningAttribute": "",
"loadBalanceCompression": "DO_NOT_COMPRESS",
"componentType": "CONNECTION",
"groupIdentifier": "c680fa77-5e72-334f-b7c1-e2d6918ce73a"
},
{
"identifier": "52f6fd29-6c71-3bf6-a4f8-6bc1790cf014",
"instanceIdentifier": "fc64a5ba-018b-1000-86f3-d0035c35ba1b",
"name": "",
"source": {
"id": "05506a5d-3d47-309e-a2f9-ccdf620fc7bb",
"type": "INPUT_PORT",
"groupId": "c680fa77-5e72-334f-b7c1-e2d6918ce73a",
"name": "input",
"instanceIdentifier": "fc647f65-018b-1000-a0c0-693b42308743"
},
"destination": {
"id": "1d143bd9-51a5-3e7a-906a-7ac9ec497672",
"type": "PROCESSOR",
"groupId": "c680fa77-5e72-334f-b7c1-e2d6918ce73a",
"name": "JSLTTransformJSON",
"comments": "",
"instanceIdentifier": "fc648ca8-018b-1000-a8dd-fc45b5f9db3f"
},
"labelIndex": 1,
"zIndex": 0,
"selectedRelationships": [
""
],
"backPressureObjectThreshold": 10000,
"backPressureDataSizeThreshold": "1 GB",
"flowFileExpiration": "0 sec",
"prioritizers": [],
"bends": [],
"loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
"partitioningAttribute": "",
"loadBalanceCompression": "DO_NOT_COMPRESS",
"componentType": "CONNECTION",
"groupIdentifier": "c680fa77-5e72-334f-b7c1-e2d6918ce73a"
},
{
"identifier": "79321aac-35cb-3600-8364-099dba9014f0",
"instanceIdentifier": "fc6518f9-018b-1000-a271-819fb66c5dc8",
"name": "",
"source": {
"id": "1d143bd9-51a5-3e7a-906a-7ac9ec497672",
"type": "PROCESSOR",
"groupId": "c680fa77-5e72-334f-b7c1-e2d6918ce73a",
"name": "JSLTTransformJSON",
"comments": "",
"instanceIdentifier": "fc648ca8-018b-1000-a8dd-fc45b5f9db3f"
},
"destination": {
"id": "a21e1073-b62e-3f2c-af10-0df16221e97e",
"type": "OUTPUT_PORT",
"groupId": "c680fa77-5e72-334f-b7c1-e2d6918ce73a",
"name": "failure",
"instanceIdentifier": "fc64eadc-018b-1000-549e-85726a49e708"
},
"labelIndex": 0,
"zIndex": 0,
"selectedRelationships": [
"failure"
],
"backPressureObjectThreshold": 10000,
"backPressureDataSizeThreshold": "1 GB",
"flowFileExpiration": "0 sec",
"prioritizers": [],
"bends": [
{
"x": 1720.0,
"y": 344.0
}
],
"loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
"partitioningAttribute": "",
"loadBalanceCompression": "DO_NOT_COMPRESS",
"componentType": "CONNECTION",
"groupIdentifier": "c680fa77-5e72-334f-b7c1-e2d6918ce73a"
}
],
"labels": [],
"funnels": [],
"controllerServices": [],
"defaultFlowFileExpiration": "0 sec",
"defaultBackPressureObjectThreshold": 10000,
"defaultBackPressureDataSizeThreshold": "1 GB",
"scheduledState": "ENABLED",
"executionEngine": "INHERITED",
"maxConcurrentTasks": 1,
"statelessFlowTimeout": "1 min",
"logFileSuffix": "",
"componentType": "PROCESS_GROUP",
"flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
"flowFileConcurrency": "UNBOUNDED"
},
"externalControllerServices": {},
"parameterContexts": {},
"flowEncodingVersion": "1.0",
"parameterProviders": {},
"latest": false
}

View File

@ -0,0 +1,277 @@
{
"flowContents": {
"identifier": "c680fa77-5e72-334f-b7c1-e2d6918ce73a",
"instanceIdentifier": "fc646a49-018b-1000-5cb2-37fc54983e37",
"name": "jslt-flow",
"comments": "",
"position": {
"x": 1389.0,
"y": 408.0
},
"processGroups": [],
"remoteProcessGroups": [],
"processors": [
{
"identifier": "1d143bd9-51a5-3e7a-906a-7ac9ec497672",
"instanceIdentifier": "fc648ca8-018b-1000-a8dd-fc45b5f9db3f",
"name": "JSLTTransformJSON",
"comments": "",
"position": {
"x": 1208.0,
"y": 280.0
},
"type": "org.apache.nifi.processors.jslt.JSLTTransformJSON",
"bundle": {
"group": "org.apache.nifi",
"artifact": "nifi-jslt-nar",
"version": "2.0.0-SNAPSHOT"
},
"properties": {
"jslt-transform-pretty_print": "false",
"jslt-transform-result-filter": ". != null and . != {} and . != []",
"jslt-transform-transformation": ".name",
"jslt-transform-cache-size": "1",
"jslt-transform-transformation-strategy": "EACH_OBJECT"
},
"propertyDescriptors": {
"jslt-transform-pretty_print": {
"name": "jslt-transform-pretty_print",
"displayName": "Pretty Print",
"identifiesControllerService": false,
"sensitive": false,
"dynamic": false
},
"jslt-transform-result-filter": {
"name": "jslt-transform-result-filter",
"displayName": "Transform Result Filter",
"identifiesControllerService": false,
"sensitive": false,
"dynamic": false
},
"jslt-transform-transformation": {
"name": "jslt-transform-transformation",
"displayName": "JSLT Transformation",
"identifiesControllerService": false,
"sensitive": false,
"dynamic": false,
"resourceDefinition": {
"cardinality": "SINGLE",
"resourceTypes": [
"FILE",
"TEXT"
]
}
},
"jslt-transform-cache-size": {
"name": "jslt-transform-cache-size",
"displayName": "Transform Cache Size",
"identifiesControllerService": false,
"sensitive": false,
"dynamic": false
},
"jslt-transform-transformation-strategy": {
"name": "jslt-transform-transformation-strategy",
"displayName": "Transformation Strategy",
"identifiesControllerService": false,
"sensitive": false,
"dynamic": false
}
},
"style": {},
"schedulingPeriod": "0 sec",
"schedulingStrategy": "TIMER_DRIVEN",
"executionNode": "ALL",
"penaltyDuration": "30 sec",
"yieldDuration": "1 sec",
"bulletinLevel": "WARN",
"runDurationMillis": 0,
"concurrentlySchedulableTaskCount": 1,
"autoTerminatedRelationships": [],
"scheduledState": "ENABLED",
"retryCount": 10,
"retriedRelationships": [],
"backoffMechanism": "PENALIZE_FLOWFILE",
"maxBackoffPeriod": "10 mins",
"componentType": "PROCESSOR",
"groupIdentifier": "c680fa77-5e72-334f-b7c1-e2d6918ce73a"
}
],
"inputPorts": [
{
"identifier": "05506a5d-3d47-309e-a2f9-ccdf620fc7bb",
"instanceIdentifier": "fc647f65-018b-1000-a0c0-693b42308743",
"name": "input",
"position": {
"x": 1265.0,
"y": 128.0
},
"type": "INPUT_PORT",
"concurrentlySchedulableTaskCount": 1,
"scheduledState": "ENABLED",
"allowRemoteAccess": false,
"portFunction": "STANDARD",
"componentType": "INPUT_PORT",
"groupIdentifier": "c680fa77-5e72-334f-b7c1-e2d6918ce73a"
}
],
"outputPorts": [
{
"identifier": "a21e1073-b62e-3f2c-af10-0df16221e97e",
"instanceIdentifier": "fc64eadc-018b-1000-549e-85726a49e708",
"name": "failure",
"position": {
"x": 1424.0,
"y": 512.0
},
"type": "OUTPUT_PORT",
"concurrentlySchedulableTaskCount": 1,
"scheduledState": "ENABLED",
"allowRemoteAccess": false,
"portFunction": "STANDARD",
"componentType": "OUTPUT_PORT",
"groupIdentifier": "c680fa77-5e72-334f-b7c1-e2d6918ce73a"
},
{
"identifier": "a0a4d4cb-5e8c-313b-8d72-322c238f7d94",
"instanceIdentifier": "fc64d691-018b-1000-e8e3-16301be6f952",
"name": "success",
"position": {
"x": 1104.0,
"y": 520.0
},
"type": "OUTPUT_PORT",
"concurrentlySchedulableTaskCount": 1,
"scheduledState": "ENABLED",
"allowRemoteAccess": false,
"portFunction": "STANDARD",
"componentType": "OUTPUT_PORT",
"groupIdentifier": "c680fa77-5e72-334f-b7c1-e2d6918ce73a"
}
],
"connections": [
{
"identifier": "ca1b97b0-b158-3e74-afc9-8b1ce3f9a52e",
"instanceIdentifier": "fc6508de-018b-1000-b1a0-a664e9eced65",
"name": "",
"source": {
"id": "1d143bd9-51a5-3e7a-906a-7ac9ec497672",
"type": "PROCESSOR",
"groupId": "c680fa77-5e72-334f-b7c1-e2d6918ce73a",
"name": "JSLTTransformJSON",
"comments": "",
"instanceIdentifier": "fc648ca8-018b-1000-a8dd-fc45b5f9db3f"
},
"destination": {
"id": "a0a4d4cb-5e8c-313b-8d72-322c238f7d94",
"type": "OUTPUT_PORT",
"groupId": "c680fa77-5e72-334f-b7c1-e2d6918ce73a",
"name": "success",
"instanceIdentifier": "fc64d691-018b-1000-e8e3-16301be6f952"
},
"labelIndex": 1,
"zIndex": 0,
"selectedRelationships": [
"success"
],
"backPressureObjectThreshold": 10000,
"backPressureDataSizeThreshold": "1 GB",
"flowFileExpiration": "0 sec",
"prioritizers": [],
"bends": [],
"loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
"partitioningAttribute": "",
"loadBalanceCompression": "DO_NOT_COMPRESS",
"componentType": "CONNECTION",
"groupIdentifier": "c680fa77-5e72-334f-b7c1-e2d6918ce73a"
},
{
"identifier": "52f6fd29-6c71-3bf6-a4f8-6bc1790cf014",
"instanceIdentifier": "fc64a5ba-018b-1000-86f3-d0035c35ba1b",
"name": "",
"source": {
"id": "05506a5d-3d47-309e-a2f9-ccdf620fc7bb",
"type": "INPUT_PORT",
"groupId": "c680fa77-5e72-334f-b7c1-e2d6918ce73a",
"name": "input",
"instanceIdentifier": "fc647f65-018b-1000-a0c0-693b42308743"
},
"destination": {
"id": "1d143bd9-51a5-3e7a-906a-7ac9ec497672",
"type": "PROCESSOR",
"groupId": "c680fa77-5e72-334f-b7c1-e2d6918ce73a",
"name": "JSLTTransformJSON",
"comments": "",
"instanceIdentifier": "fc648ca8-018b-1000-a8dd-fc45b5f9db3f"
},
"labelIndex": 1,
"zIndex": 0,
"selectedRelationships": [
""
],
"backPressureObjectThreshold": 10000,
"backPressureDataSizeThreshold": "1 GB",
"flowFileExpiration": "0 sec",
"prioritizers": [],
"bends": [],
"loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
"partitioningAttribute": "",
"loadBalanceCompression": "DO_NOT_COMPRESS",
"componentType": "CONNECTION",
"groupIdentifier": "c680fa77-5e72-334f-b7c1-e2d6918ce73a"
},
{
"identifier": "79321aac-35cb-3600-8364-099dba9014f0",
"instanceIdentifier": "fc6518f9-018b-1000-a271-819fb66c5dc8",
"name": "",
"source": {
"id": "1d143bd9-51a5-3e7a-906a-7ac9ec497672",
"type": "PROCESSOR",
"groupId": "c680fa77-5e72-334f-b7c1-e2d6918ce73a",
"name": "JSLTTransformJSON",
"comments": "",
"instanceIdentifier": "fc648ca8-018b-1000-a8dd-fc45b5f9db3f"
},
"destination": {
"id": "a21e1073-b62e-3f2c-af10-0df16221e97e",
"type": "OUTPUT_PORT",
"groupId": "c680fa77-5e72-334f-b7c1-e2d6918ce73a",
"name": "failure",
"instanceIdentifier": "fc64eadc-018b-1000-549e-85726a49e708"
},
"labelIndex": 1,
"zIndex": 0,
"selectedRelationships": [
"failure"
],
"backPressureObjectThreshold": 10000,
"backPressureDataSizeThreshold": "1 GB",
"flowFileExpiration": "0 sec",
"prioritizers": [],
"bends": [],
"loadBalanceStrategy": "DO_NOT_LOAD_BALANCE",
"partitioningAttribute": "",
"loadBalanceCompression": "DO_NOT_COMPRESS",
"componentType": "CONNECTION",
"groupIdentifier": "c680fa77-5e72-334f-b7c1-e2d6918ce73a"
}
],
"labels": [],
"funnels": [],
"controllerServices": [],
"defaultFlowFileExpiration": "0 sec",
"defaultBackPressureObjectThreshold": 10000,
"defaultBackPressureDataSizeThreshold": "1 GB",
"scheduledState": "ENABLED",
"executionEngine": "INHERITED",
"maxConcurrentTasks": 1,
"statelessFlowTimeout": "1 min",
"componentType": "PROCESS_GROUP",
"flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE",
"flowFileConcurrency": "UNBOUNDED"
},
"externalControllerServices": {},
"parameterContexts": {},
"flowEncodingVersion": "1.0",
"parameterProviders": {},
"latest": false
}

View File

@ -258,12 +258,28 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
public static final PropertyDescriptor LIB_DIRECTORY = new Builder()
.name("NAR Directory")
.displayName("NAR Directory")
.description("The directory to retrieve NAR's from")
.description("This directory has three roles: " +
"1) it contains the NiFi Stateless NAR and other necessary libraries required for the Stateless engine to be bootstrapped, " +
"2) it can contain extensions that should be loaded by the Stateless engine, " +
"3) it is used by the Stateless engine to download extensions into.")
.required(true)
.addValidator(createDirectoryExistsValidator(false, false))
.defaultValue("./lib")
.build();
public static final PropertyDescriptor ADDITIONAL_LIB_DIRECTORIES = new Builder()
.name("additional-nar-directories")
.displayName("Additional NAR Directories")
.description("A comma-separated list of paths for directories that contain extensions that " +
"should be loaded by the stateless engine. The engine will not download any " +
"extensions into these directories or write to them but will read any NAR files " +
"that are found within these directories. The engine will not recurse into " +
"subdirectories of these directories.")
.required(false)
.addValidator(StandardValidators.createListValidator(true, true,
StandardValidators.createDirectoryExistsValidator(false, false)))
.build();
public static final PropertyDescriptor WORKING_DIRECTORY = new Builder()
.name("Work Directory")
.displayName("Work Directory")
@ -386,6 +402,7 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
MAX_INPUT_FLOWFILE_SIZE,
DATAFLOW_TIMEOUT,
LIB_DIRECTORY,
ADDITIONAL_LIB_DIRECTORIES,
WORKING_DIRECTORY,
MAX_INGEST_FLOWFILES,
MAX_INGEST_DATA_SIZE,
@ -838,6 +855,7 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
private StatelessEngineConfiguration createEngineConfiguration(final ProcessContext context, final int contentRepoIndex) {
final File workingDirectory = new File(context.getProperty(WORKING_DIRECTORY).getValue());
final File narDirectory = new File(context.getProperty(LIB_DIRECTORY).getValue());
final Collection<File> additionalNarDirectories = getAdditionalNarDirectories(context.getProperty(ADDITIONAL_LIB_DIRECTORIES).getValue());
final ResourceReference krb5Reference = context.getProperty(KRB5_CONF).asResource();
final File krb5Conf = krb5Reference == null ? null : krb5Reference.asFile();
final SSLContextService sslContextService = context.getProperty(STATELESS_SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
@ -886,7 +904,7 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
@Override
public Collection<File> getReadOnlyExtensionsDirectories() {
return Collections.emptyList();
return additionalNarDirectories;
}
@Override
@ -1036,6 +1054,21 @@ public class ExecuteStateless extends AbstractProcessor implements Searchable {
}
}
private Collection<File> getAdditionalNarDirectories(String commaSeparatedPaths) {
if (commaSeparatedPaths == null || commaSeparatedPaths.isEmpty()) {
return Collections.emptyList();
} else {
final String[] paths = commaSeparatedPaths.split(",");
final List<File> directories = new ArrayList<>();
for (String path : paths) {
final String trimmedPath = path.trim();
if (!trimmedPath.isEmpty()) {
directories.add(new File(trimmedPath));
}
}
return directories;
}
}
private static class VersionedComponentSearchResults {
private final String term;