From 12bbe82381fb334ff236db6815ada8a277a782e7 Mon Sep 17 00:00:00 2001 From: Andy LoPresto Date: Fri, 13 May 2016 19:07:53 -0700 Subject: [PATCH] NIFI-1822 Moved unit test to correct directory. Added test script resource which generates flowfile and updates attribute with current thread. Added tests for single run, serial run, and pooled run (not complete). --- .../script/ExecuteScriptGroovyTest.groovy | 151 ++++++++++++++++++ .../script/ExecuteScriptGroovyTest.groovy | 71 -------- .../testAddTimeAndThreadAttribute.groovy | 22 +++ 3 files changed, 173 insertions(+), 71 deletions(-) create mode 100644 nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy delete mode 100644 nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy create mode 100644 nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testAddTimeAndThreadAttribute.groovy diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy new file mode 100644 index 0000000000..4a9c3c9849 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy @@ -0,0 +1,151 @@ +package org.apache.nifi.processors.script + +import org.apache.nifi.util.MockFlowFile +import org.apache.nifi.util.TestRunners +import org.junit.After +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import static org.junit.Assert.assertNotNull + +@RunWith(JUnit4.class) +class ExecuteScriptGroovyTest extends BaseScriptTest { + private static final Logger logger = LoggerFactory.getLogger(ExecuteScriptGroovyTest.class) + + private static final String TEST_CSV_DATA = "gender,title,first,last\n" + + "female,miss,marlene,shaw\n" + + "male,mr,todd,graham" + + @BeforeClass + public static void setUpOnce() throws Exception { + logger.metaClass.methodMissing = { String name, args -> + logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") + } + } + + @Before + public void setUp() throws Exception { + super.setupExecuteScript() + + runner.setValidateExpressionUsage(false) + runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy") + runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy") + runner.setProperty(ExecuteScript.MODULES, TEST_RESOURCE_LOCATION + "groovy") + } + + @After + public void tearDown() throws Exception { + + } + + private void setupPooledExecuteScript(int poolSize = 2) { + final ExecuteScript executeScript = new ExecuteScript() + // Need to do something to initialize the properties, like retrieve the list of properties + assertNotNull(executeScript.getSupportedPropertyDescriptors()) + runner = TestRunners.newTestRunner(executeScript) + runner.setValidateExpressionUsage(false) + runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy") + runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy") + runner.setProperty(ExecuteScript.MODULES, TEST_RESOURCE_LOCATION + "groovy") + + // Override context value + runner.processContext.maxConcurrentTasks = poolSize + logger.info("Overrode context max concurrent tasks to ${runner.processContext.maxConcurrentTasks}") + + // Must set context properties on runner before calling setup with pool size +// executeScript.setup(poolSize) + } + + @Test + public void testShouldExecuteScript() throws Exception { + // Arrange + final String SINGLE_POOL_THREAD_PATTERN = /pool-\d+-thread-1/ + + logger.info("Mock flowfile queue contents: ${runner.queueSize} ${runner.flowFileQueue.queue}") + runner.assertValid() + + // Act + runner.run() + + // Assert + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, 1) + final List result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + MockFlowFile flowFile = result.get(0) + logger.info("Resulting flowfile attributes: ${flowFile.attributes}") + + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + assert flowFile.getAttribute("thread") =~ SINGLE_POOL_THREAD_PATTERN + } + + @Test + public void testShouldExecuteScriptSerially() throws Exception { + // Arrange + final int ITERATIONS = 10 + + logger.info("Mock flowfile queue contents: ${runner.queueSize} ${runner.flowFileQueue.queue}") + runner.assertValid() + + // Act + ITERATIONS.times { int i -> + logger.info("Running iteration ${i}") + runner.run() + } + + // Assert + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) + final List result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + + result.eachWithIndex { MockFlowFile flowFile, int i -> + logger.info("Resulting flowfile [${i}] attributes: ${flowFile.attributes}") + + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-1/ + } + } + + @Test + public void testShouldExecuteScriptWithPool() throws Exception { + // Arrange + final int ITERATIONS = 10 + final int POOL_SIZE = 2 + + setupPooledExecuteScript(POOL_SIZE) + logger.info("Set up ExecuteScript processor with pool size: ${POOL_SIZE}") + + runner.setThreadCount(POOL_SIZE) + + logger.info("Mock flowfile queue contents: ${runner.queueSize} ${runner.flowFileQueue.queue}") + runner.assertValid() + + // Act + ITERATIONS.times { int i -> + logger.info("Running iteration ${i}") + runner.run() + } + + // Assert + runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS) + final List result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS) + + result.eachWithIndex { MockFlowFile flowFile, int i -> + logger.info("Resulting flowfile [${i}] attributes: ${flowFile.attributes}") + + flowFile.assertAttributeExists("time-updated") + flowFile.assertAttributeExists("thread") + flowFile.assertAttributeEquals("thread", "pool-${i + 1}-thread-1") + } + } + + //testShouldExecuteScriptWithPool + //testShouldHandleFailingScript + //testShouldHandleNoAvailableEngine + //testPooledExecutionShouldBeFaster + //testPoolSizeVsThreadCount +} diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy deleted file mode 100644 index 479ab6ff6f..0000000000 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/java/org/apache/nifi/processors/script/ExecuteScriptGroovyTest.groovy +++ /dev/null @@ -1,71 +0,0 @@ -package org.apache.nifi.processors.script - -import org.apache.nifi.security.util.EncryptionMethod -import org.junit.After -import org.junit.Before -import org.junit.BeforeClass -import org.junit.Test -import org.junit.runner.RunWith -import org.junit.runners.JUnit4 -import org.python.bouncycastle.util.encoders.Hex -import org.slf4j.Logger -import org.slf4j.LoggerFactory - -import javax.crypto.Cipher - -@RunWith(JUnit4.class) -class ExecuteScriptGroovyTest extends GroovyTestCase { - private static final Logger logger = LoggerFactory.getLogger(ExecuteScriptGroovyTest.class) - - @BeforeClass - public static void setUpOnce() throws Exception { - logger.metaClass.methodMissing = { String name, args -> - logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") - } - } - - @Before - public void setUp() throws Exception { - - } - - @After - public void tearDown() throws Exception { - - } - - @Test - public void testShouldExecuteScript() throws Exception { - // Arrange - final String PASSWORD = "shortPassword"; - final byte[] SALT = cipherProvider.generateSalt() - - final String plaintext = "This is a plaintext message."; - - // Act - for (EncryptionMethod em : strongKDFEncryptionMethods) { - logger.info("Using algorithm: ${em.getAlgorithm()}"); - - // Initialize a cipher for encryption - Cipher cipher = cipherProvider.getCipher(em, PASSWORD, SALT, DEFAULT_KEY_LENGTH, true); - byte[] iv = cipher.getIV(); - logger.info("IV: ${Hex.encodeHexString(iv)}") - - byte[] cipherBytes = cipher.doFinal(plaintext.getBytes("UTF-8")); - logger.info("Cipher text: ${Hex.encodeHexString(cipherBytes)} ${cipherBytes.length}"); - - cipher = cipherProvider.getCipher(em, PASSWORD, SALT, iv, DEFAULT_KEY_LENGTH, false); - byte[] recoveredBytes = cipher.doFinal(cipherBytes); - String recovered = new String(recoveredBytes, "UTF-8"); - logger.info("Recovered: ${recovered}") - - // Assert - assert plaintext.equals(recovered); - } - } - - //testShouldExecuteScriptWithPool - //testShouldHandleFailingScript - //testShouldHandleNoAvailableEngine - //testPooledExecutionShouldBeFaster -} diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testAddTimeAndThreadAttribute.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testAddTimeAndThreadAttribute.groovy new file mode 100644 index 0000000000..a9e599cacb --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/testAddTimeAndThreadAttribute.groovy @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +def flowFile = session.create() +flowFile = session.putAttribute(flowFile, "time-updated", new Date().toString()) +flowFile = session.putAttribute(flowFile, "thread", Thread.currentThread().getName()) +session.transfer(flowFile, REL_SUCCESS) + +// TODO: Way to add the pool "name" or script engine identifier to the flowfile? \ No newline at end of file