mirror of https://github.com/apache/nifi.git
NIFI-3832: Fixed AvroRecordSetWriter validation NPE.
This closes #1768.
This commit is contained in:
parent
3af53419af
commit
744ecc3d83
|
@ -60,7 +60,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RecordSetWriter createWriter(final ComponentLog logger, final FlowFile flowFile, final InputStream in) throws IOException {
|
public RecordSetWriter createWriter(final ComponentLog logger, final FlowFile flowFile, final InputStream in) throws IOException {
|
||||||
final String strategyValue = getConfigurationContext().getProperty(SCHEMA_WRITE_STRATEGY).getValue();
|
final String strategyValue = getConfigurationContext().getProperty(getSchemaWriteStrategyDescriptor()).getValue();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
final RecordSchema recordSchema = getSchema(flowFile, in);
|
final RecordSchema recordSchema = getSchema(flowFile, in);
|
||||||
|
@ -130,7 +130,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Set<SchemaField> getRequiredSchemaFields(final ValidationContext validationContext) {
|
protected Set<SchemaField> getRequiredSchemaFields(final ValidationContext validationContext) {
|
||||||
final String writeStrategyValue = validationContext.getProperty(SCHEMA_WRITE_STRATEGY).getValue();
|
final String writeStrategyValue = validationContext.getProperty(getSchemaWriteStrategyDescriptor()).getValue();
|
||||||
if (writeStrategyValue.equalsIgnoreCase(AVRO_EMBEDDED.getValue())) {
|
if (writeStrategyValue.equalsIgnoreCase(AVRO_EMBEDDED.getValue())) {
|
||||||
return requiredSchemaFields;
|
return requiredSchemaFields;
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,7 +58,12 @@ public abstract class SchemaRegistryRecordSetWriter extends SchemaRegistryServic
|
||||||
"The FlowFile will be given a set of 3 attributes to describe the schema: 'schema.identifier', 'schema.version', and 'schema.protocol.version'. Note that if "
|
"The FlowFile will be given a set of 3 attributes to describe the schema: 'schema.identifier', 'schema.version', and 'schema.protocol.version'. Note that if "
|
||||||
+ "the schema for a record does not contain the necessary identifier and version, an Exception will be thrown when attempting to write the data.");
|
+ "the schema for a record does not contain the necessary identifier and version, an Exception will be thrown when attempting to write the data.");
|
||||||
|
|
||||||
protected static final PropertyDescriptor SCHEMA_WRITE_STRATEGY = new PropertyDescriptor.Builder()
|
/**
|
||||||
|
* This constant is just a base spec for the actual PropertyDescriptor.
|
||||||
|
* As it can be overridden by subclasses with different AllowableValues and default value,
|
||||||
|
* {@link #getSchemaWriteStrategyDescriptor()} should be used to get the actual descriptor, instead of using this constant directly.
|
||||||
|
*/
|
||||||
|
private static final PropertyDescriptor SCHEMA_WRITE_STRATEGY = new PropertyDescriptor.Builder()
|
||||||
.name("Schema Write Strategy")
|
.name("Schema Write Strategy")
|
||||||
.description("Specifies how the schema for a Record should be added to the data.")
|
.description("Specifies how the schema for a Record should be added to the data.")
|
||||||
.allowableValues(SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)
|
.allowableValues(SCHEMA_NAME_ATTRIBUTE, AVRO_SCHEMA_ATTRIBUTE, HWX_SCHEMA_REF_ATTRIBUTES, HWX_CONTENT_ENCODED_SCHEMA)
|
||||||
|
|
Loading…
Reference in New Issue