optimize input row parsers (#6590)

* optimize input row parsers

* address comments
This commit is contained in:
Mingming Qiu 2018-11-16 11:48:32 +08:00 committed by Benedict Jin
parent 7b41e23cbb
commit 93b0d58571
8 changed files with 67 additions and 29 deletions

View File

@ -36,6 +36,7 @@ import java.util.Map;
public class MapInputRowParser implements InputRowParser<Map<String, Object>> public class MapInputRowParser implements InputRowParser<Map<String, Object>>
{ {
private final ParseSpec parseSpec; private final ParseSpec parseSpec;
private final List<String> dimensions;
@JsonCreator @JsonCreator
public MapInputRowParser( public MapInputRowParser(
@ -43,20 +44,20 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
) )
{ {
this.parseSpec = parseSpec; this.parseSpec = parseSpec;
this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
} }
@Override @Override
public List<InputRow> parseBatch(Map<String, Object> theMap) public List<InputRow> parseBatch(Map<String, Object> theMap)
{ {
final List<String> dimensions = parseSpec.getDimensionsSpec().hasCustomDimensions() final List<String> dimensions;
? parseSpec.getDimensionsSpec().getDimensionNames() if (!this.dimensions.isEmpty()) {
: Lists.newArrayList( dimensions = this.dimensions;
Sets.difference( } else {
theMap.keySet(), dimensions = Lists.newArrayList(
parseSpec.getDimensionsSpec() Sets.difference(theMap.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions())
.getDimensionExclusions()
)
); );
}
final DateTime timestamp; final DateTime timestamp;
try { try {
@ -75,7 +76,7 @@ public class MapInputRowParser implements InputRowParser<Map<String, Object>>
throw new ParseException(e, "Unparseable timestamp found! Event: %s", theMap); throw new ParseException(e, "Unparseable timestamp found! Event: %s", theMap);
} }
return ImmutableList.of(new MapBasedInputRow(timestamp.getMillis(), dimensions, theMap)); return ImmutableList.of(new MapBasedInputRow(timestamp, dimensions, theMap));
} }
@JsonProperty @JsonProperty

View File

@ -25,6 +25,7 @@ import com.google.common.base.Function;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.InputRowParser;
@ -129,6 +130,14 @@ public class OrcHadoopInputRowParser implements InputRowParser<OrcStruct>
TimestampSpec timestampSpec = parseSpec.getTimestampSpec(); TimestampSpec timestampSpec = parseSpec.getTimestampSpec();
DateTime dateTime = timestampSpec.extractTimestamp(map); DateTime dateTime = timestampSpec.extractTimestamp(map);
final List<String> dimensions;
if (!this.dimensions.isEmpty()) {
dimensions = this.dimensions;
} else {
dimensions = Lists.newArrayList(
Sets.difference(map.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions())
);
}
return ImmutableList.of(new MapBasedInputRow(dateTime, dimensions, map)); return ImmutableList.of(new MapBasedInputRow(dateTime, dimensions, map));
} }

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.twitter.elephantbird.mapreduce.io.ThriftWritable; import com.twitter.elephantbird.mapreduce.io.ThriftWritable;
import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.MapBasedInputRow;
@ -55,6 +57,7 @@ public class ThriftInputRowParser implements InputRowParser<Object>
private Parser<String, Object> parser; private Parser<String, Object> parser;
private volatile Class<TBase> thriftClass = null; private volatile Class<TBase> thriftClass = null;
private final List<String> dimensions;
@JsonCreator @JsonCreator
public ThriftInputRowParser( public ThriftInputRowParser(
@ -68,6 +71,7 @@ public class ThriftInputRowParser implements InputRowParser<Object>
Preconditions.checkNotNull(thriftClassName, "thrift class name"); Preconditions.checkNotNull(thriftClassName, "thrift class name");
this.parseSpec = parseSpec; this.parseSpec = parseSpec;
this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
} }
public Class<TBase> getThriftClass() public Class<TBase> getThriftClass()
@ -139,10 +143,17 @@ public class ThriftInputRowParser implements InputRowParser<Object>
} }
Map<String, Object> record = parser.parseToMap(json); Map<String, Object> record = parser.parseToMap(json);
final List<String> dimensions;
if (!this.dimensions.isEmpty()) {
dimensions = this.dimensions;
} else {
dimensions = Lists.newArrayList(
Sets.difference(record.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions())
);
}
return ImmutableList.of(new MapBasedInputRow( return ImmutableList.of(new MapBasedInputRow(
parseSpec.getTimestampSpec().extractTimestamp(record), parseSpec.getTimestampSpec().extractTimestamp(record),
parseSpec.getDimensionsSpec().getDimensionNames(), dimensions,
record record
)); ));
} }

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.druid.data.input.avro.AvroParsers; import org.apache.druid.data.input.avro.AvroParsers;
import org.apache.druid.data.input.impl.InputRowParser; import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.java.util.common.parsers.ObjectFlattener; import org.apache.druid.java.util.common.parsers.ObjectFlattener;
@ -33,6 +34,7 @@ public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
private final ParseSpec parseSpec; private final ParseSpec parseSpec;
private final boolean fromPigAvroStorage; private final boolean fromPigAvroStorage;
private final ObjectFlattener<GenericRecord> avroFlattener; private final ObjectFlattener<GenericRecord> avroFlattener;
private final MapInputRowParser mapParser;
@JsonCreator @JsonCreator
public AvroHadoopInputRowParser( public AvroHadoopInputRowParser(
@ -43,12 +45,13 @@ public class AvroHadoopInputRowParser implements InputRowParser<GenericRecord>
this.parseSpec = parseSpec; this.parseSpec = parseSpec;
this.fromPigAvroStorage = fromPigAvroStorage == null ? false : fromPigAvroStorage; this.fromPigAvroStorage = fromPigAvroStorage == null ? false : fromPigAvroStorage;
this.avroFlattener = AvroParsers.makeFlattener(parseSpec, this.fromPigAvroStorage, false); this.avroFlattener = AvroParsers.makeFlattener(parseSpec, this.fromPigAvroStorage, false);
this.mapParser = new MapInputRowParser(parseSpec);
} }
@Override @Override
public List<InputRow> parseBatch(GenericRecord record) public List<InputRow> parseBatch(GenericRecord record)
{ {
return AvroParsers.parseGenericRecord(record, parseSpec, avroFlattener); return AvroParsers.parseGenericRecord(record, mapParser, avroFlattener);
} }
@JsonProperty @JsonProperty

View File

@ -24,6 +24,7 @@ import com.google.common.base.Preconditions;
import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.GenericRecord;
import org.apache.druid.data.input.avro.AvroBytesDecoder; import org.apache.druid.data.input.avro.AvroBytesDecoder;
import org.apache.druid.data.input.avro.AvroParsers; import org.apache.druid.data.input.avro.AvroParsers;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.data.input.impl.ParseSpec;
import org.apache.druid.java.util.common.parsers.ObjectFlattener; import org.apache.druid.java.util.common.parsers.ObjectFlattener;
@ -36,6 +37,7 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser
private final ParseSpec parseSpec; private final ParseSpec parseSpec;
private final AvroBytesDecoder avroBytesDecoder; private final AvroBytesDecoder avroBytesDecoder;
private final ObjectFlattener<GenericRecord> avroFlattener; private final ObjectFlattener<GenericRecord> avroFlattener;
private final MapInputRowParser mapParser;
@JsonCreator @JsonCreator
public AvroStreamInputRowParser( public AvroStreamInputRowParser(
@ -46,12 +48,13 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser
this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec"); this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec");
this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder"); this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder");
this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, false); this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, false);
this.mapParser = new MapInputRowParser(parseSpec);
} }
@Override @Override
public List<InputRow> parseBatch(ByteBuffer input) public List<InputRow> parseBatch(ByteBuffer input)
{ {
return AvroParsers.parseGenericRecord(avroBytesDecoder.parse(input), parseSpec, avroFlattener); return AvroParsers.parseGenericRecord(avroBytesDecoder.parse(input), mapParser, avroFlattener);
} }
@JsonProperty @JsonProperty

View File

@ -54,10 +54,10 @@ public class AvroParsers
public static List<InputRow> parseGenericRecord( public static List<InputRow> parseGenericRecord(
GenericRecord record, GenericRecord record,
ParseSpec parseSpec, MapInputRowParser mapParser,
ObjectFlattener<GenericRecord> avroFlattener ObjectFlattener<GenericRecord> avroFlattener
) )
{ {
return new MapInputRowParser(parseSpec).parseBatch(avroFlattener.flatten(record)); return mapParser.parseBatch(avroFlattener.flatten(record));
} }
} }

View File

@ -21,6 +21,7 @@ package org.apache.druid.data.input.parquet.avro;
import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.apache.avro.LogicalType; import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes; import org.apache.avro.LogicalTypes;
@ -40,7 +41,6 @@ import org.apache.druid.java.util.common.parsers.ObjectFlatteners;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -51,7 +51,7 @@ public class ParquetAvroHadoopInputRowParser implements InputRowParser<GenericRe
private final boolean binaryAsString; private final boolean binaryAsString;
private final TimestampSpec timestampSpec; private final TimestampSpec timestampSpec;
private final ObjectFlattener<GenericRecord> recordFlattener; private final ObjectFlattener<GenericRecord> recordFlattener;
private final List<String> dimensions;
@JsonCreator @JsonCreator
public ParquetAvroHadoopInputRowParser( public ParquetAvroHadoopInputRowParser(
@ -61,6 +61,7 @@ public class ParquetAvroHadoopInputRowParser implements InputRowParser<GenericRe
{ {
this.parseSpec = parseSpec; this.parseSpec = parseSpec;
this.timestampSpec = parseSpec.getTimestampSpec(); this.timestampSpec = parseSpec.getTimestampSpec();
this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
this.binaryAsString = binaryAsString == null ? false : binaryAsString; this.binaryAsString = binaryAsString == null ? false : binaryAsString;
final JSONPathSpec flattenSpec; final JSONPathSpec flattenSpec;
@ -95,15 +96,14 @@ public class ParquetAvroHadoopInputRowParser implements InputRowParser<GenericRe
{ {
Map<String, Object> row = recordFlattener.flatten(record); Map<String, Object> row = recordFlattener.flatten(record);
final List<String> dimensions = parseSpec.getDimensionsSpec().hasCustomDimensions() final List<String> dimensions;
? parseSpec.getDimensionsSpec().getDimensionNames() if (!this.dimensions.isEmpty()) {
: new ArrayList( dimensions = this.dimensions;
Sets.difference( } else {
row.keySet(), dimensions = Lists.newArrayList(
parseSpec.getDimensionsSpec() Sets.difference(row.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions())
.getDimensionExclusions()
)
); );
}
// check for parquet Date // check for parquet Date
// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date
LogicalType logicalType = determineTimestampSpecLogicalType(record.getSchema(), timestampSpec.getTimestampColumn()); LogicalType logicalType = determineTimestampSpecLogicalType(record.getSchema(), timestampSpec.getTimestampColumn());

View File

@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import com.github.os72.protobuf.dynamic.DynamicSchema; import com.github.os72.protobuf.dynamic.DynamicSchema;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.Descriptors.Descriptor;
@ -53,7 +55,7 @@ public class ProtobufInputRowParser implements ByteBufferInputRowParser
private final String protoMessageType; private final String protoMessageType;
private final Descriptor descriptor; private final Descriptor descriptor;
private Parser<String, Object> parser; private Parser<String, Object> parser;
private final List<String> dimensions;
@JsonCreator @JsonCreator
public ProtobufInputRowParser( public ProtobufInputRowParser(
@ -66,6 +68,7 @@ public class ProtobufInputRowParser implements ByteBufferInputRowParser
this.descriptorFilePath = descriptorFilePath; this.descriptorFilePath = descriptorFilePath;
this.protoMessageType = protoMessageType; this.protoMessageType = protoMessageType;
this.descriptor = getDescriptor(descriptorFilePath); this.descriptor = getDescriptor(descriptorFilePath);
this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames();
} }
@Override @Override
@ -98,9 +101,17 @@ public class ProtobufInputRowParser implements ByteBufferInputRowParser
} }
Map<String, Object> record = parser.parseToMap(json); Map<String, Object> record = parser.parseToMap(json);
final List<String> dimensions;
if (!this.dimensions.isEmpty()) {
dimensions = this.dimensions;
} else {
dimensions = Lists.newArrayList(
Sets.difference(record.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions())
);
}
return ImmutableList.of(new MapBasedInputRow( return ImmutableList.of(new MapBasedInputRow(
parseSpec.getTimestampSpec().extractTimestamp(record), parseSpec.getTimestampSpec().extractTimestamp(record),
parseSpec.getDimensionsSpec().getDimensionNames(), dimensions,
record record
)); ));
} }