NIFI-8661: Updated ReaderLookup and RecordSetWriterLookup to use a property that indicates which reader/writer to use instead of requiring a specific attribute be provided

Signed-off-by: Matthew Burgess <mattyb149@apache.org>

This closes #5129
This commit is contained in:
Mark Payne 2021-06-04 14:56:16 -04:00 committed by Matthew Burgess
parent e883aa6b0b
commit d340ad79ed
No known key found for this signature in database
GPG Key ID: 05D3DEB8126DAD24
4 changed files with 119 additions and 129 deletions

View File

@ -16,20 +16,19 @@
*/
package org.apache.nifi.lookup;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
@ -44,62 +43,89 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import static org.apache.nifi.expression.ExpressionLanguageScope.NONE;
@Tags({"lookup", "parse", "record", "row", "reader"})
@SeeAlso({RecordSetWriterLookup.class})
@CapabilityDescription("Provides a RecordReaderFactory that can be used to dynamically select another RecordReaderFactory. This service " +
"requires an variable named 'recordreader.name' to be passed in when asking for a record record, and will throw an exception " +
"if the variable is missing. The value of 'recordreader.name' will be used to select the RecordReaderFactory that has been " +
"registered with that name. This will allow multiple RecordReaderFactory's to be defined and registered, and then selected " +
"dynamically at runtime by tagging flow files with the appropriate 'recordreader.name' variable.")
@DynamicProperty(name = "Name of the RecordReader", value = "A RecordReaderFactory controller service", expressionLanguageScope = ExpressionLanguageScope.NONE,
description = "")
@CapabilityDescription("Provides a RecordReaderFactory that can be used to dynamically select another RecordReaderFactory. " +
"This will allow multiple RecordReaderFactories to be defined and registered, and then selected " +
"dynamically at runtime by referencing a FlowFile attribute in the Service to Use property.")
@DynamicProperty(name = "Name of the RecordReader", value = "A RecordReaderFactory controller service", description = "", expressionLanguageScope = NONE)
public class ReaderLookup extends AbstractControllerService implements RecordReaderFactory {
public static final String RECORDREADER_NAME_VARIABLE = "recordreader.name";
static final PropertyDescriptor SERVICE_TO_USE = new Builder()
.name("Service to Use")
.displayName("Service to Use")
.description("Specifies the name of the user-defined property whose associated Controller Service should be used.")
.required(true)
.defaultValue("${recordreader.name}")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
private volatile Map<String, RecordReaderFactory> recordReaderFactoryMap;
private volatile PropertyValue serviceToUseValue;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.singletonList(SERVICE_TO_USE);
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
return new Builder()
.name(propertyDescriptorName)
.description("The RecordReaderFactory to return when recordreader.name = '" + propertyDescriptorName + "'")
.description("The RecordReaderFactory to return when '" + propertyDescriptorName + "' is the chosen Record Reader")
.identifiesControllerService(RecordReaderFactory.class)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> results = new ArrayList<>();
int numDefinedServices = 0;
final Set<String> serviceNames = new HashSet<>();
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
if (descriptor.isDynamic()) {
numDefinedServices++;
serviceNames.add(descriptor.getName());
}
final String referencedId = context.getProperty(descriptor).getValue();
if (this.getIdentifier().equals(referencedId)) {
results.add(new ValidationResult.Builder()
.subject(descriptor.getDisplayName())
.explanation("the current service cannot be registered as a RecordReaderFactory to lookup")
.explanation("The current service cannot be registered as a RecordReaderFactory to lookup")
.valid(false)
.build());
}
}
if (numDefinedServices == 0) {
if (serviceNames.isEmpty()) {
results.add(new ValidationResult.Builder()
.subject(this.getClass().getSimpleName())
.explanation("at least one RecordReaderFactory must be defined via dynamic properties")
.explanation("At least one RecordReaderFactory must be defined via dynamic properties")
.valid(false)
.build());
}
final PropertyValue serviceToUseValue = context.getProperty(SERVICE_TO_USE);
if (!serviceToUseValue.isExpressionLanguagePresent()) {
final String selectedValue = serviceToUseValue.getValue();
if (!serviceNames.contains(selectedValue)) {
results.add(new ValidationResult.Builder()
.subject(SERVICE_TO_USE.getDisplayName())
.explanation("No service is defined with the name <" + selectedValue + ">")
.valid(false)
.build());
}
}
return results;
}
@ -115,42 +141,22 @@ public class ReaderLookup extends AbstractControllerService implements RecordRea
}
recordReaderFactoryMap = Collections.unmodifiableMap(serviceMap);
}
@OnDisabled
public void onDisabled() {
recordReaderFactoryMap = null;
serviceToUseValue = context.getProperty(SERVICE_TO_USE);
}
@Override
public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
if(flowFile == null) {
throw new UnsupportedOperationException("Cannot lookup a RecordReaderFactory without variables.");
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger)
throws MalformedRecordException, IOException, SchemaNotFoundException {
final String serviceName = serviceToUseValue.evaluateAttributeExpressions(variables).getValue();
if (serviceName.trim().isEmpty()) {
throw new ProcessException("Unable to determine which Record Reader to use: after evaluating the property value against supplied variables, got an empty value");
}
return createRecordReader(flowFile.getAttributes(), in, flowFile.getSize(), logger);
}
@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
if(variables == null) {
throw new UnsupportedOperationException("Cannot lookup a RecordReaderFactory without variables.");
}
if (!variables.containsKey(RECORDREADER_NAME_VARIABLE)) {
throw new ProcessException("Variables must contain a variables name '" + RECORDREADER_NAME_VARIABLE + "'");
}
final String recordReaderName = variables.get(RECORDREADER_NAME_VARIABLE);
if (StringUtils.isBlank(recordReaderName)) {
throw new ProcessException(RECORDREADER_NAME_VARIABLE + " cannot be null or blank");
}
final RecordReaderFactory recordReaderFactory = recordReaderFactoryMap.get(recordReaderName);
final RecordReaderFactory recordReaderFactory = recordReaderFactoryMap.get(serviceName);
if (recordReaderFactory == null) {
throw new ProcessException("No RecordReaderFactory was found for " + RECORDREADER_NAME_VARIABLE
+ "'" + recordReaderName + "'");
throw new ProcessException("No RecordReaderFactory was configured with the name <" + serviceName + ">");
}
return recordReaderFactory.createRecordReader(variables, in, inputLength, logger);

View File

@ -17,14 +17,13 @@
package org.apache.nifi.lookup;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
@ -44,62 +43,88 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@Tags({"lookup", "result", "set", "writer", "serializer", "record", "recordset", "row"})
@SeeAlso({ReaderLookup.class})
@CapabilityDescription("Provides a RecordSetWriterFactory that can be used to dynamically select another RecordSetWriterFactory. This service " +
"requires a variable named 'recordsetwriter.name' to be passed in when asking for a schema or record set writer, and will throw an exception " +
"if the variable is missing. The value of 'recordsetwriter.name' will be used to select the RecordSetWriterFactory that has been " +
"registered with that name. This will allow multiple RecordSetWriterFactory's to be defined and registered, and then selected " +
"dynamically at runtime by tagging flow files with the appropriate 'recordsetwriter.name' variable.")
@CapabilityDescription("Provides a RecordSetWriterFactory that can be used to dynamically select another RecordSetWriterFactory. " +
"This will allow multiple RecordSetWriterFactory's to be defined and registered, and then selected " +
"dynamically at runtime by tagging FlowFiles with the attributes and referencing those attributes in the Service to Use property.")
@DynamicProperty(name = "Name of the RecordSetWriter", value = "A RecordSetWriterFactory controller service", expressionLanguageScope = ExpressionLanguageScope.NONE,
description = "")
public class RecordSetWriterLookup extends AbstractControllerService implements RecordSetWriterFactory {
public static final String RECORDWRITER_NAME_VARIABLE = "recordsetwriter.name";
static final PropertyDescriptor SERVICE_TO_USE = new PropertyDescriptor.Builder()
.name("Service to Use")
.displayName("Service to Use")
.description("Specifies the name of the user-defined property whose associated Controller Service should be used.")
.required(true)
.defaultValue("${recordsetwriter.name}")
.expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
private volatile Map<String,RecordSetWriterFactory> recordSetWriterFactoryMap;
private volatile PropertyValue serviceToUseValue;
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.name(propertyDescriptorName)
.description("The RecordSetWriterFactory to return when recordwriter.name = '" + propertyDescriptorName + "'")
.description("The RecordSetWriterFactory to return when '" + propertyDescriptorName + "' is the chosen Record Reader")
.identifiesControllerService(RecordSetWriterFactory.class)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();
}
@Override
protected Collection<ValidationResult> customValidate(ValidationContext context) {
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return Collections.singletonList(SERVICE_TO_USE);
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext context) {
final List<ValidationResult> results = new ArrayList<>();
int numDefinedServices = 0;
final Set<String> serviceNames = new HashSet<>();
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
if (descriptor.isDynamic()) {
numDefinedServices++;
serviceNames.add(descriptor.getName());
}
final String referencedId = context.getProperty(descriptor).getValue();
if (this.getIdentifier().equals(referencedId)) {
results.add(new ValidationResult.Builder()
.subject(descriptor.getDisplayName())
.explanation("the current service cannot be registered as a RecordSetWriterFactory to lookup")
.explanation("The current service cannot be registered as a RecordSetWriter to lookup")
.valid(false)
.build());
}
}
if (numDefinedServices == 0) {
if (serviceNames.isEmpty()) {
results.add(new ValidationResult.Builder()
.subject(this.getClass().getSimpleName())
.explanation("at least one RecordSetWriterFactory must be defined via dynamic properties")
.explanation("At least one RecordSetWriter must be defined via dynamic properties")
.valid(false)
.build());
}
final PropertyValue serviceToUseValue = context.getProperty(SERVICE_TO_USE);
if (!serviceToUseValue.isExpressionLanguagePresent()) {
final String selectedValue = serviceToUseValue.getValue();
if (!serviceNames.contains(selectedValue)) {
results.add(new ValidationResult.Builder()
.subject(SERVICE_TO_USE.getDisplayName())
.explanation("No service is defined with the name <" + selectedValue + ">")
.valid(false)
.build());
}
}
return results;
}
@ -115,11 +140,7 @@ public class RecordSetWriterLookup extends AbstractControllerService implements
}
recordSetWriterFactoryMap = Collections.unmodifiableMap(serviceMap);
}
@OnDisabled
public void onDisabled() {
recordSetWriterFactoryMap = null;
serviceToUseValue = context.getProperty(SERVICE_TO_USE);
}
@ -128,34 +149,20 @@ public class RecordSetWriterLookup extends AbstractControllerService implements
return getRecordSetWriterFactory(variables).getSchema(variables, readSchema);
}
@Override
public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out) {
throw new UnsupportedOperationException("Cannot lookup RecordSetWriterFactory without variables");
}
@Override
public RecordSetWriter createWriter(ComponentLog logger, RecordSchema schema, OutputStream out, Map<String, String> variables) throws SchemaNotFoundException, IOException {
return getRecordSetWriterFactory(variables).createWriter(logger, schema, out, variables);
}
private RecordSetWriterFactory getRecordSetWriterFactory(Map<String, String> variables){
if (variables == null) {
throw new UnsupportedOperationException("Cannot lookup RecordSetWriterFactory without variables");
final String serviceName = serviceToUseValue.evaluateAttributeExpressions(variables).getValue();
if (serviceName.trim().isEmpty()) {
throw new ProcessException("Unable to determine which Record Writer to use: after evaluating the property value against supplied variables, got an empty value");
}
if (!variables.containsKey(RECORDWRITER_NAME_VARIABLE)) {
throw new ProcessException("Attributes must contain an variables name '" + RECORDWRITER_NAME_VARIABLE + "'");
}
final String recordSetWriterName = variables.get(RECORDWRITER_NAME_VARIABLE);
if (StringUtils.isBlank(recordSetWriterName)) {
throw new ProcessException(RECORDWRITER_NAME_VARIABLE + " cannot be null or blank");
}
final RecordSetWriterFactory recordSetWriterFactory = recordSetWriterFactoryMap.get(recordSetWriterName);
final RecordSetWriterFactory recordSetWriterFactory = recordSetWriterFactoryMap.get(serviceName);
if (recordSetWriterFactory == null) {
throw new ProcessException("No RecordSetWriterFactory was found for " + RECORDWRITER_NAME_VARIABLE
+ "'" + recordSetWriterName + "'");
throw new ProcessException("No Record Writer was configured with the name <" + serviceName + ">");
}
return recordSetWriterFactory;

View File

@ -17,7 +17,6 @@
package org.apache.nifi.lookup;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
@ -42,6 +41,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class TestReaderLookup {
private final String DEFAULT_ATTRIBUTE_NAME = "recordreader.name";
private MockRecordReaderFactory recordReaderA;
private MockRecordReaderFactory recordReaderB;
@ -75,31 +75,19 @@ public class TestReaderLookup {
@Test
public void testLookupServiceByName() throws SchemaNotFoundException, MalformedRecordException, IOException {
final Map<String,String> attributes = new HashMap<>();
attributes.put(ReaderLookup.RECORDREADER_NAME_VARIABLE, "A");
attributes.put(DEFAULT_ATTRIBUTE_NAME, "A");
MockRecordReader recordReader = (MockRecordReader) readerLookup.createRecordReader(attributes, null, -1, null);
assertNotNull(recordReader);
assertEquals(recordReaderA.name, recordReader.name);
attributes.put(ReaderLookup.RECORDREADER_NAME_VARIABLE, "B");
attributes.put(DEFAULT_ATTRIBUTE_NAME, "B");
recordReader = (MockRecordReader) readerLookup.createRecordReader(attributes, null, -1, null);
assertNotNull(recordReader);
assertEquals(recordReaderB.name, recordReader.name);
}
@Test(expected = UnsupportedOperationException.class)
public void testLookupWithoutAttributes() throws SchemaNotFoundException, MalformedRecordException, IOException {
Map<String,String> attributes = null;
readerLookup.createRecordReader(attributes, null, -1, null);
}
@Test(expected = UnsupportedOperationException.class)
public void testLookupWithoutFlowFile() throws SchemaNotFoundException, MalformedRecordException, IOException {
FlowFile flowFile = null;
readerLookup.createRecordReader(flowFile, null, null);
}
@Test(expected = ProcessException.class)
public void testLookupMissingNameAttribute() throws SchemaNotFoundException, MalformedRecordException, IOException {
final Map<String,String> attributes = new HashMap<>();
@ -109,7 +97,7 @@ public class TestReaderLookup {
@Test(expected = ProcessException.class)
public void testLookupWithNameThatDoesNotExist() throws SchemaNotFoundException, MalformedRecordException, IOException {
final Map<String,String> attributes = new HashMap<>();
attributes.put(ReaderLookup.RECORDREADER_NAME_VARIABLE, "DOES-NOT-EXIST");
attributes.put(DEFAULT_ATTRIBUTE_NAME, "DOES-NOT-EXIST");
readerLookup.createRecordReader(attributes, null, -1, null);
}

View File

@ -44,6 +44,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
public class TestRecordSetWriterLookup {
private final String DEFAULT_ATTRIBUTE_NAME = "recordsetwriter.name";
private MockRecordSetWriterFactory recordSetWriterA;
private MockRecordSetWriterFactory recordSetWriterB;
@ -77,7 +78,7 @@ public class TestRecordSetWriterLookup {
@Test
public void testLookupServiceByName() throws SchemaNotFoundException, IOException {
final Map<String,String> attributes = new HashMap<>();
attributes.put(RecordSetWriterLookup.RECORDWRITER_NAME_VARIABLE, "A");
attributes.put(DEFAULT_ATTRIBUTE_NAME, "A");
RecordSchema recordSchema = recordSetWriterLookup.getSchema(attributes, null);
assertNotNull(recordSchema);
@ -87,7 +88,7 @@ public class TestRecordSetWriterLookup {
assertNotNull(writer);
assertEquals(recordSetWriterA.name, writer.name);
attributes.put(RecordSetWriterLookup.RECORDWRITER_NAME_VARIABLE, "B");
attributes.put(DEFAULT_ATTRIBUTE_NAME, "B");
recordSchema = recordSetWriterLookup.getSchema(attributes, null);
assertNotNull(recordSchema);
@ -98,18 +99,6 @@ public class TestRecordSetWriterLookup {
assertEquals(recordSetWriterB.name, writer.name);
}
@Test(expected = UnsupportedOperationException.class)
public void testLookupWithoutAttributes() throws SchemaNotFoundException, IOException {
Map<String,String> attributes = null;
recordSetWriterLookup.createWriter(null, null, null, attributes);
}
@Test(expected = UnsupportedOperationException.class)
public void testLookupSchemaWithoutAttributes() throws SchemaNotFoundException, IOException {
Map<String,String> attributes = null;
recordSetWriterLookup.getSchema(attributes, null);
}
@Test(expected = ProcessException.class)
public void testLookupMissingNameAttribute() throws SchemaNotFoundException, IOException {
final Map<String,String> attributes = new HashMap<>();
@ -125,14 +114,14 @@ public class TestRecordSetWriterLookup {
@Test(expected = ProcessException.class)
public void testLookupWithNameThatDoesNotExist() throws SchemaNotFoundException, IOException {
final Map<String,String> attributes = new HashMap<>();
attributes.put(RecordSetWriterLookup.RECORDWRITER_NAME_VARIABLE, "DOES-NOT-EXIST");
attributes.put(DEFAULT_ATTRIBUTE_NAME, "DOES-NOT-EXIST");
recordSetWriterLookup.createWriter(null, null, null, attributes);
}
@Test(expected = ProcessException.class)
public void testLookupSchemaWithNameThatDoesNotExist() throws SchemaNotFoundException, IOException {
final Map<String,String> attributes = new HashMap<>();
attributes.put(RecordSetWriterLookup.RECORDWRITER_NAME_VARIABLE, "DOES-NOT-EXIST");
attributes.put(DEFAULT_ATTRIBUTE_NAME, "DOES-NOT-EXIST");
recordSetWriterLookup.getSchema(attributes, null);
}