diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy new file mode 100644 index 0000000000..4a9c3c9849 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy @@ -0,0 +1,151 @@ +package org.apache.nifi.processors.script + +import org.apache.nifi.util.MockFlowFile +import org.apache.nifi.util.TestRunners +import org.junit.After +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import static org.junit.Assert.assertNotNull + +@RunWith(JUnit4.class) +class ExecuteScriptGroovyTest extends BaseScriptTest { + private static final Logger logger = LoggerFactory.getLogger(ExecuteScriptGroovyTest.class) + + private static final String TEST_CSV_DATA = "gender,title,first,last\n" + + "female,miss,marlene,shaw\n" + + "male,mr,todd,graham" + + @BeforeClass + public static void setUpOnce() throws Exception { + logger.metaClass.methodMissing = { String name, args -> + logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") + } + } + + @Before + public void setUp() throws Exception { + super.setupExecuteScript() + + runner.setValidateExpressionUsage(false) + runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy") + runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy") + runner.setProperty(ExecuteScript.MODULES, TEST_RESOURCE_LOCATION + "groovy") + } + + @After + public void tearDown() throws Exception { + + } + + private void setupPooledExecuteScript(int poolSize = 2) { + final ExecuteScript executeScript = new ExecuteScript() + // Need to do something to initialize the properties, like retrieve the list of properties + assertNotNull(executeScript.getSupportedPropertyDescriptors()) + runner = TestRunners.newTestRunner(executeScript) + runner.setValidateExpressionUsage(false) + runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy") + runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy") + runner.setProperty(ExecuteScript.MODULES, TEST_RESOURCE_LOCATION + "groovy") + + // Override context value + runner.processContext.maxConcurrentTasks = poolSize + logger.info("Overrode context max concurrent tasks to ${runner.processContext.maxConcurrentTasks}") + + // Must set context properties on runner before calling setup with pool size +// executeScript.setup(poolSize) + } + + @Test + public void testShouldExecuteScript() throws Exception { + // Arrange + final String SINGLE_POOL_THREAD_PATTERN = /pool-\d+-thread-1/ + + logger.info("Mock flowfile queue contents: ${runner.queueSize} ${runner.flowFileQueue.queue}") + runner.assertValid() + + // Act + runner.run() + + // Assert + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, 1) + final List result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + MockFlowFile flowFile = result.get(0) + logger.info("Resulting flowfile attributes: ${flowFile.attributes}") + + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + assert flowFile.getAttribute("thread") =~ SINGLE_POOL_THREAD_PATTERN + } + + @Test + public void testShouldExecuteScriptSerially() throws Exception { + // Arrange + final int ITERATIONS = 10 + + logger.info("Mock flowfile queue contents: ${runner.queueSize} ${runner.flowFileQueue.queue}") + runner.assertValid() + + // Act + ITERATIONS.times { int i -> + logger.info("Running iteration ${i}") + runner.run() + } + + // Assert + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) + final List result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + + result.eachWithIndex { MockFlowFile flowFile, int i -> + logger.info("Resulting flowfile [${i}] attributes: ${flowFile.attributes}") + + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-1/ + } + } + + @Test + public void testShouldExecuteScriptWithPool() throws Exception { + // Arrange + final int ITERATIONS = 10 + final int POOL_SIZE = 2 + + setupPooledExecuteScript(POOL_SIZE) + logger.info("Set up ExecuteScript processor with pool size: ${POOL_SIZE}") + + runner.setThreadCount(POOL_SIZE) + + logger.info("Mock flowfile queue contents: ${runner.queueSize} ${runner.flowFileQueue.queue}") + runner.assertValid() + + // Act + ITERATIONS.times { int i -> + logger.info("Running iteration ${i}") + runner.run() + } + + // Assert + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) + final List result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + + result.eachWithIndex { MockFlowFile flowFile, int i -> + logger.info("Resulting flowfile [${i}] attributes: ${flowFile.attributes}") + + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + flowFile.assertAttributeEquals("thread", "pool-${i + 1}-thread-1") + } + } + + //testShouldExecuteScriptWithPool + //testShouldHandleFailingScript + //testShouldHandleNoAvailableEngine + //testPooledExecutionShouldBeFaster + //testPoolSizeVsThreadCount +} diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy deleted file mode 100644 index 479ab6ff6f..0000000000 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy +++ /dev/null @@ -1,71 +0,0 @@ -package org.apache.nifi.processors.script - -import org.apache.nifi.security.util.EncryptionMethod -import org.junit.After -import org.junit.Before -import org.junit.BeforeClass -import org.junit.Test -import org.junit.runner.RunWith -import org.junit.runners.JUnit4 -import org.python.bouncycastle.util.encoders.Hex -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -import javax.crypto.Cipher - -@RunWith(JUnit4.class) -class ExecuteScriptGroovyTest extends GroovyTestCase { - private static final Logger logger = LoggerFactory.getLogger(ExecuteScriptGroovyTest.class) - - @BeforeClass - public static void setUpOnce() throws Exception { - logger.metaClass.methodMissing = { String name, args -> - logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") - } - } - - @Before - public void setUp() throws Exception { - - } - - @After - public void tearDown() throws Exception { - - } - - @Test - public void testShouldExecuteScript() throws Exception { - // Arrange - final String PASSWORD = "shortPassword"; - final byte[] SALT = cipherProvider.generateSalt() - - final String plaintext = "This is a plaintext message."; - - // Act - for (EncryptionMethod em : strongKDFEncryptionMethods) { - logger.info("Using algorithm: ${em.getAlgorithm()}"); - - // Initialize a cipher for encryption - Cipher cipher = cipherProvider.getCipher(em, PASSWORD, SALT, DEFAULT_KEY_LENGTH, true); - byte[] iv = cipher.getIV(); - logger.info("IV: ${Hex.encodeHexString(iv)}") - - byte[] cipherBytes = cipher.doFinal(plaintext.getBytes("UTF-8")); - logger.info("Cipher text: ${Hex.encodeHexString(cipherBytes)} ${cipherBytes.length}"); - - cipher = cipherProvider.getCipher(em, PASSWORD, SALT, iv, DEFAULT_KEY_LENGTH, false); - byte[] recoveredBytes = cipher.doFinal(cipherBytes); - String recovered = new String(recoveredBytes, "UTF-8"); - logger.info("Recovered: ${recovered}") - - // Assert - assert plaintext.equals(recovered); - } - } - - //testShouldExecuteScriptWithPool - //testShouldHandleFailingScript - //testShouldHandleNoAvailableEngine - //testPooledExecutionShouldBeFaster -} diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testAddTimeAndThreadAttribute.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testAddTimeAndThreadAttribute.groovy new file mode 100644 index 0000000000..a9e599cacb --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testAddTimeAndThreadAttribute.groovy @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +def flowFile = session.create() +flowFile = session.putAttribute(flowFile, "time-updated", new Date().toString()) +flowFile = session.putAttribute(flowFile, "thread", Thread.currentThread().getName()) +session.transfer(flowFile, REL_SUCCESS) + +// TODO: Way to add the pool "name" or script engine identifier to the flowfile? \ No newline at end of file