NIFI-1822 Removed debugging log statements for script engine queue size.

Added unit tests demonstrating pooled execution timing and thread usage.
This commit is contained in:
Andy LoPresto 2016-05-14 20:12:02 -07:00
parent bdbc4bab04
commit 9264428bd8
No known key found for this signature in database
GPG Key ID: 3C6EF65B2F7DEF69
2 changed files with 54 additions and 19 deletions

View File

@ -162,7 +162,6 @@ public class ExecuteScript extends AbstractScriptProcessor {
} }
ScriptEngine scriptEngine = engineQ.poll(); ScriptEngine scriptEngine = engineQ.poll();
ProcessorLog log = getLogger(); ProcessorLog log = getLogger();
log.info("[REMOVE] After polling engine, Script Engine queue size: " + engineQ.size());
if (scriptEngine == null) { if (scriptEngine == null) {
// No engine available so nothing more to do here // No engine available so nothing more to do here
return; return;
@ -218,7 +217,6 @@ public class ExecuteScript extends AbstractScriptProcessor {
throw t; throw t;
} finally { } finally {
engineQ.offer(scriptEngine); engineQ.offer(scriptEngine);
log.info("[REMOVE] After offering engine, Script Engine queue size: " + engineQ.size());
} }
} }
} }

View File

@ -1,6 +1,7 @@
package org.apache.nifi.processors.script package org.apache.nifi.processors.script
import org.apache.nifi.util.MockFlowFile import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.StopWatch
import org.apache.nifi.util.TestRunners import org.apache.nifi.util.TestRunners
import org.junit.After import org.junit.After
import org.junit.Before import org.junit.Before
@ -11,6 +12,8 @@ import org.junit.runners.JUnit4
import org.slf4j.Logger import org.slf4j.Logger
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import java.util.concurrent.TimeUnit
import static org.junit.Assert.assertNotNull import static org.junit.Assert.assertNotNull
@RunWith(JUnit4.class) @RunWith(JUnit4.class)
@ -56,9 +59,6 @@ class ExecuteScriptGroovyTest extends BaseScriptTest {
// Override context value // Override context value
runner.processContext.maxConcurrentTasks = poolSize runner.processContext.maxConcurrentTasks = poolSize
logger.info("Overrode context max concurrent tasks to ${runner.processContext.maxConcurrentTasks}") logger.info("Overrode context max concurrent tasks to ${runner.processContext.maxConcurrentTasks}")
// Must set context properties on runner before calling setup with pool size
// executeScript.setup(poolSize)
} }
@Test @Test
@ -92,10 +92,7 @@ class ExecuteScriptGroovyTest extends BaseScriptTest {
runner.assertValid() runner.assertValid()
// Act // Act
ITERATIONS.times { int i -> runner.run(ITERATIONS)
logger.info("Running iteration ${i}")
runner.run()
}
// Assert // Assert
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS)
@ -125,10 +122,7 @@ class ExecuteScriptGroovyTest extends BaseScriptTest {
runner.assertValid() runner.assertValid()
// Act // Act
ITERATIONS.times { int i -> runner.run(ITERATIONS)
logger.info("Running iteration ${i}")
runner.run()
}
// Assert // Assert
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS)
@ -139,13 +133,56 @@ class ExecuteScriptGroovyTest extends BaseScriptTest {
flowFile.assertAttributeExists("time-updated") flowFile.assertAttributeExists("time-updated")
flowFile.assertAttributeExists("thread") flowFile.assertAttributeExists("thread")
flowFile.assertAttributeEquals("thread", "pool-${i + 1}-thread-1") assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-[1-${POOL_SIZE}]/
} }
} }
//testShouldExecuteScriptWithPool @Test
//testShouldHandleFailingScript public void testPooledExecutionShouldBeFaster() throws Exception {
//testShouldHandleNoAvailableEngine // Arrange
//testPooledExecutionShouldBeFaster final int ITERATIONS = 1000
//testPoolSizeVsThreadCount final int POOL_SIZE = 4
// Act
// Run serially and capture the timing
final StopWatch stopWatch = new StopWatch(true)
runner.run(ITERATIONS)
stopWatch.stop()
final long serialExecutionTime = stopWatch.getDuration(TimeUnit.MILLISECONDS)
logger.info("Serial execution time for ${ITERATIONS} executions: ${serialExecutionTime} ms")
// Assert (1)
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS)
final List<MockFlowFile> serialResults = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS)
// Now run parallel
setupPooledExecuteScript(POOL_SIZE)
logger.info("Set up ExecuteScript processor with pool size: ${POOL_SIZE}")
runner.setThreadCount(POOL_SIZE)
runner.assertValid()
stopWatch.start()
runner.run(ITERATIONS)
stopWatch.stop()
final long parallelExecutionTime = stopWatch.getDuration(TimeUnit.MILLISECONDS)
logger.info("Parallel execution time for ${ITERATIONS} executions using ${POOL_SIZE} threads: ${parallelExecutionTime} ms")
// Assert (2)
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS)
final List<MockFlowFile> parallelResults = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS)
parallelResults.eachWithIndex { MockFlowFile flowFile, int i ->
flowFile.assertAttributeExists("time-updated")
flowFile.assertAttributeExists("thread")
assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-[1-${POOL_SIZE}]/
}
serialResults.eachWithIndex { MockFlowFile flowFile, int i ->
flowFile.assertAttributeExists("time-updated")
flowFile.assertAttributeExists("thread")
assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-1/
}
assert serialExecutionTime > parallelExecutionTime
}
} }