mirror of
https://github.com/apache/nifi.git
synced 2025-02-11 12:35:20 +00:00
NIFI-6859: Add scripted versions of RecordSinkService, RulesEngineService, and ActionHandler (#3881)
* NIFI-6859: Add scripted versions of RecordSinkService, RulesEngineService, and ActionHandler * Unit test updates
This commit is contained in:
parent
86cae184ff
commit
55c334feb8
@ -60,6 +60,16 @@
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-lookup-service-api</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-record-sink-api</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.nifi</groupId>
|
||||
<artifactId>nifi-rules-engine-service-api</artifactId>
|
||||
<version>1.11.0-SNAPSHOT</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.ivy</groupId>
|
||||
<artifactId>ivy</artifactId>
|
||||
|
@ -0,0 +1,259 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.record.sink.script;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.Restricted;
|
||||
import org.apache.nifi.annotation.behavior.Restriction;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.RequiredPermission;
|
||||
import org.apache.nifi.components.ValidationContext;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.expression.ExpressionLanguageScope;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processor.util.StandardValidators;
|
||||
import org.apache.nifi.processors.script.ScriptEngineConfigurator;
|
||||
import org.apache.nifi.record.sink.RecordSinkService;
|
||||
import org.apache.nifi.script.AbstractScriptedControllerService;
|
||||
import org.apache.nifi.script.ScriptingComponentHelper;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.RecordSet;
|
||||
|
||||
import javax.script.Invocable;
|
||||
import javax.script.ScriptException;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@Tags({"record", "record sink", "script", "invoke", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj"})
|
||||
@CapabilityDescription("Allows the user to provide a scripted RecordSinkService instance in order to transmit records to the desired target. The script must set a variable 'recordSink' to an "
|
||||
+ "implementation of RecordSinkService.")
|
||||
@Restricted(
|
||||
restrictions = {
|
||||
@Restriction(
|
||||
requiredPermission = RequiredPermission.EXECUTE_CODE,
|
||||
explanation = "Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
|
||||
}
|
||||
)
|
||||
public class ScriptedRecordSink extends AbstractScriptedControllerService implements RecordSinkService {
|
||||
|
||||
protected final AtomicReference<RecordSinkService> recordSink = new AtomicReference<>();
|
||||
|
||||
/**
|
||||
* Returns a list of property descriptors supported by this processor. The list always includes properties such as
|
||||
* script engine name, script file name, script body name, script arguments, and an external module path. If the
|
||||
* scripted processor also defines supported properties, those are added to the list as well.
|
||||
*
|
||||
* @return a List of PropertyDescriptor objects supported by this processor
|
||||
*/
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
synchronized (scriptingComponentHelper.isInitialized) {
|
||||
if (!scriptingComponentHelper.isInitialized.get()) {
|
||||
scriptingComponentHelper.createResources();
|
||||
}
|
||||
}
|
||||
|
||||
return Collections.unmodifiableList(scriptingComponentHelper.getDescriptors());
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a PropertyDescriptor for the given name. This is for the user to be able to define their own properties
|
||||
* which will be available as variables in the script
|
||||
*
|
||||
* @param propertyDescriptorName used to lookup if any property descriptors exist for that name
|
||||
* @return a PropertyDescriptor object corresponding to the specified dynamic property name
|
||||
*/
|
||||
@Override
|
||||
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
|
||||
return new PropertyDescriptor.Builder()
|
||||
.name(propertyDescriptorName)
|
||||
.required(false)
|
||||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
|
||||
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
|
||||
.dynamic(true)
|
||||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
|
||||
return scriptingComponentHelper.customValidate(validationContext);
|
||||
}
|
||||
|
||||
public void setup() {
|
||||
// Create a single script engine, the Processor object is reused by each task
|
||||
if (scriptEngine == null) {
|
||||
scriptingComponentHelper.setup(1, getLogger());
|
||||
scriptEngine = scriptingComponentHelper.engineQ.poll();
|
||||
}
|
||||
|
||||
if (scriptEngine == null) {
|
||||
throw new ProcessException("No script engine available!");
|
||||
}
|
||||
|
||||
if (scriptNeedsReload.get() || recordSink.get() == null) {
|
||||
if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) {
|
||||
reloadScriptFile(scriptingComponentHelper.getScriptPath());
|
||||
} else {
|
||||
reloadScriptBody(scriptingComponentHelper.getScriptBody());
|
||||
}
|
||||
scriptNeedsReload.set(false);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reloads the script RecordSinkService. This must be called within the lock.
|
||||
*
|
||||
* @param scriptBody An input stream associated with the script content
|
||||
* @return Whether the script was successfully reloaded
|
||||
*/
|
||||
protected boolean reloadScript(final String scriptBody) {
|
||||
// note we are starting here with a fresh listing of validation
|
||||
// results since we are (re)loading a new/updated script. any
|
||||
// existing validation results are not relevant
|
||||
final Collection<ValidationResult> results = new HashSet<>();
|
||||
|
||||
try {
|
||||
// get the engine and ensure its invocable
|
||||
if (scriptEngine instanceof Invocable) {
|
||||
final Invocable invocable = (Invocable) scriptEngine;
|
||||
|
||||
// Find a custom configurator and invoke their eval() method
|
||||
ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
|
||||
if (configurator != null) {
|
||||
configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
|
||||
} else {
|
||||
// evaluate the script
|
||||
scriptEngine.eval(scriptBody);
|
||||
}
|
||||
|
||||
// get configured processor from the script (if it exists)
|
||||
final Object obj = scriptEngine.get("recordSink");
|
||||
if (obj != null) {
|
||||
final ComponentLog logger = getLogger();
|
||||
|
||||
try {
|
||||
// set the logger if the processor wants it
|
||||
invocable.invokeMethod(obj, "setLogger", logger);
|
||||
} catch (final NoSuchMethodException nsme) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Configured script RecordSinkService does not contain a setLogger method.");
|
||||
}
|
||||
}
|
||||
|
||||
if (configurationContext != null) {
|
||||
try {
|
||||
// set the logger if the processor wants it
|
||||
invocable.invokeMethod(obj, "setConfigurationContext", configurationContext);
|
||||
} catch (final NoSuchMethodException nsme) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Configured script RecordSinkService does not contain a setConfigurationContext method.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// record the processor for use later
|
||||
final RecordSinkService scriptedReader = invocable.getInterface(obj, RecordSinkService.class);
|
||||
recordSink.set(scriptedReader);
|
||||
|
||||
} else {
|
||||
throw new ScriptException("No RecordReader was defined by the script.");
|
||||
}
|
||||
}
|
||||
|
||||
} catch (final Exception ex) {
|
||||
final ComponentLog logger = getLogger();
|
||||
final String message = "Unable to load script: " + ex.getLocalizedMessage();
|
||||
|
||||
logger.error(message, ex);
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject("ScriptValidation")
|
||||
.valid(false)
|
||||
.explanation("Unable to load script due to " + ex.getLocalizedMessage())
|
||||
.input(scriptingComponentHelper.getScriptPath())
|
||||
.build());
|
||||
}
|
||||
|
||||
// store the updated validation results
|
||||
validationResults.set(results);
|
||||
|
||||
// return whether there was any issues loading the configured script
|
||||
return results.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
@OnEnabled
|
||||
public void onEnabled(final ConfigurationContext context) {
|
||||
synchronized (scriptingComponentHelper.isInitialized) {
|
||||
if (!scriptingComponentHelper.isInitialized.get()) {
|
||||
scriptingComponentHelper.createResources();
|
||||
}
|
||||
}
|
||||
super.onEnabled(context);
|
||||
|
||||
// Call an non-interface method onEnabled(context), to allow a scripted RecordSinkService the chance to set up as necessary
|
||||
final Invocable invocable = (Invocable) scriptEngine;
|
||||
if (configurationContext != null) {
|
||||
try {
|
||||
// Get the actual object from the script engine, versus the proxy stored in RecordSinkService. The object may have additional methods,
|
||||
// where RecordSinkService is a proxied interface
|
||||
final Object obj = scriptEngine.get("recordSink");
|
||||
if (obj != null) {
|
||||
try {
|
||||
invocable.invokeMethod(obj, "onEnabled", context);
|
||||
} catch (final NoSuchMethodException nsme) {
|
||||
if (getLogger().isDebugEnabled()) {
|
||||
getLogger().debug("Configured script RecordSinkService does not contain an onEnabled() method.");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw new ScriptException("No RecordSinkService was defined by the script.");
|
||||
}
|
||||
} catch (ScriptException se) {
|
||||
throw new ProcessException("Error executing onEnabled(context) method: " + se.getMessage(), se);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public WriteResult sendData(RecordSet recordSet, Map<String, String> attributes, boolean sendZeroResults) throws IOException {
|
||||
if (recordSink.get() != null) {
|
||||
try {
|
||||
return recordSink.get().sendData(recordSet, attributes, sendZeroResults);
|
||||
} catch (UndeclaredThrowableException ute) {
|
||||
throw new IOException(ute.getCause());
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
if (recordSink.get() != null) {
|
||||
recordSink.get().reset();
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,219 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.rules.engine.script;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.Restricted;
|
||||
import org.apache.nifi.annotation.behavior.Restriction;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.RequiredPermission;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processors.script.ScriptEngineConfigurator;
|
||||
import org.apache.nifi.rules.Action;
|
||||
import org.apache.nifi.rules.engine.RulesEngineService;
|
||||
import org.apache.nifi.script.AbstractScriptedControllerService;
|
||||
import org.apache.nifi.script.ScriptingComponentHelper;
|
||||
|
||||
import javax.script.Invocable;
|
||||
import javax.script.ScriptException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@Tags({"rules", "rules engine", "script", "invoke", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj"})
|
||||
@CapabilityDescription("Allows the user to provide a scripted RulesEngineService for custom firing of rules depending on the supplied facts. The script must set a variable 'rulesEngine' to an "
|
||||
+ "implementation of RulesEngineService.")
|
||||
@Restricted(
|
||||
restrictions = {
|
||||
@Restriction(
|
||||
requiredPermission = RequiredPermission.EXECUTE_CODE,
|
||||
explanation = "Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
|
||||
}
|
||||
)
|
||||
public class ScriptedRulesEngine extends AbstractScriptedControllerService implements RulesEngineService {
|
||||
|
||||
protected final AtomicReference<RulesEngineService> rulesEngine = new AtomicReference<>();
|
||||
|
||||
/**
|
||||
* Returns a list of property descriptors supported by this processor. The list always includes properties such as
|
||||
* script engine name, script file name, script body name, script arguments, and an external module path. If the
|
||||
* scripted processor also defines supported properties, those are added to the list as well.
|
||||
*
|
||||
* @return a List of PropertyDescriptor objects supported by this processor
|
||||
*/
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
synchronized (scriptingComponentHelper.isInitialized) {
|
||||
if (!scriptingComponentHelper.isInitialized.get()) {
|
||||
scriptingComponentHelper.createResources();
|
||||
}
|
||||
}
|
||||
|
||||
return Collections.unmodifiableList(scriptingComponentHelper.getDescriptors());
|
||||
}
|
||||
|
||||
public void setup() {
|
||||
// Create a single script engine, the component object is reused by each task
|
||||
if (scriptEngine == null) {
|
||||
scriptingComponentHelper.setup(1, getLogger());
|
||||
scriptEngine = scriptingComponentHelper.engineQ.poll();
|
||||
}
|
||||
|
||||
if (scriptEngine == null) {
|
||||
throw new ProcessException("No script engine available!");
|
||||
}
|
||||
|
||||
if (scriptNeedsReload.get() || rulesEngine.get() == null) {
|
||||
if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) {
|
||||
reloadScriptFile(scriptingComponentHelper.getScriptPath());
|
||||
} else {
|
||||
reloadScriptBody(scriptingComponentHelper.getScriptBody());
|
||||
}
|
||||
scriptNeedsReload.set(false);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reloads the script RulesEngineService. This must be called within the lock.
|
||||
*
|
||||
* @param scriptBody An input stream associated with the script content
|
||||
* @return Whether the script was successfully reloaded
|
||||
*/
|
||||
protected boolean reloadScript(final String scriptBody) {
|
||||
// note we are starting here with a fresh listing of validation
|
||||
// results since we are (re)loading a new/updated script. any
|
||||
// existing validation results are not relevant
|
||||
final Collection<ValidationResult> results = new HashSet<>();
|
||||
|
||||
try {
|
||||
// get the engine and ensure its invocable
|
||||
if (scriptEngine instanceof Invocable) {
|
||||
final Invocable invocable = (Invocable) scriptEngine;
|
||||
|
||||
// Find a custom configurator and invoke their eval() method
|
||||
ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
|
||||
if (configurator != null) {
|
||||
configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
|
||||
} else {
|
||||
// evaluate the script
|
||||
scriptEngine.eval(scriptBody);
|
||||
}
|
||||
|
||||
// get configured processor from the script (if it exists)
|
||||
final Object obj = scriptEngine.get("rulesEngine");
|
||||
if (obj != null) {
|
||||
final ComponentLog logger = getLogger();
|
||||
|
||||
try {
|
||||
// set the logger if the processor wants it
|
||||
invocable.invokeMethod(obj, "setLogger", logger);
|
||||
} catch (final NoSuchMethodException nsme) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Configured script RulesEngineService does not contain a setLogger method.");
|
||||
}
|
||||
}
|
||||
|
||||
if (configurationContext != null) {
|
||||
try {
|
||||
// set the logger if the processor wants it
|
||||
invocable.invokeMethod(obj, "setConfigurationContext", configurationContext);
|
||||
} catch (final NoSuchMethodException nsme) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Configured script RulesEngineService does not contain a setConfigurationContext method.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// record the processor for use later
|
||||
final RulesEngineService scriptedReader = invocable.getInterface(obj, RulesEngineService.class);
|
||||
rulesEngine.set(scriptedReader);
|
||||
|
||||
} else {
|
||||
throw new ScriptException("No RecordReader was defined by the script.");
|
||||
}
|
||||
}
|
||||
|
||||
} catch (final Exception ex) {
|
||||
final ComponentLog logger = getLogger();
|
||||
final String message = "Unable to load script: " + ex.getLocalizedMessage();
|
||||
|
||||
logger.error(message, ex);
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject("ScriptValidation")
|
||||
.valid(false)
|
||||
.explanation("Unable to load script due to " + ex.getLocalizedMessage())
|
||||
.input(scriptingComponentHelper.getScriptPath())
|
||||
.build());
|
||||
}
|
||||
|
||||
// store the updated validation results
|
||||
validationResults.set(results);
|
||||
|
||||
// return whether there was any issues loading the configured script
|
||||
return results.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
@OnEnabled
|
||||
public void onEnabled(final ConfigurationContext context) {
|
||||
synchronized (scriptingComponentHelper.isInitialized) {
|
||||
if (!scriptingComponentHelper.isInitialized.get()) {
|
||||
scriptingComponentHelper.createResources();
|
||||
}
|
||||
}
|
||||
super.onEnabled(context);
|
||||
|
||||
// Call an non-interface method onEnabled(context), to allow a scripted RulesEngineService the chance to set up as necessary
|
||||
final Invocable invocable = (Invocable) scriptEngine;
|
||||
if (configurationContext != null) {
|
||||
try {
|
||||
// Get the actual object from the script engine, versus the proxy stored in RulesEngineService. The object may have additional methods,
|
||||
// where RulesEngineService is a proxied interface
|
||||
final Object obj = scriptEngine.get("rulesEngine");
|
||||
if (obj != null) {
|
||||
try {
|
||||
invocable.invokeMethod(obj, "onEnabled", context);
|
||||
} catch (final NoSuchMethodException nsme) {
|
||||
if (getLogger().isDebugEnabled()) {
|
||||
getLogger().debug("Configured script RulesEngineService does not contain an onEnabled() method.");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw new ScriptException("No RulesEngineService was defined by the script.");
|
||||
}
|
||||
} catch (ScriptException se) {
|
||||
throw new ProcessException("Error executing onEnabled(context) method: " + se.getMessage(), se);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Action> fireRules(Map<String, Object> facts) {
|
||||
if (rulesEngine.get() != null) {
|
||||
return rulesEngine.get().fireRules(facts);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
@ -0,0 +1,247 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.rules.handlers.script;
|
||||
|
||||
import org.apache.nifi.annotation.behavior.Restricted;
|
||||
import org.apache.nifi.annotation.behavior.Restriction;
|
||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||
import org.apache.nifi.annotation.documentation.Tags;
|
||||
import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.RequiredPermission;
|
||||
import org.apache.nifi.components.ValidationResult;
|
||||
import org.apache.nifi.context.PropertyContext;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
import org.apache.nifi.processors.script.ScriptEngineConfigurator;
|
||||
import org.apache.nifi.rules.Action;
|
||||
import org.apache.nifi.rules.ActionHandler;
|
||||
import org.apache.nifi.rules.PropertyContextActionHandler;
|
||||
import org.apache.nifi.script.AbstractScriptedControllerService;
|
||||
import org.apache.nifi.script.ScriptingComponentHelper;
|
||||
|
||||
import javax.script.Invocable;
|
||||
import javax.script.ScriptException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
@Tags({"rules", "rules engine", "action", "action handler", "script", "invoke", "groovy", "python", "jython", "jruby", "ruby", "javascript", "js", "lua", "luaj"})
|
||||
@CapabilityDescription("Allows the user to provide a scripted ActionHandler for custom firing of rules depending on the supplied facts. The script must set a variable 'actionHandler' to an "
|
||||
+ "implementation of ActionHandler.")
|
||||
@Restricted(
|
||||
restrictions = {
|
||||
@Restriction(
|
||||
requiredPermission = RequiredPermission.EXECUTE_CODE,
|
||||
explanation = "Provides operator the ability to execute arbitrary code assuming all permissions that NiFi has.")
|
||||
}
|
||||
)
|
||||
public class ScriptedActionHandler extends AbstractScriptedControllerService implements PropertyContextActionHandler {
|
||||
|
||||
protected final AtomicReference<ActionHandler> actionHandler = new AtomicReference<>();
|
||||
|
||||
/**
|
||||
* Returns a list of property descriptors supported by this processor. The list always includes properties such as
|
||||
* script engine name, script file name, script body name, script arguments, and an external module path. If the
|
||||
* scripted processor also defines supported properties, those are added to the list as well.
|
||||
*
|
||||
* @return a List of PropertyDescriptor objects supported by this processor
|
||||
*/
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
synchronized (scriptingComponentHelper.isInitialized) {
|
||||
if (!scriptingComponentHelper.isInitialized.get()) {
|
||||
scriptingComponentHelper.createResources();
|
||||
}
|
||||
}
|
||||
|
||||
return Collections.unmodifiableList(scriptingComponentHelper.getDescriptors());
|
||||
}
|
||||
|
||||
public void setup() {
|
||||
// Create a single script engine, the component object is reused by each task
|
||||
if (scriptEngine == null) {
|
||||
scriptingComponentHelper.setup(1, getLogger());
|
||||
scriptEngine = scriptingComponentHelper.engineQ.poll();
|
||||
}
|
||||
|
||||
if (scriptEngine == null) {
|
||||
throw new ProcessException("No script engine available!");
|
||||
}
|
||||
|
||||
if (scriptNeedsReload.get() || actionHandler.get() == null) {
|
||||
if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) {
|
||||
reloadScriptFile(scriptingComponentHelper.getScriptPath());
|
||||
} else {
|
||||
reloadScriptBody(scriptingComponentHelper.getScriptBody());
|
||||
}
|
||||
scriptNeedsReload.set(false);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reloads the script ActionHandler. This must be called within the lock.
|
||||
*
|
||||
* @param scriptBody An input stream associated with the script content
|
||||
* @return Whether the script was successfully reloaded
|
||||
*/
|
||||
protected boolean reloadScript(final String scriptBody) {
|
||||
// note we are starting here with a fresh listing of validation
|
||||
// results since we are (re)loading a new/updated script. any
|
||||
// existing validation results are not relevant
|
||||
final Collection<ValidationResult> results = new HashSet<>();
|
||||
|
||||
try {
|
||||
// get the engine and ensure its invocable
|
||||
if (scriptEngine instanceof Invocable) {
|
||||
final Invocable invocable = (Invocable) scriptEngine;
|
||||
|
||||
// Find a custom configurator and invoke their eval() method
|
||||
ScriptEngineConfigurator configurator = scriptingComponentHelper.scriptEngineConfiguratorMap.get(scriptingComponentHelper.getScriptEngineName().toLowerCase());
|
||||
if (configurator != null) {
|
||||
configurator.eval(scriptEngine, scriptBody, scriptingComponentHelper.getModules());
|
||||
} else {
|
||||
// evaluate the script
|
||||
scriptEngine.eval(scriptBody);
|
||||
}
|
||||
|
||||
// get configured processor from the script (if it exists)
|
||||
final Object obj = scriptEngine.get("actionHandler");
|
||||
if (obj != null) {
|
||||
final ComponentLog logger = getLogger();
|
||||
|
||||
try {
|
||||
// set the logger if the processor wants it
|
||||
invocable.invokeMethod(obj, "setLogger", logger);
|
||||
} catch (final NoSuchMethodException nsme) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Configured script ActionHandler does not contain a setLogger method.");
|
||||
}
|
||||
}
|
||||
|
||||
if (configurationContext != null) {
|
||||
try {
|
||||
// set the logger if the processor wants it
|
||||
invocable.invokeMethod(obj, "setConfigurationContext", configurationContext);
|
||||
} catch (final NoSuchMethodException nsme) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Configured script ActionHandler does not contain a setConfigurationContext method.");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// record the processor for use later
|
||||
final ActionHandler scriptedReader = invocable.getInterface(obj, ActionHandler.class);
|
||||
actionHandler.set(scriptedReader);
|
||||
|
||||
} else {
|
||||
throw new ScriptException("No RecordReader was defined by the script.");
|
||||
}
|
||||
}
|
||||
|
||||
} catch (final Exception ex) {
|
||||
final ComponentLog logger = getLogger();
|
||||
final String message = "Unable to load script: " + ex.getLocalizedMessage();
|
||||
|
||||
logger.error(message, ex);
|
||||
results.add(new ValidationResult.Builder()
|
||||
.subject("ScriptValidation")
|
||||
.valid(false)
|
||||
.explanation("Unable to load script due to " + ex.getLocalizedMessage())
|
||||
.input(scriptingComponentHelper.getScriptPath())
|
||||
.build());
|
||||
}
|
||||
|
||||
// store the updated validation results
|
||||
validationResults.set(results);
|
||||
|
||||
// return whether there was any issues loading the configured script
|
||||
return results.isEmpty();
|
||||
}
|
||||
|
||||
@Override
|
||||
@OnEnabled
|
||||
public void onEnabled(final ConfigurationContext context) {
|
||||
synchronized (scriptingComponentHelper.isInitialized) {
|
||||
if (!scriptingComponentHelper.isInitialized.get()) {
|
||||
scriptingComponentHelper.createResources();
|
||||
}
|
||||
}
|
||||
super.onEnabled(context);
|
||||
|
||||
// Call an non-interface method onEnabled(context), to allow a scripted ActionHandler the chance to set up as necessary
|
||||
final Invocable invocable = (Invocable) scriptEngine;
|
||||
if (configurationContext != null) {
|
||||
try {
|
||||
// Get the actual object from the script engine, versus the proxy stored in ActionHandler. The object may have additional methods,
|
||||
// where ActionHandler is a proxied interface
|
||||
final Object obj = scriptEngine.get("actionHandler");
|
||||
if (obj != null) {
|
||||
try {
|
||||
invocable.invokeMethod(obj, "onEnabled", context);
|
||||
} catch (final NoSuchMethodException nsme) {
|
||||
if (getLogger().isDebugEnabled()) {
|
||||
getLogger().debug("Configured script ActionHandler does not contain an onEnabled() method.");
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw new ScriptException("No ActionHandler was defined by the script.");
|
||||
}
|
||||
} catch (ScriptException se) {
|
||||
throw new ProcessException("Error executing onEnabled(context) method", se);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void execute(PropertyContext context, Action action, Map<String, Object> facts) {
|
||||
// Attempt to call a non-ActionHandler interface method (i.e. execute(context, action, facts) from PropertyContextActionHandler)
|
||||
final Invocable invocable = (Invocable) scriptEngine;
|
||||
|
||||
try {
|
||||
// Get the actual object from the script engine, versus the proxy stored in ActionHandler. The object may have additional methods,
|
||||
// where ActionHandler is a proxied interface
|
||||
final Object obj = scriptEngine.get("actionHandler");
|
||||
if (obj != null) {
|
||||
try {
|
||||
invocable.invokeMethod(obj, "execute", context, action, facts);
|
||||
} catch (final NoSuchMethodException nsme) {
|
||||
if (getLogger().isDebugEnabled()) {
|
||||
getLogger().debug("Configured script ActionHandler is not a PropertyContextActionHandler and has no execute(context, action, facts) method, falling back to"
|
||||
+ "execute(action, facts).");
|
||||
}
|
||||
execute(action, facts);
|
||||
}
|
||||
} else {
|
||||
throw new ScriptException("No ActionHandler was defined by the script.");
|
||||
}
|
||||
} catch (ScriptException se) {
|
||||
throw new ProcessException("Error executing onEnabled(context) method: " + se.getMessage(), se);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void execute(Action action, Map<String, Object> facts) {
|
||||
if (actionHandler.get() != null) {
|
||||
actionHandler.get().execute(action, facts);
|
||||
}
|
||||
}
|
||||
}
|
@ -41,6 +41,7 @@ public class ClojureScriptEngineConfigurator extends AbstractModuleClassloaderCo
|
||||
+ "[org.apache.nifi.script ScriptingComponentHelper ScriptingComponentUtils]\n"
|
||||
+ "[org.apache.nifi.logging ComponentLog]\n"
|
||||
+ "[org.apache.nifi.lookup LookupService RecordLookupService StringLookupService LookupFailureException]\n"
|
||||
+ "[org.apache.nifi.record.sink RecordSinkService]\n"
|
||||
+ ")\n";
|
||||
|
||||
|
||||
|
@ -31,6 +31,7 @@ public class GroovyScriptEngineConfigurator extends AbstractModuleClassloaderCon
|
||||
+ "import org.apache.nifi.processors.script.*\n"
|
||||
+ "import org.apache.nifi.logging.ComponentLog\n"
|
||||
+ "import org.apache.nifi.script.*\n"
|
||||
+ "import org.apache.nifi.record.sink.*\n"
|
||||
+ "import org.apache.nifi.lookup.*\n";
|
||||
|
||||
private ScriptEngine scriptEngine;
|
||||
|
@ -13,7 +13,10 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
org.apache.nifi.rules.handlers.script.ScriptedActionHandler
|
||||
org.apache.nifi.record.script.ScriptedReader
|
||||
org.apache.nifi.record.script.ScriptedRecordSetWriter
|
||||
org.apache.nifi.record.sink.script.ScriptedRecordSink
|
||||
org.apache.nifi.rules.engine.script.ScriptedRulesEngine
|
||||
org.apache.nifi.lookup.script.ScriptedLookupService
|
||||
org.apache.nifi.lookup.script.SimpleScriptedLookupService
|
@ -0,0 +1,132 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License") you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.record.sink.script;
|
||||
|
||||
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
|
||||
import org.apache.nifi.components.PropertyValue;
|
||||
import org.apache.nifi.components.state.StateManager;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper;
|
||||
import org.apache.nifi.record.sink.RecordSinkService;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.script.ScriptingComponentHelper;
|
||||
import org.apache.nifi.script.ScriptingComponentUtils;
|
||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||
import org.apache.nifi.serialization.WriteResult;
|
||||
import org.apache.nifi.serialization.record.ListRecordSet;
|
||||
import org.apache.nifi.serialization.record.MapRecord;
|
||||
import org.apache.nifi.serialization.record.MockRecordWriter;
|
||||
import org.apache.nifi.serialization.record.RecordField;
|
||||
import org.apache.nifi.serialization.record.RecordFieldType;
|
||||
import org.apache.nifi.serialization.record.RecordSchema;
|
||||
import org.apache.nifi.serialization.record.RecordSet;
|
||||
import org.apache.nifi.state.MockStateManager;
|
||||
import org.apache.nifi.util.MockControllerServiceInitializationContext;
|
||||
import org.apache.nifi.util.MockPropertyValue;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
||||
public class ScriptedRecordSinkTest {
|
||||
|
||||
@Test
|
||||
public void testRecordFormat() throws IOException, InitializationException {
|
||||
MockScriptedRecordSink task = initTask();
|
||||
|
||||
List<RecordField> recordFields = Arrays.asList(
|
||||
new RecordField("field1", RecordFieldType.INT.getDataType()),
|
||||
new RecordField("field2", RecordFieldType.STRING.getDataType())
|
||||
);
|
||||
RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
|
||||
|
||||
Map<String, Object> row1 = new HashMap<>();
|
||||
row1.put("field1", 15);
|
||||
row1.put("field2", "Hello");
|
||||
|
||||
Map<String, Object> row2 = new HashMap<>();
|
||||
row2.put("field1", 6);
|
||||
row2.put("field2", "World!");
|
||||
|
||||
RecordSet recordSet = new ListRecordSet(recordSchema, Arrays.asList(
|
||||
new MapRecord(recordSchema, row1),
|
||||
new MapRecord(recordSchema, row2)
|
||||
));
|
||||
|
||||
WriteResult writeResult = task.sendData(recordSet, new HashMap<>(), false);
|
||||
// Verify the attribute was added by the scripted RecordSinkService
|
||||
assertEquals("I am now set.", writeResult.getAttributes().get("newAttr"));
|
||||
assertEquals(2, writeResult.getRecordCount());
|
||||
}
|
||||
|
||||
private MockScriptedRecordSink initTask() throws InitializationException {
|
||||
|
||||
final MockScriptedRecordSink recordSink = new MockScriptedRecordSink();
|
||||
ConfigurationContext context = mock(ConfigurationContext.class);
|
||||
StateManager stateManager = new MockStateManager(recordSink);
|
||||
|
||||
final PropertyValue pValue = mock(StandardPropertyValue.class);
|
||||
MockRecordWriter writer = new MockRecordWriter(null, false); // No header, don"t quote values
|
||||
when(context.getProperty(RecordSinkService.RECORD_WRITER_FACTORY)).thenReturn(pValue);
|
||||
when(pValue.asControllerService(RecordSetWriterFactory.class)).thenReturn(writer);
|
||||
|
||||
|
||||
final ComponentLog logger = mock(ComponentLog.class);
|
||||
final ControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(writer, UUID.randomUUID().toString(), logger, stateManager);
|
||||
recordSink.initialize(initContext);
|
||||
|
||||
// Call something that sets up the ScriptingComponentHelper, so we can mock it
|
||||
recordSink.getSupportedPropertyDescriptors();
|
||||
|
||||
when(context.getProperty(recordSink.getScriptingComponentHelper().SCRIPT_ENGINE))
|
||||
.thenReturn(new MockPropertyValue("Groovy"));
|
||||
when(context.getProperty(ScriptingComponentUtils.SCRIPT_FILE))
|
||||
.thenReturn(new MockPropertyValue("src/test/resources/groovy/test_record_sink.groovy"));
|
||||
when(context.getProperty(ScriptingComponentUtils.SCRIPT_BODY))
|
||||
.thenReturn(new MockPropertyValue(null));
|
||||
when(context.getProperty(ScriptingComponentUtils.MODULES))
|
||||
.thenReturn(new MockPropertyValue(null));
|
||||
try {
|
||||
recordSink.onEnabled(context);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
fail("onEnabled error: " + e.getMessage());
|
||||
}
|
||||
return recordSink;
|
||||
}
|
||||
|
||||
public static class MockScriptedRecordSink extends ScriptedRecordSink implements AccessibleScriptingComponentHelper {
|
||||
|
||||
@Override
|
||||
public ScriptingComponentHelper getScriptingComponentHelper() {
|
||||
return this.scriptingComponentHelper;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,104 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.rules.engine.script;
|
||||
|
||||
import org.apache.nifi.components.state.StateManager;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.rules.Action;
|
||||
import org.apache.nifi.script.ScriptingComponentHelper;
|
||||
import org.apache.nifi.script.ScriptingComponentUtils;
|
||||
import org.apache.nifi.state.MockStateManager;
|
||||
import org.apache.nifi.util.MockControllerServiceInitializationContext;
|
||||
import org.apache.nifi.util.MockPropertyValue;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
||||
public class ScriptedRulesEngineTest {
|
||||
|
||||
private Map<String, Object> facts = new HashMap<>();
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
facts.put("predictedQueuedCount", 60);
|
||||
facts.put("predictedTimeToBytesBackpressureMillis", 299999);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRules() throws IOException, InitializationException {
|
||||
ScriptedRulesEngine task = initTask();
|
||||
List<Action> actions = task.fireRules(facts);
|
||||
assertEquals(2, actions.size());
|
||||
assertEquals("LOG", actions.get(0).getType());
|
||||
assertEquals("DEBUG", actions.get(0).getAttributes().get("level"));
|
||||
assertEquals("ALERT", actions.get(1).getType());
|
||||
assertEquals("Time to backpressure < 5 mins", actions.get(1).getAttributes().get("message"));
|
||||
}
|
||||
|
||||
private MockScriptedRulesEngine initTask() throws InitializationException {
|
||||
|
||||
final MockScriptedRulesEngine rulesEngine = new MockScriptedRulesEngine();
|
||||
ConfigurationContext context = mock(ConfigurationContext.class);
|
||||
StateManager stateManager = new MockStateManager(rulesEngine);
|
||||
|
||||
final ComponentLog logger = mock(ComponentLog.class);
|
||||
final ControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(rulesEngine, UUID.randomUUID().toString(), logger, stateManager);
|
||||
rulesEngine.initialize(initContext);
|
||||
|
||||
// Call something that sets up the ScriptingComponentHelper, so we can mock it
|
||||
rulesEngine.getSupportedPropertyDescriptors();
|
||||
|
||||
when(context.getProperty(rulesEngine.getScriptingComponentHelper().SCRIPT_ENGINE))
|
||||
.thenReturn(new MockPropertyValue("Groovy"));
|
||||
when(context.getProperty(ScriptingComponentUtils.SCRIPT_FILE))
|
||||
.thenReturn(new MockPropertyValue("src/test/resources/groovy/test_rules_engine.groovy"));
|
||||
when(context.getProperty(ScriptingComponentUtils.SCRIPT_BODY))
|
||||
.thenReturn(new MockPropertyValue(null));
|
||||
when(context.getProperty(ScriptingComponentUtils.MODULES))
|
||||
.thenReturn(new MockPropertyValue(null));
|
||||
try {
|
||||
rulesEngine.onEnabled(context);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
fail("onEnabled error: " + e.getMessage());
|
||||
}
|
||||
return rulesEngine;
|
||||
}
|
||||
|
||||
public static class MockScriptedRulesEngine extends ScriptedRulesEngine implements AccessibleScriptingComponentHelper {
|
||||
|
||||
@Override
|
||||
public ScriptingComponentHelper getScriptingComponentHelper() {
|
||||
return this.scriptingComponentHelper;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,247 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License") you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.rules.handlers.script;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.components.state.StateManager;
|
||||
import org.apache.nifi.controller.ConfigurationContext;
|
||||
import org.apache.nifi.controller.ControllerServiceInitializationContext;
|
||||
import org.apache.nifi.logging.ComponentLog;
|
||||
import org.apache.nifi.processors.script.AccessibleScriptingComponentHelper;
|
||||
import org.apache.nifi.reporting.Bulletin;
|
||||
import org.apache.nifi.reporting.BulletinFactory;
|
||||
import org.apache.nifi.reporting.BulletinRepository;
|
||||
import org.apache.nifi.reporting.InitializationException;
|
||||
import org.apache.nifi.reporting.ReportingContext;
|
||||
import org.apache.nifi.reporting.Severity;
|
||||
import org.apache.nifi.rules.Action;
|
||||
import org.apache.nifi.script.ScriptingComponentHelper;
|
||||
import org.apache.nifi.script.ScriptingComponentUtils;
|
||||
import org.apache.nifi.state.MockStateManager;
|
||||
import org.apache.nifi.util.MockBulletinRepository;
|
||||
import org.apache.nifi.util.MockControllerServiceInitializationContext;
|
||||
import org.apache.nifi.util.MockPropertyValue;
|
||||
import org.apache.nifi.util.TestRunner;
|
||||
import org.apache.nifi.util.TestRunners;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
||||
import static junit.framework.TestCase.assertTrue;
|
||||
import static org.hamcrest.core.IsInstanceOf.instanceOf;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.ArgumentMatchers.anyString;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
||||
public class ScriptedActionHandlerTest {
|
||||
|
||||
private ConfigurationContext context;
|
||||
private TestRunner runner;
|
||||
private ReportingContext reportingContext;
|
||||
private MockScriptedActionHandler actionHandler;
|
||||
private MockScriptedBulletinRepository mockScriptedBulletinRepository;
|
||||
|
||||
private Map<String, Object> facts = new HashMap<>();
|
||||
private Map<String, String> attrs = new HashMap<>();
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
facts.put("predictedQueuedCount", 60);
|
||||
facts.put("predictedTimeToBytesBackpressureMillis", 299999);
|
||||
attrs.put("level", "DEBUG");
|
||||
attrs.put("message", "Time to backpressure < 5 mins");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testActions() throws InitializationException {
|
||||
actionHandler = initTask("src/test/resources/groovy/test_action_handler.groovy");
|
||||
actionHandler.onEnabled(context);
|
||||
List<Action> actions = Arrays.asList(new Action("LOG", attrs), new Action("ALERT", attrs));
|
||||
actions.forEach((action) -> actionHandler.execute(action, facts));
|
||||
// Verify a fact was added (not the intended operation of ActionHandler, but testable)
|
||||
assertEquals(42, facts.get("testFact"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testActionHandlerNotPropertyContextActionHandler() throws InitializationException {
|
||||
actionHandler = initTask("src/test/resources/groovy/test_action_handler.groovy");
|
||||
mockScriptedBulletinRepository = new MockScriptedBulletinRepository();
|
||||
reportingContext = mock(ReportingContext.class);
|
||||
when(reportingContext.getBulletinRepository()).thenReturn(mockScriptedBulletinRepository);
|
||||
when(reportingContext.createBulletin(anyString(), Mockito.any(Severity.class), anyString()))
|
||||
.thenAnswer(invocation -> BulletinFactory.createBulletin(invocation.getArgument(0), invocation.getArgument(1).toString(), invocation.getArgument(2)));
|
||||
actionHandler.onEnabled(context);
|
||||
List<Action> actions = Arrays.asList(new Action("LOG", attrs), new Action("ALERT", attrs));
|
||||
actions.forEach(action -> actionHandler.execute(reportingContext, action, facts));
|
||||
|
||||
// Verify instead of a bulletin being added, a fact was added (not the intended operation of ActionHandler, but testable)
|
||||
assertTrue(mockScriptedBulletinRepository.bulletinList.isEmpty());
|
||||
assertEquals(42, facts.get("testFact"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPropertyContextActionHandler() throws InitializationException {
|
||||
actionHandler = initTask("src/test/resources/groovy/test_propertycontext_action_handler.groovy");
|
||||
mockScriptedBulletinRepository = new MockScriptedBulletinRepository();
|
||||
reportingContext = mock(ReportingContext.class);
|
||||
when(reportingContext.getBulletinRepository()).thenReturn(mockScriptedBulletinRepository);
|
||||
when(reportingContext.createBulletin(anyString(), Mockito.any(Severity.class), anyString()))
|
||||
.thenAnswer(invocation -> BulletinFactory.createBulletin(invocation.getArgument(0), invocation.getArgument(1).toString(), invocation.getArgument(2)));
|
||||
actionHandler.onEnabled(context);
|
||||
List<Action> actions = Arrays.asList(new Action("LOG", attrs), new Action("ALERT", attrs));
|
||||
actions.forEach(action -> actionHandler.execute(reportingContext, action, facts));
|
||||
|
||||
// Verify instead of a bulletin being added, a fact was added (not the intended operation of ActionHandler, but testable)
|
||||
List<Bulletin> bulletinList = mockScriptedBulletinRepository.bulletinList;
|
||||
assertEquals(2, bulletinList.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidService() throws Exception {
|
||||
setupTestRunner();
|
||||
runner.assertValid(actionHandler);
|
||||
assertThat(actionHandler, instanceOf(ScriptedActionHandler.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAlertWithBulletinLevel() throws Exception {
|
||||
setupTestRunner();
|
||||
final Map<String, String> attributes = new HashMap<>();
|
||||
final Map<String, Object> metrics = new HashMap<>();
|
||||
|
||||
final String category = "Rules Alert";
|
||||
final String message = "This should be sent as an alert!";
|
||||
final String severity = "INFO";
|
||||
attributes.put("category", category);
|
||||
attributes.put("message", message);
|
||||
attributes.put("severity", severity);
|
||||
metrics.put("jvmHeap", "1000000");
|
||||
metrics.put("cpu", "90");
|
||||
|
||||
final String expectedOutput = "This should be sent as an alert!\n" +
|
||||
"Alert Facts:\n" +
|
||||
"Field: cpu, Value: 90\n" +
|
||||
"Field: jvmHeap, Value: 1000000\n";
|
||||
|
||||
final Action action = new Action();
|
||||
action.setType("ALERT");
|
||||
action.setAttributes(attributes);
|
||||
actionHandler.execute(reportingContext, action, metrics);
|
||||
BulletinRepository bulletinRepository = reportingContext.getBulletinRepository();
|
||||
List<Bulletin> bulletins = bulletinRepository.findBulletinsForController();
|
||||
assertFalse(bulletins.isEmpty());
|
||||
Bulletin bulletin = bulletins.get(0);
|
||||
assertEquals(bulletin.getCategory(), category);
|
||||
assertEquals(bulletin.getMessage(), expectedOutput);
|
||||
assertEquals(bulletin.getLevel(), severity);
|
||||
}
|
||||
|
||||
private static class MockScriptedBulletinRepository extends MockBulletinRepository {
|
||||
|
||||
List<Bulletin> bulletinList;
|
||||
|
||||
MockScriptedBulletinRepository() {
|
||||
bulletinList = new ArrayList<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addBulletin(Bulletin bulletin) {
|
||||
bulletinList.add(bulletin);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Bulletin> findBulletinsForController() {
|
||||
return bulletinList;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private void setupTestRunner() throws Exception {
|
||||
runner = TestRunners.newTestRunner(TestProcessor.class);
|
||||
MockScriptedActionHandler handler = initTask("src/test/resources/groovy/test_propertycontext_action_handler.groovy");
|
||||
mockScriptedBulletinRepository = new MockScriptedBulletinRepository();
|
||||
Map<String, String> properties = new HashMap<>();
|
||||
properties.put(handler.getScriptingComponentHelper().SCRIPT_ENGINE.getName(), "Groovy");
|
||||
properties.put(ScriptingComponentUtils.SCRIPT_FILE.getName(), "src/test/resources/groovy/test_propertycontext_action_handler.groovy");
|
||||
runner.addControllerService("MockAlertHandler", handler, properties);
|
||||
runner.enableControllerService(handler);
|
||||
actionHandler = (MockScriptedActionHandler) runner.getProcessContext()
|
||||
.getControllerServiceLookup()
|
||||
.getControllerService("MockAlertHandler");
|
||||
reportingContext = mock(ReportingContext.class);
|
||||
when(reportingContext.getBulletinRepository()).thenReturn(mockScriptedBulletinRepository);
|
||||
when(reportingContext.createBulletin(anyString(), Mockito.any(Severity.class), anyString()))
|
||||
.thenAnswer(invocation -> BulletinFactory.createBulletin(invocation.getArgument(0), invocation.getArgument(1).toString(), invocation.getArgument(2)));
|
||||
}
|
||||
|
||||
private MockScriptedActionHandler initTask(String scriptFile) throws InitializationException {
|
||||
|
||||
final MockScriptedActionHandler actionHandler = new MockScriptedActionHandler();
|
||||
context = mock(ConfigurationContext.class);
|
||||
StateManager stateManager = new MockStateManager(actionHandler);
|
||||
|
||||
final ComponentLog logger = mock(ComponentLog.class);
|
||||
final ControllerServiceInitializationContext initContext = new MockControllerServiceInitializationContext(actionHandler, UUID.randomUUID().toString(), logger, stateManager);
|
||||
actionHandler.initialize(initContext);
|
||||
|
||||
// Call something that sets up the ScriptingComponentHelper, so we can mock it
|
||||
actionHandler.getSupportedPropertyDescriptors();
|
||||
|
||||
Map<PropertyDescriptor, String> properties = new HashMap<>();
|
||||
properties.put(actionHandler.getScriptingComponentHelper().SCRIPT_ENGINE, actionHandler.getScriptingComponentHelper().SCRIPT_ENGINE.getName());
|
||||
properties.put(ScriptingComponentUtils.SCRIPT_FILE, ScriptingComponentUtils.SCRIPT_FILE.getName());
|
||||
properties.put(ScriptingComponentUtils.SCRIPT_BODY, ScriptingComponentUtils.SCRIPT_BODY.getName());
|
||||
properties.put(ScriptingComponentUtils.MODULES, ScriptingComponentUtils.MODULES.getName());
|
||||
when(context.getProperties()).thenReturn(properties);
|
||||
|
||||
when(context.getProperty(actionHandler.getScriptingComponentHelper().SCRIPT_ENGINE))
|
||||
.thenReturn(new MockPropertyValue("Groovy"));
|
||||
when(context.getProperty(ScriptingComponentUtils.SCRIPT_FILE))
|
||||
.thenReturn(new MockPropertyValue(scriptFile));
|
||||
when(context.getProperty(ScriptingComponentUtils.SCRIPT_BODY))
|
||||
.thenReturn(new MockPropertyValue(null));
|
||||
when(context.getProperty(ScriptingComponentUtils.MODULES))
|
||||
.thenReturn(new MockPropertyValue(null));
|
||||
try {
|
||||
actionHandler.onEnabled(context);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
fail("onEnabled error: " + e.getMessage());
|
||||
}
|
||||
return actionHandler;
|
||||
}
|
||||
|
||||
public static class MockScriptedActionHandler extends ScriptedActionHandler implements AccessibleScriptingComponentHelper {
|
||||
|
||||
@Override
|
||||
public ScriptingComponentHelper getScriptingComponentHelper() {
|
||||
return this.scriptingComponentHelper;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,47 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.nifi.rules.handlers.script;
|
||||
|
||||
import org.apache.nifi.components.PropertyDescriptor;
|
||||
import org.apache.nifi.processor.AbstractProcessor;
|
||||
import org.apache.nifi.processor.ProcessContext;
|
||||
import org.apache.nifi.processor.ProcessSession;
|
||||
import org.apache.nifi.processor.exception.ProcessException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class TestProcessor extends AbstractProcessor {
|
||||
|
||||
private static PropertyDescriptor prop = new PropertyDescriptor.Builder()
|
||||
.name("scripted-action-handler-test")
|
||||
.description("Scripted Action Handler")
|
||||
.identifiesControllerService(ScriptedActionHandler.class)
|
||||
.required(true)
|
||||
.build();
|
||||
|
||||
@Override
|
||||
public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
|
||||
}
|
||||
|
||||
@Override
|
||||
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
|
||||
List<PropertyDescriptor> properties = new ArrayList<>();
|
||||
properties.add(prop);
|
||||
return properties;
|
||||
}
|
||||
}
|
@ -0,0 +1,32 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the 'License') you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an 'AS IS' BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.nifi.controller.AbstractControllerService
|
||||
import org.apache.nifi.rules.Action
|
||||
import org.apache.nifi.rules.ActionHandler
|
||||
|
||||
|
||||
class GroovyActionHandler extends AbstractControllerService implements ActionHandler {
|
||||
|
||||
@Override
|
||||
void execute(Action action, Map<String, Object> facts) {
|
||||
// Add a fact for verification that execute was successfully performed
|
||||
facts['testFact'] = 42
|
||||
}
|
||||
}
|
||||
|
||||
actionHandler = new GroovyActionHandler()
|
@ -0,0 +1,49 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the 'License') you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an 'AS IS' BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.nifi.context.PropertyContext
|
||||
import org.apache.nifi.controller.AbstractControllerService
|
||||
import org.apache.nifi.reporting.BulletinRepository
|
||||
import org.apache.nifi.reporting.ReportingContext
|
||||
import org.apache.nifi.reporting.Severity
|
||||
import org.apache.nifi.rules.Action
|
||||
import org.apache.nifi.rules.PropertyContextActionHandler
|
||||
|
||||
|
||||
class GroovyPropertyContextActionHandler extends AbstractControllerService implements PropertyContextActionHandler {
|
||||
|
||||
@Override
|
||||
void execute(Action action, Map<String, Object> facts) {
|
||||
// Add a fact for verification that execute was successfully performed
|
||||
facts['testFact'] = 42
|
||||
}
|
||||
|
||||
@Override
|
||||
void execute(PropertyContext propertyContext, Action action, Map<String, Object> facts) {
|
||||
// Add a fact for verification that execute was successfully performed
|
||||
if (propertyContext instanceof ReportingContext) {
|
||||
ReportingContext context = (ReportingContext) propertyContext
|
||||
BulletinRepository bulletinRepository = context.bulletinRepository
|
||||
bulletinRepository.addBulletin(context.createBulletin('Rules Alert', Severity.INFO, 'This should be sent as an alert!\n'
|
||||
+ 'Alert Facts:\n'
|
||||
+ 'Field: cpu, Value: 90\n'
|
||||
+ 'Field: jvmHeap, Value: 1000000\n'))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
actionHandler = new GroovyPropertyContextActionHandler()
|
@ -0,0 +1,37 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the 'License') you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an 'AS IS' BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.nifi.controller.AbstractControllerService
|
||||
import org.apache.nifi.record.sink.RecordSinkService
|
||||
import org.apache.nifi.serialization.WriteResult
|
||||
import org.apache.nifi.serialization.record.RecordSet
|
||||
|
||||
|
||||
class GroovyRecordSink extends AbstractControllerService implements RecordSinkService {
|
||||
|
||||
@Override
|
||||
WriteResult sendData(RecordSet recordSet, Map<String, String> attributes, boolean sendZeroResults) throws IOException {
|
||||
attributes['newAttr'] = 'I am now set.'
|
||||
int recordCount = 0
|
||||
def record
|
||||
while(recordSet.next()) {
|
||||
recordCount++
|
||||
}
|
||||
return WriteResult.of(recordCount, attributes)
|
||||
}
|
||||
}
|
||||
recordSink = new GroovyRecordSink()
|
@ -0,0 +1,39 @@
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the 'License') you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an 'AS IS' BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import org.apache.nifi.controller.AbstractControllerService
|
||||
import org.apache.nifi.rules.Action
|
||||
import org.apache.nifi.rules.engine.RulesEngineService
|
||||
|
||||
|
||||
class GroovyRulesEngine extends AbstractControllerService implements RulesEngineService {
|
||||
|
||||
|
||||
@Override
|
||||
List<Action> fireRules(Map<String, Object> facts) {
|
||||
def actions = [] as List<Action>
|
||||
if (facts['predictedQueuedCount'] > 50) {
|
||||
actions << new Action('LOG', ['level': 'DEBUG'])
|
||||
}
|
||||
if (facts['predictedTimeToBytesBackpressureMillis'] < 300000) {
|
||||
actions << new Action('ALERT', ['message': 'Time to backpressure < 5 mins'])
|
||||
}
|
||||
actions
|
||||
}
|
||||
}
|
||||
|
||||
rulesEngine = new GroovyRulesEngine()
|
Loading…
x
Reference in New Issue
Block a user