NIFI-7014: This closes #3985. Add RecordReader/Writer access in ExecuteGroovyScript

Signed-off-by: Joe Witt <joewitt@apache.org>
This commit is contained in:
Matthew Burgess 2020-01-14 09:56:43 -05:00 committed by Joe Witt
parent 6dbcf9d521
commit 824cc0ed77
No known key found for this signature in database
GPG Key ID: 9093BF854F811A1A
5 changed files with 166 additions and 17 deletions

View File

@ -36,6 +36,11 @@
<artifactId>nifi-processor-utils</artifactId>
<version>1.11.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
<version>1.11.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-json</artifactId>
@ -65,6 +70,18 @@
<version>1.11.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
<version>1.11.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock-record-utils</artifactId>
<version>1.11.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.derby</groupId>
<artifactId>derby</artifactId>

View File

@ -46,6 +46,8 @@ import org.apache.nifi.processors.groovyx.flow.GroovyProcessSessionWrap;
import org.apache.nifi.processors.groovyx.sql.OSql;
import org.apache.nifi.processors.groovyx.util.Files;
import org.apache.nifi.processors.groovyx.util.Validators;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.codehaus.groovy.control.CompilerConfiguration;
import org.codehaus.groovy.runtime.ResourceGroovyMethods;
import org.codehaus.groovy.runtime.StackTraceUtils;
@ -80,8 +82,9 @@ import java.util.Set;
@DynamicProperty(name = "A script engine property to update",
value = "The value to set it to",
expressionLanguageScope = ExpressionLanguageScope.FLOWFILE_ATTRIBUTES,
description = "Updates a script engine property specified by the Dynamic Property's key with the value "
+ "specified by the Dynamic Property's value. Use `CTL.` to access any controller services.")
description = "Updates a script engine property specified by the Dynamic Property's key with the value specified by the Dynamic Property's value. "
+ "Use `CTL.` to access any controller services, `SQL.` to access any DBCPServices, `RecordReader.` to access RecordReaderFactory instances, or "
+ "`RecordWriter.` to access any RecordSetWriterFactory instances.")
public class ExecuteGroovyScript extends AbstractProcessor {
public static final String GROOVY_CLASSPATH = "${groovy.classes.path}";
@ -335,9 +338,8 @@ public class ExecuteGroovyScript extends AbstractProcessor {
/**
* init SQL variables from DBCP services
*/
@SuppressWarnings("unchecked")
private void onInitSQL(HashMap SQL) throws SQLException {
for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
private void onInitSQL(Map<String, Object> SQL) throws SQLException {
for (Map.Entry<String, Object> e : SQL.entrySet()) {
DBCPService s = (DBCPService) e.getValue();
OSql sql = new OSql(s.getConnection(Collections.emptyMap()));
//try to set autocommit to false
@ -355,9 +357,8 @@ public class ExecuteGroovyScript extends AbstractProcessor {
/**
* before commit SQL services
*/
@SuppressWarnings("unchecked")
private void onCommitSQL(HashMap SQL) throws SQLException {
for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
private void onCommitSQL(Map<String, Object> SQL) throws SQLException {
for (Map.Entry<String, Object> e : SQL.entrySet()) {
OSql sql = (OSql) e.getValue();
if (!sql.getConnection().getAutoCommit()) {
sql.commit();
@ -368,9 +369,8 @@ public class ExecuteGroovyScript extends AbstractProcessor {
/**
* finalize SQL services. no exceptions should be thrown.
*/
@SuppressWarnings("unchecked")
private void onFinitSQL(HashMap SQL) {
for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
private void onFinitSQL(Map<String, Object> SQL) {
for (Map.Entry<String, Object> e : SQL.entrySet()) {
OSql sql = (OSql) e.getValue();
try {
if (!sql.getConnection().getAutoCommit()) {
@ -391,9 +391,8 @@ public class ExecuteGroovyScript extends AbstractProcessor {
/**
* exception SQL services
*/
@SuppressWarnings("unchecked")
private void onFailSQL(HashMap SQL) {
for (Map.Entry e : (Set<Map.Entry>) SQL.entrySet()) {
private void onFailSQL(Map<String, Object> SQL) {
for (Map.Entry<String, Object> e : SQL.entrySet()) {
OSql sql = (OSql) e.getValue();
try {
if (!sql.getConnection().getAutoCommit()) {
@ -412,8 +411,10 @@ public class ExecuteGroovyScript extends AbstractProcessor {
//so transfer original input to failure will be possible
GroovyProcessSessionWrap session = new GroovyProcessSessionWrap(_session, toFailureOnError);
HashMap CTL = new AccessMap("CTL");
HashMap SQL = new AccessMap("SQL");
Map<String, Object> CTL = new AccessMap("CTL");
Map<String, Object> SQL = new AccessMap("SQL");
Map<String, Object> RECORD_READER = new AccessMap("RecordReader");
Map<String, Object> RECORD_SET_WRITER = new AccessMap("RecordSetWriter");
try {
Script script = getGroovyScript(); //compilation must be moved to validation
@ -431,6 +432,14 @@ public class ExecuteGroovyScript extends AbstractProcessor {
} else if (property.getKey().getName().startsWith("SQL.")) {
DBCPService dbcp = context.getProperty(property.getKey()).asControllerService(DBCPService.class);
SQL.put(property.getKey().getName().substring(4), dbcp);
} else if (property.getKey().getName().startsWith("RecordReader.")) {
// Get RecordReaderFactory controller service
RecordReaderFactory recordReader = context.getProperty(property.getKey()).asControllerService(RecordReaderFactory.class);
RECORD_READER.put(property.getKey().getName().substring(13), recordReader);
} else if (property.getKey().getName().startsWith("RecordWriter.")) {
// Get RecordWriterFactory controller service
RecordSetWriterFactory recordWriter = context.getProperty(property.getKey()).asControllerService(RecordSetWriterFactory.class);
RECORD_SET_WRITER.put(property.getKey().getName().substring(13), recordWriter);
} else {
// Add the dynamic property bound to its full PropertyValue to the script engine
if (property.getValue() != null) {
@ -448,6 +457,8 @@ public class ExecuteGroovyScript extends AbstractProcessor {
bindings.put("REL_FAILURE", REL_FAILURE);
bindings.put("CTL", CTL);
bindings.put("SQL", SQL);
bindings.put("RecordReader", RECORD_READER);
bindings.put("RecordWriter", RECORD_SET_WRITER);
script.run();
bindings.clear();
@ -496,6 +507,26 @@ public class ExecuteGroovyScript extends AbstractProcessor {
.identifiesControllerService(DBCPService.class)
.build();
}
if (propertyDescriptorName.startsWith("RecordReader.")) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.displayName(propertyDescriptorName)
.required(false)
.description("RecordReaderFactory controller service accessible from code as `" + propertyDescriptorName + "`")
.dynamic(true)
.identifiesControllerService(RecordReaderFactory.class)
.build();
}
if (propertyDescriptorName.startsWith("RecordWriter.")) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.displayName(propertyDescriptorName)
.required(false)
.description("RecordSetWriterFactory controller service accessible from code as `" + propertyDescriptorName + "`")
.dynamic(true)
.identifiesControllerService(RecordSetWriterFactory.class)
.build();
}
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
@ -506,7 +537,7 @@ public class ExecuteGroovyScript extends AbstractProcessor {
}
/** simple HashMap with exception on access of non-existent key */
private class AccessMap extends HashMap {
private static class AccessMap extends HashMap<String,Object> {
private String parentKey;
AccessMap(String parentKey){
this.parentKey=parentKey;

View File

@ -64,6 +64,18 @@
<td>Map populated with `groovy.sql.Sql` objects connected to corresponding database defined with `SQL.*` processor properties.
<br/>The `SQL.` prefixed properties could be linked only to DBCPSercice.</td>
</tr>
<tr>
<td>RecordReader</td>
<td>java.util.HashMap&lt;String,<a href="https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordReaderFactory.java">RecordReaderFactory</a>&gt;</td>
<td>Map populated with controller services defined with `RecordReader.*` processor properties.
<br/>The `RecordReader.` prefixed properties are to be linked to RecordReaderFactory controller service instances.</td>
</tr>
<tr>
<td>RecordWriter</td>
<td>java.util.HashMap&lt;String,<a href="https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-service-api/src/main/java/org/apache/nifi/serialization/RecordSetWriterFactory.java">RecordSetWriterFactory</a>&gt;</td>
<td>Map populated with controller services defined with `RecordWriter.*` processor properties.
<br/>The `RecordWriter.` prefixed properties are to be linked to RecordSetWriterFactory controller service instances.</td>
</tr>
<tr>
<td>Dynamic processor properties</td>
<td>org.apache.nifi.components.PropertyDescriptor</td>

View File

@ -16,6 +16,13 @@
*/
package org.apache.nifi.processors.groovyx;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.MockProcessorInitializationContext;
@ -38,6 +45,7 @@ import java.io.FileInputStream;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.HashMap;
@ -61,6 +69,9 @@ public class ExecuteGroovyScriptTest {
protected TestRunner runner;
protected static DBCPService dbcp = null; //to make single initialization
protected MockRecordParser recordParser = null;
protected RecordSetWriterFactory recordWriter = null;
protected RecordSchema recordSchema = null;
protected ExecuteGroovyScript proc;
public final String TEST_RESOURCE_LOCATION = "target/test/resources/groovy/";
private final String TEST_CSV_DATA = "gender,title,first,last\n"
@ -121,6 +132,21 @@ public class ExecuteGroovyScriptTest {
runner = TestRunners.newTestRunner(proc);
runner.addControllerService("dbcp", dbcp, new HashMap<>());
runner.enableControllerService(dbcp);
List<RecordField> recordFields = Arrays.asList(
new RecordField("id", RecordFieldType.INT.getDataType()),
new RecordField("name", RecordFieldType.STRING.getDataType()),
new RecordField("code", RecordFieldType.INT.getDataType()));
recordSchema = new SimpleRecordSchema(recordFields);
recordParser = new MockRecordParser();
recordFields.forEach((r) -> recordParser.addSchemaField(r));
runner.addControllerService("myreader", recordParser, new HashMap<>());
runner.enableControllerService(recordParser);
recordWriter = new MockRecordWriter();
runner.addControllerService("mywriter", recordWriter, new HashMap<>());
runner.enableControllerService(recordWriter);
}
/**
@ -225,6 +251,7 @@ public class ExecuteGroovyScriptTest {
runner.setProperty(proc.SCRIPT_BODY, " { { ");
runner.assertNotValid();
}
//---------------------------------------------------------
@Test
public void test_ctl_01_access() throws Exception {
@ -309,6 +336,23 @@ public class ExecuteGroovyScriptTest {
resultFile.assertContentEquals(JsonOutput.toJson(new JsonSlurper().parseText(lines.get(1))), "UTF-8");
}
@Test
public void test_record_reader_writer_access() throws Exception {
runner.setProperty(ExecuteGroovyScript.SCRIPT_FILE, TEST_RESOURCE_LOCATION + "test_record_reader_writer.groovy");
runner.setProperty("RecordReader.myreader", "myreader"); //pass myreader as a service to script
runner.setProperty("RecordWriter.mywriter", "mywriter"); //pass mywriter as a service to script
runner.assertValid();
recordParser.addRecord(1, "A", "XYZ");
runner.enqueue("");
runner.run();
runner.assertAllFlowFilesTransferred(ExecuteGroovyScript.REL_SUCCESS.getName(), 1);
final List<MockFlowFile> result = runner.getFlowFilesForRelationship(ExecuteGroovyScript.REL_SUCCESS.getName());
MockFlowFile resultFile = result.get(0);
resultFile.assertContentEquals("\"1\",\"A\",\"XYZ\"\n", "UTF-8");
}
@Test
public void test_filter_01() throws Exception {
runner.setProperty(proc.SCRIPT_BODY, "def ff = session.get{it.FILTER=='3'}; if(!ff)return; REL_SUCCESS << ff;");

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.serialization.RecordReaderFactory
import org.apache.nifi.processor.io.StreamCallback
import org.apache.nifi.serialization.RecordSetWriterFactory
//just check that it's possible to access controller services
def ff = session.get()
if (!ff) return
def readerFactory = RecordReader.myreader
assert readerFactory instanceof RecordReaderFactory
def writerFactory = RecordWriter.mywriter
assert writerFactory instanceof RecordSetWriterFactory
session.write(ff, { inStream, outStream ->
def variables = new HashMap<String, String>(ff.attributes)
def recordReader = readerFactory.createRecordReader(variables, inStream, -1L, log)
def recordWriter = writerFactory.createWriter(log, recordReader.schema, outStream, variables)
def record = null
recordWriter.beginRecordSet()
while (record = recordReader.nextRecord()) {
recordWriter.write(record)
}
recordWriter.finishRecordSet()
} as StreamCallback)
REL_SUCCESS << ff