NIFI-10449 Fixed ScriptedLookupServices reloading

- Reload script after disabled and first validation after changes made

This closes #6371

Signed-off-by: David Handermann <exceptionfactory@apache.org>
This commit is contained in:
Matthew Burgess 2022-09-06 17:33:12 -04:00 committed by exceptionfactory
parent 1d7ee542b1
commit e0ec9780a5
No known key found for this signature in database
GPG Key ID: 29B6A52D2AAE8DBA
5 changed files with 114 additions and 19 deletions

View File

@ -721,7 +721,10 @@ public class StandardProcessorTestRunner implements TestRunner {
}
try {
ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service);
// Create a config context to pass into the controller service's OnDisabled method (it will be ignored if the controller service has no arguments)
final MockConfigurationContext configContext = new MockConfigurationContext(service, configuration.getProperties(), context, variableRegistry);
configContext.setValidateExpressions(validateExpressionUsage);
ReflectionUtils.invokeMethodsWithAnnotation(OnDisabled.class, service, configContext);
} catch (final Exception e) {
e.printStackTrace();
Assertions.fail("Failed to disable Controller Service " + service + " due to " + e);

View File

@ -21,7 +21,9 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.ConfigurableComponent;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.resource.ResourceReferences;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerServiceInitializationContext;
@ -35,6 +37,7 @@ import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.script.AbstractScriptedControllerService;
import org.apache.nifi.script.ScriptingComponentHelper;
import org.apache.nifi.script.ScriptingComponentUtils;
import org.apache.nifi.script.impl.FilteredPropertiesValidationContextAdapter;
import javax.script.Invocable;
import javax.script.ScriptContext;
@ -46,6 +49,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@ -105,7 +109,7 @@ public class BaseScriptedLookupService extends AbstractScriptedControllerService
}
} catch (final Throwable t) {
final ComponentLog logger = getLogger();
final String message = "Unable to get property descriptors from Processor: " + t;
final String message = "Unable to get property descriptors from LookupService: " + t;
logger.error(message);
if (logger.isDebugEnabled()) {
@ -148,16 +152,29 @@ public class BaseScriptedLookupService extends AbstractScriptedControllerService
*/
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
validationResults.set(new HashSet<>());
final ComponentLog logger = getLogger();
final ConfigurableComponent instance = lookupService.get();
final LookupService<?> instance = lookupService.get();
if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor)
|| ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor)
|| ScriptingComponentUtils.MODULES.equals(descriptor)
|| scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
// Update the ScriptingComponentHelper's value(s)
if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor)) {
scriptingComponentHelper.setScriptPath(newValue);
} else if (ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor)) {
scriptingComponentHelper.setScriptBody(newValue);
} else if (scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
scriptingComponentHelper.setScriptEngineName(newValue);
}
scriptNeedsReload.set(true);
scriptRunner = null; //reset engine. This happens only when the controller service is disabled, so there won't be any performance impact in run-time.
} else if (instance != null) {
// If the script provides a ConfigurableComponent, call its onPropertyModified() method
// If the script provides a LookupService, call its onPropertyModified() method
try {
instance.onPropertyModified(descriptor, oldValue, newValue);
} catch (final Exception e) {
@ -167,6 +184,71 @@ public class BaseScriptedLookupService extends AbstractScriptedControllerService
}
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
Collection<ValidationResult> commonValidationResults = super.customValidate(context);
if (!commonValidationResults.isEmpty()) {
return commonValidationResults;
}
// do not try to build processor/compile/etc until onPropertyModified clear the validation error/s
// and don't print anything into log.
if (!validationResults.get().isEmpty()) {
return validationResults.get();
}
Collection<ValidationResult> scriptingComponentHelperResults = scriptingComponentHelper.customValidate(context);
if (scriptingComponentHelperResults != null && !scriptingComponentHelperResults.isEmpty()) {
validationResults.set(scriptingComponentHelperResults);
return scriptingComponentHelperResults;
}
scriptingComponentHelper.setScriptEngineName(context.getProperty(scriptingComponentHelper.SCRIPT_ENGINE).getValue());
scriptingComponentHelper.setScriptPath(context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).evaluateAttributeExpressions().getValue());
scriptingComponentHelper.setScriptBody(context.getProperty(ScriptingComponentUtils.SCRIPT_BODY).getValue());
final ResourceReferences resourceReferences = context.getProperty(ScriptingComponentUtils.MODULES).evaluateAttributeExpressions().asResources();
scriptingComponentHelper.setModules(resourceReferences);
setup();
// Now that the component is validated, we can call validate on the scripted lookup service
final LookupService<?> instance = lookupService.get();
final Collection<ValidationResult> currentValidationResults = validationResults.get();
// if there was existing validation errors and the processor loaded successfully
if (currentValidationResults.isEmpty() && instance != null) {
try {
// defer to the underlying controller service for validation, without the
// lookup service's properties
final Set<PropertyDescriptor> innerPropertyDescriptor = new HashSet<>(scriptingComponentHelper.getDescriptors());
ValidationContext innerValidationContext = new FilteredPropertiesValidationContextAdapter(context, innerPropertyDescriptor);
final Collection<ValidationResult> instanceResults = instance.validate(innerValidationContext);
if (instanceResults != null && instanceResults.size() > 0) {
// return the validation results from the underlying instance
return instanceResults;
}
} catch (final Exception e) {
final ComponentLog logger = getLogger();
final String message = "Unable to validate the scripted LookupService: " + e;
logger.error(message, e);
// return a new validation message
final Collection<ValidationResult> results = new HashSet<>();
results.add(new ValidationResult.Builder()
.subject("Validation")
.valid(false)
.explanation("An error occurred calling validate in the configured scripted LookupService.")
.input(context.getProperty(ScriptingComponentUtils.SCRIPT_FILE).getValue())
.build());
return results;
}
}
return currentValidationResults;
}
@Override
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
@ -226,7 +308,8 @@ public class BaseScriptedLookupService extends AbstractScriptedControllerService
}
}
} else {
throw new ScriptException("No LookupService was defined by the script.");
// This might be due to an error during compilation, log it rather than throwing an exception
getLogger().warn("No LookupService was defined by the script.");
}
} catch (ScriptException se) {
throw new ProcessException("Error executing onDisabled(context) method", se);
@ -235,6 +318,10 @@ public class BaseScriptedLookupService extends AbstractScriptedControllerService
} else {
throw new ProcessException("Error creating ScriptRunner");
}
scriptingComponentHelper.stop();
lookupService.set(null);
scriptRunner = null;
}
@Override
@ -262,7 +349,7 @@ public class BaseScriptedLookupService extends AbstractScriptedControllerService
final Collection<ValidationResult> results = new HashSet<>();
try {
// Create a single script engine, the Processor object is reused by each task
// Create a single script engine, the LookupService object is reused by each task
if (scriptRunner == null) {
scriptingComponentHelper.setupScriptRunners(1, scriptBody, getLogger());
scriptRunner = scriptingComponentHelper.scriptRunnerQ.poll();

View File

@ -236,7 +236,7 @@ public class ScriptingComponentHelper {
}
// Get a list of URLs from the configurator (if present), or just convert modules from Strings to URLs
final String[] locations = modules.asLocations().toArray(new String[0]);
final String[] locations = (modules == null) ? new String[0] : modules.asLocations().toArray(new String[0]);
final URL[] additionalClasspathURLs = ScriptRunnerFactory.getInstance().getModuleURLsForClasspath(scriptEngineName, locations, log);

View File

@ -112,19 +112,10 @@ class TestScriptedLookupService {
runner.addControllerService("lookupService", scriptedLookupService)
runner.setProperty(scriptedLookupService, "Script Engine", "Groovy")
runner.setProperty(scriptedLookupService, ScriptingComponentUtils.SCRIPT_BODY, (String) null)
runner.setProperty(scriptedLookupService, ScriptingComponentUtils.SCRIPT_FILE, ALTERNATE_TARGET_PATH.toString())
runner.setProperty(scriptedLookupService, ScriptingComponentUtils.SCRIPT_FILE, TARGET_PATH.toString())
runner.setProperty(scriptedLookupService, ScriptingComponentUtils.MODULES, (String) null)
// This call to setup should fail loading the script but should mark that the script should be reloaded
scriptedLookupService.setup()
// This prevents the (lookupService == null) check from passing, in order to force the reload from the
// scriptNeedsReload variable specifically
scriptedLookupService.lookupService.set(new MockScriptedLookupService())
runner.enableControllerService(scriptedLookupService)
MockFlowFile mockFlowFile = new MockFlowFile(1L)
InputStream inStream = new ByteArrayInputStream('Flow file content not used'.bytes)
Optional opt = scriptedLookupService.lookup(['key':'Hello'])
assertTrue(opt.present)
assertEquals('Hi', opt.get())
@ -133,6 +124,20 @@ class TestScriptedLookupService {
assertEquals('there', opt.get())
opt = scriptedLookupService.lookup(['key':'Not There'])
assertFalse(opt.present)
// Disable and load different script
runner.disableControllerService(scriptedLookupService)
runner.setProperty(scriptedLookupService, ScriptingComponentUtils.SCRIPT_FILE, ALTERNATE_TARGET_PATH.toString())
runner.enableControllerService(scriptedLookupService)
opt = scriptedLookupService.lookup(['key':'Hello'])
assertTrue(opt.present)
assertEquals('Goodbye', opt.get())
opt = scriptedLookupService.lookup(['key':'World'])
assertTrue(opt.present)
assertEquals('Stranger', opt.get())
opt = scriptedLookupService.lookup(['key':'Not There'])
assertFalse(opt.present)
}
class MockScriptedLookupService extends ScriptedLookupService implements AccessibleScriptingComponentHelper {

View File

@ -24,8 +24,8 @@ import org.apache.nifi.reporting.InitializationException
class SimpleGroovyLookupService implements StringLookupService {
def lookupTable = [
'Hello': 'Hi',
'World': 'there'
'Hello': 'Goodbye',
'World': 'Stranger'
]