NIFI-7343: Add support for SchemaRegistryService to scripted components

This closes #7467

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Matt Burgess 2023-07-04 19:54:47 -04:00 committed by exceptionfactory
parent d2c70d1d2f
commit 21e5ebc840
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
8 changed files with 117 additions and 1 deletions

View File

@ -50,6 +50,14 @@
<scope>runtime</scope>
</dependency>
<!-- nifi-avro-record-utils dependency to include in the NAR for processor to use at runtime -->
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-avro-record-utils</artifactId>
<type>jar</type>
<scope>runtime</scope>
</dependency>
<!-- Groovy dependencies to include in the NAR for processors to use at runtime -->
<dependency>
<groupId>org.codehaus.groovy</groupId>

View File

@ -34,6 +34,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-metrics</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-avro-record-utils</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-properties</artifactId>

View File

@ -63,7 +63,40 @@ public class ScriptedReader extends AbstractScriptedRecordFactory<RecordReaderFa
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
synchronized (scriptingComponentHelper.isInitialized) {
if (!scriptingComponentHelper.isInitialized.get()) {
scriptingComponentHelper.createResources();
}
}
super.onEnabled(context);
// Call an non-interface method onEnabled(context), to allow a scripted LookupService the chance to set up as necessary
if (scriptRunner != null) {
final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
final Invocable invocable = (Invocable) scriptEngine;
if (configurationContext != null) {
try {
// Get the actual object from the script engine, versus the proxy stored in lookupService. The object may have additional methods,
// where lookupService is a proxied interface
final Object obj = scriptEngine.get("reader");
if (obj != null) {
try {
invocable.invokeMethod(obj, "onEnabled", context);
} catch (final NoSuchMethodException nsme) {
if (getLogger().isDebugEnabled()) {
getLogger().debug("Configured script for ScriptedReader does not contain an onEnabled() method.");
}
}
} else {
throw new ScriptException("No ScriptedReader was defined by the script.");
}
} catch (ScriptException se) {
throw new ProcessException("Error executing onEnabled(context) method", se);
}
}
} else {
throw new ProcessException("Error creating ScriptRunner");
}
}
@Override

View File

@ -64,7 +64,40 @@ public class ScriptedRecordSetWriter extends AbstractScriptedRecordFactory<Recor
@Override
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
synchronized (scriptingComponentHelper.isInitialized) {
if (!scriptingComponentHelper.isInitialized.get()) {
scriptingComponentHelper.createResources();
}
}
super.onEnabled(context);
// Call an non-interface method onEnabled(context), to allow a scripted LookupService the chance to set up as necessary
if (scriptRunner != null) {
final ScriptEngine scriptEngine = scriptRunner.getScriptEngine();
final Invocable invocable = (Invocable) scriptEngine;
if (configurationContext != null) {
try {
// Get the actual object from the script engine, versus the proxy stored in lookupService. The object may have additional methods,
// where lookupService is a proxied interface
final Object obj = scriptEngine.get("writer");
if (obj != null) {
try {
invocable.invokeMethod(obj, "onEnabled", context);
} catch (final NoSuchMethodException nsme) {
if (getLogger().isDebugEnabled()) {
getLogger().debug("Configured script for ScriptedRecordSetWriter does not contain an onEnabled() method.");
}
}
} else {
throw new ScriptException("No ScriptedRecordSetWriter was defined by the script.");
}
} catch (ScriptException se) {
throw new ProcessException("Error executing onEnabled(context) method", se);
}
}
} else {
throw new ProcessException("Error creating ScriptRunner");
}
}

View File

@ -15,8 +15,8 @@
* limitations under the License.
*/
import java.util.Set
import org.apache.nifi.controller.ConfigurationContext
import org.apache.nifi.controller.ControllerServiceInitializationContext
import org.apache.nifi.reporting.InitializationException
@ -28,6 +28,15 @@ class GroovyLookupService implements LookupService<String> {
'World': 'there'
]
ComponentLog logger;
void setLogger(ComponentLog logger) {
this.logger = logger
}
void onEnabled(final ConfigurationContext context) {
logger.info("in onEnabled")
}
@Override
Optional<String> lookup(Map<String, String> coordinates) {

View File

@ -16,6 +16,7 @@
*/
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.controller.ConfigurationContext
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.schema.access.SchemaNotFoundException
import org.apache.nifi.serialization.MalformedRecordException
@ -27,6 +28,7 @@ import org.apache.nifi.serialization.record.Record
import org.apache.nifi.serialization.record.RecordField
import org.apache.nifi.serialization.record.RecordFieldType
import org.apache.nifi.serialization.record.RecordSchema
import org.apache.nifi.serialization.SchemaRegistryService
class GroovyRecordReader implements RecordReader {
@ -57,6 +59,16 @@ class GroovyRecordReader implements RecordReader {
class GroovyRecordReaderFactory extends AbstractControllerService implements RecordReaderFactory {
ComponentLog logger;
void setLogger(ComponentLog logger) {
this.logger = logger
}
void onEnabled(final ConfigurationContext context) {
logger.info("in onEnabled")
}
RecordReader createRecordReader(Map<String, String> variables, InputStream inputStream, long inputLength, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
return new GroovyRecordReader()
}

View File

@ -18,6 +18,7 @@
import groovy.xml.MarkupBuilder
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.controller.ConfigurationContext
import org.apache.nifi.flowfile.FlowFile
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.schema.access.SchemaNotFoundException
@ -98,6 +99,16 @@ class GroovyRecordSetWriter implements RecordSetWriter {
class GroovyRecordSetWriterFactory extends AbstractControllerService implements RecordSetWriterFactory {
ComponentLog logger;
void setLogger(ComponentLog logger) {
this.logger = logger
}
void onEnabled(final ConfigurationContext context) {
logger.info("in onEnabled")
}
@Override
RecordSchema getSchema(Map<String, String> variables, RecordSchema readSchema) throws SchemaNotFoundException, IOException {
return null

View File

@ -71,6 +71,12 @@
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-avro-record-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>