NIFI-3734: Add ScriptedReader and ScriptedRecordSetWriter

This closes #1691.

Signed-off-by: Andy LoPresto <alopresto@apache.org>
This commit is contained in:
Matt Burgess 2017-04-25 00:38:14 -04:00 committed by Andy LoPresto
parent ee4b88620a
commit 49a62448ce
No known key found for this signature in database
GPG Key ID: 6EC293152D90B61D
10 changed files with 1111 additions and 0 deletions

View File

@ -38,6 +38,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>

View File

@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.script;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.script.ScriptingComponentHelper;
import org.apache.nifi.processors.script.ScriptingComponentUtils;
import org.apache.nifi.util.StringUtils;
import javax.script.ScriptEngine;
import java.io.FileInputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* An abstract base class containing code common to the Scripted record reader/writer implementations
*/
public abstract class AbstractScriptedRecordFactory<T> extends AbstractControllerService {
protected final AtomicReference<T> recordFactory = new AtomicReference<>();
protected final AtomicReference<Collection<ValidationResult>> validationResults = new AtomicReference<>(new ArrayList<>());
protected final AtomicBoolean scriptNeedsReload = new AtomicBoolean(true);
protected volatile ScriptEngine scriptEngine = null;
protected volatile ScriptingComponentHelper scriptingComponentHelper = new ScriptingComponentHelper();
protected volatile ConfigurationContext configurationContext = null;
/**
* Returns a list of property descriptors supported by this record reader. The
* list always includes properties such as script engine name, script file
* name, script body name, script arguments, and an external module path.
*
* @return a List of PropertyDescriptor objects supported by this processor
*/
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
synchronized (scriptingComponentHelper.isInitialized) {
if (!scriptingComponentHelper.isInitialized.get()) {
scriptingComponentHelper.createResources();
}
}
List<PropertyDescriptor> supportedPropertyDescriptors = new ArrayList<>();
supportedPropertyDescriptors.addAll(scriptingComponentHelper.getDescriptors());
return Collections.unmodifiableList(supportedPropertyDescriptors);
}
/**
* Returns a PropertyDescriptor for the given name. This is for the user to
* be able to define their own properties which will be available as
* variables in the script
*
* @param propertyDescriptorName used to lookup if any property descriptors
* exist for that name
* @return a PropertyDescriptor object corresponding to the specified
* dynamic property name
*/
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.dynamic(true)
.build();
}
/**
* Handles changes to this processor's properties. If changes are made to
* script- or engine-related properties, the script will be reloaded.
*
* @param descriptor of the modified property
* @param oldValue non-null property value (previous)
* @param newValue the new property value or if null indicates the property
*/
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor)
|| ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor)
|| ScriptingComponentUtils.MODULES.equals(descriptor)
|| scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
scriptNeedsReload.set(true);
// Need to reset scriptEngine if the value has changed
if (scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
scriptEngine = null;
}
}
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return scriptingComponentHelper.customValidate(validationContext);
}
public void onEnabled(final ConfigurationContext context) {
this.configurationContext = context;
scriptingComponentHelper.setScriptEngineName(context.getProperty(scriptingComponentHelper.SCRIPT_ENGINE).getValue());
scriptingComponentHelper.setScriptPath(context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue());
scriptingComponentHelper.setScriptBody(context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue());
String modulePath = context.getProperty(ScriptingComponentUtils.MODULES).evaluateAttributeExpressions().getValue();
if (!StringUtils.isEmpty(modulePath)) {
scriptingComponentHelper.setModules(modulePath.split(","));
} else {
scriptingComponentHelper.setModules(new String[0]);
}
setup();
}
public void setup() {
// Create a single script engine, the Processor object is reused by each task
if (scriptEngine == null) {
scriptingComponentHelper.setup(1, getLogger());
scriptEngine = scriptingComponentHelper.engineQ.poll();
}
if (scriptEngine == null) {
throw new ProcessException("No script engine available!");
}
if (scriptNeedsReload.get() || recordFactory.get() == null) {
if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) {
reloadScriptFile(scriptingComponentHelper.getScriptPath());
} else {
reloadScriptBody(scriptingComponentHelper.getScriptBody());
}
scriptNeedsReload.set(false);
}
}
/**
* Reloads the script located at the given path
*
* @param scriptPath the path to the script file to be loaded
* @return true if the script was loaded successfully; false otherwise
*/
private boolean reloadScriptFile(final String scriptPath) {
final Collection<ValidationResult> results = new HashSet<>();
try (final FileInputStream scriptStream = new FileInputStream(scriptPath)) {
return reloadScript(IOUtils.toString(scriptStream, Charset.defaultCharset()));
} catch (final Exception e) {
final ComponentLog logger = getLogger();
final String message = "Unable to load script: " + e;
logger.error(message, e);
results.add(new ValidationResult.Builder()
.subject("ScriptValidation")
.valid(false)
.explanation("Unable to load script due to " + e)
.input(scriptPath)
.build());
}
// store the updated validation results
validationResults.set(results);
// return whether there was any issues loading the configured script
return results.isEmpty();
}
/**
* Reloads the script defined by the given string
*
* @param scriptBody the contents of the script to be loaded
* @return true if the script was loaded successfully; false otherwise
*/
private boolean reloadScriptBody(final String scriptBody) {
final Collection<ValidationResult> results = new HashSet<>();
try {
return reloadScript(scriptBody);
} catch (final Exception e) {
final ComponentLog logger = getLogger();
final String message = "Unable to load script: " + e;
logger.error(message, e);
results.add(new ValidationResult.Builder()
.subject("ScriptValidation")
.valid(false)
.explanation("Unable to load script due to " + e)
.input(scriptingComponentHelper.getScriptPath())
.build());
}
// store the updated validation results
validationResults.set(results);
// return whether there was any issues loading the configured script
return results.isEmpty();
}
protected abstract boolean reloadScript(final String scriptBody);
}

View File

@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.script;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.script.ScriptEngineConfigurator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import javax.script.Invocable;
import javax.script.ScriptException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Collection;
import java.util.HashSet;
/**
* A RecordReader implementation that allows the user to script the RecordReader instance
*/
@Tags({"record", "recordFactory", "script", "invoke", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj", "restricted"})
@CapabilityDescription("Allows the user to provide a scripted RecordReaderFactory instance in order to read/parse/generate records from an incoming flow file.")
@Restricted("Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
public class ScriptedReader extends AbstractScriptedRecordFactory<RecordReaderFactory> implements RecordReaderFactory {
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
super.onEnabled(context);
}
@Override
public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
if (recordFactory.get() != null) {
try {
return recordFactory.get().createRecordReader(flowFile, in, logger);
} catch (UndeclaredThrowableException ute) {
throw new IOException(ute.getCause());
}
}
return null;
}
/**
* Reloads the script RecordReaderFactory. This must be called within the lock.
*
* @param scriptBody An input stream associated with the script content
* @return Whether the script was successfully reloaded
*/
protected boolean reloadScript(final String scriptBody) {
// note we are starting here with a fresh listing of validation
// results since we are (re)loading a new/updated script. any
// existing validation results are not relevant
final Collection<ValidationResult> results = new HashSet<>();
try {
// get the engine and ensure its invocable
if (scriptEngine instanceof Invocable) {
final Invocable invocable = (Invocable) scriptEngine;
// Find a custom configurator and invoke their eval() method
ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
if (configurator != null) {
configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
} else {
// evaluate the script
scriptEngine.eval(scriptBody);
}
// get configured processor from the script (if it exists)
final Object obj = scriptEngine.get("reader");
if (obj != null) {
final ComponentLog logger = getLogger();
try {
// set the logger if the processor wants it
invocable.invokeMethod(obj, "setLogger", logger);
} catch (final NoSuchMethodException nsme) {
if (logger.isDebugEnabled()) {
logger.debug("Configured script RecordReaderFactory does not contain a setLogger method.");
}
}
if (configurationContext != null) {
try {
// set the logger if the processor wants it
invocable.invokeMethod(obj, "setConfigurationContext", configurationContext);
} catch (final NoSuchMethodException nsme) {
if (logger.isDebugEnabled()) {
logger.debug("Configured script RecordReaderFactory does not contain a setConfigurationContext method.");
}
}
}
// record the processor for use later
final RecordReaderFactory scriptedReader = invocable.getInterface(obj, RecordReaderFactory.class);
recordFactory.set(scriptedReader);
} else {
throw new ScriptException("No RecordReader was defined by the script.");
}
}
} catch (final Exception ex) {
final ComponentLog logger = getLogger();
final String message = "Unable to load script: " + ex.getLocalizedMessage();
logger.error(message, ex);
results.add(new ValidationResult.Builder()
.subject("ScriptValidation")
.valid(false)
.explanation("Unable to load script due to " + ex.getLocalizedMessage())
.input(scriptingComponentHelper.getScriptPath())
.build());
}
// store the updated validation results
validationResults.set(results);
// return whether there was any issues loading the configured script
return results.isEmpty();
}
}

View File

@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.script;
import org.apache.nifi.annotation.behavior.Restricted;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processors.script.ScriptEngineConfigurator;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import javax.script.Invocable;
import javax.script.ScriptException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.Collection;
import java.util.HashSet;
/**
* A RecordSetWriter implementation that allows the user to script the RecordWriter instance
*/
@Tags({"record", "writer", "script", "invoke", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj", "restricted"})
@CapabilityDescription("Allows the user to provide a scripted RecordSetWriterFactory instance in order to write records to an outgoing flow file.")
@Restricted("Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<RecordSetWriterFactory> implements RecordSetWriterFactory {
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
super.onEnabled(context);
}
@Override
public RecordSetWriter createWriter(ComponentLog logger, FlowFile flowFile, InputStream flowFileContent) throws SchemaNotFoundException, IOException {
if (recordFactory.get() != null) {
try {
return recordFactory.get().createWriter(logger, flowFile, flowFileContent);
} catch (UndeclaredThrowableException ute) {
throw new IOException(ute.getCause());
}
}
return null;
}
/**
* Reloads the script RecordSetWriterFactory. This must be called within the lock.
*
* @param scriptBody An input stream associated with the script content
* @return Whether the script was successfully reloaded
*/
protected boolean reloadScript(final String scriptBody) {
// note we are starting here with a fresh listing of validation
// results since we are (re)loading a new/updated script. any
// existing validation results are not relevant
final Collection<ValidationResult> results = new HashSet<>();
try {
// get the engine and ensure its invocable
if (scriptEngine instanceof Invocable) {
final Invocable invocable = (Invocable) scriptEngine;
// Find a custom configurator and invoke their eval() method
ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
if (configurator != null) {
configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
} else {
// evaluate the script
scriptEngine.eval(scriptBody);
}
// get configured processor from the script (if it exists)
final Object obj = scriptEngine.get("writer");
if (obj != null) {
final ComponentLog logger = getLogger();
try {
// set the logger if the processor wants it
invocable.invokeMethod(obj, "setLogger", logger);
} catch (final NoSuchMethodException nsme) {
if (logger.isDebugEnabled()) {
logger.debug("Configured script RecordSetWriterFactory does not contain a setLogger method.");
}
}
if (configurationContext != null) {
try {
// set the logger if the processor wants it
invocable.invokeMethod(obj, "setConfigurationContext", configurationContext);
} catch (final NoSuchMethodException nsme) {
if (logger.isDebugEnabled()) {
logger.debug("Configured script RecordSetWriterFactory does not contain a setConfigurationContext method.");
}
}
}
// record the processor for use later
final RecordSetWriterFactory scriptedWriter = invocable.getInterface(obj, RecordSetWriterFactory.class);
recordFactory.set(scriptedWriter);
} else {
throw new ScriptException("No RecordSetWriterFactory was defined by the script.");
}
}
} catch (final Exception ex) {
final ComponentLog logger = getLogger();
final String message = "Unable to load script: " + ex.getLocalizedMessage();
logger.error(message, ex);
results.add(new ValidationResult.Builder()
.subject("ScriptValidation")
.valid(false)
.explanation("Unable to load script due to " + ex.getLocalizedMessage())
.input(scriptingComponentHelper.getScriptPath())
.build());
}
// store the updated validation results
validationResults.set(results);
// return whether there was any issues loading the configured script
return results.isEmpty();
}
}

View File

@ -0,0 +1,17 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.record.script.ScriptedReader
org.apache.nifi.record.script.ScriptedRecordSetWriter

View File

@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.script
import org.apache.commons.io.FileUtils
import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.controller.ConfigurationContext
import org.apache.nifi.controller.ControllerServiceInitializationContext
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.processor.util.StandardValidators
import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper
import org.apache.nifi.processors.script.ScriptingComponentHelper
import org.apache.nifi.processors.script.ScriptingComponentUtils
import org.apache.nifi.serialization.RecordReader
import org.apache.nifi.util.MockComponentLog
import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.MockPropertyValue
import org.apache.nifi.util.TestRunners
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import static groovy.util.GroovyTestCase.assertEquals
import static org.junit.Assert.assertNotNull
import static org.junit.Assert.assertNull
import static org.mockito.Mockito.mock
import static org.mockito.Mockito.when
/**
* Unit tests for the ScriptedReader class
*/
@RunWith(JUnit4.class)
class ScriptedReaderTest {
private static final Logger logger = LoggerFactory.getLogger(ScriptedReaderTest)
def recordReaderFactory
def runner
def scriptingComponent
@BeforeClass
static void setUpOnce() throws Exception {
logger.metaClass.methodMissing = {String name, args ->
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
}
FileUtils.copyDirectory('src/test/resources' as File, 'target/test/resources' as File)
}
@Before
void setUp() {
recordReaderFactory = new MockScriptedReader()
runner = TestRunners
scriptingComponent = (AccessibleScriptingComponentHelper) recordReaderFactory
}
@Test
void testRecordReaderGroovyScript() {
def properties = [:] as Map<PropertyDescriptor, String>
recordReaderFactory.getSupportedPropertyDescriptors().each {PropertyDescriptor descriptor ->
properties.put(descriptor, descriptor.getDefaultValue())
}
// Mock the ConfigurationContext for setup(...)
def configurationContext = mock(ConfigurationContext)
when(configurationContext.getProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE))
.thenReturn(new MockPropertyValue('Groovy'))
when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_FILE))
.thenReturn(new MockPropertyValue('target/test/resources/groovy/test_record_reader_inline.groovy'))
when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_BODY))
.thenReturn(new MockPropertyValue(null))
when(configurationContext.getProperty(ScriptingComponentUtils.MODULES))
.thenReturn(new MockPropertyValue(null))
def logger = mock(ComponentLog)
def initContext = mock(ControllerServiceInitializationContext)
when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString())
when(initContext.getLogger()).thenReturn(logger)
recordReaderFactory.initialize initContext
recordReaderFactory.onEnabled configurationContext
MockFlowFile mockFlowFile = new MockFlowFile(1L)
InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes)
RecordReader recordReader = recordReaderFactory.createRecordReader(mockFlowFile, inStream, logger)
assertNotNull(recordReader)
3.times {
def record = recordReader.nextRecord()
assertNotNull(record)
assertEquals(record.getAsInt('code'), record.getAsInt('id') * 100)
}
assertNull(recordReader.nextRecord())
}
@Test
void testXmlRecordReaderGroovyScript() {
def properties = [:] as Map<PropertyDescriptor, String>
recordReaderFactory.getSupportedPropertyDescriptors().each {PropertyDescriptor descriptor ->
properties.put(descriptor, descriptor.getDefaultValue())
}
// Test dynamic property descriptor
PropertyDescriptor SCHEMA_TEXT = new PropertyDescriptor.Builder()
.name('schema.text')
.dynamic(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build()
def schemaText = '''
[
{"id": "int"},
{"name": "string"},
{"code": "int"}
]
'''
properties.put(SCHEMA_TEXT, schemaText)
// Mock the ConfigurationContext for setup(...)
def configurationContext = mock(ConfigurationContext)
when(configurationContext.getProperties()).thenReturn(properties)
when(configurationContext.getProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE))
.thenReturn(new MockPropertyValue('Groovy'))
when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_FILE))
.thenReturn(new MockPropertyValue('target/test/resources/groovy/test_record_reader_xml.groovy'))
when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_BODY))
.thenReturn(new MockPropertyValue(null))
when(configurationContext.getProperty(ScriptingComponentUtils.MODULES))
.thenReturn(new MockPropertyValue(null))
when(configurationContext.getProperty(SCHEMA_TEXT)).thenReturn(new MockPropertyValue(schemaText))
def logger = new MockComponentLog('ScriptedReader', '')
def initContext = mock(ControllerServiceInitializationContext)
when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString())
when(initContext.getLogger()).thenReturn(logger)
recordReaderFactory.initialize initContext
recordReaderFactory.onEnabled configurationContext
MockFlowFile mockFlowFile = new MockFlowFile(1L)
mockFlowFile.putAttributes(['record.tag': 'myRecord'])
InputStream inStream = new ByteArrayInputStream('''
<root>
<myRecord>
<id>1</id>
<name>John</name>
<code>100</code>
</myRecord>
<myRecord>
<id>2</id>
<name>Mary</name>
<code>200</code>
</myRecord>
<myRecord>
<id>3</id>
<name>Ramon</name>
<code>300</code>
</myRecord>
</root>
'''.bytes)
RecordReader recordReader = recordReaderFactory.createRecordReader(mockFlowFile, inStream, logger)
assertNotNull(recordReader)
3.times {
def record = recordReader.nextRecord()
assertNotNull(record)
assertEquals(record.getAsInt('code'), record.getAsInt('id') * 100)
}
assertNull(recordReader.nextRecord())
}
class MockScriptedReader extends ScriptedReader implements AccessibleScriptingComponentHelper {
@Override
ScriptingComponentHelper getScriptingComponentHelper() {
return this.@scriptingComponentHelper
}
}
}

View File

@ -0,0 +1,137 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.script
import org.apache.commons.io.FileUtils
import org.apache.nifi.components.PropertyDescriptor
import org.apache.nifi.controller.ConfigurationContext
import org.apache.nifi.controller.ControllerServiceInitializationContext
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper
import org.apache.nifi.processors.script.ScriptingComponentHelper
import org.apache.nifi.processors.script.ScriptingComponentUtils
import org.apache.nifi.serialization.RecordSetWriter
import org.apache.nifi.serialization.SimpleRecordSchema
import org.apache.nifi.serialization.record.MapRecord
import org.apache.nifi.serialization.record.RecordField
import org.apache.nifi.serialization.record.RecordFieldType
import org.apache.nifi.serialization.record.RecordSet
import org.apache.nifi.util.MockFlowFile
import org.apache.nifi.util.MockPropertyValue
import org.apache.nifi.util.TestRunners
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import static org.junit.Assert.assertNotNull
import static org.junit.Assert.assertEquals
import static org.mockito.Mockito.mock
import static org.mockito.Mockito.when
/**
* Unit tests for the ScriptedReader class
*/
@RunWith(JUnit4.class)
class ScriptedRecordSetWriterTest {
private static final Logger logger = LoggerFactory.getLogger(ScriptedRecordSetWriterTest)
MockScriptedWriter recordSetWriterFactory
def runner
def scriptingComponent
@BeforeClass
static void setUpOnce() throws Exception {
logger.metaClass.methodMissing = {String name, args ->
logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
}
FileUtils.copyDirectory('src/test/resources' as File, 'target/test/resources' as File)
}
@Before
void setUp() {
recordSetWriterFactory = new MockScriptedWriter()
runner = TestRunners
scriptingComponent = (AccessibleScriptingComponentHelper) recordSetWriterFactory
}
@Test
void testRecordWriterGroovyScript() {
def properties = [:] as Map<PropertyDescriptor, String>
recordSetWriterFactory.getSupportedPropertyDescriptors().each {PropertyDescriptor descriptor ->
properties.put(descriptor, descriptor.getDefaultValue())
}
// Mock the ConfigurationContext for setup(...)
def configurationContext = mock(ConfigurationContext)
when(configurationContext.getProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE))
.thenReturn(new MockPropertyValue('Groovy'))
when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_FILE))
.thenReturn(new MockPropertyValue('target/test/resources/groovy/test_record_writer_inline.groovy'))
when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_BODY))
.thenReturn(new MockPropertyValue(null))
when(configurationContext.getProperty(ScriptingComponentUtils.MODULES))
.thenReturn(new MockPropertyValue(null))
def logger = mock(ComponentLog)
def initContext = mock(ControllerServiceInitializationContext)
when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString())
when(initContext.getLogger()).thenReturn(logger)
recordSetWriterFactory.initialize initContext
recordSetWriterFactory.onEnabled configurationContext
MockFlowFile mockFlowFile = new MockFlowFile(1L)
InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes)
RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, mockFlowFile, inStream)
assertNotNull(recordSetWriter)
def recordSchema = new SimpleRecordSchema(
[new RecordField('id', RecordFieldType.INT.dataType),
new RecordField('name', RecordFieldType.STRING.dataType),
new RecordField('code', RecordFieldType.INT.dataType)]
)
def records = [
new MapRecord(recordSchema, ['id': 1, 'name': 'John', 'code': 100]),
new MapRecord(recordSchema, ['id': 2, 'name': 'Mary', 'code': 200]),
new MapRecord(recordSchema, ['id': 3, 'name': 'Ramon', 'code': 300])
] as MapRecord[]
ByteArrayOutputStream outputStream = new ByteArrayOutputStream()
recordSetWriter.write(RecordSet.of(recordSchema, records), outputStream)
def xml = new XmlSlurper().parseText(outputStream.toString())
assertEquals('1', xml.record[0].id.toString())
assertEquals('200', xml.record[1].code.toString())
assertEquals('Ramon', xml.record[2].name.toString())
}
class MockScriptedWriter extends ScriptedRecordSetWriter implements AccessibleScriptingComponentHelper {
@Override
ScriptingComponentHelper getScriptingComponentHelper() {
return this.@scriptingComponentHelper
}
}
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.schema.access.SchemaNotFoundException
import org.apache.nifi.serialization.MalformedRecordException
import org.apache.nifi.serialization.RecordReader
import org.apache.nifi.serialization.RecordReaderFactory
import org.apache.nifi.serialization.SimpleRecordSchema
import org.apache.nifi.serialization.record.MapRecord
import org.apache.nifi.serialization.record.Record
import org.apache.nifi.serialization.record.RecordField
import org.apache.nifi.serialization.record.RecordFieldType
import org.apache.nifi.serialization.record.RecordSchema
class GroovyRecordReader implements RecordReader {
def recordSchema = new SimpleRecordSchema(
[new RecordField('id', RecordFieldType.INT.dataType),
new RecordField('name', RecordFieldType.STRING.dataType),
new RecordField('code', RecordFieldType.INT.dataType)]
)
def recordIterator = [
new MapRecord(recordSchema, ['id': 1, 'name': 'John', 'code': 100]),
new MapRecord(recordSchema, ['id': 2, 'name': 'Mary', 'code': 200]),
new MapRecord(recordSchema, ['id': 3, 'name': 'Ramon', 'code': 300])
].iterator()
Record nextRecord() throws IOException, MalformedRecordException {
return recordIterator.hasNext() ? recordIterator.next() : null
}
RecordSchema getSchema() throws MalformedRecordException {
return recordSchema
}
void close() throws IOException {
}
}
class GroovyRecordReaderFactory extends AbstractControllerService implements RecordReaderFactory {
RecordReader createRecordReader(FlowFile flowFile, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
return new GroovyRecordReader()
}
}
reader = new GroovyRecordReaderFactory()

View File

@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import groovy.json.JsonSlurper
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.controller.ConfigurationContext
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.schema.access.SchemaNotFoundException
import org.apache.nifi.serialization.MalformedRecordException
import org.apache.nifi.serialization.RecordReader
import org.apache.nifi.serialization.RecordReaderFactory
import org.apache.nifi.serialization.SimpleRecordSchema
import org.apache.nifi.serialization.record.MapRecord
import org.apache.nifi.serialization.record.Record
import org.apache.nifi.serialization.record.RecordField
import org.apache.nifi.serialization.record.RecordFieldType
import org.apache.nifi.serialization.record.RecordSchema
class GroovyXmlRecordReader implements RecordReader {
def recordIterator
def recordSchema
GroovyXmlRecordReader(final String recordTag, final RecordSchema schema, final InputStream inputStream) {
recordSchema = schema
def xml = new XmlSlurper().parse(inputStream)
// Change the XML fields to a MapRecord for each incoming record
recordIterator = xml[recordTag].collect {r ->
// Create a map of field names to values, using the field names from the schema as keys into the XML object
def fields = recordSchema.fieldNames.inject([:]) {result, fieldName ->
result[fieldName] = r[fieldName].toString()
result
}
new MapRecord(recordSchema, fields)
}.iterator()
}
Record nextRecord() throws IOException, MalformedRecordException {
return recordIterator?.hasNext() ? recordIterator.next() : null
}
RecordSchema getSchema() throws MalformedRecordException {
return recordSchema
}
void close() throws IOException {
}
}
class GroovyXmlRecordReaderFactory extends AbstractControllerService implements RecordReaderFactory {
// Will be set by the ScriptedRecordReaderFactory
ConfigurationContext configurationContext
RecordReader createRecordReader(FlowFile flowFile, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
// Expecting 'schema.text' to have be an JSON array full of objects whose keys are the field name and the value maps to a RecordFieldType
def schemaText = configurationContext.properties.find {p -> p.key.dynamic && p.key.name == 'schema.text'}?.getValue()
if (!schemaText) return null
def jsonSchema = new JsonSlurper().parseText(schemaText)
def recordSchema = new SimpleRecordSchema(jsonSchema.collect {field ->
def entry = field.entrySet()[0]
new RecordField(entry.key, RecordFieldType.of(entry.value).dataType)
} as List<RecordField>)
return new GroovyXmlRecordReader(flowFile.getAttribute('record.tag'), recordSchema, inputStream)
}
}
// Create an instance of RecordReaderFactory called "writer", this is the entry point for ScriptedReader
reader = new GroovyXmlRecordReaderFactory()

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import groovy.xml.MarkupBuilder
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.schema.access.SchemaNotFoundException
import org.apache.nifi.serialization.RecordSetWriter
import org.apache.nifi.serialization.RecordSetWriterFactory
import org.apache.nifi.serialization.WriteResult
import org.apache.nifi.serialization.record.Record
import org.apache.nifi.serialization.record.RecordSet
import org.apache.nifi.stream.io.NonCloseableOutputStream
class GroovyRecordSetWriter implements RecordSetWriter {
@Override
WriteResult write(Record r, OutputStream out) throws IOException {
new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw ->
new MarkupBuilder(osw).record {
r.schema.fieldNames.each {fieldName ->
"$fieldName" r.getValue(fieldName)
}
}
}
WriteResult.of(0, [:])
}
@Override
String getMimeType() {
return 'application/xml'
}
@Override
WriteResult write(final RecordSet rs, final OutputStream rawOut) throws IOException {
int count = 0
new OutputStreamWriter(new NonCloseableOutputStream(rawOut)).with {osw ->
new MarkupBuilder(osw).recordSet {
Record r
while (r = rs.next()) {
count++
record {
rs.schema.fieldNames.each {fieldName ->
"$fieldName" r.getValue(fieldName)
}
}
}
}
}
WriteResult.of(count, [:])
}
}
class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory {
@Override
RecordSetWriter createWriter(ComponentLog logger, FlowFile flowFile, InputStream flowFileContent) throws SchemaNotFoundException, IOException {
return new GroovyRecordSetWriter()
}
}
writer = new GroovyRecordSetWriterFactory()