From de815da9422ae87bc7cff58a154d098908e6625f Mon Sep 17 00:00:00 2001 From: Benedict Jin <1571805553@qq.com> Date: Tue, 25 Apr 2017 11:46:32 +0800 Subject: [PATCH] Some code refactor for better performance of `Avro-Extension` (#4092) * 1. Collections.singletonList instand of Arrays.asList; 2. close FSDataInputStream/ByteBufferInputStream for releasing resource; 3. convert com.google.common.base.Function into java.util.function.Function; 4. others code refactor * Put each param on its own line for code style * Revert GenericRecordAsMap back about `Function` --- .../io/druid/data/input/AvroStreamInputRowParser.java | 9 +++++++-- .../io/druid/data/input/avro/AvroExtensionsModule.java | 4 ++-- .../io/druid/data/input/avro/AvroValueInputFormat.java | 10 +++------- .../data/input/avro/InlineSchemaAvroBytesDecoder.java | 9 +++++---- .../data/input/avro/InlineSchemasAvroBytesDecoder.java | 7 ++----- .../avro/SchemaRegistryBasedAvroBytesDecoder.java | 7 ++++--- .../input/avro/SchemaRepoBasedAvroBytesDecoder.java | 7 +++---- .../schemarepo/Avro1124SubjectAndIdConverter.java | 4 +--- .../data/input/schemarepo/SubjectAndIdConverter.java | 2 -- 9 files changed, 27 insertions(+), 32 deletions(-) diff --git a/extensions-core/avro-extensions/src/main/java/io/druid/data/input/AvroStreamInputRowParser.java b/extensions-core/avro-extensions/src/main/java/io/druid/data/input/AvroStreamInputRowParser.java index 9fbd96d0992..e054b35b6e4 100644 --- a/extensions-core/avro-extensions/src/main/java/io/druid/data/input/AvroStreamInputRowParser.java +++ b/extensions-core/avro-extensions/src/main/java/io/druid/data/input/AvroStreamInputRowParser.java @@ -53,8 +53,13 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser return parseGenericRecord(avroBytesDecoder.parse(input), parseSpec, dimensions, false, false); } - protected static InputRow parseGenericRecord(GenericRecord record, ParseSpec parseSpec, List dimensions, - boolean fromPigAvroStorage, boolean binaryAsString) + protected static InputRow parseGenericRecord( + GenericRecord record, + ParseSpec parseSpec, + List dimensions, + boolean fromPigAvroStorage, + boolean binaryAsString + ) { GenericRecordAsMap genericRecordAsMap = new GenericRecordAsMap(record, fromPigAvroStorage, binaryAsString); TimestampSpec timestampSpec = parseSpec.getTimestampSpec(); diff --git a/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/AvroExtensionsModule.java b/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/AvroExtensionsModule.java index 68158b52ee8..7078b3e469e 100644 --- a/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/AvroExtensionsModule.java +++ b/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/AvroExtensionsModule.java @@ -36,7 +36,7 @@ import org.schemarepo.ValidatorFactory; import org.schemarepo.json.GsonJsonUtil; import org.schemarepo.json.JsonUtil; -import java.util.Arrays; +import java.util.Collections; import java.util.List; public class AvroExtensionsModule implements DruidModule @@ -46,7 +46,7 @@ public class AvroExtensionsModule implements DruidModule @Override public List getJacksonModules() { - return Arrays.asList( + return Collections.singletonList( new SimpleModule("AvroInputRowParserModule") .registerSubtypes( new NamedType(AvroStreamInputRowParser.class, "avro_stream"), diff --git a/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/AvroValueInputFormat.java b/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/AvroValueInputFormat.java index 00893fce30a..40d422287e9 100644 --- a/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/AvroValueInputFormat.java +++ b/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/AvroValueInputFormat.java @@ -18,6 +18,7 @@ */ package io.druid.data.input.avro; +import io.druid.java.util.common.logger.Logger; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.mapreduce.AvroJob; @@ -31,8 +32,6 @@ import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import io.druid.java.util.common.logger.Logger; - import java.io.IOException; public class AvroValueInputFormat extends FileInputFormat @@ -55,13 +54,10 @@ public class AvroValueInputFormat extends FileInputFormat schema; + private final DatumReader reader; @JsonCreator public InlineSchemaAvroBytesDecoder( @@ -61,7 +62,8 @@ public class InlineSchemaAvroBytesDecoder implements AvroBytesDecoder String schemaStr = mapper.writeValueAsString(schema); LOGGER.info("Schema string [%s]", schemaStr); - schemaObj = new Schema.Parser().parse(schemaStr); + this.schemaObj = new Schema.Parser().parse(schemaStr); + this.reader = new GenericDatumReader<>(this.schemaObj); } //For UT only @@ -69,6 +71,7 @@ public class InlineSchemaAvroBytesDecoder implements AvroBytesDecoder InlineSchemaAvroBytesDecoder(Schema schemaObj) { this.schemaObj = schemaObj; + this.reader = new GenericDatumReader<>(schemaObj); this.schema = null; } @@ -81,9 +84,7 @@ public class InlineSchemaAvroBytesDecoder implements AvroBytesDecoder @Override public GenericRecord parse(ByteBuffer bytes) { - DatumReader reader = new GenericDatumReader(schemaObj); - ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes)); - try { + try (ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes))) { return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null)); } catch (EOFException eof) { diff --git a/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/InlineSchemasAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/InlineSchemasAvroBytesDecoder.java index 2f9cf203ab3..c8a6abe9800 100644 --- a/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/InlineSchemasAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/InlineSchemasAvroBytesDecoder.java @@ -72,7 +72,6 @@ public class InlineSchemasAvroBytesDecoder implements AvroBytesDecoder Map schema = e.getValue(); String schemaStr = mapper.writeValueAsString(schema); - ; LOGGER.info("Schema string [%s] = [%s]", id, schemaStr); schemaObjs.put(id, new Schema.Parser().parse(schemaStr)); @@ -116,10 +115,8 @@ public class InlineSchemasAvroBytesDecoder implements AvroBytesDecoder throw new ParseException("Failed to find schema for id [%s]", schemaId); } - try { - DatumReader reader = new GenericDatumReader(schemaObj); - ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes)); - + DatumReader reader = new GenericDatumReader<>(schemaObj); + try (ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes))) { return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null)); } catch (EOFException eof) { diff --git a/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java index 7839a0fb3eb..05952648422 100644 --- a/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/SchemaRegistryBasedAvroBytesDecoder.java @@ -47,7 +47,8 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder } //For UT only - @VisibleForTesting SchemaRegistryBasedAvroBytesDecoder(SchemaRegistryClient registry) + @VisibleForTesting + SchemaRegistryBasedAvroBytesDecoder(SchemaRegistryClient registry) { this.registry = registry; } @@ -63,7 +64,8 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder Schema schema = registry.getByID(id); DatumReader reader = new GenericDatumReader<>(schema); return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null)); - } catch (Exception e) { + } + catch (Exception e) { throw new ParseException(e, "Fail to decode avro message!"); } } @@ -81,7 +83,6 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder SchemaRegistryBasedAvroBytesDecoder that = (SchemaRegistryBasedAvroBytesDecoder) o; return registry != null ? registry.equals(that.registry) : that.registry == null; - } @Override diff --git a/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java b/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java index 01f56c18c73..1cebd3f1fee 100644 --- a/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java +++ b/extensions-core/avro-extensions/src/main/java/io/druid/data/input/avro/SchemaRepoBasedAvroBytesDecoder.java @@ -52,7 +52,7 @@ public class SchemaRepoBasedAvroBytesDecoder implements AvroBytesDe { this.subjectAndIdConverter = subjectAndIdConverter; this.schemaRepository = schemaRepository; - typedRepository = new TypedSchemaRepository( + this.typedRepository = new TypedSchemaRepository<>( schemaRepository, subjectAndIdConverter.getIdConverter(), new AvroSchemaConverter(false), @@ -77,9 +77,8 @@ public class SchemaRepoBasedAvroBytesDecoder implements AvroBytesDe { Pair subjectAndId = subjectAndIdConverter.getSubjectAndId(bytes); Schema schema = typedRepository.getSchema(subjectAndId.lhs, subjectAndId.rhs); - DatumReader reader = new GenericDatumReader(schema); - ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes)); - try { + DatumReader reader = new GenericDatumReader<>(schema); + try (ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes))) { return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null)); } catch (EOFException eof) { diff --git a/extensions-core/avro-extensions/src/main/java/io/druid/data/input/schemarepo/Avro1124SubjectAndIdConverter.java b/extensions-core/avro-extensions/src/main/java/io/druid/data/input/schemarepo/Avro1124SubjectAndIdConverter.java index 1babb9e0676..8f9e151feb2 100644 --- a/extensions-core/avro-extensions/src/main/java/io/druid/data/input/schemarepo/Avro1124SubjectAndIdConverter.java +++ b/extensions-core/avro-extensions/src/main/java/io/druid/data/input/schemarepo/Avro1124SubjectAndIdConverter.java @@ -20,9 +20,7 @@ package io.druid.data.input.schemarepo; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; - import io.druid.java.util.common.Pair; - import org.schemarepo.api.converter.Converter; import org.schemarepo.api.converter.IdentityConverter; import org.schemarepo.api.converter.IntegerConverter; @@ -51,7 +49,7 @@ public class Avro1124SubjectAndIdConverter implements SubjectAndIdConverter getSubjectAndId(ByteBuffer payload) { - return new Pair(topic, payload.getInt()); + return new Pair<>(topic, payload.getInt()); } @Override diff --git a/extensions-core/avro-extensions/src/main/java/io/druid/data/input/schemarepo/SubjectAndIdConverter.java b/extensions-core/avro-extensions/src/main/java/io/druid/data/input/schemarepo/SubjectAndIdConverter.java index b09f530bc9b..40769a462a6 100644 --- a/extensions-core/avro-extensions/src/main/java/io/druid/data/input/schemarepo/SubjectAndIdConverter.java +++ b/extensions-core/avro-extensions/src/main/java/io/druid/data/input/schemarepo/SubjectAndIdConverter.java @@ -20,9 +20,7 @@ package io.druid.data.input.schemarepo; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; - import io.druid.java.util.common.Pair; - import org.schemarepo.api.converter.Converter; import java.nio.ByteBuffer;