diff --git a/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java b/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java index 133e574f567..80db2c43965 100644 --- a/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java +++ b/core/src/main/java/org/apache/druid/data/input/impl/MapInputRowParser.java @@ -36,6 +36,7 @@ import java.util.Map; public class MapInputRowParser implements InputRowParser> { private final ParseSpec parseSpec; + private final List dimensions; @JsonCreator public MapInputRowParser( @@ -43,20 +44,20 @@ public class MapInputRowParser implements InputRowParser> ) { this.parseSpec = parseSpec; + this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames(); } @Override public List parseBatch(Map theMap) { - final List dimensions = parseSpec.getDimensionsSpec().hasCustomDimensions() - ? parseSpec.getDimensionsSpec().getDimensionNames() - : Lists.newArrayList( - Sets.difference( - theMap.keySet(), - parseSpec.getDimensionsSpec() - .getDimensionExclusions() - ) - ); + final List dimensions; + if (!this.dimensions.isEmpty()) { + dimensions = this.dimensions; + } else { + dimensions = Lists.newArrayList( + Sets.difference(theMap.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions()) + ); + } final DateTime timestamp; try { @@ -75,7 +76,7 @@ public class MapInputRowParser implements InputRowParser> throw new ParseException(e, "Unparseable timestamp found! Event: %s", theMap); } - return ImmutableList.of(new MapBasedInputRow(timestamp.getMillis(), dimensions, theMap)); + return ImmutableList.of(new MapBasedInputRow(timestamp, dimensions, theMap)); } @JsonProperty diff --git a/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java b/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java index 21b249c54c3..e31bcd88dba 100644 --- a/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java +++ b/extensions-contrib/orc-extensions/src/main/java/org/apache/druid/data/input/orc/OrcHadoopInputRowParser.java @@ -25,6 +25,7 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.InputRowParser; @@ -129,6 +130,14 @@ public class OrcHadoopInputRowParser implements InputRowParser TimestampSpec timestampSpec = parseSpec.getTimestampSpec(); DateTime dateTime = timestampSpec.extractTimestamp(map); + final List dimensions; + if (!this.dimensions.isEmpty()) { + dimensions = this.dimensions; + } else { + dimensions = Lists.newArrayList( + Sets.difference(map.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions()) + ); + } return ImmutableList.of(new MapBasedInputRow(dateTime, dimensions, map)); } diff --git a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputRowParser.java b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputRowParser.java index 740b888ee78..8e148d207d9 100644 --- a/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputRowParser.java +++ b/extensions-contrib/thrift-extensions/src/main/java/org/apache/druid/data/input/thrift/ThriftInputRowParser.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.twitter.elephantbird.mapreduce.io.ThriftWritable; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedInputRow; @@ -55,6 +57,7 @@ public class ThriftInputRowParser implements InputRowParser private Parser parser; private volatile Class thriftClass = null; + private final List dimensions; @JsonCreator public ThriftInputRowParser( @@ -68,6 +71,7 @@ public class ThriftInputRowParser implements InputRowParser Preconditions.checkNotNull(thriftClassName, "thrift class name"); this.parseSpec = parseSpec; + this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames(); } public Class getThriftClass() @@ -139,10 +143,17 @@ public class ThriftInputRowParser implements InputRowParser } Map record = parser.parseToMap(json); - + final List dimensions; + if (!this.dimensions.isEmpty()) { + dimensions = this.dimensions; + } else { + dimensions = Lists.newArrayList( + Sets.difference(record.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions()) + ); + } return ImmutableList.of(new MapBasedInputRow( parseSpec.getTimestampSpec().extractTimestamp(record), - parseSpec.getDimensionsSpec().getDimensionNames(), + dimensions, record )); } diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java index 19de982b974..83b27349b18 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroHadoopInputRowParser.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.avro.generic.GenericRecord; import org.apache.druid.data.input.avro.AvroParsers; import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.java.util.common.parsers.ObjectFlattener; @@ -33,6 +34,7 @@ public class AvroHadoopInputRowParser implements InputRowParser private final ParseSpec parseSpec; private final boolean fromPigAvroStorage; private final ObjectFlattener avroFlattener; + private final MapInputRowParser mapParser; @JsonCreator public AvroHadoopInputRowParser( @@ -43,12 +45,13 @@ public class AvroHadoopInputRowParser implements InputRowParser this.parseSpec = parseSpec; this.fromPigAvroStorage = fromPigAvroStorage == null ? false : fromPigAvroStorage; this.avroFlattener = AvroParsers.makeFlattener(parseSpec, this.fromPigAvroStorage, false); + this.mapParser = new MapInputRowParser(parseSpec); } @Override public List parseBatch(GenericRecord record) { - return AvroParsers.parseGenericRecord(record, parseSpec, avroFlattener); + return AvroParsers.parseGenericRecord(record, mapParser, avroFlattener); } @JsonProperty diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java index ab5c2d36547..749970f6505 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/AvroStreamInputRowParser.java @@ -24,6 +24,7 @@ import com.google.common.base.Preconditions; import org.apache.avro.generic.GenericRecord; import org.apache.druid.data.input.avro.AvroBytesDecoder; import org.apache.druid.data.input.avro.AvroParsers; +import org.apache.druid.data.input.impl.MapInputRowParser; import org.apache.druid.data.input.impl.ParseSpec; import org.apache.druid.java.util.common.parsers.ObjectFlattener; @@ -36,6 +37,7 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser private final ParseSpec parseSpec; private final AvroBytesDecoder avroBytesDecoder; private final ObjectFlattener avroFlattener; + private final MapInputRowParser mapParser; @JsonCreator public AvroStreamInputRowParser( @@ -46,12 +48,13 @@ public class AvroStreamInputRowParser implements ByteBufferInputRowParser this.parseSpec = Preconditions.checkNotNull(parseSpec, "parseSpec"); this.avroBytesDecoder = Preconditions.checkNotNull(avroBytesDecoder, "avroBytesDecoder"); this.avroFlattener = AvroParsers.makeFlattener(parseSpec, false, false); + this.mapParser = new MapInputRowParser(parseSpec); } @Override public List parseBatch(ByteBuffer input) { - return AvroParsers.parseGenericRecord(avroBytesDecoder.parse(input), parseSpec, avroFlattener); + return AvroParsers.parseGenericRecord(avroBytesDecoder.parse(input), mapParser, avroFlattener); } @JsonProperty diff --git a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java index 1adb4bf827e..92ea3ae1bda 100644 --- a/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java +++ b/extensions-core/avro-extensions/src/main/java/org/apache/druid/data/input/avro/AvroParsers.java @@ -54,10 +54,10 @@ public class AvroParsers public static List parseGenericRecord( GenericRecord record, - ParseSpec parseSpec, + MapInputRowParser mapParser, ObjectFlattener avroFlattener ) { - return new MapInputRowParser(parseSpec).parseBatch(avroFlattener.flatten(record)); + return mapParser.parseBatch(avroFlattener.flatten(record)); } } diff --git a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java index cdf1f85c9a5..330d9e11f37 100755 --- a/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java +++ b/extensions-core/parquet-extensions/src/main/java/org/apache/druid/data/input/parquet/avro/ParquetAvroHadoopInputRowParser.java @@ -21,6 +21,7 @@ package org.apache.druid.data.input.parquet.avro; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.avro.LogicalType; import org.apache.avro.LogicalTypes; @@ -40,7 +41,6 @@ import org.apache.druid.java.util.common.parsers.ObjectFlatteners; import org.joda.time.DateTime; import javax.annotation.Nullable; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -51,7 +51,7 @@ public class ParquetAvroHadoopInputRowParser implements InputRowParser recordFlattener; - + private final List dimensions; @JsonCreator public ParquetAvroHadoopInputRowParser( @@ -61,6 +61,7 @@ public class ParquetAvroHadoopInputRowParser implements InputRowParser row = recordFlattener.flatten(record); - final List dimensions = parseSpec.getDimensionsSpec().hasCustomDimensions() - ? parseSpec.getDimensionsSpec().getDimensionNames() - : new ArrayList( - Sets.difference( - row.keySet(), - parseSpec.getDimensionsSpec() - .getDimensionExclusions() - ) - ); + final List dimensions; + if (!this.dimensions.isEmpty()) { + dimensions = this.dimensions; + } else { + dimensions = Lists.newArrayList( + Sets.difference(row.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions()) + ); + } // check for parquet Date // https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date LogicalType logicalType = determineTimestampSpecLogicalType(record.getSchema(), timestampSpec.getTimestampColumn()); diff --git a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java index b9075a116c5..b75e4002850 100644 --- a/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java +++ b/extensions-core/protobuf-extensions/src/main/java/org/apache/druid/data/input/protobuf/ProtobufInputRowParser.java @@ -23,6 +23,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.github.os72.protobuf.dynamic.DynamicSchema; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors; import com.google.protobuf.Descriptors.Descriptor; @@ -53,7 +55,7 @@ public class ProtobufInputRowParser implements ByteBufferInputRowParser private final String protoMessageType; private final Descriptor descriptor; private Parser parser; - + private final List dimensions; @JsonCreator public ProtobufInputRowParser( @@ -66,6 +68,7 @@ public class ProtobufInputRowParser implements ByteBufferInputRowParser this.descriptorFilePath = descriptorFilePath; this.protoMessageType = protoMessageType; this.descriptor = getDescriptor(descriptorFilePath); + this.dimensions = parseSpec.getDimensionsSpec().getDimensionNames(); } @Override @@ -98,9 +101,17 @@ public class ProtobufInputRowParser implements ByteBufferInputRowParser } Map record = parser.parseToMap(json); + final List dimensions; + if (!this.dimensions.isEmpty()) { + dimensions = this.dimensions; + } else { + dimensions = Lists.newArrayList( + Sets.difference(record.keySet(), parseSpec.getDimensionsSpec().getDimensionExclusions()) + ); + } return ImmutableList.of(new MapBasedInputRow( parseSpec.getTimestampSpec().extractTimestamp(record), - parseSpec.getDimensionsSpec().getDimensionNames(), + dimensions, record )); }