From c13fb3159f59778e78769a1ad3a9f7131cfb7d96 Mon Sep 17 00:00:00 2001 From: Matt Burgess Date: Thu, 28 Apr 2016 09:53:16 -0400 Subject: [PATCH] NIFI-1822 Enabled pooled ExecuteScript engines to facilitate concurrent processing. Removed unused variable in unit test. (+10 squashed commits) Squashed commits: [7c5acc1] NIFI-1822 Removed trailing whitespace to conform with checkstyle rules. [cb108cd] NIFI-1822 Added ASF License to unit test. [9264428] NIFI-1822 Removed debugging log statements for script engine queue size. Added unit tests demonstrating pooled execution timing and thread usage. [bdbc4ba] NIFI-1822 Renamed reference to MockProcessorContext#setNumThreads to setMaxConcurrentTasks after refactor. [12bbe82] NIFI-1822 Moved unit test to correct directory. Added test script resource which generates flowfile and updates attribute with current thread. Added tests for single run, serial run, and pooled run (not complete). [4c174c8] NIFI-1822 Added debugging messages to script execution. [8ab0ce5] NIFI-1822 Added for loop to instantiate multiple script engines in queue. [8c5ba51] NIFI-1822 Added variable max concurrent task field in MockProcessorContext because it was previously hardcoded to 1. Changed setNumThreads to setMaxConcurrentTasks to maintain naming convention. [fd9120c] NIFI-1822 Added unit test skeleton for pooled script processor execution. [23e4f68] NIFI-1822: Allow concurrent execution in ExecuteScript This closes #443. Signed-off-by: Andy LoPresto --- .../apache/nifi/util/MockProcessContext.java | 9 +- .../util/StandardProcessorTestRunner.java | 5 +- .../script/AbstractScriptProcessor.java | 76 ++++--- .../nifi/processors/script/ExecuteScript.java | 49 ++--- .../script/InvokeScriptedProcessor.java | 15 +- .../script/ExecuteScriptGroovyTest.groovy | 200 ++++++++++++++++++ .../testAddTimeAndThreadAttribute.groovy | 22 ++ 7 files changed, 312 insertions(+), 64 deletions(-) create mode 100644 nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy create mode 100644 nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testAddTimeAndThreadAttribute.groovy diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java index b6272ee096..81429e7f7c 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; - import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.attribute.expression.language.Query.Range; import org.apache.nifi.components.ConfigurableComponent; @@ -55,7 +54,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements S private boolean allowExpressionValidation = true; private volatile boolean incomingConnection = true; private volatile boolean nonLoopConnection = true; - private int numThreads = 1; + private int maxConcurrentTasks = 1; private volatile Set connections = new HashSet<>(); private volatile Set unavailableRelationships = new HashSet<>(); @@ -175,7 +174,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements S @Override public int getMaxConcurrentTasks() { - return numThreads; + return maxConcurrentTasks; } public void setAnnotationData(final String annotationData) { @@ -362,7 +361,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements S return ""; } - protected void setNumThreads(int numThreads) { - this.numThreads = numThreads; + protected void setMaxConcurrentTasks(int maxConcurrentTasks) { + this.maxConcurrentTasks = maxConcurrentTasks; } } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 2f62ac214c..edc0e65c26 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -43,7 +43,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; - import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; @@ -436,7 +435,7 @@ public class StandardProcessorTestRunner implements TestRunner { @Override public void enqueue(final String data) { - enqueue(data.getBytes(StandardCharsets.UTF_8), Collections. emptyMap()); + enqueue(data.getBytes(StandardCharsets.UTF_8), Collections.emptyMap()); } @Override @@ -565,7 +564,7 @@ public class StandardProcessorTestRunner implements TestRunner { } this.numThreads = threadCount; - this.context.setNumThreads(threadCount); + this.context.setMaxConcurrentTasks(threadCount); } @Override diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java index b7e73bc950..a34c83c0d1 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java @@ -16,21 +16,6 @@ */ package org.apache.nifi.processors.script; -import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.processor.AbstractSessionFactoryProcessor; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.util.StringUtils; - -import javax.script.ScriptEngine; -import javax.script.ScriptEngineFactory; -import javax.script.ScriptEngineManager; -import javax.script.ScriptException; import java.io.File; import java.net.MalformedURLException; import java.net.URL; @@ -48,8 +33,25 @@ import java.util.List; import java.util.Map; import java.util.ServiceLoader; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import javax.script.ScriptEngine; +import javax.script.ScriptEngineFactory; +import javax.script.ScriptEngineManager; +import javax.script.ScriptException; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; /** * This class contains variables and methods common to scripting processors @@ -102,7 +104,8 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc protected String scriptBody; protected String[] modules; protected List descriptors; - protected ScriptEngine scriptEngine; + + protected BlockingQueue engineQ = null; /** * Custom validation for ensuring exactly one of Script File or Script Body is populated @@ -197,7 +200,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc * Performs common setup operations when the processor is scheduled to run. This method assumes the member * variables associated with properties have been filled. */ - public void setup() { + public void setup(int numberOfScriptEngines) { if (scriptEngineConfiguratorMap.isEmpty()) { ServiceLoader configuratorServiceLoader = @@ -206,7 +209,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc scriptEngineConfiguratorMap.put(configurator.getScriptEngineName().toLowerCase(), configurator); } } - setupEngine(); + setupEngines(numberOfScriptEngines); } /** @@ -216,11 +219,16 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc * * @see org.apache.nifi.processors.script.ScriptEngineConfigurator */ - protected void setupEngine() { + protected void setupEngines(int numberOfScriptEngines) { + engineQ = new LinkedBlockingQueue<>(numberOfScriptEngines); ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader(); try { ProcessorLog log = getLogger(); + if (StringUtils.isBlank(scriptEngineName)) { + throw new IllegalArgumentException("The script engine name cannot be null"); + } + ScriptEngineConfigurator configurator = scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase()); // Get a list of URLs from the configurator (if present), or just convert modules from Strings to URLs @@ -248,18 +256,21 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc Thread.currentThread().setContextClassLoader(scriptEngineModuleClassLoader); } - scriptEngine = createScriptEngine(); - try { - if (configurator != null) { - configurator.init(scriptEngine, modules); - } - } catch (ScriptException se) { - log.error("Error initializing script engine configurator {}", new Object[]{scriptEngineName}); - if (log.isDebugEnabled()) { - log.error("Error initializing script engine configurator", se); + for (int i = 0; i < numberOfScriptEngines; i++) { + ScriptEngine scriptEngine = createScriptEngine(); + try { + if (configurator != null) { + configurator.init(scriptEngine, modules); + } + engineQ.offer(scriptEngine); + + } catch (ScriptException se) { + log.error("Error initializing script engine configurator {}", new Object[]{scriptEngineName}); + if (log.isDebugEnabled()) { + log.error("Error initializing script engine configurator", se); + } } } - } finally { // Restore original context class loader Thread.currentThread().setContextClassLoader(originalContextClassLoader); @@ -300,4 +311,11 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc return new URLClassLoader(modules, thisClassLoader); } + + @OnStopped + public void stop() { + if (engineQ != null) { + engineQ.clear(); + } + } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java index 3f7c202d51..f9fe2e3b3e 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java @@ -16,15 +16,25 @@ */ package org.apache.nifi.processors.script; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.script.Bindings; +import javax.script.ScriptContext; +import javax.script.ScriptEngine; +import javax.script.ScriptException; +import javax.script.SimpleBindings; import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; -import org.apache.nifi.annotation.behavior.TriggerSerially; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; @@ -33,19 +43,6 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StringUtils; -import javax.script.Bindings; -import javax.script.ScriptContext; -import javax.script.ScriptException; -import javax.script.SimpleBindings; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -@TriggerSerially @Tags({"script", "execute", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj"}) @CapabilityDescription("Experimental - Executes a script given the flow file and a process session. The script is responsible for " + "handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by " @@ -128,7 +125,9 @@ public class ExecuteScript extends AbstractScriptProcessor { } else { modules = new String[0]; } - super.setup(); + // Create a script engine for each possible task + int maxTasks = context.getMaxConcurrentTasks(); + super.setup(maxTasks); scriptToRun = scriptBody; try { @@ -161,7 +160,12 @@ public class ExecuteScript extends AbstractScriptProcessor { createResources(); } } + ScriptEngine scriptEngine = engineQ.poll(); ProcessorLog log = getLogger(); + if (scriptEngine == null) { + // No engine available so nothing more to do here + return; + } ProcessSession session = sessionFactory.createSession(); try { @@ -211,11 +215,8 @@ public class ExecuteScript extends AbstractScriptProcessor { getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t}); session.rollback(true); throw t; + } finally { + engineQ.offer(scriptEngine); } } - - @OnStopped - public void stop() { - scriptEngine = null; - } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java index 2f287b6103..06c0a7f18b 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.script.Invocable; +import javax.script.ScriptEngine; import javax.script.ScriptException; import org.apache.commons.io.IOUtils; @@ -67,6 +68,8 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { private AtomicBoolean scriptNeedsReload = new AtomicBoolean(true); + private ScriptEngine scriptEngine = null; + /** * Returns the valid relationships for this processor. SUCCESS and FAILURE are always returned, and if the script * processor has defined additional relationships, those will be added as well. @@ -174,9 +177,14 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { setup(); } - @Override public void setup() { - super.setup(); + // Create a single script engine, the Processor object is reused by each task + super.setup(1); + scriptEngine = engineQ.poll(); + if (scriptEngine == null) { + throw new ProcessException("No script engine available!"); + } + if (scriptNeedsReload.get() || processor.get() == null) { if (isFile(scriptPath)) { reloadScriptFile(scriptPath); @@ -386,7 +394,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { protected Collection customValidate(final ValidationContext context) { Collection commonValidationResults = super.customValidate(context); - if(!commonValidationResults.isEmpty()) { + if (!commonValidationResults.isEmpty()) { return commonValidationResults; } @@ -486,6 +494,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { @OnStopped public void stop() { + super.stop(); processor.set(null); } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy new file mode 100644 index 0000000000..6dbc5e1008 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.script + +import org.apache.nifi.util.MockFlowFile +import org.apache.nifi.util.StopWatch +import org.apache.nifi.util.TestRunners +import org.junit.After +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import java.util.concurrent.TimeUnit + +import static org.junit.Assert.assertNotNull + +@RunWith(JUnit4.class) +class ExecuteScriptGroovyTest extends BaseScriptTest { + private static final Logger logger = LoggerFactory.getLogger(ExecuteScriptGroovyTest.class) + + @BeforeClass + public static void setUpOnce() throws Exception { + logger.metaClass.methodMissing = { String name, args -> + logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") + } + } + + @Before + public void setUp() throws Exception { + super.setupExecuteScript() + + runner.setValidateExpressionUsage(false) + runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy") + runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy") + runner.setProperty(ExecuteScript.MODULES, TEST_RESOURCE_LOCATION + "groovy") + } + + @After + public void tearDown() throws Exception { + + } + + private void setupPooledExecuteScript(int poolSize = 2) { + final ExecuteScript executeScript = new ExecuteScript() + // Need to do something to initialize the properties, like retrieve the list of properties + assertNotNull(executeScript.getSupportedPropertyDescriptors()) + runner = TestRunners.newTestRunner(executeScript) + runner.setValidateExpressionUsage(false) + runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy") + runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy") + runner.setProperty(ExecuteScript.MODULES, TEST_RESOURCE_LOCATION + "groovy") + + // Override context value + runner.processContext.maxConcurrentTasks = poolSize + logger.info("Overrode context max concurrent tasks to ${runner.processContext.maxConcurrentTasks}") + } + + @Test + public void testShouldExecuteScript() throws Exception { + // Arrange + final String SINGLE_POOL_THREAD_PATTERN = /pool-\d+-thread-1/ + + logger.info("Mock flowfile queue contents: ${runner.queueSize} ${runner.flowFileQueue.queue}") + runner.assertValid() + + // Act + runner.run() + + // Assert + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, 1) + final List result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + MockFlowFile flowFile = result.get(0) + logger.info("Resulting flowfile attributes: ${flowFile.attributes}") + + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + assert flowFile.getAttribute("thread") =~ SINGLE_POOL_THREAD_PATTERN + } + + @Test + public void testShouldExecuteScriptSerially() throws Exception { + // Arrange + final int ITERATIONS = 10 + + logger.info("Mock flowfile queue contents: ${runner.queueSize} ${runner.flowFileQueue.queue}") + runner.assertValid() + + // Act + runner.run(ITERATIONS) + + // Assert + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) + final List result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + + result.eachWithIndex { MockFlowFile flowFile, int i -> + logger.info("Resulting flowfile [${i}] attributes: ${flowFile.attributes}") + + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-1/ + } + } + + @Test + public void testShouldExecuteScriptWithPool() throws Exception { + // Arrange + final int ITERATIONS = 10 + final int POOL_SIZE = 2 + + setupPooledExecuteScript(POOL_SIZE) + logger.info("Set up ExecuteScript processor with pool size: ${POOL_SIZE}") + + runner.setThreadCount(POOL_SIZE) + + logger.info("Mock flowfile queue contents: ${runner.queueSize} ${runner.flowFileQueue.queue}") + runner.assertValid() + + // Act + runner.run(ITERATIONS) + + // Assert + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) + final List result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + + result.eachWithIndex { MockFlowFile flowFile, int i -> + logger.info("Resulting flowfile [${i}] attributes: ${flowFile.attributes}") + + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-[1-${POOL_SIZE}]/ + } + } + + @Test + public void testPooledExecutionShouldBeFaster() throws Exception { + // Arrange + final int ITERATIONS = 1000 + final int POOL_SIZE = 4 + + // Act + // Run serially and capture the timing + final StopWatch stopWatch = new StopWatch(true) + runner.run(ITERATIONS) + stopWatch.stop() + final long serialExecutionTime = stopWatch.getDuration(TimeUnit.MILLISECONDS) + logger.info("Serial execution time for ${ITERATIONS} executions: ${serialExecutionTime} ms") + + // Assert (1) + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) + final List serialResults = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + + // Now run parallel + setupPooledExecuteScript(POOL_SIZE) + logger.info("Set up ExecuteScript processor with pool size: ${POOL_SIZE}") + runner.setThreadCount(POOL_SIZE) + runner.assertValid() + + stopWatch.start() + runner.run(ITERATIONS) + stopWatch.stop() + final long parallelExecutionTime = stopWatch.getDuration(TimeUnit.MILLISECONDS) + logger.info("Parallel execution time for ${ITERATIONS} executions using ${POOL_SIZE} threads: ${parallelExecutionTime} ms") + + // Assert (2) + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) + final List parallelResults = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + + parallelResults.eachWithIndex { MockFlowFile flowFile, int i -> + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-[1-${POOL_SIZE}]/ + } + + serialResults.eachWithIndex { MockFlowFile flowFile, int i -> + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-1/ + } + + assert serialExecutionTime > parallelExecutionTime + } +} diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testAddTimeAndThreadAttribute.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testAddTimeAndThreadAttribute.groovy new file mode 100644 index 0000000000..a9e599cacb --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testAddTimeAndThreadAttribute.groovy @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +def flowFile = session.create() +flowFile = session.putAttribute(flowFile, "time-updated", new Date().toString()) +flowFile = session.putAttribute(flowFile, "thread", Thread.currentThread().getName()) +session.transfer(flowFile, REL_SUCCESS) + +// TODO: Way to add the pool "name" or script engine identifier to the flowfile? \ No newline at end of file