NIFI-5805: Pool the BinaryEncoders used by the WriteAvroResultWithExternalSchema writer. Unfortunately, the writer that embeds schemas does not allow for this optimization due to the Avro API

This closes #3160.
This commit is contained in:
Mark Payne 2018-11-08 14:54:05 -05:00
parent 08189596d2
commit d3b1674813
4 changed files with 159 additions and 59 deletions

View File

@ -17,27 +17,22 @@
package org.apache.nifi.avro;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.LoadingCache;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.io.BinaryEncoder;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyDescriptor.Builder;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
@ -48,6 +43,17 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.SchemaRegistryRecordSetWriter;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@Tags({"avro", "result", "set", "writer", "serializer", "record", "recordset", "row"})
@CapabilityDescription("Writes the contents of a RecordSet in Binary Avro format.")
public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implements RecordSetWriterFactory {
@ -61,7 +67,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
LZO
}
private static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
private static final PropertyDescriptor COMPRESSION_FORMAT = new Builder()
.name("compression-format")
.displayName("Compression Format")
.description("Compression type to use when writing Avro files. Default is None.")
@ -70,19 +76,33 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
.required(true)
.build();
private LoadingCache<String, Schema> compiledAvroSchemaCache;
static final PropertyDescriptor ENCODER_POOL_SIZE = new Builder()
.name("encoder-pool-size")
.displayName("Encoder Pool Size")
.description("Avro Writers require the use of an Encoder. Creation of Encoders is expensive, but once created, they can be reused. This property controls the maximum number of Encoders that" +
" can be pooled and reused. Setting this value too small can result in degraded performance, but setting it higher can result in more heap being used. This property is ignored if the" +
" Avro Writer is configured with a Schema Write Strategy of 'Embed Avro Schema'.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("32")
.build();
static final AllowableValue AVRO_EMBEDDED = new AllowableValue("avro-embedded", "Embed Avro Schema",
"The FlowFile will have the Avro schema embedded into the content, as is typical with Avro");
static final PropertyDescriptor CACHE_SIZE = new PropertyDescriptor.Builder()
.name("cache-size")
.displayName("Cache Size")
.description("Specifies how many Schemas should be cached")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.defaultValue("1000")
.required(true)
.build();
.name("cache-size")
.displayName("Cache Size")
.description("Specifies how many Schemas should be cached")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.defaultValue("1000")
.required(true)
.build();
private LoadingCache<String, Schema> compiledAvroSchemaCache;
private volatile BlockingQueue<BinaryEncoder> encoderPool;
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
@ -90,6 +110,16 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
compiledAvroSchemaCache = Caffeine.newBuilder()
.maximumSize(cacheSize)
.build(schemaText -> new Schema.Parser().parse(schemaText));
final int capacity = context.getProperty(ENCODER_POOL_SIZE).evaluateAttributeExpressions().asInteger();
encoderPool = new LinkedBlockingQueue<>(capacity);
}
@OnDisabled
public void cleanup() {
if (encoderPool != null) {
encoderPool.clear();
}
}
@Override
@ -117,7 +147,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
if (AVRO_EMBEDDED.getValue().equals(strategyValue)) {
return new WriteAvroResultWithSchema(avroSchema, out, getCodecFactory(compressionFormat));
} else {
return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema), out);
return new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, getSchemaAccessWriter(recordSchema), out, encoderPool, getLogger());
}
} catch (final SchemaNotFoundException e) {
throw new ProcessException("Could not determine the Avro Schema to use for writing the content", e);
@ -146,6 +176,7 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(COMPRESSION_FORMAT);
properties.add(CACHE_SIZE);
properties.add(ENCODER_POOL_SIZE);
return properties;
}

View File

@ -17,22 +17,24 @@
package org.apache.nifi.avro;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.EncoderFactory;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaAccessWriter;
import org.apache.nifi.serialization.AbstractRecordSetWriter;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter {
private final SchemaAccessWriter schemaAccessWriter;
private final RecordSchema recordSchema;
@ -40,17 +42,26 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter {
private final BinaryEncoder encoder;
private final OutputStream buffered;
private final DatumWriter<GenericRecord> datumWriter;
private final BlockingQueue<BinaryEncoder> recycleQueue;
public WriteAvroResultWithExternalSchema(final Schema avroSchema, final RecordSchema recordSchema,
final SchemaAccessWriter schemaAccessWriter, final OutputStream out) throws IOException {
public WriteAvroResultWithExternalSchema(final Schema avroSchema, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccessWriter,
final OutputStream out, final BlockingQueue<BinaryEncoder> recycleQueue, final ComponentLog logger) {
super(out);
this.recordSchema = recordSchema;
this.schemaAccessWriter = schemaAccessWriter;
this.avroSchema = avroSchema;
this.buffered = new BufferedOutputStream(out);
this.recycleQueue = recycleQueue;
BinaryEncoder reusableEncoder = recycleQueue.poll();
if (reusableEncoder == null) {
logger.debug("Was not able to obtain a BinaryEncoder from reuse pool. This is normal for the first X number of iterations (where X is equal to the max size of the pool), " +
"but if this continues, it indicates that increasing the size of the pool will likely yield better performance for this Avro Writer.");
}
encoder = EncoderFactory.get().blockingBinaryEncoder(buffered, reusableEncoder);
datumWriter = new GenericDatumWriter<>(avroSchema);
encoder = EncoderFactory.get().blockingBinaryEncoder(buffered, null);
}
@Override
@ -88,4 +99,13 @@ public class WriteAvroResultWithExternalSchema extends AbstractRecordSetWriter {
public String getMimeType() {
return "application/avro-binary";
}
@Override
public void close() throws IOException {
if (encoder != null) {
recycleQueue.offer(encoder);
}
super.close();
}
}

View File

@ -17,32 +17,6 @@
package org.apache.nifi.avro;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TimeZone;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalType;
import org.apache.avro.Schema;
@ -60,6 +34,32 @@ import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.sql.Date;
import java.sql.Time;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.TimeZone;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
public abstract class TestWriteAvroResult {
protected abstract RecordSetWriter createWriter(Schema schema, OutputStream out) throws IOException;

View File

@ -17,26 +17,49 @@
package org.apache.nifi.avro;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.NopSchemaAccessWriter;
import org.apache.nifi.schema.access.WriteAvroSchemaAttributeStrategy;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.stream.io.NullOutputStream;
import org.apache.nifi.util.MockComponentLog;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
public class TestWriteAvroResultWithoutSchema extends TestWriteAvroResult {
private final BlockingQueue<BinaryEncoder> encoderPool = new LinkedBlockingQueue<>(32);
@Override
protected RecordSetWriter createWriter(final Schema schema, final OutputStream out) throws IOException {
return new WriteAvroResultWithExternalSchema(schema, AvroTypeUtil.createSchema(schema), new WriteAvroSchemaAttributeStrategy(), out);
return new WriteAvroResultWithExternalSchema(schema, AvroTypeUtil.createSchema(schema), new WriteAvroSchemaAttributeStrategy(), out, encoderPool,
new MockComponentLog("id", new Object()));
}
@Override
@ -55,4 +78,30 @@ public class TestWriteAvroResultWithoutSchema extends TestWriteAvroResult {
new Schema.Parser().parse(schemaText);
}
@Test
@Ignore("This test takes many seconds to run and is only really useful for comparing performance of the writer before and after changes, so it is @Ignored, but left in place to be run manually " +
"for performance comparisons before & after changes are made.")
public void testPerf() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema recordSchema = new SimpleRecordSchema(fields);
final OutputStream out = new NullOutputStream();
final Record record = new MapRecord(recordSchema, Collections.singletonMap("name", "John Doe"));
final Schema avroSchema = AvroTypeUtil.extractAvroSchema(recordSchema);
final ComponentLog logger = new MockComponentLog("id", new Object());
final long start = System.nanoTime();
for (int i=0; i < 10_000_000; i++) {
try (final RecordSetWriter writer = new WriteAvroResultWithExternalSchema(avroSchema, recordSchema, new NopSchemaAccessWriter(), out, encoderPool, logger)) {
writer.write(RecordSet.of(record.getSchema(), record));
}
}
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
System.out.println(millis);
}
}