NIFI-7260: Fix error handling and re-evaluate Module Directory property on changed for scripted controller services

This closes #4147

Signed-off-by: Mike Thomsen <mthomsen@apache.org>
This commit is contained in:
Matthew Burgess 2020-03-16 19:01:07 -04:00 committed by Mike Thomsen
parent 1e13b62e78
commit 64e3599f05
No known key found for this signature in database
GPG Key ID: 88511C3D4CAD246F
5 changed files with 143 additions and 5 deletions

View File

@ -42,9 +42,13 @@ public abstract class AbstractScriptedRecordFactory<T> extends AbstractScriptedC
if (scriptNeedsReload.get() || recordFactory.get() == null) {
if (ScriptingComponentHelper.isFile(scriptingComponentHelper.getScriptPath())) {
reloadScriptFile(scriptingComponentHelper.getScriptPath());
if (!reloadScriptFile(scriptingComponentHelper.getScriptPath())) {
throw new ProcessException("Error during loading of script");
}
} else {
reloadScriptBody(scriptingComponentHelper.getScriptBody());
if (!reloadScriptBody(scriptingComponentHelper.getScriptBody())) {
throw new ProcessException("Error during loading of script");
}
}
scriptNeedsReload.set(false);
}

View File

@ -103,13 +103,15 @@ public abstract class AbstractScriptedControllerService extends AbstractControll
@Override
public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
validationResults.set(new HashSet<>());
if (ScriptingComponentUtils.SCRIPT_FILE.equals(descriptor)
|| ScriptingComponentUtils.SCRIPT_BODY.equals(descriptor)
|| ScriptingComponentUtils.MODULES.equals(descriptor)
|| scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
scriptNeedsReload.set(true);
// Need to reset scriptEngine if the value has changed
if (scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor)) {
if (scriptingComponentHelper.SCRIPT_ENGINE.equals(descriptor) || ScriptingComponentUtils.MODULES.equals(descriptor)) {
scriptEngine = null;
}
}
@ -117,7 +119,21 @@ public abstract class AbstractScriptedControllerService extends AbstractControll
@Override
protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
return scriptingComponentHelper.customValidate(validationContext);
Collection<ValidationResult> commonValidationResults = super.customValidate(validationContext);
commonValidationResults.addAll(scriptingComponentHelper.customValidate(validationContext));
if (!commonValidationResults.isEmpty()) {
return commonValidationResults;
}
// do not try to build processor/compile/etc until onPropertyModified clear the validation error/s
// and don't print anything into log.
if (!validationResults.get().isEmpty()) {
return validationResults.get();
}
return commonValidationResults;
}
public void onEnabled(final ConfigurationContext context) {

View File

@ -37,9 +37,10 @@ import org.junit.runners.JUnit4
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import static groovy.util.GroovyTestCase.assertEquals
import static junit.framework.TestCase.assertEquals
import static org.junit.Assert.assertNotNull
import static org.junit.Assert.assertNull
import static org.junit.Assert.fail
import static org.mockito.Mockito.mock
import static org.mockito.Mockito.when
@ -189,7 +190,56 @@ class ScriptedReaderTest {
assertEquals(record.getAsInt('code'), record.getAsInt('id') * 100)
}
assertNull(recordReader.nextRecord())
}
@Test
void testRecordReaderGroovyScriptChangeModuleDirectory() {
def properties = [:] as Map<PropertyDescriptor, String>
recordReaderFactory.getSupportedPropertyDescriptors().each {PropertyDescriptor descriptor ->
properties.put(descriptor, descriptor.getDefaultValue())
}
// Mock the ConfigurationContext for setup(...)
def configurationContext = mock(ConfigurationContext)
when(configurationContext.getProperty(scriptingComponent.getScriptingComponentHelper().SCRIPT_ENGINE))
.thenReturn(new MockPropertyValue('Groovy'))
when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_FILE))
.thenReturn(new MockPropertyValue('target/test/resources/groovy/test_record_reader_load_module.groovy'))
when(configurationContext.getProperty(ScriptingComponentUtils.SCRIPT_BODY))
.thenReturn(new MockPropertyValue(null))
when(configurationContext.getProperty(ScriptingComponentUtils.MODULES))
.thenReturn(new MockPropertyValue(null))
def logger = mock(ComponentLog)
def initContext = mock(ControllerServiceInitializationContext)
when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString())
when(initContext.getLogger()).thenReturn(logger)
recordReaderFactory.initialize initContext
try {
recordReaderFactory.onEnabled configurationContext
fail('Expected exception in onEnabled when script is loaded with no Module Directory set')
} catch(e) {
// Do nothing, the exception is expected as the needed class is not in the Module Directory property
}
byte[] contentBytes = 'Flow file content not used'.bytes
InputStream inStream = new ByteArrayInputStream(contentBytes)
def recordReader = recordReaderFactory.createRecordReader(Collections.emptyMap(), inStream, contentBytes.length, logger)
// This one is supposed to be null as the factory should fail on initialize
assertNull(recordReader)
when(configurationContext.getProperty(ScriptingComponentUtils.MODULES))
.thenReturn(new MockPropertyValue('target/test/resources/jar/test.jar'))
recordReaderFactory.onPropertyModified(ScriptingComponentUtils.MODULES, '', 'target/test/resources/jar/test.jar')
recordReaderFactory.initialize initContext
recordReaderFactory.onEnabled configurationContext
recordReader = recordReaderFactory.createRecordReader(Collections.emptyMap(), inStream, contentBytes.length, logger)
assertNotNull(recordReader)
}
class MockScriptedReader extends ScriptedReader implements AccessibleScriptingComponentHelper {

View File

@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.nifi.controller.AbstractControllerService
import org.apache.nifi.logging.ComponentLog
import org.apache.nifi.schema.access.SchemaNotFoundException
import org.apache.nifi.serialization.MalformedRecordException
import org.apache.nifi.serialization.RecordReader
import org.apache.nifi.serialization.RecordReaderFactory
import org.apache.nifi.serialization.SimpleRecordSchema
import org.apache.nifi.serialization.record.MapRecord
import org.apache.nifi.serialization.record.Record
import org.apache.nifi.serialization.record.RecordField
import org.apache.nifi.serialization.record.RecordFieldType
import org.apache.nifi.serialization.record.RecordSchema
// import a test class to ensure Module Directory property is working correctly
import org.apache.nifi.script.ModulePropertyExample
class GroovyModuleRecordReader implements RecordReader {
def recordSchema = new SimpleRecordSchema(
[new RecordField('id', RecordFieldType.INT.dataType),
new RecordField('name', RecordFieldType.STRING.dataType),
new RecordField('code', RecordFieldType.INT.dataType)]
)
def recordIterator = [
new MapRecord(recordSchema, ['id': 1, 'name': 'John', 'code': 100]),
new MapRecord(recordSchema, ['id': 2, 'name': 'Mary', 'code': 200]),
new MapRecord(recordSchema, ['id': 3, 'name': 'Ramon', 'code': 300])
].iterator()
Record nextRecord(boolean coerceTypes, boolean dropUnknown) throws IOException, MalformedRecordException {
return recordIterator.hasNext() ? recordIterator.next() : null
}
RecordSchema getSchema() throws MalformedRecordException {
return recordSchema
}
void close() throws IOException {
}
}
class GroovyModuleRecordReaderFactory extends AbstractControllerService implements RecordReaderFactory {
RecordReader createRecordReader(Map<String, String> variables, InputStream inputStream, long inputLength, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
return new GroovyModuleRecordReader()
}
}
reader = new GroovyModuleRecordReaderFactory()