NIFI-1822 Moved unit test to correct directory.

Added test script resource which generates flowfile and updates attribute with current thread.
Added tests for single run, serial run, and pooled run (not complete).
This commit is contained in:
Andy LoPresto 2016-05-13 19:07:53 -07:00
parent 4c174c8ca7
commit 12bbe82381
No known key found for this signature in database
GPG Key ID: 3C6EF65B2F7DEF69
3 changed files with 173 additions and 71 deletions

View File

@ -0,0 +1,151 @@
package org.apache.nifi.processors.script
import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.TestRunners
import org.junit.After
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import static org.junit.Assert.assertNotNull
@RunWith(JUnit4.class)
class ExecuteScriptGroovyTest extends BaseScriptTest {
private static final Logger logger = LoggerFactory.getLogger(ExecuteScriptGroovyTest.class)
private static final String TEST_CSV_DATA = "gender,title,first,last\n" +
"female,miss,marlene,shaw\n" +
"male,mr,todd,graham"
@BeforeClass
public static void setUpOnce() throws Exception {
logger.metaClass.methodMissing = { String name, args ->
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
}
}
@Before
public void setUp() throws Exception {
super.setupExecuteScript()
runner.setValidateExpressionUsage(false)
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy")
runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy")
runner.setProperty(ExecuteScript.MODULES, TEST_RESOURCE_LOCATION + "groovy")
}
@After
public void tearDown() throws Exception {
}
private void setupPooledExecuteScript(int poolSize = 2) {
final ExecuteScript executeScript = new ExecuteScript()
// Need to do something to initialize the properties, like retrieve the list of properties
assertNotNull(executeScript.getSupportedPropertyDescriptors())
runner = TestRunners.newTestRunner(executeScript)
runner.setValidateExpressionUsage(false)
runner.setProperty(ExecuteScript.SCRIPT_ENGINE, "Groovy")
runner.setProperty(ExecuteScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy")
runner.setProperty(ExecuteScript.MODULES, TEST_RESOURCE_LOCATION + "groovy")
// Override context value
runner.processContext.maxConcurrentTasks = poolSize
logger.info("Overrode context max concurrent tasks to ${runner.processContext.maxConcurrentTasks}")
// Must set context properties on runner before calling setup with pool size
// executeScript.setup(poolSize)
}
@Test
public void testShouldExecuteScript() throws Exception {
// Arrange
final String SINGLE_POOL_THREAD_PATTERN = /pool-\d+-thread-1/
logger.info("Mock flowfile queue contents: ${runner.queueSize} ${runner.flowFileQueue.queue}")
runner.assertValid()
// Act
runner.run()
// Assert
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, 1)
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS)
MockFlowFile flowFile = result.get(0)
logger.info("Resulting flowfile attributes: ${flowFile.attributes}")
flowFile.assertAttributeExists("time-updated")
flowFile.assertAttributeExists("thread")
assert flowFile.getAttribute("thread") =~ SINGLE_POOL_THREAD_PATTERN
}
@Test
public void testShouldExecuteScriptSerially() throws Exception {
// Arrange
final int ITERATIONS = 10
logger.info("Mock flowfile queue contents: ${runner.queueSize} ${runner.flowFileQueue.queue}")
runner.assertValid()
// Act
ITERATIONS.times { int i ->
logger.info("Running iteration ${i}")
runner.run()
}
// Assert
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS)
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS)
result.eachWithIndex { MockFlowFile flowFile, int i ->
logger.info("Resulting flowfile [${i}] attributes: ${flowFile.attributes}")
flowFile.assertAttributeExists("time-updated")
flowFile.assertAttributeExists("thread")
assert flowFile.getAttribute("thread") =~ /pool-\d+-thread-1/
}
}
@Test
public void testShouldExecuteScriptWithPool() throws Exception {
// Arrange
final int ITERATIONS = 10
final int POOL_SIZE = 2
setupPooledExecuteScript(POOL_SIZE)
logger.info("Set up ExecuteScript processor with pool size: ${POOL_SIZE}")
runner.setThreadCount(POOL_SIZE)
logger.info("Mock flowfile queue contents: ${runner.queueSize} ${runner.flowFileQueue.queue}")
runner.assertValid()
// Act
ITERATIONS.times { int i ->
logger.info("Running iteration ${i}")
runner.run()
}
// Assert
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, ITERATIONS)
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS)
result.eachWithIndex { MockFlowFile flowFile, int i ->
logger.info("Resulting flowfile [${i}] attributes: ${flowFile.attributes}")
flowFile.assertAttributeExists("time-updated")
flowFile.assertAttributeExists("thread")
flowFile.assertAttributeEquals("thread", "pool-${i + 1}-thread-1")
}
}
//testShouldExecuteScriptWithPool
//testShouldHandleFailingScript
//testShouldHandleNoAvailableEngine
//testPooledExecutionShouldBeFaster
//testPoolSizeVsThreadCount
}

View File

@ -1,71 +0,0 @@
package org.apache.nifi.processors.script
import org.apache.nifi.security.util.EncryptionMethod
import org.junit.After
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.python.bouncycastle.util.encoders.Hex
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import javax.crypto.Cipher
@RunWith(JUnit4.class)
class ExecuteScriptGroovyTest extends GroovyTestCase {
private static final Logger logger = LoggerFactory.getLogger(ExecuteScriptGroovyTest.class)
@BeforeClass
public static void setUpOnce() throws Exception {
logger.metaClass.methodMissing = { String name, args ->
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
}
}
@Before
public void setUp() throws Exception {
}
@After
public void tearDown() throws Exception {
}
@Test
public void testShouldExecuteScript() throws Exception {
// Arrange
final String PASSWORD = "shortPassword";
final byte[] SALT = cipherProvider.generateSalt()
final String plaintext = "This is a plaintext message.";
// Act
for (EncryptionMethod em : strongKDFEncryptionMethods) {
logger.info("Using algorithm: ${em.getAlgorithm()}");
// Initialize a cipher for encryption
Cipher cipher = cipherProvider.getCipher(em, PASSWORD, SALT, DEFAULT_KEY_LENGTH, true);
byte[] iv = cipher.getIV();
logger.info("IV: ${Hex.encodeHexString(iv)}")
byte[] cipherBytes = cipher.doFinal(plaintext.getBytes("UTF-8"));
logger.info("Cipher text: ${Hex.encodeHexString(cipherBytes)} ${cipherBytes.length}");
cipher = cipherProvider.getCipher(em, PASSWORD, SALT, iv, DEFAULT_KEY_LENGTH, false);
byte[] recoveredBytes = cipher.doFinal(cipherBytes);
String recovered = new String(recoveredBytes, "UTF-8");
logger.info("Recovered: ${recovered}")
// Assert
assert plaintext.equals(recovered);
}
}
//testShouldExecuteScriptWithPool
//testShouldHandleFailingScript
//testShouldHandleNoAvailableEngine
//testPooledExecutionShouldBeFaster
}

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
def flowFile = session.create()
flowFile = session.putAttribute(flowFile, "time-updated", new Date().toString())
flowFile = session.putAttribute(flowFile, "thread", Thread.currentThread().getName())
session.transfer(flowFile, REL_SUCCESS)
// TODO: Way to add the pool "name" or script engine identifier to the flowfile?