diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml index e4873704f9..fc01a1899b 100644 --- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml @@ -38,6 +38,10 @@ org.apache.nifi nifi-utils + + org.apache.nifi + nifi-record-serialization-service-api + org.codehaus.groovy groovy-all diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/AbstractScriptedRecordFactory.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/AbstractScriptedRecordFactory.java new file mode 100644 index 0000000000..9b70fe7da9 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/AbstractScriptedRecordFactory.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.record.script; + +import org.apache.commons.io.IOUtils; +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.components.ValidationContext; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.AbstractControllerService; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.processors.script.ScriptingComponentHelper; +import org.apache.nifi.processors.script.ScriptingComponentUtils; +import org.apache.nifi.util.StringUtils; + +import javax.script.ScriptEngine; +import java.io.FileInputStream; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +/** + * An abstract base class containing code common to the Scripted record reader/writer implementations + */ +public abstract class AbstractScriptedRecordFactory extends AbstractControllerService { + + protected final AtomicReference recordFactory = new AtomicReference<>(); + + protected final AtomicReference> validationResults = new AtomicReference<>(new ArrayList<>()); + + protected final AtomicBoolean scriptNeedsReload = new AtomicBoolean(true); + + protected volatile ScriptEngine scriptEngine = null; + protected volatile ScriptingComponentHelper scriptingComponentHelper = new ScriptingComponentHelper(); + protected volatile ConfigurationContext configurationContext = null; + + /** + * Returns a list of property descriptors supported by this record reader. The + * list always includes properties such as script engine name, script file + * name, script body name, script arguments, and an external module path. + * + * @return a List of PropertyDescriptor objects supported by this processor + */ + @Override + protected List getSupportedPropertyDescriptors() { + + synchronized (scriptingComponentHelper.isInitialized) { + if (!scriptingComponentHelper.isInitialized.get()) { + scriptingComponentHelper.createResources(); + } + } + List supportedPropertyDescriptors = new ArrayList<>(); + supportedPropertyDescriptors.addAll(scriptingComponentHelper.getDescriptors()); + + return Collections.unmodifiableList(supportedPropertyDescriptors); + } + + /** + * Returns a PropertyDescriptor for the given name. This is for the user to + * be able to define their own properties which will be available as + * variables in the script + * + * @param propertyDescriptorName used to lookup if any property descriptors + * exist for that name + * @return a PropertyDescriptor object corresponding to the specified + * dynamic property name + */ + @Override + protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { + return new PropertyDescriptor.Builder() + .name(propertyDescriptorName) + .required(false) + .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) + .expressionLanguageSupported(true) + .dynamic(true) + .build(); + } + + /** + * Handles changes to this processor's properties. If changes are made to + * script- or engine-related properties, the script will be reloaded. + * + * @param descriptor of the modified property + * @param oldValue non-null property value (previous) + * @param newValue the new property value or if null indicates the property + */ + @Override + public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) { + + if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor) + || ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor) + || ScriptingComponentUtils.MODULES.equals(descriptor) + || scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) { + scriptNeedsReload.set(true); + // Need to reset scriptEngine if the value has changed + if (scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) { + scriptEngine = null; + } + } + } + + @Override + protected Collection customValidate(ValidationContext validationContext) { + return scriptingComponentHelper.customValidate(validationContext); + } + + public void onEnabled(final ConfigurationContext context) { + this.configurationContext = context; + + scriptingComponentHelper.setScriptEngineName(context.getProperty(scriptingComponentHelper.SCRIPT_ENGINE).getValue()); + scriptingComponentHelper.setScriptPath(context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue()); + scriptingComponentHelper.setScriptBody(context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue()); + String modulePath = context.getProperty(ScriptingComponentUtils.MODULES).evaluateAttributeExpressions().getValue(); + if (!StringUtils.isEmpty(modulePath)) { + scriptingComponentHelper.setModules(modulePath.split(",")); + } else { + scriptingComponentHelper.setModules(new String[0]); + } + setup(); + } + + public void setup() { + // Create a single script engine, the Processor object is reused by each task + if (scriptEngine == null) { + scriptingComponentHelper.setup(1, getLogger()); + scriptEngine = scriptingComponentHelper.engineQ.poll(); + } + + if (scriptEngine == null) { + throw new ProcessException("No script engine available!"); + } + + if (scriptNeedsReload.get() || recordFactory.get() == null) { + if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) { + reloadScriptFile(scriptingComponentHelper.getScriptPath()); + } else { + reloadScriptBody(scriptingComponentHelper.getScriptBody()); + } + scriptNeedsReload.set(false); + } + } + + /** + * Reloads the script located at the given path + * + * @param scriptPath the path to the script file to be loaded + * @return true if the script was loaded successfully; false otherwise + */ + private boolean reloadScriptFile(final String scriptPath) { + final Collection results = new HashSet<>(); + + try (final FileInputStream scriptStream = new FileInputStream(scriptPath)) { + return reloadScript(IOUtils.toString(scriptStream, Charset.defaultCharset())); + + } catch (final Exception e) { + final ComponentLog logger = getLogger(); + final String message = "Unable to load script: " + e; + + logger.error(message, e); + results.add(new ValidationResult.Builder() + .subject("ScriptValidation") + .valid(false) + .explanation("Unable to load script due to " + e) + .input(scriptPath) + .build()); + } + + // store the updated validation results + validationResults.set(results); + + // return whether there was any issues loading the configured script + return results.isEmpty(); + } + + /** + * Reloads the script defined by the given string + * + * @param scriptBody the contents of the script to be loaded + * @return true if the script was loaded successfully; false otherwise + */ + private boolean reloadScriptBody(final String scriptBody) { + final Collection results = new HashSet<>(); + try { + return reloadScript(scriptBody); + + } catch (final Exception e) { + final ComponentLog logger = getLogger(); + final String message = "Unable to load script: " + e; + + logger.error(message, e); + results.add(new ValidationResult.Builder() + .subject("ScriptValidation") + .valid(false) + .explanation("Unable to load script due to " + e) + .input(scriptingComponentHelper.getScriptPath()) + .build()); + } + + // store the updated validation results + validationResults.set(results); + + // return whether there was any issues loading the configured script + return results.isEmpty(); + } + + protected abstract boolean reloadScript(final String scriptBody); +} diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java new file mode 100644 index 0000000000..d95c87f1ee --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedReader.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.record.script; + +import org.apache.nifi.annotation.behavior.Restricted; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processors.script.ScriptEngineConfigurator; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordReaderFactory; + +import javax.script.Invocable; +import javax.script.ScriptException; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.UndeclaredThrowableException; +import java.util.Collection; +import java.util.HashSet; + +/** + * A RecordReader implementation that allows the user to script the RecordReader instance + */ +@Tags({"record", "recordFactory", "script", "invoke", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj", "restricted"}) +@CapabilityDescription("Allows the user to provide a scripted RecordReaderFactory instance in order to read/parse/generate records from an incoming flow file.") +@Restricted("Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.") +public class ScriptedReader extends AbstractScriptedRecordFactory implements RecordReaderFactory { + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + super.onEnabled(context); + } + + @Override + public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { + if (recordFactory.get() != null) { + try { + return recordFactory.get().createRecordReader(flowFile, in, logger); + } catch (UndeclaredThrowableException ute) { + throw new IOException(ute.getCause()); + } + } + return null; + } + + /** + * Reloads the script RecordReaderFactory. This must be called within the lock. + * + * @param scriptBody An input stream associated with the script content + * @return Whether the script was successfully reloaded + */ + protected boolean reloadScript(final String scriptBody) { + // note we are starting here with a fresh listing of validation + // results since we are (re)loading a new/updated script. any + // existing validation results are not relevant + final Collection results = new HashSet<>(); + + try { + // get the engine and ensure its invocable + if (scriptEngine instanceof Invocable) { + final Invocable invocable = (Invocable) scriptEngine; + + // Find a custom configurator and invoke their eval() method + ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase()); + if (configurator != null) { + configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); + } else { + // evaluate the script + scriptEngine.eval(scriptBody); + } + + // get configured processor from the script (if it exists) + final Object obj = scriptEngine.get("reader"); + if (obj != null) { + final ComponentLog logger = getLogger(); + + try { + // set the logger if the processor wants it + invocable.invokeMethod(obj, "setLogger", logger); + } catch (final NoSuchMethodException nsme) { + if (logger.isDebugEnabled()) { + logger.debug("Configured script RecordReaderFactory does not contain a setLogger method."); + } + } + + if (configurationContext != null) { + try { + // set the logger if the processor wants it + invocable.invokeMethod(obj, "setConfigurationContext", configurationContext); + } catch (final NoSuchMethodException nsme) { + if (logger.isDebugEnabled()) { + logger.debug("Configured script RecordReaderFactory does not contain a setConfigurationContext method."); + } + } + } + + // record the processor for use later + final RecordReaderFactory scriptedReader = invocable.getInterface(obj, RecordReaderFactory.class); + recordFactory.set(scriptedReader); + + } else { + throw new ScriptException("No RecordReader was defined by the script."); + } + } + + } catch (final Exception ex) { + final ComponentLog logger = getLogger(); + final String message = "Unable to load script: " + ex.getLocalizedMessage(); + + logger.error(message, ex); + results.add(new ValidationResult.Builder() + .subject("ScriptValidation") + .valid(false) + .explanation("Unable to load script due to " + ex.getLocalizedMessage()) + .input(scriptingComponentHelper.getScriptPath()) + .build()); + } + + // store the updated validation results + validationResults.set(results); + + // return whether there was any issues loading the configured script + return results.isEmpty(); + } +} diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java new file mode 100644 index 0000000000..dd0be2b7a6 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/java/org/apache/nifi/record/script/ScriptedRecordSetWriter.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.record.script; + +import org.apache.nifi.annotation.behavior.Restricted; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.annotation.lifecycle.OnEnabled; +import org.apache.nifi.components.ValidationResult; +import org.apache.nifi.controller.ConfigurationContext; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.processors.script.ScriptEngineConfigurator; +import org.apache.nifi.schema.access.SchemaNotFoundException; +import org.apache.nifi.serialization.RecordSetWriter; +import org.apache.nifi.serialization.RecordSetWriterFactory; + +import javax.script.Invocable; +import javax.script.ScriptException; +import java.io.IOException; +import java.io.InputStream; +import java.lang.reflect.UndeclaredThrowableException; +import java.util.Collection; +import java.util.HashSet; + +/** + * A RecordSetWriter implementation that allows the user to script the RecordWriter instance + */ +@Tags({"record", "writer", "script", "invoke", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj", "restricted"}) +@CapabilityDescription("Allows the user to provide a scripted RecordSetWriterFactory instance in order to write records to an outgoing flow file.") +@Restricted("Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.") +public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory implements RecordSetWriterFactory { + + @OnEnabled + public void onEnabled(final ConfigurationContext context) { + super.onEnabled(context); + } + + @Override + public RecordSetWriter createWriter(ComponentLog logger, FlowFile flowFile, InputStream flowFileContent) throws SchemaNotFoundException, IOException { + if (recordFactory.get() != null) { + try { + return recordFactory.get().createWriter(logger, flowFile, flowFileContent); + } catch (UndeclaredThrowableException ute) { + throw new IOException(ute.getCause()); + } + } + return null; + } + + + /** + * Reloads the script RecordSetWriterFactory. This must be called within the lock. + * + * @param scriptBody An input stream associated with the script content + * @return Whether the script was successfully reloaded + */ + protected boolean reloadScript(final String scriptBody) { + // note we are starting here with a fresh listing of validation + // results since we are (re)loading a new/updated script. any + // existing validation results are not relevant + final Collection results = new HashSet<>(); + + try { + // get the engine and ensure its invocable + if (scriptEngine instanceof Invocable) { + final Invocable invocable = (Invocable) scriptEngine; + + // Find a custom configurator and invoke their eval() method + ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase()); + if (configurator != null) { + configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules()); + } else { + // evaluate the script + scriptEngine.eval(scriptBody); + } + + // get configured processor from the script (if it exists) + final Object obj = scriptEngine.get("writer"); + if (obj != null) { + final ComponentLog logger = getLogger(); + + try { + // set the logger if the processor wants it + invocable.invokeMethod(obj, "setLogger", logger); + } catch (final NoSuchMethodException nsme) { + if (logger.isDebugEnabled()) { + logger.debug("Configured script RecordSetWriterFactory does not contain a setLogger method."); + } + } + + if (configurationContext != null) { + try { + // set the logger if the processor wants it + invocable.invokeMethod(obj, "setConfigurationContext", configurationContext); + } catch (final NoSuchMethodException nsme) { + if (logger.isDebugEnabled()) { + logger.debug("Configured script RecordSetWriterFactory does not contain a setConfigurationContext method."); + } + } + } + + // record the processor for use later + final RecordSetWriterFactory scriptedWriter = invocable.getInterface(obj, RecordSetWriterFactory.class); + recordFactory.set(scriptedWriter); + + } else { + throw new ScriptException("No RecordSetWriterFactory was defined by the script."); + } + } + + } catch (final Exception ex) { + final ComponentLog logger = getLogger(); + final String message = "Unable to load script: " + ex.getLocalizedMessage(); + + logger.error(message, ex); + results.add(new ValidationResult.Builder() + .subject("ScriptValidation") + .valid(false) + .explanation("Unable to load script due to " + ex.getLocalizedMessage()) + .input(scriptingComponentHelper.getScriptPath()) + .build()); + } + + // store the updated validation results + validationResults.set(results); + + // return whether there was any issues loading the configured script + return results.isEmpty(); + } +} diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService new file mode 100644 index 0000000000..f698255478 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +org.apache.nifi.record.script.ScriptedReader +org.apache.nifi.record.script.ScriptedRecordSetWriter \ No newline at end of file diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy new file mode 100644 index 0000000000..1025146ac4 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedReaderTest.groovy @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.record.script + +import org.apache.commons.io.FileUtils +import org.apache.nifi.components.PropertyDescriptor +import org.apache.nifi.controller.ConfigurationContext +import org.apache.nifi.controller.ControllerServiceInitializationContext +import org.apache.nifi.logging.ComponentLog +import org.apache.nifi.processor.util.StandardValidators +import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper +import org.apache.nifi.processors.script.ScriptingComponentHelper +import org.apache.nifi.processors.script.ScriptingComponentUtils +import org.apache.nifi.serialization.RecordReader +import org.apache.nifi.util.MockComponentLog +import org.apache.nifi.util.MockFlowFile +import org.apache.nifi.util.MockPropertyValue +import org.apache.nifi.util.TestRunners +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import static groovy.util.GroovyTestCase.assertEquals +import static org.junit.Assert.assertNotNull +import static org.junit.Assert.assertNull +import static org.mockito.Mockito.mock +import static org.mockito.Mockito.when + +/** + * Unit tests for the ScriptedReader class + */ +@RunWith(JUnit4.class) +class ScriptedReaderTest { + + private static final Logger logger = LoggerFactory.getLogger(ScriptedReaderTest) + def recordReaderFactory + def runner + def scriptingComponent + + + @BeforeClass + static void setUpOnce() throws Exception { + logger.metaClass.methodMissing = {String name, args -> + logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") + } + FileUtils.copyDirectory('src/test/resources' as File, 'target/test/resources' as File) + } + + @Before + void setUp() { + recordReaderFactory = new MockScriptedReader() + runner = TestRunners + scriptingComponent = (AccessibleScriptingComponentHelper) recordReaderFactory + } + + @Test + void testRecordReaderGroovyScript() { + + def properties = [:] as Map + recordReaderFactory.getSupportedPropertyDescriptors().each {PropertyDescriptor descriptor -> + properties.put(descriptor, descriptor.getDefaultValue()) + } + + // Mock the ConfigurationContext for setup(...) + def configurationContext = mock(ConfigurationContext) + when(configurationContext.getProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE)) + .thenReturn(new MockPropertyValue('Groovy')) + when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_FILE)) + .thenReturn(new MockPropertyValue('target/test/resources/groovy/test_record_reader_inline.groovy')) + when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_BODY)) + .thenReturn(new MockPropertyValue(null)) + when(configurationContext.getProperty(ScriptingComponentUtils.MODULES)) + .thenReturn(new MockPropertyValue(null)) + + def logger = mock(ComponentLog) + def initContext = mock(ControllerServiceInitializationContext) + when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()) + when(initContext.getLogger()).thenReturn(logger) + + recordReaderFactory.initialize initContext + recordReaderFactory.onEnabled configurationContext + + MockFlowFile mockFlowFile = new MockFlowFile(1L) + InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes) + + RecordReader recordReader = recordReaderFactory.createRecordReader(mockFlowFile, inStream, logger) + assertNotNull(recordReader) + + 3.times { + def record = recordReader.nextRecord() + assertNotNull(record) + assertEquals(record.getAsInt('code'), record.getAsInt('id') * 100) + } + assertNull(recordReader.nextRecord()) + } + + @Test + void testXmlRecordReaderGroovyScript() { + + def properties = [:] as Map + recordReaderFactory.getSupportedPropertyDescriptors().each {PropertyDescriptor descriptor -> + properties.put(descriptor, descriptor.getDefaultValue()) + } + + // Test dynamic property descriptor + PropertyDescriptor SCHEMA_TEXT = new PropertyDescriptor.Builder() + .name('schema.text') + .dynamic(true) + .addValidator(StandardValidators.NON_BLANK_VALIDATOR) + .build() + + def schemaText = ''' + [ + {"id": "int"}, + {"name": "string"}, + {"code": "int"} + ] + ''' + properties.put(SCHEMA_TEXT, schemaText) + + // Mock the ConfigurationContext for setup(...) + def configurationContext = mock(ConfigurationContext) + when(configurationContext.getProperties()).thenReturn(properties) + when(configurationContext.getProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE)) + .thenReturn(new MockPropertyValue('Groovy')) + when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_FILE)) + .thenReturn(new MockPropertyValue('target/test/resources/groovy/test_record_reader_xml.groovy')) + when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_BODY)) + .thenReturn(new MockPropertyValue(null)) + when(configurationContext.getProperty(ScriptingComponentUtils.MODULES)) + .thenReturn(new MockPropertyValue(null)) + when(configurationContext.getProperty(SCHEMA_TEXT)).thenReturn(new MockPropertyValue(schemaText)) + + def logger = new MockComponentLog('ScriptedReader', '') + def initContext = mock(ControllerServiceInitializationContext) + when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()) + when(initContext.getLogger()).thenReturn(logger) + + recordReaderFactory.initialize initContext + recordReaderFactory.onEnabled configurationContext + + MockFlowFile mockFlowFile = new MockFlowFile(1L) + mockFlowFile.putAttributes(['record.tag': 'myRecord']) + + InputStream inStream = new ByteArrayInputStream(''' + + + 1 + John + 100 + + + 2 + Mary + 200 + + + 3 + Ramon + 300 + + + '''.bytes) + + RecordReader recordReader = recordReaderFactory.createRecordReader(mockFlowFile, inStream, logger) + assertNotNull(recordReader) + + 3.times { + def record = recordReader.nextRecord() + assertNotNull(record) + assertEquals(record.getAsInt('code'), record.getAsInt('id') * 100) + } + assertNull(recordReader.nextRecord()) + + } + + class MockScriptedReader extends ScriptedReader implements AccessibleScriptingComponentHelper { + + @Override + ScriptingComponentHelper getScriptingComponentHelper() { + return this.@scriptingComponentHelper + } + } +} diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy new file mode 100644 index 0000000000..d4e7d5ab99 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/groovy/org/apache/nifi/record/script/ScriptedRecordSetWriterTest.groovy @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.record.script + +import org.apache.commons.io.FileUtils +import org.apache.nifi.components.PropertyDescriptor +import org.apache.nifi.controller.ConfigurationContext +import org.apache.nifi.controller.ControllerServiceInitializationContext +import org.apache.nifi.logging.ComponentLog +import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper +import org.apache.nifi.processors.script.ScriptingComponentHelper +import org.apache.nifi.processors.script.ScriptingComponentUtils +import org.apache.nifi.serialization.RecordSetWriter +import org.apache.nifi.serialization.SimpleRecordSchema +import org.apache.nifi.serialization.record.MapRecord +import org.apache.nifi.serialization.record.RecordField +import org.apache.nifi.serialization.record.RecordFieldType +import org.apache.nifi.serialization.record.RecordSet +import org.apache.nifi.util.MockFlowFile +import org.apache.nifi.util.MockPropertyValue +import org.apache.nifi.util.TestRunners +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Test +import org.junit.runner.RunWith +import org.junit.runners.JUnit4 +import org.slf4j.Logger +import org.slf4j.LoggerFactory + +import static org.junit.Assert.assertNotNull +import static org.junit.Assert.assertEquals +import static org.mockito.Mockito.mock +import static org.mockito.Mockito.when + +/** + * Unit tests for the ScriptedReader class + */ +@RunWith(JUnit4.class) +class ScriptedRecordSetWriterTest { + + private static final Logger logger = LoggerFactory.getLogger(ScriptedRecordSetWriterTest) + MockScriptedWriter recordSetWriterFactory + def runner + def scriptingComponent + + + @BeforeClass + static void setUpOnce() throws Exception { + logger.metaClass.methodMissing = {String name, args -> + logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}") + } + FileUtils.copyDirectory('src/test/resources' as File, 'target/test/resources' as File) + } + + @Before + void setUp() { + recordSetWriterFactory = new MockScriptedWriter() + runner = TestRunners + scriptingComponent = (AccessibleScriptingComponentHelper) recordSetWriterFactory + } + + @Test + void testRecordWriterGroovyScript() { + + def properties = [:] as Map + recordSetWriterFactory.getSupportedPropertyDescriptors().each {PropertyDescriptor descriptor -> + properties.put(descriptor, descriptor.getDefaultValue()) + } + + // Mock the ConfigurationContext for setup(...) + def configurationContext = mock(ConfigurationContext) + when(configurationContext.getProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE)) + .thenReturn(new MockPropertyValue('Groovy')) + when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_FILE)) + .thenReturn(new MockPropertyValue('target/test/resources/groovy/test_record_writer_inline.groovy')) + when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_BODY)) + .thenReturn(new MockPropertyValue(null)) + when(configurationContext.getProperty(ScriptingComponentUtils.MODULES)) + .thenReturn(new MockPropertyValue(null)) + + def logger = mock(ComponentLog) + def initContext = mock(ControllerServiceInitializationContext) + when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString()) + when(initContext.getLogger()).thenReturn(logger) + + recordSetWriterFactory.initialize initContext + recordSetWriterFactory.onEnabled configurationContext + + MockFlowFile mockFlowFile = new MockFlowFile(1L) + InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes) + + RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(logger, mockFlowFile, inStream) + assertNotNull(recordSetWriter) + + def recordSchema = new SimpleRecordSchema( + [new RecordField('id', RecordFieldType.INT.dataType), + new RecordField('name', RecordFieldType.STRING.dataType), + new RecordField('code', RecordFieldType.INT.dataType)] + ) + + def records = [ + new MapRecord(recordSchema, ['id': 1, 'name': 'John', 'code': 100]), + new MapRecord(recordSchema, ['id': 2, 'name': 'Mary', 'code': 200]), + new MapRecord(recordSchema, ['id': 3, 'name': 'Ramon', 'code': 300]) + ] as MapRecord[] + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream() + recordSetWriter.write(RecordSet.of(recordSchema, records), outputStream) + + def xml = new XmlSlurper().parseText(outputStream.toString()) + assertEquals('1', xml.record[0].id.toString()) + assertEquals('200', xml.record[1].code.toString()) + assertEquals('Ramon', xml.record[2].name.toString()) + } + + class MockScriptedWriter extends ScriptedRecordSetWriter implements AccessibleScriptingComponentHelper { + + @Override + ScriptingComponentHelper getScriptingComponentHelper() { + return this.@scriptingComponentHelper + } + } +} diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy new file mode 100644 index 0000000000..2be37dffc2 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.nifi.controller.AbstractControllerService +import org.apache.nifi.flowfile.FlowFile +import org.apache.nifi.logging.ComponentLog +import org.apache.nifi.schema.access.SchemaNotFoundException +import org.apache.nifi.serialization.MalformedRecordException +import org.apache.nifi.serialization.RecordReader +import org.apache.nifi.serialization.RecordReaderFactory +import org.apache.nifi.serialization.SimpleRecordSchema +import org.apache.nifi.serialization.record.MapRecord +import org.apache.nifi.serialization.record.Record +import org.apache.nifi.serialization.record.RecordField +import org.apache.nifi.serialization.record.RecordFieldType +import org.apache.nifi.serialization.record.RecordSchema + + +class GroovyRecordReader implements RecordReader { + + def recordSchema = new SimpleRecordSchema( + [new RecordField('id', RecordFieldType.INT.dataType), + new RecordField('name', RecordFieldType.STRING.dataType), + new RecordField('code', RecordFieldType.INT.dataType)] + ) + + def recordIterator = [ + new MapRecord(recordSchema, ['id': 1, 'name': 'John', 'code': 100]), + new MapRecord(recordSchema, ['id': 2, 'name': 'Mary', 'code': 200]), + new MapRecord(recordSchema, ['id': 3, 'name': 'Ramon', 'code': 300]) + ].iterator() + + Record nextRecord() throws IOException, MalformedRecordException { + return recordIterator.hasNext() ? recordIterator.next() : null + } + + RecordSchema getSchema() throws MalformedRecordException { + return recordSchema + } + + void close() throws IOException { + } +} + +class GroovyRecordReaderFactory extends AbstractControllerService implements RecordReaderFactory { + + RecordReader createRecordReader(FlowFile flowFile, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { + return new GroovyRecordReader() + } +} + +reader = new GroovyRecordReaderFactory() diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy new file mode 100644 index 0000000000..d51089b582 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import groovy.json.JsonSlurper +import org.apache.nifi.controller.AbstractControllerService +import org.apache.nifi.controller.ConfigurationContext +import org.apache.nifi.flowfile.FlowFile +import org.apache.nifi.logging.ComponentLog +import org.apache.nifi.schema.access.SchemaNotFoundException +import org.apache.nifi.serialization.MalformedRecordException +import org.apache.nifi.serialization.RecordReader +import org.apache.nifi.serialization.RecordReaderFactory +import org.apache.nifi.serialization.SimpleRecordSchema +import org.apache.nifi.serialization.record.MapRecord +import org.apache.nifi.serialization.record.Record +import org.apache.nifi.serialization.record.RecordField +import org.apache.nifi.serialization.record.RecordFieldType +import org.apache.nifi.serialization.record.RecordSchema + + +class GroovyXmlRecordReader implements RecordReader { + + def recordIterator + def recordSchema + + GroovyXmlRecordReader(final String recordTag, final RecordSchema schema, final InputStream inputStream) { + recordSchema = schema + def xml = new XmlSlurper().parse(inputStream) + // Change the XML fields to a MapRecord for each incoming record + recordIterator = xml[recordTag].collect {r -> + // Create a map of field names to values, using the field names from the schema as keys into the XML object + def fields = recordSchema.fieldNames.inject([:]) {result, fieldName -> + result[fieldName] = r[fieldName].toString() + result + } + new MapRecord(recordSchema, fields) + }.iterator() + } + + Record nextRecord() throws IOException, MalformedRecordException { + return recordIterator?.hasNext() ? recordIterator.next() : null + } + + RecordSchema getSchema() throws MalformedRecordException { + return recordSchema + } + + void close() throws IOException { + } +} + +class GroovyXmlRecordReaderFactory extends AbstractControllerService implements RecordReaderFactory { + + // Will be set by the ScriptedRecordReaderFactory + ConfigurationContext configurationContext + + RecordReader createRecordReader(FlowFile flowFile, InputStream inputStream, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { + // Expecting 'schema.text' to have be an JSON array full of objects whose keys are the field name and the value maps to a RecordFieldType + def schemaText = configurationContext.properties.find {p -> p.key.dynamic && p.key.name == 'schema.text'}?.getValue() + if (!schemaText) return null + def jsonSchema = new JsonSlurper().parseText(schemaText) + def recordSchema = new SimpleRecordSchema(jsonSchema.collect {field -> + def entry = field.entrySet()[0] + new RecordField(entry.key, RecordFieldType.of(entry.value).dataType) + } as List) + return new GroovyXmlRecordReader(flowFile.getAttribute('record.tag'), recordSchema, inputStream) + } + +} + +// Create an instance of RecordReaderFactory called "writer", this is the entry point for ScriptedReader +reader = new GroovyXmlRecordReaderFactory() diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy new file mode 100644 index 0000000000..e17b7018d7 --- /dev/null +++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_writer_inline.groovy @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import groovy.xml.MarkupBuilder +import org.apache.nifi.controller.AbstractControllerService +import org.apache.nifi.flowfile.FlowFile +import org.apache.nifi.logging.ComponentLog +import org.apache.nifi.schema.access.SchemaNotFoundException +import org.apache.nifi.serialization.RecordSetWriter +import org.apache.nifi.serialization.RecordSetWriterFactory +import org.apache.nifi.serialization.WriteResult +import org.apache.nifi.serialization.record.Record +import org.apache.nifi.serialization.record.RecordSet +import org.apache.nifi.stream.io.NonCloseableOutputStream + + +class GroovyRecordSetWriter implements RecordSetWriter { + + @Override + WriteResult write(Record r, OutputStream out) throws IOException { + new OutputStreamWriter(new NonCloseableOutputStream(out)).with {osw -> + new MarkupBuilder(osw).record { + r.schema.fieldNames.each {fieldName -> + "$fieldName" r.getValue(fieldName) + } + } + } + WriteResult.of(0, [:]) + } + + @Override + String getMimeType() { + return 'application/xml' + } + + @Override + WriteResult write(final RecordSet rs, final OutputStream rawOut) throws IOException { + int count = 0 + + new OutputStreamWriter(new NonCloseableOutputStream(rawOut)).with {osw -> + new MarkupBuilder(osw).recordSet { + + Record r + while (r = rs.next()) { + count++ + + record { + rs.schema.fieldNames.each {fieldName -> + "$fieldName" r.getValue(fieldName) + } + } + } + } + } + WriteResult.of(count, [:]) + } +} + +class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory { + + @Override + RecordSetWriter createWriter(ComponentLog logger, FlowFile flowFile, InputStream flowFileContent) throws SchemaNotFoundException, IOException { + return new GroovyRecordSetWriter() + } +} + +writer = new GroovyRecordSetWriterFactory()