diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java index b6272ee096..81429e7f7c 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockProcessContext.java @@ -27,7 +27,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; - import org.apache.nifi.attribute.expression.language.Query; import org.apache.nifi.attribute.expression.language.Query.Range; import org.apache.nifi.components.ConfigurableComponent; @@ -55,7 +54,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements S private boolean allowExpressionValidation = true; private volatile boolean incomingConnection = true; private volatile boolean nonLoopConnection = true; - private int numThreads = 1; + private int maxConcurrentTasks = 1; private volatile Set connections = new HashSet<>(); private volatile Set unavailableRelationships = new HashSet<>(); @@ -175,7 +174,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements S @Override public int getMaxConcurrentTasks() { - return numThreads; + return maxConcurrentTasks; } public void setAnnotationData(final String annotationData) { @@ -362,7 +361,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements S return ""; } - protected void setNumThreads(int numThreads) { - this.numThreads = numThreads; + protected void setMaxConcurrentTasks(int maxConcurrentTasks) { + this.maxConcurrentTasks = maxConcurrentTasks; } } diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java index 2f62ac214c..edc0e65c26 100644 --- a/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java +++ b/nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java @@ -43,7 +43,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; - import org.apache.nifi.annotation.behavior.TriggerSerially; import org.apache.nifi.annotation.lifecycle.OnAdded; import org.apache.nifi.annotation.lifecycle.OnConfigurationRestored; @@ -436,7 +435,7 @@ public class StandardProcessorTestRunner implements TestRunner { @Override public void enqueue(final String data) { - enqueue(data.getBytes(StandardCharsets.UTF_8), Collections. emptyMap()); + enqueue(data.getBytes(StandardCharsets.UTF_8), Collections.emptyMap()); } @Override @@ -565,7 +564,7 @@ public class StandardProcessorTestRunner implements TestRunner { } this.numThreads = threadCount; - this.context.setNumThreads(threadCount); + this.context.setMaxConcurrentTasks(threadCount); } @Override diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java index b7e73bc950..a34c83c0d1 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/AbstractScriptProcessor.java @@ -16,21 +16,6 @@ */ package org.apache.nifi.processors.script; -import org.apache.nifi.components.AllowableValue; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.components.ValidationContext; -import org.apache.nifi.components.ValidationResult; -import org.apache.nifi.components.Validator; -import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.processor.AbstractSessionFactoryProcessor; -import org.apache.nifi.processor.Relationship; -import org.apache.nifi.processor.util.StandardValidators; -import org.apache.nifi.util.StringUtils; - -import javax.script.ScriptEngine; -import javax.script.ScriptEngineFactory; -import javax.script.ScriptEngineManager; -import javax.script.ScriptException; import java.io.File; import java.net.MalformedURLException; import java.net.URL; @@ -48,8 +33,25 @@ import java.util.List; import java.util.Map; import java.util.ServiceLoader; import java.util.Set; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import javax.script.ScriptEngine; +import javax.script.ScriptEngineFactory; +import javax.script.ScriptEngineManager; +import javax.script.ScriptException; +import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.components.Validator; +import org.apache.nifi.logging.ProcessorLog; +import org.apache.nifi.processor.AbstractSessionFactoryProcessor; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.util.StringUtils; /** * This class contains variables and methods common to scripting processors @@ -102,7 +104,8 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc protected String scriptBody; protected String[] modules; protected List descriptors; - protected ScriptEngine scriptEngine; + + protected BlockingQueue engineQ = null; /** * Custom validation for ensuring exactly one of Script File or Script Body is populated @@ -197,7 +200,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc * Performs common setup operations when the processor is scheduled to run. This method assumes the member * variables associated with properties have been filled. */ - public void setup() { + public void setup(int numberOfScriptEngines) { if (scriptEngineConfiguratorMap.isEmpty()) { ServiceLoader configuratorServiceLoader = @@ -206,7 +209,7 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc scriptEngineConfiguratorMap.put(configurator.getScriptEngineName().toLowerCase(), configurator); } } - setupEngine(); + setupEngines(numberOfScriptEngines); } /** @@ -216,11 +219,16 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc * * @see org.apache.nifi.processors.script.ScriptEngineConfigurator */ - protected void setupEngine() { + protected void setupEngines(int numberOfScriptEngines) { + engineQ = new LinkedBlockingQueue<>(numberOfScriptEngines); ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader(); try { ProcessorLog log = getLogger(); + if (StringUtils.isBlank(scriptEngineName)) { + throw new IllegalArgumentException("The script engine name cannot be null"); + } + ScriptEngineConfigurator configurator = scriptEngineConfiguratorMap.get(scriptEngineName.toLowerCase()); // Get a list of URLs from the configurator (if present), or just convert modules from Strings to URLs @@ -248,18 +256,21 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc Thread.currentThread().setContextClassLoader(scriptEngineModuleClassLoader); } - scriptEngine = createScriptEngine(); - try { - if (configurator != null) { - configurator.init(scriptEngine, modules); - } - } catch (ScriptException se) { - log.error("Error initializing script engine configurator {}", new Object[]{scriptEngineName}); - if (log.isDebugEnabled()) { - log.error("Error initializing script engine configurator", se); + for (int i = 0; i < numberOfScriptEngines; i++) { + ScriptEngine scriptEngine = createScriptEngine(); + try { + if (configurator != null) { + configurator.init(scriptEngine, modules); + } + engineQ.offer(scriptEngine); + + } catch (ScriptException se) { + log.error("Error initializing script engine configurator {}", new Object[]{scriptEngineName}); + if (log.isDebugEnabled()) { + log.error("Error initializing script engine configurator", se); + } } } - } finally { // Restore original context class loader Thread.currentThread().setContextClassLoader(originalContextClassLoader); @@ -300,4 +311,11 @@ public abstract class AbstractScriptProcessor extends AbstractSessionFactoryProc return new URLClassLoader(modules, thisClassLoader); } + + @OnStopped + public void stop() { + if (engineQ != null) { + engineQ.clear(); + } + } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java index 3f7c202d51..f9fe2e3b3e 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/ExecuteScript.java @@ -16,15 +16,25 @@ */ package org.apache.nifi.processors.script; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import javax.script.Bindings; +import javax.script.ScriptContext; +import javax.script.ScriptEngine; +import javax.script.ScriptException; +import javax.script.SimpleBindings; import org.apache.commons.io.IOUtils; import org.apache.nifi.annotation.behavior.DynamicProperty; -import org.apache.nifi.annotation.behavior.TriggerSerially; -import org.apache.nifi.annotation.lifecycle.OnStopped; -import org.apache.nifi.components.PropertyDescriptor; -import org.apache.nifi.logging.ProcessorLog; -import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnScheduled; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.logging.ProcessorLog; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; import org.apache.nifi.processor.ProcessSessionFactory; @@ -33,19 +43,6 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StringUtils; -import javax.script.Bindings; -import javax.script.ScriptContext; -import javax.script.ScriptException; -import javax.script.SimpleBindings; -import java.io.FileInputStream; -import java.io.IOException; -import java.util.Collections; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -@TriggerSerially @Tags({"script", "execute", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj"}) @CapabilityDescription("Experimental - Executes a script given the flow file and a process session. The script is responsible for " + "handling the incoming flow file (transfer to SUCCESS or remove, e.g.) as well as any flow files created by " @@ -128,7 +125,9 @@ public class ExecuteScript extends AbstractScriptProcessor { } else { modules = new String[0]; } - super.setup(); + // Create a script engine for each possible task + int maxTasks = context.getMaxConcurrentTasks(); + super.setup(maxTasks); scriptToRun = scriptBody; try { @@ -161,7 +160,12 @@ public class ExecuteScript extends AbstractScriptProcessor { createResources(); } } + ScriptEngine scriptEngine = engineQ.poll(); ProcessorLog log = getLogger(); + if (scriptEngine == null) { + // No engine available so nothing more to do here + return; + } ProcessSession session = sessionFactory.createSession(); try { @@ -211,11 +215,8 @@ public class ExecuteScript extends AbstractScriptProcessor { getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t}); session.rollback(true); throw t; + } finally { + engineQ.offer(scriptEngine); } } - - @OnStopped - public void stop() { - scriptEngine = null; - } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java index 2f287b6103..06c0a7f18b 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/processors/script/InvokeScriptedProcessor.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.script.Invocable; +import javax.script.ScriptEngine; import javax.script.ScriptException; import org.apache.commons.io.IOUtils; @@ -67,6 +68,8 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { private AtomicBoolean scriptNeedsReload = new AtomicBoolean(true); + private ScriptEngine scriptEngine = null; + /** * Returns the valid relationships for this processor. SUCCESS and FAILURE are always returned, and if the script * processor has defined additional relationships, those will be added as well. @@ -174,9 +177,14 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { setup(); } - @Override public void setup() { - super.setup(); + // Create a single script engine, the Processor object is reused by each task + super.setup(1); + scriptEngine = engineQ.poll(); + if (scriptEngine == null) { + throw new ProcessException("No script engine available!"); + } + if (scriptNeedsReload.get() || processor.get() == null) { if (isFile(scriptPath)) { reloadScriptFile(scriptPath); @@ -386,7 +394,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { protected Collection customValidate(final ValidationContext context) { Collection commonValidationResults = super.customValidate(context); - if(!commonValidationResults.isEmpty()) { + if (!commonValidationResults.isEmpty()) { return commonValidationResults; } @@ -486,6 +494,7 @@ public class InvokeScriptedProcessor extends AbstractScriptProcessor { @OnStopped public void stop() { + super.stop(); processor.set(null); } } diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy new file mode 100644 index 0000000000..6dbc5e1008 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.script + +import org.apache.nifi.util.MockFlowFile +import org.apache.nifi.util.StopWatch +import org.apache.nifi.util.TestRunners +import org.junit.After +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import java.util.concurrent.TimeUnit + +import static org.junit.Assert.assertNotNull + +@RunWith(JUnit4.class) +class ExecuteScriptGroovyTest extends BaseScriptTest { + private static final Logger logger = LoggerFactory.getLogger(ExecuteScriptGroovyTest.class) + + @BeforeClass + public static void setUpOnce() throws Exception { + logger.metaClass.methodMissing = { String name, args -> + logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") + } + } + + @Before + public void setUp() throws Exception { + super.setupExecuteScript() + + runner.setValidateExpressionUsage(false) + runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy") + runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy") + runner.setProperty(ExecuteScript.MODULES, TEST_RESOURCE_LOCATION + "groovy") + } + + @After + public void tearDown() throws Exception { + + } + + private void setupPooledExecuteScript(int poolSize = 2) { + final ExecuteScript executeScript = new ExecuteScript() + // Need to do something to initialize the properties, like retrieve the list of properties + assertNotNull(executeScript.getSupportedPropertyDescriptors()) + runner = TestRunners.newTestRunner(executeScript) + runner.setValidateExpressionUsage(false) + runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy") + runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy") + runner.setProperty(ExecuteScript.MODULES, TEST_RESOURCE_LOCATION + "groovy") + + // Override context value + runner.processContext.maxConcurrentTasks = poolSize + logger.info("Overrode context max concurrent tasks to ${runner.processContext.maxConcurrentTasks}") + } + + @Test + public void testShouldExecuteScript() throws Exception { + // Arrange + final String SINGLE_POOL_THREAD_PATTERN = /pool-\d+-thread-1/ + + logger.info("Mock flowfile queue contents: ${runner.queueSize} ${runner.flowFileQueue.queue}") + runner.assertValid() + + // Act + runner.run() + + // Assert + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, 1) + final List result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + MockFlowFile flowFile = result.get(0) + logger.info("Resulting flowfile attributes: ${flowFile.attributes}") + + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + assert flowFile.getAttribute("thread") =~ SINGLE_POOL_THREAD_PATTERN + } + + @Test + public void testShouldExecuteScriptSerially() throws Exception { + // Arrange + final int ITERATIONS = 10 + + logger.info("Mock flowfile queue contents: ${runner.queueSize} ${runner.flowFileQueue.queue}") + runner.assertValid() + + // Act + runner.run(ITERATIONS) + + // Assert + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) + final List result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + + result.eachWithIndex { MockFlowFile flowFile, int i -> + logger.info("Resulting flowfile [${i}] attributes: ${flowFile.attributes}") + + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-1/ + } + } + + @Test + public void testShouldExecuteScriptWithPool() throws Exception { + // Arrange + final int ITERATIONS = 10 + final int POOL_SIZE = 2 + + setupPooledExecuteScript(POOL_SIZE) + logger.info("Set up ExecuteScript processor with pool size: ${POOL_SIZE}") + + runner.setThreadCount(POOL_SIZE) + + logger.info("Mock flowfile queue contents: ${runner.queueSize} ${runner.flowFileQueue.queue}") + runner.assertValid() + + // Act + runner.run(ITERATIONS) + + // Assert + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) + final List result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + + result.eachWithIndex { MockFlowFile flowFile, int i -> + logger.info("Resulting flowfile [${i}] attributes: ${flowFile.attributes}") + + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-[1-${POOL_SIZE}]/ + } + } + + @Test + public void testPooledExecutionShouldBeFaster() throws Exception { + // Arrange + final int ITERATIONS = 1000 + final int POOL_SIZE = 4 + + // Act + // Run serially and capture the timing + final StopWatch stopWatch = new StopWatch(true) + runner.run(ITERATIONS) + stopWatch.stop() + final long serialExecutionTime = stopWatch.getDuration(TimeUnit.MILLISECONDS) + logger.info("Serial execution time for ${ITERATIONS} executions: ${serialExecutionTime} ms") + + // Assert (1) + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) + final List serialResults = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + + // Now run parallel + setupPooledExecuteScript(POOL_SIZE) + logger.info("Set up ExecuteScript processor with pool size: ${POOL_SIZE}") + runner.setThreadCount(POOL_SIZE) + runner.assertValid() + + stopWatch.start() + runner.run(ITERATIONS) + stopWatch.stop() + final long parallelExecutionTime = stopWatch.getDuration(TimeUnit.MILLISECONDS) + logger.info("Parallel execution time for ${ITERATIONS} executions using ${POOL_SIZE} threads: ${parallelExecutionTime} ms") + + // Assert (2) + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) + final List parallelResults = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + + parallelResults.eachWithIndex { MockFlowFile flowFile, int i -> + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-[1-${POOL_SIZE}]/ + } + + serialResults.eachWithIndex { MockFlowFile flowFile, int i -> + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-1/ + } + + assert serialExecutionTime > parallelExecutionTime + } +} diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testAddTimeAndThreadAttribute.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testAddTimeAndThreadAttribute.groovy new file mode 100644 index 0000000000..a9e599cacb --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testAddTimeAndThreadAttribute.groovy @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +def flowFile = session.create() +flowFile = session.putAttribute(flowFile, "time-updated", new Date().toString()) +flowFile = session.putAttribute(flowFile, "thread", Thread.currentThread().getName()) +session.transfer(flowFile, REL_SUCCESS) + +// TODO: Way to add the pool "name" or script engine identifier to the flowfile? \ No newline at end of file