NIFI-1822 Enabled pooled ExecuteScript engines to facilitate concurrent processing.

Removed unused variable in unit test. (+10 squashed commits)
Squashed commits:
[7c5acc1] NIFI-1822 Removed trailing whitespace to conform with checkstyle rules.
[cb108cd] NIFI-1822 Added ASF License to unit test.
[9264428] NIFI-1822 Removed debugging log statements for script engine queue size.
Added unit tests demonstrating pooled execution timing and thread usage.
[bdbc4ba] NIFI-1822 Renamed reference to MockProcessorContext#setNumThreads to setMaxConcurrentTasks after refactor.
[12bbe82] NIFI-1822 Moved unit test to correct directory.
Added test script resource which generates flowfile and updates attribute with current thread.
Added tests for single run, serial run, and pooled run (not complete).
[4c174c8] NIFI-1822 Added debugging messages to script execution.
[8ab0ce5] NIFI-1822 Added for loop to instantiate multiple script engines in queue.
[8c5ba51] NIFI-1822 Added variable max concurrent task field in MockProcessorContext because it was previously hardcoded to 1. Changed setNumThreads to setMaxConcurrentTasks to maintain naming convention.
[fd9120c] NIFI-1822 Added unit test skeleton for pooled script processor execution.
[23e4f68] NIFI-1822: Allow concurrent execution in ExecuteScript

This closes #443.

Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
Matt Burgess 2016-04-28 09:53:16 -04:00 committed by Andy LoPresto
parent a81c204565
commit c13fb3159f
No known key found for this signature in database
GPG Key ID: 3C6EF65B2F7DEF69
7 changed files with 312 additions and 64 deletions

View File

@ -27,7 +27,6 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.nifi.attribute.expression.language.Query;
import org.apache.nifi.attribute.expression.language.Query.Range;
import org.apache.nifi.components.ConfigurableComponent;
@ -55,7 +54,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
private boolean allowExpressionValidation = true;
private volatile boolean incomingConnection = true;
private volatile boolean nonLoopConnection = true;
private int numThreads = 1;
private int maxConcurrentTasks = 1;
private volatile Set<Relationship> connections = new HashSet<>();
private volatile Set<Relationship> unavailableRelationships = new HashSet<>();
@ -175,7 +174,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
@Override
public int getMaxConcurrentTasks() {
return numThreads;
return maxConcurrentTasks;
}
public void setAnnotationData(final String annotationData) {
@ -362,7 +361,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements S
return "";
}
protected void setNumThreads(int numThreads) {
this.numThreads = numThreads;
protected void setMaxConcurrentTasks(int maxConcurrentTasks) {
this.maxConcurrentTasks = maxConcurrentTasks;
}
}

View File

@ -43,7 +43,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.lifecycle.OnAdded;
import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored;
@ -436,7 +435,7 @@ public class StandardProcessorTestRunner implements TestRunner {
@Override
public void enqueue(final String data) {
enqueue(data.getBytes(StandardCharsets.UTF_8), Collections.<String, String> emptyMap());
enqueue(data.getBytes(StandardCharsets.UTF_8), Collections.emptyMap());
}
@Override
@ -565,7 +564,7 @@ public class StandardProcessorTestRunner implements TestRunner {
}
this.numThreads = threadCount;
this.context.setNumThreads(threadCount);
this.context.setMaxConcurrentTasks(threadCount);
}
@Override

View File

@ -16,21 +16,6 @@
*/
package org.apache.nifi.processors.script;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineFactory;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import java.io.File;
import java.net.MalformedURLException;
import java.net.URL;
@ -48,8 +33,25 @@ import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.script.ScriptEngine;
import javax.script.ScriptEngineFactory;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
/**
* This class contains variables and methods common to scripting processors
@ -102,7 +104,8 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
protected String scriptBody;
protected String[] modules;
protected List<PropertyDescriptor> descriptors;
protected ScriptEngine scriptEngine;
protected BlockingQueue<ScriptEngine> engineQ = null;
/**
* Custom validation for ensuring exactly one of Script File or Script Body is populated
@ -197,7 +200,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
* Performs common setup operations when the processor is scheduled to run. This method assumes the member
* variables associated with properties have been filled.
*/
public void setup() {
public void setup(int numberOfScriptEngines) {
if (scriptEngineConfiguratorMap.isEmpty()) {
ServiceLoader<ScriptEngineConfigurator> configuratorServiceLoader =
@ -206,7 +209,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
scriptEngineConfiguratorMap.put(configurator.getScriptEngineName().toLowerCase(), configurator);
}
}
setupEngine();
setupEngines(numberOfScriptEngines);
}
/**
@ -216,11 +219,16 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
*
* @see org.apache.nifi.processors.script.ScriptEngineConfigurator
*/
protected void setupEngine() {
protected void setupEngines(int numberOfScriptEngines) {
engineQ = new LinkedBlockingQueue<>(numberOfScriptEngines);
ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
try {
ProcessorLog log = getLogger();
if (StringUtils.isBlank(scriptEngineName)) {
throw new IllegalArgumentException("The script engine name cannot be null");
}
ScriptEngineConfigurator configurator = scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase());
// Get a list of URLs from the configurator (if present), or just convert modules from Strings to URLs
@ -248,18 +256,21 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
Thread.currentThread().setContextClassLoader(scriptEngineModuleClassLoader);
}
scriptEngine = createScriptEngine();
try {
if (configurator != null) {
configurator.init(scriptEngine, modules);
}
} catch (ScriptException se) {
log.error("Error initializing script engine configurator {}", new Object[]{scriptEngineName});
if (log.isDebugEnabled()) {
log.error("Error initializing script engine configurator", se);
for (int i = 0; i < numberOfScriptEngines; i++) {
ScriptEngine scriptEngine = createScriptEngine();
try {
if (configurator != null) {
configurator.init(scriptEngine, modules);
}
engineQ.offer(scriptEngine);
} catch (ScriptException se) {
log.error("Error initializing script engine configurator {}", new Object[]{scriptEngineName});
if (log.isDebugEnabled()) {
log.error("Error initializing script engine configurator", se);
}
}
}
} finally {
// Restore original context class loader
Thread.currentThread().setContextClassLoader(originalContextClassLoader);
@ -300,4 +311,11 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc
return new URLClassLoader(modules, thisClassLoader);
}
@OnStopped
public void stop() {
if (engineQ != null) {
engineQ.clear();
}
}
}

View File

@ -16,15 +16,25 @@
*/
package org.apache.nifi.processors.script;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.script.Bindings;
import javax.script.ScriptContext;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import javax.script.SimpleBindings;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.TriggerSerially;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.logging.ProcessorLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
@ -33,19 +43,6 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StringUtils;
import javax.script.Bindings;
import javax.script.ScriptContext;
import javax.script.ScriptException;
import javax.script.SimpleBindings;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@TriggerSerially
@Tags({"script", "execute", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj"})
@CapabilityDescription("Experimental - Executes a script given the flow file and a process session. The script is responsible for "
+ "handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by "
@ -128,7 +125,9 @@ public class ExecuteScript extends AbstractScriptProcessor {
} else {
modules = new String[0];
}
super.setup();
// Create a script engine for each possible task
int maxTasks = context.getMaxConcurrentTasks();
super.setup(maxTasks);
scriptToRun = scriptBody;
try {
@ -161,7 +160,12 @@ public class ExecuteScript extends AbstractScriptProcessor {
createResources();
}
}
ScriptEngine scriptEngine = engineQ.poll();
ProcessorLog log = getLogger();
if (scriptEngine == null) {
// No engine available so nothing more to do here
return;
}
ProcessSession session = sessionFactory.createSession();
try {
@ -211,11 +215,8 @@ public class ExecuteScript extends AbstractScriptProcessor {
getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t});
session.rollback(true);
throw t;
} finally {
engineQ.offer(scriptEngine);
}
}
@OnStopped
public void stop() {
scriptEngine = null;
}
}

View File

@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.script.Invocable;
import javax.script.ScriptEngine;
import javax.script.ScriptException;
import org.apache.commons.io.IOUtils;
@ -67,6 +68,8 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
private AtomicBoolean scriptNeedsReload = new AtomicBoolean(true);
private ScriptEngine scriptEngine = null;
/**
* Returns the valid relationships for this processor. SUCCESS and FAILURE are always returned, and if the script
* processor has defined additional relationships, those will be added as well.
@ -174,9 +177,14 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
setup();
}
@Override
public void setup() {
super.setup();
// Create a single script engine, the Processor object is reused by each task
super.setup(1);
scriptEngine = engineQ.poll();
if (scriptEngine == null) {
throw new ProcessException("No script engine available!");
}
if (scriptNeedsReload.get() || processor.get() == null) {
if (isFile(scriptPath)) {
reloadScriptFile(scriptPath);
@ -386,7 +394,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
Collection<ValidationResult> commonValidationResults = super.customValidate(context);
if(!commonValidationResults.isEmpty()) {
if (!commonValidationResults.isEmpty()) {
return commonValidationResults;
}
@ -486,6 +494,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor {
@OnStopped
public void stop() {
super.stop();
processor.set(null);
}
}

View File

@ -0,0 +1,200 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.script
import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.StopWatch
import org.apache.nifi.util.TestRunners
import org.junit.After
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.util.concurrent.TimeUnit
import static org.junit.Assert.assertNotNull
@RunWith(JUnit4.class)
class ExecuteScriptGroovyTest extends BaseScriptTest {
private static final Logger logger = LoggerFactory.getLogger(ExecuteScriptGroovyTest.class)
@BeforeClass
public static void setUpOnce() throws Exception {
logger.metaClass.methodMissing = { String name, args ->
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
}
}
@Before
public void setUp() throws Exception {
super.setupExecuteScript()
runner.setValidateExpressionUsage(false)
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy")
runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy")
runner.setProperty(ExecuteScript.MODULES, TEST_RESOURCE_LOCATION + "groovy")
}
@After
public void tearDown() throws Exception {
}
private void setupPooledExecuteScript(int poolSize = 2) {
final ExecuteScript executeScript = new ExecuteScript()
// Need to do something to initialize the properties, like retrieve the list of properties
assertNotNull(executeScript.getSupportedPropertyDescriptors())
runner = TestRunners.newTestRunner(executeScript)
runner.setValidateExpressionUsage(false)
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy")
runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy")
runner.setProperty(ExecuteScript.MODULES, TEST_RESOURCE_LOCATION + "groovy")
// Override context value
runner.processContext.maxConcurrentTasks = poolSize
logger.info("Overrode context max concurrent tasks to ${runner.processContext.maxConcurrentTasks}")
}
@Test
public void testShouldExecuteScript() throws Exception {
// Arrange
final String SINGLE_POOL_THREAD_PATTERN = /pool-\d+-thread-1/
logger.info("Mock flowfile queue contents: ${runner.queueSize} ${runner.flowFileQueue.queue}")
runner.assertValid()
// Act
runner.run()
// Assert
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, 1)
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS)
MockFlowFile flowFile = result.get(0)
logger.info("Resulting flowfile attributes: ${flowFile.attributes}")
flowFile.assertAttributeExists("time-updated")
flowFile.assertAttributeExists("thread")
assert flowFile.getAttribute("thread") =~ SINGLE_POOL_THREAD_PATTERN
}
@Test
public void testShouldExecuteScriptSerially() throws Exception {
// Arrange
final int ITERATIONS = 10
logger.info("Mock flowfile queue contents: ${runner.queueSize} ${runner.flowFileQueue.queue}")
runner.assertValid()
// Act
runner.run(ITERATIONS)
// Assert
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS)
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS)
result.eachWithIndex { MockFlowFile flowFile, int i ->
logger.info("Resulting flowfile [${i}] attributes: ${flowFile.attributes}")
flowFile.assertAttributeExists("time-updated")
flowFile.assertAttributeExists("thread")
assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-1/
}
}
@Test
public void testShouldExecuteScriptWithPool() throws Exception {
// Arrange
final int ITERATIONS = 10
final int POOL_SIZE = 2
setupPooledExecuteScript(POOL_SIZE)
logger.info("Set up ExecuteScript processor with pool size: ${POOL_SIZE}")
runner.setThreadCount(POOL_SIZE)
logger.info("Mock flowfile queue contents: ${runner.queueSize} ${runner.flowFileQueue.queue}")
runner.assertValid()
// Act
runner.run(ITERATIONS)
// Assert
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS)
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS)
result.eachWithIndex { MockFlowFile flowFile, int i ->
logger.info("Resulting flowfile [${i}] attributes: ${flowFile.attributes}")
flowFile.assertAttributeExists("time-updated")
flowFile.assertAttributeExists("thread")
assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-[1-${POOL_SIZE}]/
}
}
@Test
public void testPooledExecutionShouldBeFaster() throws Exception {
// Arrange
final int ITERATIONS = 1000
final int POOL_SIZE = 4
// Act
// Run serially and capture the timing
final StopWatch stopWatch = new StopWatch(true)
runner.run(ITERATIONS)
stopWatch.stop()
final long serialExecutionTime = stopWatch.getDuration(TimeUnit.MILLISECONDS)
logger.info("Serial execution time for ${ITERATIONS} executions: ${serialExecutionTime} ms")
// Assert (1)
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS)
final List<MockFlowFile> serialResults = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS)
// Now run parallel
setupPooledExecuteScript(POOL_SIZE)
logger.info("Set up ExecuteScript processor with pool size: ${POOL_SIZE}")
runner.setThreadCount(POOL_SIZE)
runner.assertValid()
stopWatch.start()
runner.run(ITERATIONS)
stopWatch.stop()
final long parallelExecutionTime = stopWatch.getDuration(TimeUnit.MILLISECONDS)
logger.info("Parallel execution time for ${ITERATIONS} executions using ${POOL_SIZE} threads: ${parallelExecutionTime} ms")
// Assert (2)
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS)
final List<MockFlowFile> parallelResults = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS)
parallelResults.eachWithIndex { MockFlowFile flowFile, int i ->
flowFile.assertAttributeExists("time-updated")
flowFile.assertAttributeExists("thread")
assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-[1-${POOL_SIZE}]/
}
serialResults.eachWithIndex { MockFlowFile flowFile, int i ->
flowFile.assertAttributeExists("time-updated")
flowFile.assertAttributeExists("thread")
assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-1/
}
assert serialExecutionTime > parallelExecutionTime
}
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
def flowFile = session.create()
flowFile = session.putAttribute(flowFile, "time-updated", new Date().toString())
flowFile = session.putAttribute(flowFile, "thread", Thread.currentThread().getName())
session.transfer(flowFile, REL_SUCCESS)
// TODO: Way to add the pool "name" or script engine identifier to the flowfile?