mirror of https://github.com/apache/nifi.git
NIFI-3912: This closes #1809. Fixed NPE that would result in validation failure for FreeFormTextRecordSetWriter
Signed-off-by: joewitt <joewitt@apache.org>
This commit is contained in:
parent
2f3ba57dd6
commit
ce1bc42ac5
|
@ -21,6 +21,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -124,6 +125,10 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
|
||||||
}
|
}
|
||||||
|
|
||||||
protected SchemaAccessWriter getSchemaWriteStrategy(final String allowableValue) {
|
protected SchemaAccessWriter getSchemaWriteStrategy(final String allowableValue) {
|
||||||
|
if (allowableValue == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_ATTRIBUTE.getValue())) {
|
if (allowableValue.equalsIgnoreCase(SCHEMA_NAME_ATTRIBUTE.getValue())) {
|
||||||
return new SchemaNameAsAttribute();
|
return new SchemaNameAsAttribute();
|
||||||
} else if (allowableValue.equalsIgnoreCase(AVRO_SCHEMA_ATTRIBUTE.getValue())) {
|
} else if (allowableValue.equalsIgnoreCase(AVRO_SCHEMA_ATTRIBUTE.getValue())) {
|
||||||
|
@ -140,6 +145,10 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
|
||||||
protected Set<SchemaField> getRequiredSchemaFields(final ValidationContext validationContext) {
|
protected Set<SchemaField> getRequiredSchemaFields(final ValidationContext validationContext) {
|
||||||
final String writeStrategyValue = validationContext.getProperty(getSchemaWriteStrategyDescriptor()).getValue();
|
final String writeStrategyValue = validationContext.getProperty(getSchemaWriteStrategyDescriptor()).getValue();
|
||||||
final SchemaAccessWriter writer = getSchemaWriteStrategy(writeStrategyValue);
|
final SchemaAccessWriter writer = getSchemaWriteStrategy(writeStrategyValue);
|
||||||
|
if (writer == null) {
|
||||||
|
return EnumSet.noneOf(SchemaField.class);
|
||||||
|
}
|
||||||
|
|
||||||
final Set<SchemaField> requiredFields = writer.getRequiredSchemaFields();
|
final Set<SchemaField> requiredFields = writer.getRequiredSchemaFields();
|
||||||
return requiredFields;
|
return requiredFields;
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,6 +38,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.EnumSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
@ -103,6 +104,11 @@ public abstract class SchemaRegistryService extends AbstractControllerService {
|
||||||
}
|
}
|
||||||
|
|
||||||
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException {
|
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException {
|
||||||
|
final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy();
|
||||||
|
if (accessStrategy == null) {
|
||||||
|
throw new SchemaNotFoundException("Could not determine the Schema Access Strategy for this service");
|
||||||
|
}
|
||||||
|
|
||||||
return getSchemaAccessStrategy().getSchema(flowFile, contentStream);
|
return getSchemaAccessStrategy().getSchema(flowFile, contentStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,15 +127,25 @@ public abstract class SchemaRegistryService extends AbstractControllerService {
|
||||||
final SchemaRegistry schemaRegistry = validationContext.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
|
final SchemaRegistry schemaRegistry = validationContext.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
|
||||||
final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy(accessStrategyValue, schemaRegistry, validationContext);
|
final SchemaAccessStrategy accessStrategy = getSchemaAccessStrategy(accessStrategyValue, schemaRegistry, validationContext);
|
||||||
|
|
||||||
|
if (accessStrategy == null) {
|
||||||
|
return EnumSet.noneOf(SchemaField.class);
|
||||||
|
}
|
||||||
final Set<SchemaField> suppliedFields = accessStrategy.getSuppliedSchemaFields();
|
final Set<SchemaField> suppliedFields = accessStrategy.getSuppliedSchemaFields();
|
||||||
return suppliedFields;
|
return suppliedFields;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
|
protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
|
||||||
|
if (allowableValue == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
return SchemaAccessUtils.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
|
return SchemaAccessUtils.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) {
|
protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) {
|
||||||
|
if (allowableValue == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
return SchemaAccessUtils.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
|
return SchemaAccessUtils.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,8 +17,11 @@
|
||||||
|
|
||||||
package org.apache.nifi.text;
|
package org.apache.nifi.text;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
import org.apache.nifi.annotation.documentation.CapabilityDescription;
|
||||||
|
@ -27,11 +30,14 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.PropertyValue;
|
import org.apache.nifi.components.PropertyValue;
|
||||||
import org.apache.nifi.controller.ConfigurationContext;
|
import org.apache.nifi.controller.ConfigurationContext;
|
||||||
|
import org.apache.nifi.flowfile.FlowFile;
|
||||||
import org.apache.nifi.logging.ComponentLog;
|
import org.apache.nifi.logging.ComponentLog;
|
||||||
import org.apache.nifi.processor.util.StandardValidators;
|
import org.apache.nifi.processor.util.StandardValidators;
|
||||||
|
import org.apache.nifi.schema.access.SchemaNotFoundException;
|
||||||
import org.apache.nifi.serialization.RecordSetWriter;
|
import org.apache.nifi.serialization.RecordSetWriter;
|
||||||
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
import org.apache.nifi.serialization.RecordSetWriterFactory;
|
||||||
import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter;
|
import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter;
|
||||||
|
import org.apache.nifi.serialization.SimpleRecordSchema;
|
||||||
import org.apache.nifi.serialization.record.RecordSchema;
|
import org.apache.nifi.serialization.record.RecordSchema;
|
||||||
|
|
||||||
@Tags({"text", "freeform", "expression", "language", "el", "record", "recordset", "resultset", "writer", "serialize"})
|
@Tags({"text", "freeform", "expression", "language", "el", "record", "recordset", "resultset", "writer", "serialize"})
|
||||||
|
@ -39,6 +45,8 @@ import org.apache.nifi.serialization.record.RecordSchema;
|
||||||
+ "text is able to make use of the Expression Language to reference each of the fields that are available "
|
+ "text is able to make use of the Expression Language to reference each of the fields that are available "
|
||||||
+ "in a Record. Each record in the RecordSet will be separated by a single newline character.")
|
+ "in a Record. Each record in the RecordSet will be separated by a single newline character.")
|
||||||
public class FreeFormTextRecordSetWriter extends SchemaRegistryRecordSetWriter implements RecordSetWriterFactory {
|
public class FreeFormTextRecordSetWriter extends SchemaRegistryRecordSetWriter implements RecordSetWriterFactory {
|
||||||
|
private static final RecordSchema EMPTY_SCHEMA = new SimpleRecordSchema(Collections.emptyList());
|
||||||
|
|
||||||
static final PropertyDescriptor TEXT = new PropertyDescriptor.Builder()
|
static final PropertyDescriptor TEXT = new PropertyDescriptor.Builder()
|
||||||
.name("Text")
|
.name("Text")
|
||||||
.description("The text to use when writing the results. This property will evaluate the Expression Language using any of the fields available in a Record.")
|
.description("The text to use when writing the results. This property will evaluate the Expression Language using any of the fields available in a Record.")
|
||||||
|
@ -76,4 +84,9 @@ public class FreeFormTextRecordSetWriter extends SchemaRegistryRecordSetWriter i
|
||||||
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema) {
|
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema) {
|
||||||
return new FreeFormTextWriter(textValue, characterSet);
|
return new FreeFormTextWriter(textValue, characterSet);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException {
|
||||||
|
return EMPTY_SCHEMA;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue