mirror of https://github.com/apache/nifi.git
NIFI-1822 Removed debugging log statements for script engine queue size.
Added unit tests demonstrating pooled execution timing and thread usage.
This commit is contained in:
parent
bdbc4bab04
commit
9264428bd8
|
@ -162,7 +162,6 @@ public class ExecuteScript extends AbstractScriptProcessor {
|
|||
}
|
||||
ScriptEngine scriptEngine = engineQ.poll();
|
||||
ProcessorLog log = getLogger();
|
||||
log.info("[REMOVE] After polling engine, Script Engine queue size: " + engineQ.size());
|
||||
if (scriptEngine == null) {
|
||||
// No engine available so nothing more to do here
|
||||
return;
|
||||
|
@ -218,7 +217,6 @@ public class ExecuteScript extends AbstractScriptProcessor {
|
|||
throw t;
|
||||
} finally {
|
||||
engineQ.offer(scriptEngine);
|
||||
log.info("[REMOVE] After offering engine, Script Engine queue size: " + engineQ.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package org.apache.nifi.processors.script
|
||||
|
||||
import org.apache.nifi.util.MockFlowFile
|
||||
import org.apache.nifi.util.StopWatch
|
||||
import org.apache.nifi.util.TestRunners
|
||||
import org.junit.After
|
||||
import org.junit.Before
|
||||
|
@ -11,6 +12,8 @@ import org.junit.runners.JUnit4
|
|||
import org.slf4j.Logger
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import static org.junit.Assert.assertNotNull
|
||||
|
||||
@RunWith(JUnit4.class)
|
||||
|
@ -56,9 +59,6 @@ class ExecuteScriptGroovyTest extends BaseScriptTest {
|
|||
// Override context value
|
||||
runner.processContext.maxConcurrentTasks = poolSize
|
||||
logger.info("Overrode context max concurrent tasks to ${runner.processContext.maxConcurrentTasks}")
|
||||
|
||||
// Must set context properties on runner before calling setup with pool size
|
||||
// executeScript.setup(poolSize)
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -92,10 +92,7 @@ class ExecuteScriptGroovyTest extends BaseScriptTest {
|
|||
runner.assertValid()
|
||||
|
||||
// Act
|
||||
ITERATIONS.times { int i ->
|
||||
logger.info("Running iteration ${i}")
|
||||
runner.run()
|
||||
}
|
||||
runner.run(ITERATIONS)
|
||||
|
||||
// Assert
|
||||
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS)
|
||||
|
@ -125,10 +122,7 @@ class ExecuteScriptGroovyTest extends BaseScriptTest {
|
|||
runner.assertValid()
|
||||
|
||||
// Act
|
||||
ITERATIONS.times { int i ->
|
||||
logger.info("Running iteration ${i}")
|
||||
runner.run()
|
||||
}
|
||||
runner.run(ITERATIONS)
|
||||
|
||||
// Assert
|
||||
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS)
|
||||
|
@ -139,13 +133,56 @@ class ExecuteScriptGroovyTest extends BaseScriptTest {
|
|||
|
||||
flowFile.assertAttributeExists("time-updated")
|
||||
flowFile.assertAttributeExists("thread")
|
||||
flowFile.assertAttributeEquals("thread", "pool-${i + 1}-thread-1")
|
||||
assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-[1-${POOL_SIZE}]/
|
||||
}
|
||||
}
|
||||
|
||||
//testShouldExecuteScriptWithPool
|
||||
//testShouldHandleFailingScript
|
||||
//testShouldHandleNoAvailableEngine
|
||||
//testPooledExecutionShouldBeFaster
|
||||
//testPoolSizeVsThreadCount
|
||||
@Test
|
||||
public void testPooledExecutionShouldBeFaster() throws Exception {
|
||||
// Arrange
|
||||
final int ITERATIONS = 1000
|
||||
final int POOL_SIZE = 4
|
||||
|
||||
// Act
|
||||
// Run serially and capture the timing
|
||||
final StopWatch stopWatch = new StopWatch(true)
|
||||
runner.run(ITERATIONS)
|
||||
stopWatch.stop()
|
||||
final long serialExecutionTime = stopWatch.getDuration(TimeUnit.MILLISECONDS)
|
||||
logger.info("Serial execution time for ${ITERATIONS} executions: ${serialExecutionTime} ms")
|
||||
|
||||
// Assert (1)
|
||||
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS)
|
||||
final List<MockFlowFile> serialResults = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS)
|
||||
|
||||
// Now run parallel
|
||||
setupPooledExecuteScript(POOL_SIZE)
|
||||
logger.info("Set up ExecuteScript processor with pool size: ${POOL_SIZE}")
|
||||
runner.setThreadCount(POOL_SIZE)
|
||||
runner.assertValid()
|
||||
|
||||
stopWatch.start()
|
||||
runner.run(ITERATIONS)
|
||||
stopWatch.stop()
|
||||
final long parallelExecutionTime = stopWatch.getDuration(TimeUnit.MILLISECONDS)
|
||||
logger.info("Parallel execution time for ${ITERATIONS} executions using ${POOL_SIZE} threads: ${parallelExecutionTime} ms")
|
||||
|
||||
// Assert (2)
|
||||
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS)
|
||||
final List<MockFlowFile> parallelResults = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS)
|
||||
|
||||
parallelResults.eachWithIndex { MockFlowFile flowFile, int i ->
|
||||
flowFile.assertAttributeExists("time-updated")
|
||||
flowFile.assertAttributeExists("thread")
|
||||
assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-[1-${POOL_SIZE}]/
|
||||
}
|
||||
|
||||
serialResults.eachWithIndex { MockFlowFile flowFile, int i ->
|
||||
flowFile.assertAttributeExists("time-updated")
|
||||
flowFile.assertAttributeExists("thread")
|
||||
assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-1/
|
||||
}
|
||||
|
||||
assert serialExecutionTime > parallelExecutionTime
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue