Some code refactor for better performance of `Avro-Extension` (#4092)

* 1. Collections.singletonList instand of Arrays.asList; 2. close FSDataInputStream/ByteBufferInputStream for releasing resource; 3. convert com.google.common.base.Function into java.util.function.Function; 4. others code refactor

* Put each param on its own line for  code style

* Revert GenericRecordAsMap back about `Function`
This commit is contained in:
Benedict Jin 2017-04-25 11:46:32 +08:00 committed by Gian Merlino
parent d51097c809
commit de815da942
9 changed files with 27 additions and 32 deletions

View File

@ -53,8 +53,13 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser
return parseGenericRecord(avroBytesDecoder.parse(input), parseSpec, dimensions, false, false); return parseGenericRecord(avroBytesDecoder.parse(input), parseSpec, dimensions, false, false);
} }
protected static InputRow parseGenericRecord(GenericRecord record, ParseSpec parseSpec, List<String> dimensions, protected static InputRow parseGenericRecord(
boolean fromPigAvroStorage, boolean binaryAsString) GenericRecord record,
ParseSpec parseSpec,
List<String> dimensions,
boolean fromPigAvroStorage,
boolean binaryAsString
)
{ {
GenericRecordAsMap genericRecordAsMap = new GenericRecordAsMap(record, fromPigAvroStorage, binaryAsString); GenericRecordAsMap genericRecordAsMap = new GenericRecordAsMap(record, fromPigAvroStorage, binaryAsString);
TimestampSpec timestampSpec = parseSpec.getTimestampSpec(); TimestampSpec timestampSpec = parseSpec.getTimestampSpec();

View File

@ -36,7 +36,7 @@ import org.schemarepo.ValidatorFactory;
import org.schemarepo.json.GsonJsonUtil; import org.schemarepo.json.GsonJsonUtil;
import org.schemarepo.json.JsonUtil; import org.schemarepo.json.JsonUtil;
import java.util.Arrays; import java.util.Collections;
import java.util.List; import java.util.List;
public class AvroExtensionsModule implements DruidModule public class AvroExtensionsModule implements DruidModule
@ -46,7 +46,7 @@ public class AvroExtensionsModule implements DruidModule
@Override @Override
public List<? extends Module> getJacksonModules() public List<? extends Module> getJacksonModules()
{ {
return Arrays.asList( return Collections.singletonList(
new SimpleModule("AvroInputRowParserModule") new SimpleModule("AvroInputRowParserModule")
.registerSubtypes( .registerSubtypes(
new NamedType(AvroStreamInputRowParser.class, "avro_stream"), new NamedType(AvroStreamInputRowParser.class, "avro_stream"),

View File

@ -18,6 +18,7 @@
*/ */
package io.druid.data.input.avro; package io.druid.data.input.avro;
import io.druid.java.util.common.logger.Logger;
import org.apache.avro.Schema; import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapreduce.AvroJob; import org.apache.avro.mapreduce.AvroJob;
@ -31,8 +32,6 @@ import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import io.druid.java.util.common.logger.Logger;
import java.io.IOException; import java.io.IOException;
public class AvroValueInputFormat extends FileInputFormat<NullWritable, GenericRecord> public class AvroValueInputFormat extends FileInputFormat<NullWritable, GenericRecord>
@ -55,13 +54,10 @@ public class AvroValueInputFormat extends FileInputFormat<NullWritable, GenericR
String schemaFilePath = context.getConfiguration().get(CONF_INPUT_VALUE_SCHEMA_PATH); String schemaFilePath = context.getConfiguration().get(CONF_INPUT_VALUE_SCHEMA_PATH);
if (StringUtils.isNotBlank(schemaFilePath)) { if (StringUtils.isNotBlank(schemaFilePath)) {
log.info("Using file: %s as reader schema.", schemaFilePath); log.info("Using file: %s as reader schema.", schemaFilePath);
FSDataInputStream inputStream = FileSystem.get(context.getConfiguration()).open(new Path(schemaFilePath)); try (FSDataInputStream inputStream =
try { FileSystem.get(context.getConfiguration()).open(new Path(schemaFilePath))) {
readerSchema = new Schema.Parser().parse(inputStream); readerSchema = new Schema.Parser().parse(inputStream);
} }
finally {
inputStream.close();
}
} }
} }

View File

@ -48,6 +48,7 @@ public class InlineSchemaAvroBytesDecoder implements AvroBytesDecoder
private final Schema schemaObj; private final Schema schemaObj;
private final Map<String, Object> schema; private final Map<String, Object> schema;
private final DatumReader<GenericRecord> reader;
@JsonCreator @JsonCreator
public InlineSchemaAvroBytesDecoder( public InlineSchemaAvroBytesDecoder(
@ -61,7 +62,8 @@ public class InlineSchemaAvroBytesDecoder implements AvroBytesDecoder
String schemaStr = mapper.writeValueAsString(schema); String schemaStr = mapper.writeValueAsString(schema);
LOGGER.info("Schema string [%s]", schemaStr); LOGGER.info("Schema string [%s]", schemaStr);
schemaObj = new Schema.Parser().parse(schemaStr); this.schemaObj = new Schema.Parser().parse(schemaStr);
this.reader = new GenericDatumReader<>(this.schemaObj);
} }
//For UT only //For UT only
@ -69,6 +71,7 @@ public class InlineSchemaAvroBytesDecoder implements AvroBytesDecoder
InlineSchemaAvroBytesDecoder(Schema schemaObj) InlineSchemaAvroBytesDecoder(Schema schemaObj)
{ {
this.schemaObj = schemaObj; this.schemaObj = schemaObj;
this.reader = new GenericDatumReader<>(schemaObj);
this.schema = null; this.schema = null;
} }
@ -81,9 +84,7 @@ public class InlineSchemaAvroBytesDecoder implements AvroBytesDecoder
@Override @Override
public GenericRecord parse(ByteBuffer bytes) public GenericRecord parse(ByteBuffer bytes)
{ {
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schemaObj); try (ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes))) {
ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes));
try {
return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null)); return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null));
} }
catch (EOFException eof) { catch (EOFException eof) {

View File

@ -72,7 +72,6 @@ public class InlineSchemasAvroBytesDecoder implements AvroBytesDecoder
Map<String, Object> schema = e.getValue(); Map<String, Object> schema = e.getValue();
String schemaStr = mapper.writeValueAsString(schema); String schemaStr = mapper.writeValueAsString(schema);
;
LOGGER.info("Schema string [%s] = [%s]", id, schemaStr); LOGGER.info("Schema string [%s] = [%s]", id, schemaStr);
schemaObjs.put(id, new Schema.Parser().parse(schemaStr)); schemaObjs.put(id, new Schema.Parser().parse(schemaStr));
@ -116,10 +115,8 @@ public class InlineSchemasAvroBytesDecoder implements AvroBytesDecoder
throw new ParseException("Failed to find schema for id [%s]", schemaId); throw new ParseException("Failed to find schema for id [%s]", schemaId);
} }
try { DatumReader<GenericRecord> reader = new GenericDatumReader<>(schemaObj);
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schemaObj); try (ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes))) {
ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes));
return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null)); return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null));
} }
catch (EOFException eof) { catch (EOFException eof) {

View File

@ -47,7 +47,8 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
} }
//For UT only //For UT only
@VisibleForTesting SchemaRegistryBasedAvroBytesDecoder(SchemaRegistryClient registry) @VisibleForTesting
SchemaRegistryBasedAvroBytesDecoder(SchemaRegistryClient registry)
{ {
this.registry = registry; this.registry = registry;
} }
@ -63,7 +64,8 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
Schema schema = registry.getByID(id); Schema schema = registry.getByID(id);
DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema); DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null)); return reader.read(null, DecoderFactory.get().binaryDecoder(bytes.array(), offset, length, null));
} catch (Exception e) { }
catch (Exception e) {
throw new ParseException(e, "Fail to decode avro message!"); throw new ParseException(e, "Fail to decode avro message!");
} }
} }
@ -81,7 +83,6 @@ public class SchemaRegistryBasedAvroBytesDecoder implements AvroBytesDecoder
SchemaRegistryBasedAvroBytesDecoder that = (SchemaRegistryBasedAvroBytesDecoder) o; SchemaRegistryBasedAvroBytesDecoder that = (SchemaRegistryBasedAvroBytesDecoder) o;
return registry != null ? registry.equals(that.registry) : that.registry == null; return registry != null ? registry.equals(that.registry) : that.registry == null;
} }
@Override @Override

View File

@ -52,7 +52,7 @@ public class SchemaRepoBasedAvroBytesDecoder<SUBJECT, ID> implements AvroBytesDe
{ {
this.subjectAndIdConverter = subjectAndIdConverter; this.subjectAndIdConverter = subjectAndIdConverter;
this.schemaRepository = schemaRepository; this.schemaRepository = schemaRepository;
typedRepository = new TypedSchemaRepository<ID, Schema, SUBJECT>( this.typedRepository = new TypedSchemaRepository<>(
schemaRepository, schemaRepository,
subjectAndIdConverter.getIdConverter(), subjectAndIdConverter.getIdConverter(),
new AvroSchemaConverter(false), new AvroSchemaConverter(false),
@ -77,9 +77,8 @@ public class SchemaRepoBasedAvroBytesDecoder<SUBJECT, ID> implements AvroBytesDe
{ {
Pair<SUBJECT, ID> subjectAndId = subjectAndIdConverter.getSubjectAndId(bytes); Pair<SUBJECT, ID> subjectAndId = subjectAndIdConverter.getSubjectAndId(bytes);
Schema schema = typedRepository.getSchema(subjectAndId.lhs, subjectAndId.rhs); Schema schema = typedRepository.getSchema(subjectAndId.lhs, subjectAndId.rhs);
DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema); DatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes)); try (ByteBufferInputStream inputStream = new ByteBufferInputStream(Collections.singletonList(bytes))) {
try {
return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null)); return reader.read(null, DecoderFactory.get().binaryDecoder(inputStream, null));
} }
catch (EOFException eof) { catch (EOFException eof) {

View File

@ -20,9 +20,7 @@ package io.druid.data.input.schemarepo;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.java.util.common.Pair; import io.druid.java.util.common.Pair;
import org.schemarepo.api.converter.Converter; import org.schemarepo.api.converter.Converter;
import org.schemarepo.api.converter.IdentityConverter; import org.schemarepo.api.converter.IdentityConverter;
import org.schemarepo.api.converter.IntegerConverter; import org.schemarepo.api.converter.IntegerConverter;
@ -51,7 +49,7 @@ public class Avro1124SubjectAndIdConverter implements SubjectAndIdConverter<Stri
@Override @Override
public Pair<String, Integer> getSubjectAndId(ByteBuffer payload) public Pair<String, Integer> getSubjectAndId(ByteBuffer payload)
{ {
return new Pair<String, Integer>(topic, payload.getInt()); return new Pair<>(topic, payload.getInt());
} }
@Override @Override

View File

@ -20,9 +20,7 @@ package io.druid.data.input.schemarepo;
import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo; import com.fasterxml.jackson.annotation.JsonTypeInfo;
import io.druid.java.util.common.Pair; import io.druid.java.util.common.Pair;
import org.schemarepo.api.converter.Converter; import org.schemarepo.api.converter.Converter;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;