diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java index 5b85f030a2..6926c939c0 100644 --- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java +++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java @@ -17,24 +17,25 @@ package org.apache.nifi.serialization; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.RecordField; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.SchemaIdentifier; + import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import org.apache.nifi.serialization.record.DataType; -import org.apache.nifi.serialization.record.RecordField; -import org.apache.nifi.serialization.record.RecordSchema; -import org.apache.nifi.serialization.record.SchemaIdentifier; - public class SimpleRecordSchema implements RecordSchema { private List fields = null; private Map fieldMap = null; private final boolean textAvailable; - private final String text; + private final AtomicReference text = new AtomicReference<>(); private final String schemaFormat; private final SchemaIdentifier schemaIdentifier; @@ -50,6 +51,10 @@ public class SimpleRecordSchema implements RecordSchema { this(text, schemaFormat, true, id); } + public SimpleRecordSchema(final SchemaIdentifier id) { + this(null, null, false, id); + } + public SimpleRecordSchema(final List fields, final String text, final String schemaFormat, final SchemaIdentifier id) { this(fields, text, schemaFormat, true, id); } @@ -60,7 +65,7 @@ public class SimpleRecordSchema implements RecordSchema { } private SimpleRecordSchema(final String text, final String schemaFormat, final boolean textAvailable, final SchemaIdentifier id) { - this.text = text; + this.text.set(text); this.schemaFormat = schemaFormat; this.schemaIdentifier = id; this.textAvailable = textAvailable; @@ -69,7 +74,7 @@ public class SimpleRecordSchema implements RecordSchema { @Override public Optional getSchemaText() { if (textAvailable) { - return Optional.ofNullable(text); + return Optional.ofNullable(text.get()); } else { return Optional.empty(); } @@ -121,13 +126,13 @@ public class SimpleRecordSchema implements RecordSchema { @Override public List getDataTypes() { - return getFields().stream().map(recordField -> recordField.getDataType()) + return getFields().stream().map(RecordField::getDataType) .collect(Collectors.toList()); } @Override public List getFieldNames() { - return getFields().stream().map(recordField -> recordField.getFieldName()) + return getFields().stream().map(RecordField::getFieldName) .collect(Collectors.toList()); } @@ -189,7 +194,19 @@ public class SimpleRecordSchema implements RecordSchema { @Override public String toString() { - return text; + String textValue = text.get(); + if (textValue != null) { + return textValue; + } + + textValue = createText(fields); + final boolean updated = text.compareAndSet(null, textValue); + + if (updated) { + return textValue; + } else { + return text.get(); + } } @Override diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java index 23f74b86d0..2e8898a495 100755 --- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java +++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java @@ -17,28 +17,6 @@ package org.apache.nifi.avro; -import java.io.IOException; -import java.math.BigDecimal; -import java.nio.ByteBuffer; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; -import java.sql.Time; -import java.sql.Timestamp; -import java.time.Duration; -import java.time.temporal.ChronoUnit; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; -import java.util.stream.Collectors; - import org.apache.avro.Conversions; import org.apache.avro.JsonProperties; import org.apache.avro.LogicalType; @@ -72,6 +50,27 @@ import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.Duration; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + public class AvroTypeUtil { private static final Logger logger = LoggerFactory.getLogger(AvroTypeUtil.class); public static final String AVRO_SCHEMA_FORMAT = "avro"; @@ -308,7 +307,7 @@ public class AvroTypeUtil { if (knownRecordTypes.containsKey(schemaFullName)) { return knownRecordTypes.get(schemaFullName); } else { - SimpleRecordSchema recordSchema = new SimpleRecordSchema(avroSchema.toString(), AVRO_SCHEMA_FORMAT, SchemaIdentifier.EMPTY); + SimpleRecordSchema recordSchema = new SimpleRecordSchema(SchemaIdentifier.EMPTY); DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema); knownRecordTypes.put(schemaFullName, recordSchemaType); @@ -353,23 +352,33 @@ public class AvroTypeUtil { return null; } - private static List getNonNullSubSchemas(Schema avroSchema) { - List unionFieldSchemas = avroSchema.getTypes(); + private static List getNonNullSubSchemas(final Schema avroSchema) { + final List unionFieldSchemas = avroSchema.getTypes(); if (unionFieldSchemas == null) { return Collections.emptyList(); } - return unionFieldSchemas.stream() - .filter(s -> s.getType() != Type.NULL) - .collect(Collectors.toList()); + + final List nonNullTypes = new ArrayList<>(unionFieldSchemas.size()); + for (final Schema fieldSchema : unionFieldSchemas) { + if (fieldSchema.getType() != Type.NULL) { + nonNullTypes.add(fieldSchema); + } + } + + return nonNullTypes; } public static RecordSchema createSchema(final Schema avroSchema) { + return createSchema(avroSchema, true); + } + + public static RecordSchema createSchema(final Schema avroSchema, final boolean includeText) { if (avroSchema == null) { throw new IllegalArgumentException("Avro Schema cannot be null"); } SchemaIdentifier identifier = new StandardSchemaIdentifier.Builder().name(avroSchema.getName()).build(); - return createSchema(avroSchema, avroSchema.toString(), identifier); + return createSchema(avroSchema, includeText ? avroSchema.toString() : null, identifier); } /** @@ -385,10 +394,10 @@ public class AvroTypeUtil { throw new IllegalArgumentException("Avro Schema cannot be null"); } - String schemaFullName = avroSchema.getNamespace() + "." + avroSchema.getName(); - SimpleRecordSchema recordSchema = new SimpleRecordSchema(schemaText, AVRO_SCHEMA_FORMAT, schemaId); - DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema); - Map knownRecords = new HashMap<>(); + final String schemaFullName = avroSchema.getNamespace() + "." + avroSchema.getName(); + final SimpleRecordSchema recordSchema = schemaText == null ? new SimpleRecordSchema(schemaId) : new SimpleRecordSchema(schemaText, AVRO_SCHEMA_FORMAT, schemaId); + final DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema); + final Map knownRecords = new HashMap<>(); knownRecords.put(schemaFullName, recordSchemaType); final List recordFields = new ArrayList<>(avroSchema.getFields().size()); @@ -752,36 +761,39 @@ public class AvroTypeUtil { * @param conversion the conversion function which takes a non-null field schema within the union field and returns a converted value * @return a converted value */ - private static Object convertUnionFieldValue(Object originalValue, Schema fieldSchema, Function conversion, final String fieldName) { - // Ignore null types in union - final List nonNullFieldSchemas = getNonNullSubSchemas(fieldSchema); - - // If at least one non-null type exists, find the first compatible type - if (nonNullFieldSchemas.size() >= 1) { - for (final Schema nonNullFieldSchema : nonNullFieldSchemas) { - final DataType desiredDataType = AvroTypeUtil.determineDataType(nonNullFieldSchema); - try { - final Object convertedValue = conversion.apply(nonNullFieldSchema); - - if (isCompatibleDataType(convertedValue, desiredDataType)) { - return convertedValue; - } - - // For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue - if (nonNullFieldSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType)) { - return convertedValue; - } - } catch (Exception e) { - // If failed with one of possible types, continue with the next available option. - if (logger.isDebugEnabled()) { - logger.debug("Cannot convert value {} to type {}", originalValue, desiredDataType, e); - } - } + private static Object convertUnionFieldValue(final Object originalValue, final Schema fieldSchema, final Function conversion, final String fieldName) { + boolean foundNonNull = false; + for (final Schema subSchema : fieldSchema.getTypes()) { + if (subSchema.getType() == Type.NULL) { + continue; } + foundNonNull = true; + final DataType desiredDataType = AvroTypeUtil.determineDataType(subSchema); + try { + final Object convertedValue = conversion.apply(subSchema); + + if (isCompatibleDataType(convertedValue, desiredDataType)) { + return convertedValue; + } + + // For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue + if (subSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType)) { + return convertedValue; + } + } catch (Exception e) { + // If failed with one of possible types, continue with the next available option. + if (logger.isDebugEnabled()) { + logger.debug("Cannot convert value {} to type {}", originalValue, desiredDataType, e); + } + } + } + + if (foundNonNull) { throw new IllegalTypeConversionException("Cannot convert value " + originalValue + " of type " + originalValue.getClass() + " because no compatible types exist in the UNION for field " + fieldName); } + return null; } @@ -875,7 +887,7 @@ public class AvroTypeUtil { final Object fieldValue = normalizeValue(avroFieldValue, field.schema(), fieldName + "/" + field.name()); values.put(field.name(), fieldValue); } - final RecordSchema childSchema = AvroTypeUtil.createSchema(recordSchema); + final RecordSchema childSchema = AvroTypeUtil.createSchema(recordSchema, false); return new MapRecord(childSchema, values); case BYTES: final ByteBuffer bb = (ByteBuffer) value; diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java index 26e3b82e2b..f35550addf 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/service/ServiceStateTransition.java @@ -22,6 +22,9 @@ import org.apache.nifi.controller.ComponentNode; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; public class ServiceStateTransition { private ControllerServiceState state = ControllerServiceState.DISABLED; @@ -29,32 +32,45 @@ public class ServiceStateTransition { private final List> disabledFutures = new ArrayList<>(); private final ControllerServiceNode serviceNode; + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private final Lock writeLock = rwLock.writeLock(); + private final Lock readLock = rwLock.readLock(); public ServiceStateTransition(final ControllerServiceNode serviceNode) { this.serviceNode = serviceNode; } - public synchronized boolean transitionToEnabling(final ControllerServiceState expectedState, final CompletableFuture enabledFuture) { - if (expectedState != state) { - return false; - } + public boolean transitionToEnabling(final ControllerServiceState expectedState, final CompletableFuture enabledFuture) { + writeLock.lock(); + try { + if (expectedState != state) { + return false; + } - state = ControllerServiceState.ENABLING; - enabledFutures.add(enabledFuture); - return true; + state = ControllerServiceState.ENABLING; + enabledFutures.add(enabledFuture); + return true; + } finally { + writeLock.unlock(); + } } - public synchronized boolean enable() { - if (state != ControllerServiceState.ENABLING) { - return false; + public boolean enable() { + writeLock.lock(); + try { + if (state != ControllerServiceState.ENABLING) { + return false; + } + + state = ControllerServiceState.ENABLED; + + validateReferences(serviceNode); + + enabledFutures.stream().forEach(future -> future.complete(null)); + return true; + } finally { + writeLock.unlock(); } - - state = ControllerServiceState.ENABLED; - - validateReferences(serviceNode); - - enabledFutures.stream().forEach(future -> future.complete(null)); - return true; } private void validateReferences(final ControllerServiceNode service) { @@ -64,22 +80,37 @@ public class ServiceStateTransition { } } - public synchronized boolean transitionToDisabling(final ControllerServiceState expectedState, final CompletableFuture disabledFuture) { - if (expectedState != state) { - return false; + public boolean transitionToDisabling(final ControllerServiceState expectedState, final CompletableFuture disabledFuture) { + writeLock.lock(); + try { + if (expectedState != state) { + return false; + } + + state = ControllerServiceState.DISABLING; + disabledFutures.add(disabledFuture); + return true; + } finally { + writeLock.unlock(); } - - state = ControllerServiceState.DISABLING; - disabledFutures.add(disabledFuture); - return true; } - public synchronized void disable() { - state = ControllerServiceState.DISABLED; - disabledFutures.stream().forEach(future -> future.complete(null)); + public void disable() { + writeLock.lock(); + try { + state = ControllerServiceState.DISABLED; + disabledFutures.stream().forEach(future -> future.complete(null)); + } finally { + writeLock.unlock(); + } } - public synchronized ControllerServiceState getState() { - return state; + public ControllerServiceState getState() { + readLock.lock(); + try { + return state; + } finally { + readLock.unlock(); + } } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java index eed37f867f..97643aa28e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java @@ -17,14 +17,6 @@ package org.apache.nifi.avro; -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; - import org.apache.avro.Schema; import org.apache.nifi.annotation.documentation.CapabilityDescription; import org.apache.nifi.annotation.documentation.Tags; @@ -35,12 +27,19 @@ import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.schema.access.SchemaAccessStrategy; import org.apache.nifi.schema.access.SchemaNotFoundException; import org.apache.nifi.schemaregistry.services.SchemaRegistry; -import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.RecordReader; import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.SchemaRegistryService; import org.apache.nifi.serialization.record.RecordSchema; +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + @Tags({"avro", "parse", "record", "row", "reader", "delimited", "comma", "separated", "values"}) @CapabilityDescription("Parses Avro data and returns each Avro record as an separate Record object. The Avro data may contain the schema itself, " + "or the schema can be externalized and accessed by one of the methods offered by the 'Schema Access Strategy' property.") @@ -83,7 +82,7 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac } @Override - public RecordReader createRecordReader(final Map variables, final InputStream in, final ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException { + public RecordReader createRecordReader(final Map variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException { final String schemaAccessStrategy = getConfigurationContext().getProperty(getSchemaAcessStrategyDescriptor()).getValue(); if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) { return new AvroReaderWithEmbeddedSchema(in); diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java index aa61e4cf27..a5e5ce712e 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithEmbeddedSchema.java @@ -17,16 +17,14 @@ package org.apache.nifi.avro; -import java.io.IOException; -import java.io.InputStream; - import org.apache.avro.Schema; import org.apache.avro.file.DataFileStream; -import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; -import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.record.RecordSchema; +import java.io.IOException; +import java.io.InputStream; + public class AvroReaderWithEmbeddedSchema extends AvroRecordReader { private final DataFileStream dataFileStream; private final InputStream in; @@ -35,7 +33,7 @@ public class AvroReaderWithEmbeddedSchema extends AvroRecordReader { public AvroReaderWithEmbeddedSchema(final InputStream in) throws IOException { this.in = in; - dataFileStream = new DataFileStream<>(in, new GenericDatumReader()); + dataFileStream = new DataFileStream<>(in, new NonCachingDatumReader<>()); this.avroSchema = dataFileStream.getSchema(); recordSchema = AvroTypeUtil.createSchema(avroSchema); } @@ -56,7 +54,7 @@ public class AvroReaderWithEmbeddedSchema extends AvroRecordReader { } @Override - public RecordSchema getSchema() throws MalformedRecordException { + public RecordSchema getSchema() { return recordSchema; } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java index ce49443b7b..9197e47833 100644 --- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java @@ -17,20 +17,17 @@ package org.apache.nifi.avro; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; - import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.DatumReader; import org.apache.avro.io.DecoderFactory; -import org.apache.nifi.schema.access.SchemaNotFoundException; -import org.apache.nifi.serialization.MalformedRecordException; import org.apache.nifi.serialization.record.RecordSchema; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + public class AvroReaderWithExplicitSchema extends AvroRecordReader { private final InputStream in; private final RecordSchema recordSchema; @@ -38,11 +35,11 @@ public class AvroReaderWithExplicitSchema extends AvroRecordReader { private final BinaryDecoder decoder; private GenericRecord genericRecord; - public AvroReaderWithExplicitSchema(final InputStream in, final RecordSchema recordSchema, final Schema avroSchema) throws IOException, SchemaNotFoundException { + public AvroReaderWithExplicitSchema(final InputStream in, final RecordSchema recordSchema, final Schema avroSchema) { this.in = in; this.recordSchema = recordSchema; - datumReader = new GenericDatumReader(avroSchema); + datumReader = new NonCachingDatumReader<>(avroSchema); decoder = DecoderFactory.get().binaryDecoder(in, null); } @@ -67,7 +64,7 @@ public class AvroReaderWithExplicitSchema extends AvroRecordReader { } @Override - public RecordSchema getSchema() throws MalformedRecordException { + public RecordSchema getSchema() { return recordSchema; } } diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/NonCachingDatumReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/NonCachingDatumReader.java new file mode 100644 index 0000000000..fa4ab7d306 --- /dev/null +++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/NonCachingDatumReader.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.avro; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.io.Decoder; + +import java.io.IOException; + +/** + * Override the GenericDatumReader to provide a much more efficient implementation of #readString. The one that is provided by + * GenericDatumReader performs very poorly in some cases because it uses an IdentityHashMap with the key being the Schema so that + * it can stash away the "stringClass" but that performs far worse than just calling JsonNode#getProp. I.e., {@link #readString(Object, Schema, Decoder)} + * in GenericDatumReader calls #getStringClass, which uses an IdentityHashMap to cache results in order to avoid calling {@link #findStringClass(Schema)}. + * However, {@link #findStringClass(Schema)} is much more efficient than using an IdentityHashMap anyway. Additionally, the performance of {@link #findStringClass(Schema)}} + * can be improved slightly and made more readable. + */ +public class NonCachingDatumReader extends GenericDatumReader { + public NonCachingDatumReader() { + super(); + } + + public NonCachingDatumReader(final Schema schema) { + super(schema); + } + + @Override + protected Object readString(final Object old, final Schema expected, final Decoder in) throws IOException { + final Class stringClass = findStringClass(expected); + if (stringClass == String.class) { + return in.readString(); + } + + if (stringClass == CharSequence.class) { + return readString(old, in); + } + + return newInstanceFromString(stringClass, in.readString()); + } + + protected Class findStringClass(Schema schema) { + final String name = schema.getProp(GenericData.STRING_PROP); + if ("String".equals(name)) { + return String.class; + } + + return CharSequence.class; + } +}