diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java index ac450bd787..f9fe2e3b3e 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java @@ -162,7 +162,6 @@ public class ExecuteScript extends AbstractScriptProcessor { } ScriptEngine scriptEngine = engineQ.poll(); ProcessorLog log = getLogger(); - log.info("[REMOVE] After polling engine, Script Engine queue size: " + engineQ.size()); if (scriptEngine == null) { // No engine available so nothing more to do here return; @@ -218,7 +217,6 @@ public class ExecuteScript extends AbstractScriptProcessor { throw t; } finally { engineQ.offer(scriptEngine); - log.info("[REMOVE] After offering engine, Script Engine queue size: " + engineQ.size()); } } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy index 4a9c3c9849..fc1c445080 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy @@ -1,6 +1,7 @@ package org.apache.nifi.processors.script import org.apache.nifi.util.MockFlowFile +import org.apache.nifi.util.StopWatch import org.apache.nifi.util.TestRunners import org.junit.After import org.junit.Before @@ -11,6 +12,8 @@ import org.junit.runners.JUnit4 import org.slf4j.Logger import org.slf4j.LoggerFactory +import java.util.concurrent.TimeUnit + import static org.junit.Assert.assertNotNull @RunWith(JUnit4.class) @@ -56,9 +59,6 @@ class ExecuteScriptGroovyTest extends BaseScriptTest { // Override context value runner.processContext.maxConcurrentTasks = poolSize logger.info("Overrode context max concurrent tasks to ${runner.processContext.maxConcurrentTasks}") - - // Must set context properties on runner before calling setup with pool size -// executeScript.setup(poolSize) } @Test @@ -92,10 +92,7 @@ class ExecuteScriptGroovyTest extends BaseScriptTest { runner.assertValid() // Act - ITERATIONS.times { int i -> - logger.info("Running iteration ${i}") - runner.run() - } + runner.run(ITERATIONS) // Assert runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) @@ -125,10 +122,7 @@ class ExecuteScriptGroovyTest extends BaseScriptTest { runner.assertValid() // Act - ITERATIONS.times { int i -> - logger.info("Running iteration ${i}") - runner.run() - } + runner.run(ITERATIONS) // Assert runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) @@ -139,13 +133,56 @@ class ExecuteScriptGroovyTest extends BaseScriptTest { flowFile.assertAttributeExists("time-updated") flowFile.assertAttributeExists("thread") - flowFile.assertAttributeEquals("thread", "pool-${i + 1}-thread-1") + assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-[1-${POOL_SIZE}]/ } } - //testShouldExecuteScriptWithPool - //testShouldHandleFailingScript - //testShouldHandleNoAvailableEngine - //testPooledExecutionShouldBeFaster - //testPoolSizeVsThreadCount + @Test + public void testPooledExecutionShouldBeFaster() throws Exception { + // Arrange + final int ITERATIONS = 1000 + final int POOL_SIZE = 4 + + // Act + // Run serially and capture the timing + final StopWatch stopWatch = new StopWatch(true) + runner.run(ITERATIONS) + stopWatch.stop() + final long serialExecutionTime = stopWatch.getDuration(TimeUnit.MILLISECONDS) + logger.info("Serial execution time for ${ITERATIONS} executions: ${serialExecutionTime} ms") + + // Assert (1) + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) + final List serialResults = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + + // Now run parallel + setupPooledExecuteScript(POOL_SIZE) + logger.info("Set up ExecuteScript processor with pool size: ${POOL_SIZE}") + runner.setThreadCount(POOL_SIZE) + runner.assertValid() + + stopWatch.start() + runner.run(ITERATIONS) + stopWatch.stop() + final long parallelExecutionTime = stopWatch.getDuration(TimeUnit.MILLISECONDS) + logger.info("Parallel execution time for ${ITERATIONS} executions using ${POOL_SIZE} threads: ${parallelExecutionTime} ms") + + // Assert (2) + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) + final List parallelResults = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + + parallelResults.eachWithIndex { MockFlowFile flowFile, int i -> + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-[1-${POOL_SIZE}]/ + } + + serialResults.eachWithIndex { MockFlowFile flowFile, int i -> + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-1/ + } + + assert serialExecutionTime > parallelExecutionTime + } }