mirror of
https://github.com/apache/nifi.git
synced 2025-02-07 02:28:31 +00:00
NIFI-12041 Converted Groovy tests to Java in nifi-scripting-processors
This closes #7752 Signed-off-by: David Handermann <exceptionfactory@apache.org> (cherry picked from commit 9b591a2fe374fa21dc4a5778edad904af6a7bbe5)
This commit is contained in:
parent
96bbdfe97f
commit
990d0807c9
@ -532,7 +532,7 @@ public class MockProcessContext extends MockControllerServiceLookup implements P
|
||||
return componentName;
|
||||
}
|
||||
|
||||
protected void setMaxConcurrentTasks(int maxConcurrentTasks) {
|
||||
public void setMaxConcurrentTasks(int maxConcurrentTasks) {
|
||||
this.maxConcurrentTasks = maxConcurrentTasks;
|
||||
}
|
||||
|
||||
|
@ -165,6 +165,7 @@
|
||||
<artifactId>apache-rat-plugin</artifactId>
|
||||
<configuration>
|
||||
<excludes combine.children="append">
|
||||
<exclude>src/test/resources/xmlRecord.xml</exclude>
|
||||
<exclude>src/test/resources/jython/test_compress.py</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
|
@ -0,0 +1,133 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.lookup.script;
|
||||
|
||||
import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper;
|
||||
import org.apache.nifi.script.ScriptingComponentHelper;
|
||||
import org.apache.nifi.script.ScriptingComponentUtils;
|
||||
import org.apache.nifi.util.NoOpProcessor;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
/**
|
||||
* Unit tests for the ScriptedLookupService controller service
|
||||
*/
|
||||
public class TestScriptedLookupService {
|
||||
@TempDir
|
||||
private static Path targetPath;
|
||||
@TempDir
|
||||
private static Path alternateTargetPath;
|
||||
private ScriptedLookupService scriptedLookupService;
|
||||
private TestRunner runner;
|
||||
|
||||
@BeforeAll
|
||||
public static void setUpOnce() throws Exception {
|
||||
Files.copy(Paths.get("src/test/resources/groovy/test_lookup_inline.groovy"), targetPath, StandardCopyOption.REPLACE_EXISTING);
|
||||
Files.copy(Paths.get("src/test/resources/groovy/test_simple_lookup_inline.groovy"), alternateTargetPath, StandardCopyOption.REPLACE_EXISTING);
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws Exception {
|
||||
scriptedLookupService = new MockScriptedLookupService();
|
||||
runner = TestRunners.newTestRunner(NoOpProcessor.class);
|
||||
runner.addControllerService("lookupService", scriptedLookupService);
|
||||
runner.setProperty(scriptedLookupService, "Script Engine", "Groovy");
|
||||
runner.setProperty(scriptedLookupService, ScriptingComponentUtils.SCRIPT_FILE, targetPath.toString());
|
||||
runner.setProperty(scriptedLookupService, ScriptingComponentUtils.SCRIPT_BODY, (String) null);
|
||||
runner.setProperty(scriptedLookupService, ScriptingComponentUtils.MODULES, (String) null);
|
||||
runner.enableControllerService(scriptedLookupService);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testLookupServiceGroovyScript() throws Exception {
|
||||
Map<String, Object> map = new LinkedHashMap<>(1);
|
||||
map.put("key", "Hello");
|
||||
Optional<Object> opt = scriptedLookupService.lookup(map);
|
||||
assertTrue(opt.isPresent());
|
||||
assertEquals("Hi", opt.get());
|
||||
map = new LinkedHashMap<>(1);
|
||||
map.put("key", "World");
|
||||
opt = scriptedLookupService.lookup(map);
|
||||
assertTrue(opt.isPresent());
|
||||
assertEquals("there", opt.get());
|
||||
map = new LinkedHashMap<>(1);
|
||||
map.put("key", "Not There");
|
||||
opt = scriptedLookupService.lookup(map);
|
||||
assertFalse(opt.isPresent());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testLookupServiceScriptReload() throws Exception {
|
||||
Map<String, Object> map = new LinkedHashMap<>(1);
|
||||
map.put("key", "Hello");
|
||||
Optional<Object> opt = scriptedLookupService.lookup(map);
|
||||
assertTrue(opt.isPresent());
|
||||
assertEquals("Hi", opt.get());
|
||||
map = new LinkedHashMap<>(1);
|
||||
map.put("key", "World");
|
||||
opt = scriptedLookupService.lookup(map);
|
||||
assertTrue(opt.isPresent());
|
||||
assertEquals("there", opt.get());
|
||||
map = new LinkedHashMap<>(1);
|
||||
map.put("key", "Not There");
|
||||
opt = scriptedLookupService.lookup(map);
|
||||
assertFalse(opt.isPresent());
|
||||
|
||||
// Disable and load different script
|
||||
runner.disableControllerService(scriptedLookupService);
|
||||
runner.setProperty(scriptedLookupService, ScriptingComponentUtils.SCRIPT_FILE, alternateTargetPath.toString());
|
||||
runner.enableControllerService(scriptedLookupService);
|
||||
|
||||
map = new LinkedHashMap<>(1);
|
||||
map.put("key", "Hello");
|
||||
opt = scriptedLookupService.lookup(map);
|
||||
assertTrue(opt.isPresent());
|
||||
assertEquals("Goodbye", opt.get());
|
||||
map = new LinkedHashMap<>(1);
|
||||
map.put("key", "World");
|
||||
opt = scriptedLookupService.lookup(map);
|
||||
assertTrue(opt.isPresent());
|
||||
assertEquals("Stranger", opt.get());
|
||||
map = new LinkedHashMap<>(1);
|
||||
map.put("key", "Not There");
|
||||
opt = scriptedLookupService.lookup(map);
|
||||
assertFalse(opt.isPresent());
|
||||
}
|
||||
|
||||
public static class MockScriptedLookupService extends ScriptedLookupService implements AccessibleScriptingComponentHelper {
|
||||
@Override
|
||||
public ScriptingComponentHelper getScriptingComponentHelper() {
|
||||
return this.scriptingComponentHelper;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,86 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.lookup.script;
|
||||
|
||||
import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper;
|
||||
import org.apache.nifi.script.ScriptingComponentHelper;
|
||||
import org.apache.nifi.script.ScriptingComponentUtils;
|
||||
import org.apache.nifi.util.NoOpProcessor;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
/**
|
||||
* Unit tests for the SimpleScriptedLookupService controller service
|
||||
*/
|
||||
public class TestSimpleScriptedLookupService {
|
||||
@TempDir
|
||||
private static Path targetPath;
|
||||
|
||||
@BeforeAll
|
||||
public static void setUpOnce() throws Exception {
|
||||
Files.copy(Paths.get("src/test/resources/groovy/test_lookup_inline.groovy"), targetPath, StandardCopyOption.REPLACE_EXISTING);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSimpleLookupServiceGroovyScript() throws Exception {
|
||||
final TestRunner runner = TestRunners.newTestRunner(NoOpProcessor.class);
|
||||
SimpleScriptedLookupService scriptedLookupService = new MockScriptedLookupService();
|
||||
runner.addControllerService("lookupService", scriptedLookupService);
|
||||
runner.setProperty(scriptedLookupService, "Script Engine", "Groovy");
|
||||
runner.setProperty(scriptedLookupService, ScriptingComponentUtils.SCRIPT_FILE, targetPath.toString());
|
||||
runner.setProperty(scriptedLookupService, ScriptingComponentUtils.SCRIPT_BODY, (String) null);
|
||||
runner.setProperty(scriptedLookupService, ScriptingComponentUtils.MODULES, (String) null);
|
||||
runner.enableControllerService(scriptedLookupService);
|
||||
|
||||
Map<String, Object> map = new LinkedHashMap<>(1);
|
||||
map.put("key", "Hello");
|
||||
Optional<String> opt = scriptedLookupService.lookup(map);
|
||||
assertTrue(opt.isPresent());
|
||||
assertEquals("Hi", opt.get());
|
||||
map = new LinkedHashMap<>(1);
|
||||
map.put("key", "World");
|
||||
opt = scriptedLookupService.lookup(map);
|
||||
assertTrue(opt.isPresent());
|
||||
assertEquals("there", opt.get());
|
||||
map = new LinkedHashMap<>(1);
|
||||
map.put("key", "Not There");
|
||||
opt = scriptedLookupService.lookup(map);
|
||||
assertFalse(opt.isPresent());
|
||||
}
|
||||
|
||||
public static class MockScriptedLookupService extends SimpleScriptedLookupService implements AccessibleScriptingComponentHelper {
|
||||
@Override
|
||||
public ScriptingComponentHelper getScriptingComponentHelper() {
|
||||
return this.scriptingComponentHelper;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
@ -0,0 +1,126 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.processors.script;
|
||||
|
||||
import org.apache.nifi.script.ScriptingComponentUtils;
|
||||
import org.apache.nifi.util.MockFlowFile;
|
||||
import org.apache.nifi.util.MockProcessContext;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
class ExecuteScriptGroovyTest extends BaseScriptTest {
|
||||
private static final Pattern SINGLE_POOL_THREAD_PATTERN = Pattern.compile("pool-\\d+-thread-1");
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws Exception {
|
||||
super.setupExecuteScript();
|
||||
runner.setValidateExpressionUsage(false);
|
||||
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
|
||||
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy");
|
||||
runner.setProperty(ScriptingComponentUtils.MODULES, TEST_RESOURCE_LOCATION + "groovy");
|
||||
}
|
||||
|
||||
@Test
|
||||
void testShouldExecuteScript() {
|
||||
runner.assertValid();
|
||||
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, 1);
|
||||
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS).get(0);
|
||||
flowFile.assertAttributeExists("time-updated");
|
||||
flowFile.assertAttributeExists("thread");
|
||||
assertTrue(SINGLE_POOL_THREAD_PATTERN.matcher(flowFile.getAttribute("thread")).find());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testShouldExecuteScriptSerially() {
|
||||
final int iterations = 10;
|
||||
runner.assertValid();
|
||||
|
||||
runner.run(iterations);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, iterations);
|
||||
runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS).forEach( flowFile -> {
|
||||
flowFile.assertAttributeExists("time-updated");
|
||||
flowFile.assertAttributeExists("thread");
|
||||
assertTrue(SINGLE_POOL_THREAD_PATTERN.matcher(flowFile.getAttribute("thread")).find());
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void testShouldExecuteScriptWithPool() {
|
||||
final int iterations = 10;
|
||||
final int poolSize = 2;
|
||||
|
||||
setupPooledExecuteScript(poolSize);
|
||||
runner.setThreadCount(poolSize);
|
||||
runner.assertValid();
|
||||
|
||||
runner.run(iterations);
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, iterations);
|
||||
runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS).forEach(flowFile -> {
|
||||
flowFile.assertAttributeExists("time-updated");
|
||||
flowFile.assertAttributeExists("thread");
|
||||
assertTrue((Pattern.compile("pool-\\d+-thread-[1-" + poolSize + "]").matcher(flowFile.getAttribute("thread"))).find());
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
void testExecuteScriptRecompileOnChange() {
|
||||
|
||||
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/setAttributeHello_executescript.groovy");
|
||||
runner.enqueue("");
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, 1);
|
||||
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS).get(0);
|
||||
flowFile.assertAttributeExists("greeting");
|
||||
flowFile.assertAttributeEquals("greeting", "hello");
|
||||
runner.clearTransferState();
|
||||
|
||||
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/setAttributeGoodbye_executescript.groovy");
|
||||
runner.enqueue("");
|
||||
runner.run();
|
||||
|
||||
runner.assertAllFlowFilesTransferred(ExecuteScript.REL_SUCCESS, 1);
|
||||
flowFile = runner.getFlowFilesForRelationship(ExecuteScript.REL_SUCCESS).get(0);
|
||||
flowFile.assertAttributeExists("greeting");
|
||||
flowFile.assertAttributeEquals("greeting", "good-bye");
|
||||
}
|
||||
|
||||
private void setupPooledExecuteScript(int poolSize) {
|
||||
final ExecuteScript executeScript = new ExecuteScript();
|
||||
// Need to do something to initialize the properties, like retrieve the list of properties
|
||||
assertNotNull(executeScript.getSupportedPropertyDescriptors());
|
||||
runner = TestRunners.newTestRunner(executeScript);
|
||||
runner.setValidateExpressionUsage(false);
|
||||
runner.setProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE, "Groovy");
|
||||
runner.setProperty(ScriptingComponentUtils.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "groovy/testAddTimeAndThreadAttribute.groovy");
|
||||
runner.setProperty(ScriptingComponentUtils.MODULES, TEST_RESOURCE_LOCATION + "groovy");
|
||||
|
||||
// Override userContext value
|
||||
((MockProcessContext)runner.getProcessContext()).setMaxConcurrentTasks(poolSize);
|
||||
}
|
||||
}
|
@ -0,0 +1,150 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.record.script;
|
||||
|
||||
import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper;
|
||||
import org.apache.nifi.script.ScriptingComponentHelper;
|
||||
import org.apache.nifi.script.ScriptingComponentUtils;
|
||||
import org.apache.nifi.serialization.RecordReader;
|
||||
import org.apache.nifi.serialization.record.Record;
|
||||
import org.apache.nifi.util.MockComponentLog;
|
||||
import org.apache.nifi.util.NoOpProcessor;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
|
||||
/**
|
||||
* Unit tests for the ScriptedReader class
|
||||
*/
|
||||
class ScriptedReaderTest {
|
||||
private static final String SOURCE_DIR = "src/test/resources";
|
||||
private static final String GROOVY_DIR = "groovy";
|
||||
private static Path tempJar;
|
||||
@TempDir
|
||||
private Path targetScriptFile;
|
||||
private ScriptedReader recordReaderFactory;
|
||||
private TestRunner runner;
|
||||
|
||||
@BeforeAll
|
||||
public static void before() throws IOException {
|
||||
tempJar = File.createTempFile("test-jar", null).toPath();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void after() {
|
||||
tempJar.toFile().delete();
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() throws Exception {
|
||||
recordReaderFactory = new MockScriptedReader();
|
||||
runner = TestRunners.newTestRunner(NoOpProcessor.class);
|
||||
runner.addControllerService("reader", recordReaderFactory);
|
||||
runner.setProperty(recordReaderFactory, "Script Engine", "Groovy");
|
||||
runner.setProperty(recordReaderFactory, ScriptingComponentUtils.SCRIPT_BODY, (String) null);
|
||||
runner.setProperty(recordReaderFactory, ScriptingComponentUtils.MODULES, (String) null);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRecordReaderGroovyScript() throws Exception {
|
||||
Files.copy(Paths.get(SOURCE_DIR, GROOVY_DIR, "test_record_reader_inline.groovy"), targetScriptFile, StandardCopyOption.REPLACE_EXISTING);
|
||||
runner.setProperty(recordReaderFactory, ScriptingComponentUtils.SCRIPT_FILE, targetScriptFile.toString());
|
||||
runner.enableControllerService(recordReaderFactory);
|
||||
byte[] contentBytes = "Flow file content not used".getBytes();
|
||||
InputStream inStream = new ByteArrayInputStream(contentBytes);
|
||||
|
||||
final RecordReader recordReader =
|
||||
recordReaderFactory.createRecordReader(Collections.emptyMap(), inStream, contentBytes.length, new MockComponentLog("id", recordReaderFactory));
|
||||
assertNotNull(recordReader);
|
||||
|
||||
for(int index = 0; index < 3; index++) {
|
||||
Record record = recordReader.nextRecord();
|
||||
assertNotNull(record);
|
||||
assertEquals(record.getAsInt("code"), record.getAsInt("id") * 100);
|
||||
}
|
||||
assertNull(recordReader.nextRecord());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testXmlRecordReaderGroovyScript() throws Exception {
|
||||
Files.copy(Paths.get(SOURCE_DIR, GROOVY_DIR, "test_record_reader_xml.groovy"), targetScriptFile, StandardCopyOption.REPLACE_EXISTING);
|
||||
runner.setProperty(recordReaderFactory, ScriptingComponentUtils.SCRIPT_FILE, targetScriptFile.toString());
|
||||
String schemaText = "\n[\n{\"id\": \"int\"},\n{\"name\": \"string\"},\n{\"code\": \"int\"}\n]\n";
|
||||
runner.setProperty(recordReaderFactory, "schema.text", schemaText);
|
||||
runner.enableControllerService(recordReaderFactory);
|
||||
|
||||
Map<String, String> map = new LinkedHashMap<>(1);
|
||||
map.put("record.tag", "myRecord");
|
||||
byte[] contentBytes = Files.readAllBytes(Paths.get("src/test/resources/xmlRecord.xml"));
|
||||
InputStream inStream = new ByteArrayInputStream(contentBytes);
|
||||
final RecordReader recordReader = recordReaderFactory.createRecordReader(map, inStream, contentBytes.length, new MockComponentLog("ScriptedReader", ""));
|
||||
assertNotNull(recordReader);
|
||||
|
||||
for(int index = 0; index < 3; index++) {
|
||||
Record record = recordReader.nextRecord();
|
||||
assertNotNull(record);
|
||||
assertEquals(record.getAsInt("code"), record.getAsInt("id") * 100);
|
||||
}
|
||||
assertNull(recordReader.nextRecord());
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRecordReaderGroovyScriptChangeModuleDirectory() throws Exception {
|
||||
Files.copy(Paths.get(SOURCE_DIR, GROOVY_DIR, "test_record_reader_load_module.groovy"), targetScriptFile, StandardCopyOption.REPLACE_EXISTING);
|
||||
runner.setProperty(recordReaderFactory, ScriptingComponentUtils.SCRIPT_FILE, targetScriptFile.toString());
|
||||
|
||||
assertThrows(Throwable.class, () -> runner.enableControllerService(recordReaderFactory));
|
||||
|
||||
Files.copy(Paths.get(SOURCE_DIR, "jar", "test.jar"), tempJar, StandardCopyOption.REPLACE_EXISTING);
|
||||
runner.setProperty(recordReaderFactory, ScriptingComponentUtils.MODULES, tempJar.toString());
|
||||
runner.enableControllerService(recordReaderFactory);
|
||||
byte[] contentBytes = "Flow file content not used".getBytes();
|
||||
InputStream inStream = new ByteArrayInputStream(contentBytes);
|
||||
|
||||
final RecordReader recordReader =
|
||||
recordReaderFactory.createRecordReader(Collections.emptyMap(), inStream, contentBytes.length, new MockComponentLog("id", recordReaderFactory));
|
||||
assertNotNull(recordReader);
|
||||
}
|
||||
|
||||
public static class MockScriptedReader extends ScriptedReader implements AccessibleScriptingComponentHelper {
|
||||
@Override
|
||||
public ScriptingComponentHelper getScriptingComponentHelper() {
|
||||
return this.scriptingComponentHelper;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,130 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.record.script;
|
||||
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper;
|
||||
import org.apache.nifi.script.ScriptingComponentHelper;
|
||||
import org.apache.nifi.script.ScriptingComponentUtils;
|
||||
import org.apache.nifi.serialization.RecordSetWriter;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.record.MapRecord;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.RecordSet;
|
||||
import org.apache.nifi.util.MockComponentLog;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
import org.w3c.dom.Document;
|
||||
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
import javax.xml.xpath.XPath;
|
||||
import javax.xml.xpath.XPathFactory;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
|
||||
/**
|
||||
* Unit tests for the ScriptedReader class
|
||||
*/
|
||||
public class ScriptedRecordSetWriterTest {
|
||||
@TempDir
|
||||
private static Path targetPath;
|
||||
|
||||
@BeforeAll
|
||||
public static void setUpOnce() throws Exception {
|
||||
Files.copy(Paths.get("src/test/resources/groovy/test_record_writer_inline.groovy"), targetPath, StandardCopyOption.REPLACE_EXISTING);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testRecordWriterGroovyScript() throws Exception {
|
||||
final TestRunner runner = TestRunners.newTestRunner(new AbstractProcessor() {
|
||||
@Override
|
||||
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
|
||||
}
|
||||
});
|
||||
|
||||
MockScriptedWriter recordSetWriterFactory = new MockScriptedWriter();
|
||||
runner.addControllerService("writer", recordSetWriterFactory);
|
||||
runner.setProperty(recordSetWriterFactory, "Script Engine", "Groovy");
|
||||
runner.setProperty(recordSetWriterFactory, ScriptingComponentUtils.SCRIPT_FILE, targetPath.toString());
|
||||
runner.setProperty(recordSetWriterFactory, ScriptingComponentUtils.SCRIPT_BODY, (String) null);
|
||||
runner.setProperty(recordSetWriterFactory, ScriptingComponentUtils.MODULES, (String) null);
|
||||
runner.enableControllerService(recordSetWriterFactory);
|
||||
|
||||
RecordSchema schema = recordSetWriterFactory.getSchema(Collections.emptyMap(), null);
|
||||
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
||||
RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(new MockComponentLog("id", recordSetWriterFactory), schema, outputStream, Collections.emptyMap());
|
||||
assertNotNull(recordSetWriter);
|
||||
|
||||
SimpleRecordSchema recordSchema = new SimpleRecordSchema(Arrays.asList(new RecordField("id", RecordFieldType.INT.getDataType()),
|
||||
new RecordField("name", RecordFieldType.STRING.getDataType()),
|
||||
new RecordField("code", RecordFieldType.INT.getDataType())));
|
||||
MapRecord [] records = createMapRecords(recordSchema);
|
||||
|
||||
recordSetWriter.write(RecordSet.of(recordSchema, records));
|
||||
|
||||
DocumentBuilder documentBuilder = DocumentBuilderFactory.newInstance().newDocumentBuilder();
|
||||
Document document = documentBuilder.parse(new ByteArrayInputStream(outputStream.toByteArray()));
|
||||
XPathFactory xpathfactory = XPathFactory.newInstance();
|
||||
XPath xpath = xpathfactory.newXPath();
|
||||
assertEquals("1", xpath.evaluate("//record[1]/id/text()", document));
|
||||
assertEquals("200", xpath.evaluate("//record[2]/code/text()", document));
|
||||
assertEquals("Ramon", xpath.evaluate("//record[3]/name/text()", document));
|
||||
}
|
||||
|
||||
private static MapRecord[] createMapRecords(SimpleRecordSchema recordSchema) {
|
||||
Map<String, Object> map = new LinkedHashMap<>(3);
|
||||
map.put("id", 1);
|
||||
map.put("name", "John");
|
||||
map.put("code", 100);
|
||||
Map<String, Object> map1 = new LinkedHashMap<>(3);
|
||||
map1.put("id", 2);
|
||||
map1.put("name", "Mary");
|
||||
map1.put("code", 200);
|
||||
Map<String, Object> map2 = new LinkedHashMap<>(3);
|
||||
map2.put("id", 3);
|
||||
map2.put("name", "Ramon");
|
||||
map2.put("code", 300);
|
||||
|
||||
return new MapRecord[]{new MapRecord(recordSchema, map), new MapRecord(recordSchema, map1), new MapRecord(recordSchema, map2)};
|
||||
}
|
||||
public static class MockScriptedWriter extends ScriptedRecordSetWriter implements AccessibleScriptingComponentHelper {
|
||||
@Override
|
||||
public ScriptingComponentHelper getScriptingComponentHelper() {
|
||||
return this.scriptingComponentHelper;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,165 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.reporting.script;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper;
|
||||
import org.apache.nifi.processors.script.ScriptRunner;
|
||||
import org.apache.nifi.provenance.ProvenanceEventRecord;
|
||||
import org.apache.nifi.registry.VariableRegistry;
|
||||
import org.apache.nifi.reporting.ReportingInitializationContext;
|
||||
import org.apache.nifi.script.ScriptingComponentHelper;
|
||||
import org.apache.nifi.script.ScriptingComponentUtils;
|
||||
import org.apache.nifi.util.MockConfigurationContext;
|
||||
import org.apache.nifi.util.MockEventAccess;
|
||||
import org.apache.nifi.util.MockReportingContext;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.ExtendWith;
|
||||
import org.junit.jupiter.api.io.TempDir;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
|
||||
import javax.script.ScriptEngine;
|
||||
import java.math.BigInteger;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.Mockito.doReturn;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
/**
|
||||
* Unit tests for ScriptedReportingTask.
|
||||
*/
|
||||
@ExtendWith(MockitoExtension.class)
|
||||
class ScriptedReportingTaskTest {
|
||||
private static final String SCRIPT_ENGINE = "Script Engine";
|
||||
private static final PropertyDescriptor SCRIPT_ENGINE_PROPERTY_DESCRIPTOR = new PropertyDescriptor.Builder().name(SCRIPT_ENGINE).build();
|
||||
private static final String GROOVY = "Groovy";
|
||||
@TempDir
|
||||
private Path targetPath;
|
||||
@Mock
|
||||
private ReportingInitializationContext initContext;
|
||||
private MockScriptedReportingTask task;
|
||||
private Map<PropertyDescriptor, String> properties;
|
||||
private ConfigurationContext configurationContext;
|
||||
private MockReportingContext reportingContext;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp(@Mock ComponentLog logger) {
|
||||
task = new MockScriptedReportingTask();
|
||||
properties = new HashMap<>();
|
||||
configurationContext = new MockConfigurationContext(properties, null);
|
||||
reportingContext = new MockReportingContext(new LinkedHashMap<>(), null, VariableRegistry.EMPTY_REGISTRY);
|
||||
when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
|
||||
when(initContext.getLogger()).thenReturn(logger);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testProvenanceGroovyScript() throws Exception {
|
||||
properties.put(SCRIPT_ENGINE_PROPERTY_DESCRIPTOR, GROOVY);
|
||||
Files.copy(Paths.get("src/test/resources/groovy/test_log_provenance_events.groovy"), targetPath, StandardCopyOption.REPLACE_EXISTING);
|
||||
properties.put(ScriptingComponentUtils.SCRIPT_FILE, targetPath.toString());
|
||||
reportingContext.setProperty(SCRIPT_ENGINE, GROOVY);
|
||||
reportingContext.setProperty(ScriptingComponentUtils.SCRIPT_FILE.getName(), targetPath.toString());
|
||||
|
||||
final MockEventAccess eventAccess = reportingContext.getEventAccess();
|
||||
for (long index = 1; index < 4; index++) {
|
||||
final ProvenanceEventRecord event = mock(ProvenanceEventRecord.class);
|
||||
doReturn(index).when(event).getEventId();
|
||||
if(index == 1) {
|
||||
doReturn("1234").when(event).getComponentId();
|
||||
Map<String, String> map = new LinkedHashMap<>(1);
|
||||
map.put("abc", "xyz");
|
||||
doReturn(map).when(event).getAttributes();
|
||||
}
|
||||
eventAccess.addProvenanceEvent(event);
|
||||
}
|
||||
|
||||
run();
|
||||
|
||||
// This script should return a variable x with the number of events and a variable e with the first event
|
||||
ScriptEngine se = task.getScriptRunner().getScriptEngine();
|
||||
assertEquals(3, se.get("x"));
|
||||
ProvenanceEventRecord per = (ProvenanceEventRecord)se.get("e");
|
||||
assertEquals("1234", per.getComponentId());
|
||||
assertEquals("xyz",per.getAttributes().get("abc"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testVMEventsGroovyScript() throws Exception {
|
||||
properties.put(SCRIPT_ENGINE_PROPERTY_DESCRIPTOR, GROOVY);
|
||||
Files.copy(Paths.get("src/test/resources/groovy/test_log_vm_stats.groovy"), targetPath, StandardCopyOption.REPLACE_EXISTING);
|
||||
properties.put(ScriptingComponentUtils.SCRIPT_FILE, targetPath.toString());
|
||||
reportingContext.setProperty(SCRIPT_ENGINE, GROOVY);
|
||||
reportingContext.setProperty(ScriptingComponentUtils.SCRIPT_FILE.getName(), targetPath.toString());
|
||||
|
||||
run();
|
||||
|
||||
// This script should store a variable called x with a map of stats to values
|
||||
ScriptEngine se = task.getScriptRunner().getScriptEngine();
|
||||
@SuppressWarnings("unchecked")
|
||||
final Map<String, Long> x = (Map<String, Long>)se.get("x");
|
||||
assertTrue(x.get("uptime") >= 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
void testVMEventsJythonScript() throws Exception {
|
||||
properties.put(SCRIPT_ENGINE_PROPERTY_DESCRIPTOR, "python");
|
||||
Files.copy(Paths.get("src/test/resources/jython/test_log_vm_stats.py"), targetPath, StandardCopyOption.REPLACE_EXISTING);
|
||||
properties.put(ScriptingComponentUtils.SCRIPT_FILE, targetPath.toString());
|
||||
reportingContext.setProperty(SCRIPT_ENGINE, "python");
|
||||
reportingContext.setProperty(ScriptingComponentUtils.SCRIPT_FILE.getName(), targetPath.toString());
|
||||
|
||||
run();
|
||||
|
||||
// This script should store a variable called x with a map of stats to values
|
||||
ScriptEngine se = task.getScriptRunner().getScriptEngine();
|
||||
@SuppressWarnings("unchecked")
|
||||
final Map<String, BigInteger> x = (Map<String, BigInteger>)se.get("x");
|
||||
assertTrue(x.get("uptime").longValue() >= 0);
|
||||
}
|
||||
|
||||
private void run() throws Exception {
|
||||
task.initialize(initContext);
|
||||
task.getSupportedPropertyDescriptors();
|
||||
task.setup(configurationContext);
|
||||
task.onTrigger(reportingContext);
|
||||
}
|
||||
|
||||
public static class MockScriptedReportingTask extends ScriptedReportingTask implements AccessibleScriptingComponentHelper {
|
||||
public ScriptRunner getScriptRunner() {
|
||||
return getScriptingComponentHelper().scriptRunnerQ.poll();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ScriptingComponentHelper getScriptingComponentHelper() {
|
||||
return this.scriptingComponentHelper;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,17 @@
|
||||
<root>
|
||||
<myRecord>
|
||||
<id>1</id>
|
||||
<name>John</name>
|
||||
<code>100</code>
|
||||
</myRecord>
|
||||
<myRecord>
|
||||
<id>2</id>
|
||||
<name>Mary</name>
|
||||
<code>200</code>
|
||||
</myRecord>
|
||||
<myRecord>
|
||||
<id>3</id>
|
||||
<name>Ramon</name>
|
||||
<code>300</code>
|
||||
</myRecord>
|
||||
</root>
|
Loading…
x
Reference in New Issue
Block a user