mirror of https://github.com/apache/nifi.git
NIFI-1822 Removed debugging log statements for script engine queue size.
Added unit tests demonstrating pooled execution timing and thread usage.
This commit is contained in:
parent
bdbc4bab04
commit
9264428bd8
|
@ -162,7 +162,6 @@ public class ExecuteScript extends AbstractScriptProcessor {
|
||||||
}
|
}
|
||||||
ScriptEngine scriptEngine = engineQ.poll();
|
ScriptEngine scriptEngine = engineQ.poll();
|
||||||
ProcessorLog log = getLogger();
|
ProcessorLog log = getLogger();
|
||||||
log.info("[REMOVE] After polling engine, Script Engine queue size: " + engineQ.size());
|
|
||||||
if (scriptEngine == null) {
|
if (scriptEngine == null) {
|
||||||
// No engine available so nothing more to do here
|
// No engine available so nothing more to do here
|
||||||
return;
|
return;
|
||||||
|
@ -218,7 +217,6 @@ public class ExecuteScript extends AbstractScriptProcessor {
|
||||||
throw t;
|
throw t;
|
||||||
} finally {
|
} finally {
|
||||||
engineQ.offer(scriptEngine);
|
engineQ.offer(scriptEngine);
|
||||||
log.info("[REMOVE] After offering engine, Script Engine queue size: " + engineQ.size());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package org.apache.nifi.processors.script
|
package org.apache.nifi.processors.script
|
||||||
|
|
||||||
import org.apache.nifi.util.MockFlowFile
|
import org.apache.nifi.util.MockFlowFile
|
||||||
|
import org.apache.nifi.util.StopWatch
|
||||||
import org.apache.nifi.util.TestRunners
|
import org.apache.nifi.util.TestRunners
|
||||||
import org.junit.After
|
import org.junit.After
|
||||||
import org.junit.Before
|
import org.junit.Before
|
||||||
|
@ -11,6 +12,8 @@ import org.junit.runners.JUnit4
|
||||||
import org.slf4j.Logger
|
import org.slf4j.Logger
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
import static org.junit.Assert.assertNotNull
|
import static org.junit.Assert.assertNotNull
|
||||||
|
|
||||||
@RunWith(JUnit4.class)
|
@RunWith(JUnit4.class)
|
||||||
|
@ -56,9 +59,6 @@ class ExecuteScriptGroovyTest extends BaseScriptTest {
|
||||||
// Override context value
|
// Override context value
|
||||||
runner.processContext.maxConcurrentTasks = poolSize
|
runner.processContext.maxConcurrentTasks = poolSize
|
||||||
logger.info("Overrode context max concurrent tasks to ${runner.processContext.maxConcurrentTasks}")
|
logger.info("Overrode context max concurrent tasks to ${runner.processContext.maxConcurrentTasks}")
|
||||||
|
|
||||||
// Must set context properties on runner before calling setup with pool size
|
|
||||||
// executeScript.setup(poolSize)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -92,10 +92,7 @@ class ExecuteScriptGroovyTest extends BaseScriptTest {
|
||||||
runner.assertValid()
|
runner.assertValid()
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
ITERATIONS.times { int i ->
|
runner.run(ITERATIONS)
|
||||||
logger.info("Running iteration ${i}")
|
|
||||||
runner.run()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS)
|
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS)
|
||||||
|
@ -125,10 +122,7 @@ class ExecuteScriptGroovyTest extends BaseScriptTest {
|
||||||
runner.assertValid()
|
runner.assertValid()
|
||||||
|
|
||||||
// Act
|
// Act
|
||||||
ITERATIONS.times { int i ->
|
runner.run(ITERATIONS)
|
||||||
logger.info("Running iteration ${i}")
|
|
||||||
runner.run()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Assert
|
// Assert
|
||||||
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS)
|
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS)
|
||||||
|
@ -139,13 +133,56 @@ class ExecuteScriptGroovyTest extends BaseScriptTest {
|
||||||
|
|
||||||
flowFile.assertAttributeExists("time-updated")
|
flowFile.assertAttributeExists("time-updated")
|
||||||
flowFile.assertAttributeExists("thread")
|
flowFile.assertAttributeExists("thread")
|
||||||
flowFile.assertAttributeEquals("thread", "pool-${i + 1}-thread-1")
|
assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-[1-${POOL_SIZE}]/
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//testShouldExecuteScriptWithPool
|
@Test
|
||||||
//testShouldHandleFailingScript
|
public void testPooledExecutionShouldBeFaster() throws Exception {
|
||||||
//testShouldHandleNoAvailableEngine
|
// Arrange
|
||||||
//testPooledExecutionShouldBeFaster
|
final int ITERATIONS = 1000
|
||||||
//testPoolSizeVsThreadCount
|
final int POOL_SIZE = 4
|
||||||
|
|
||||||
|
// Act
|
||||||
|
// Run serially and capture the timing
|
||||||
|
final StopWatch stopWatch = new StopWatch(true)
|
||||||
|
runner.run(ITERATIONS)
|
||||||
|
stopWatch.stop()
|
||||||
|
final long serialExecutionTime = stopWatch.getDuration(TimeUnit.MILLISECONDS)
|
||||||
|
logger.info("Serial execution time for ${ITERATIONS} executions: ${serialExecutionTime} ms")
|
||||||
|
|
||||||
|
// Assert (1)
|
||||||
|
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS)
|
||||||
|
final List<MockFlowFile> serialResults = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS)
|
||||||
|
|
||||||
|
// Now run parallel
|
||||||
|
setupPooledExecuteScript(POOL_SIZE)
|
||||||
|
logger.info("Set up ExecuteScript processor with pool size: ${POOL_SIZE}")
|
||||||
|
runner.setThreadCount(POOL_SIZE)
|
||||||
|
runner.assertValid()
|
||||||
|
|
||||||
|
stopWatch.start()
|
||||||
|
runner.run(ITERATIONS)
|
||||||
|
stopWatch.stop()
|
||||||
|
final long parallelExecutionTime = stopWatch.getDuration(TimeUnit.MILLISECONDS)
|
||||||
|
logger.info("Parallel execution time for ${ITERATIONS} executions using ${POOL_SIZE} threads: ${parallelExecutionTime} ms")
|
||||||
|
|
||||||
|
// Assert (2)
|
||||||
|
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS)
|
||||||
|
final List<MockFlowFile> parallelResults = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS)
|
||||||
|
|
||||||
|
parallelResults.eachWithIndex { MockFlowFile flowFile, int i ->
|
||||||
|
flowFile.assertAttributeExists("time-updated")
|
||||||
|
flowFile.assertAttributeExists("thread")
|
||||||
|
assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-[1-${POOL_SIZE}]/
|
||||||
|
}
|
||||||
|
|
||||||
|
serialResults.eachWithIndex { MockFlowFile flowFile, int i ->
|
||||||
|
flowFile.assertAttributeExists("time-updated")
|
||||||
|
flowFile.assertAttributeExists("thread")
|
||||||
|
assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-1/
|
||||||
|
}
|
||||||
|
|
||||||
|
assert serialExecutionTime > parallelExecutionTime
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue