diff --git a/client/src/main/java/com/metamx/druid/query/filter/SpatialDimFilter.java b/client/src/main/java/com/metamx/druid/query/filter/SpatialDimFilter.java index 98caa674989..4e7f672a8d3 100644 --- a/client/src/main/java/com/metamx/druid/query/filter/SpatialDimFilter.java +++ b/client/src/main/java/com/metamx/druid/query/filter/SpatialDimFilter.java @@ -21,7 +21,7 @@ package com.metamx.druid.query.filter; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import com.metamx.common.spatial.rtree.search.Bound; +import com.metamx.collections.spatial.search.Bound; import java.nio.ByteBuffer; diff --git a/client/src/main/java/com/metamx/druid/query/group/DefaultLimitSpec.java b/client/src/main/java/com/metamx/druid/query/group/DefaultLimitSpec.java new file mode 100644 index 00000000000..1eb9bbb24f9 --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/group/DefaultLimitSpec.java @@ -0,0 +1,81 @@ +package com.metamx.druid.query.group; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Joiner; +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; + +import java.nio.ByteBuffer; +import java.util.Comparator; +import java.util.List; + +/** + */ +public class DefaultLimitSpec implements LimitSpec +{ + private static final byte CACHE_TYPE_ID = 0x0; + private static Joiner JOINER = Joiner.on(""); + + private final List orderBy; + private final int limit; + + @JsonCreator + public DefaultLimitSpec( + @JsonProperty("orderBy") List orderBy, + @JsonProperty("limit") int limit + ) + { + this.orderBy = (orderBy == null) ? Lists.newArrayList() : orderBy; + this.limit = limit; + } + + public DefaultLimitSpec() + { + this.orderBy = Lists.newArrayList(); + this.limit = 0; + } + + @JsonProperty + @Override + public List getOrderBy() + { + return orderBy; + } + + @JsonProperty + @Override + public int getLimit() + { + return limit; + } + + @Override + public Comparator getComparator() + { + return null; + } + + @Override + public byte[] getCacheKey() + { + byte[] orderByBytes = JOINER.join(orderBy).getBytes(); + + byte[] limitBytes = Ints.toByteArray(limit); + + return ByteBuffer.allocate(1 + orderByBytes.length + limitBytes.length) + .put(CACHE_TYPE_ID) + .put(orderByBytes) + .put(limitBytes) + .array(); + } + + @Override + public String toString() + { + return "DefaultLimitSpec{" + + "orderBy='" + orderBy + '\'' + + ", limit=" + limit + + '}'; + } +} diff --git a/client/src/main/java/com/metamx/druid/query/group/GroupByQuery.java b/client/src/main/java/com/metamx/druid/query/group/GroupByQuery.java index a82c1fae7d5..6f8eb052cb1 100644 --- a/client/src/main/java/com/metamx/druid/query/group/GroupByQuery.java +++ b/client/src/main/java/com/metamx/druid/query/group/GroupByQuery.java @@ -49,7 +49,7 @@ public class GroupByQuery extends BaseQuery return new Builder(); } - private final int threshold; + private final LimitSpec limitSpec; private final DimFilter dimFilter; private final QueryGranularity granularity; private final List dimensions; @@ -60,7 +60,7 @@ public class GroupByQuery extends BaseQuery public GroupByQuery( @JsonProperty("dataSource") String dataSource, @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, - @JsonProperty("threshold") int threshold, + @JsonProperty("limitSpec") LimitSpec limitSpec, @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("granularity") QueryGranularity granularity, @JsonProperty("dimensions") List dimensions, @@ -70,7 +70,7 @@ public class GroupByQuery extends BaseQuery ) { super(dataSource, querySegmentSpec, context); - this.threshold = threshold; + this.limitSpec = (limitSpec == null) ? new DefaultLimitSpec() : limitSpec; this.dimFilter = dimFilter; this.granularity = granularity; this.dimensions = dimensions == null ? ImmutableList.of() : dimensions; @@ -82,10 +82,10 @@ public class GroupByQuery extends BaseQuery Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs); } - @JsonProperty("threshold") - public int getThreshold() + @JsonProperty + public LimitSpec getLimitSpec() { - return threshold; + return limitSpec; } @JsonProperty("filter") @@ -136,7 +136,7 @@ public class GroupByQuery extends BaseQuery return new GroupByQuery( getDataSource(), getQuerySegmentSpec(), - threshold, + limitSpec, dimFilter, granularity, dimensions, @@ -152,7 +152,7 @@ public class GroupByQuery extends BaseQuery return new GroupByQuery( getDataSource(), spec, - threshold, + limitSpec, dimFilter, granularity, dimensions, @@ -166,7 +166,7 @@ public class GroupByQuery extends BaseQuery { private String dataSource; private QuerySegmentSpec querySegmentSpec; - private int threshold; + private LimitSpec limitSpec; private DimFilter dimFilter; private QueryGranularity granularity; private List dimensions; @@ -180,7 +180,7 @@ public class GroupByQuery extends BaseQuery { dataSource = builder.dataSource; querySegmentSpec = builder.querySegmentSpec; - threshold = builder.threshold; + limitSpec = builder.limitSpec; dimFilter = builder.dimFilter; granularity = builder.granularity; dimensions = builder.dimensions; @@ -200,9 +200,9 @@ public class GroupByQuery extends BaseQuery return setQuerySegmentSpec(new LegacySegmentSpec(interval)); } - public Builder setThreshold(int threshold) + public Builder setLimitSpec(LimitSpec limitSpec) { - this.threshold = threshold; + this.limitSpec = limitSpec; return this; } @@ -298,7 +298,7 @@ public class GroupByQuery extends BaseQuery return new GroupByQuery( dataSource, querySegmentSpec, - threshold, + limitSpec, dimFilter, granularity, dimensions, diff --git a/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java b/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java index f0d418bbe01..4a70ef5ca9a 100644 --- a/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java +++ b/client/src/main/java/com/metamx/druid/query/group/GroupByQueryQueryToolChest.java @@ -21,12 +21,12 @@ package com.metamx.druid.query.group; import com.fasterxml.jackson.core.type.TypeReference; import com.google.common.base.Function; -import com.google.common.base.Functions; import com.google.common.base.Joiner; -import com.google.common.base.Predicate; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import com.google.common.collect.Ordering; +import com.google.common.primitives.Longs; import com.metamx.common.ISE; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.ConcatSequence; @@ -35,10 +35,8 @@ import com.metamx.common.guava.Sequences; import com.metamx.druid.Query; import com.metamx.druid.QueryGranularity; import com.metamx.druid.aggregation.AggregatorFactory; -import com.metamx.druid.aggregation.post.PostAggregator; import com.metamx.druid.index.v1.IncrementalIndex; import com.metamx.druid.initialization.Initialization; -import com.metamx.druid.input.MapBasedInputRow; import com.metamx.druid.input.MapBasedRow; import com.metamx.druid.input.Row; import com.metamx.druid.input.Rows; @@ -46,15 +44,14 @@ import com.metamx.druid.query.MetricManipulationFn; import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryToolChest; import com.metamx.druid.query.dimension.DimensionSpec; -import com.metamx.druid.result.Result; -import com.metamx.druid.result.TimeseriesResultValue; import com.metamx.druid.utils.PropUtils; import com.metamx.emitter.service.ServiceMetricEvent; - import org.joda.time.Interval; import org.joda.time.Minutes; import javax.annotation.Nullable; +import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -150,20 +147,27 @@ public class GroupByQueryQueryToolChest extends QueryToolChest retVal = Sequences.map( + Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())), + new Function() + { + @Override + public Row apply(Row input) + { + final MapBasedRow row = (MapBasedRow) input; + return new MapBasedRow(gran.toDateTime(row.getTimestampFromEpoch()), row.getEvent()); + } + } + ); + + // sort results to be returned + if (!query.getLimitSpec().getOrderBy().isEmpty()) { + retVal = Sequences.sort(retVal, makeComparator(query)); + } + return Sequences.limit( - Sequences.map( - Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())), - new Function() - { - @Override - public Row apply(Row input) - { - final MapBasedRow row = (MapBasedRow) input; - return new MapBasedRow(gran.toDateTime(row.getTimestampFromEpoch()), row.getEvent()); - } - } - ), - (query.getThreshold() > 0) ? query.getThreshold() : maxRows + retVal, + (query.getLimitSpec().getLimit() > 0) ? query.getLimitSpec().getLimit() : maxRows ); } @@ -217,4 +221,46 @@ public class GroupByQueryQueryToolChest extends QueryToolChest makeComparator(GroupByQuery query) + { + Ordering ordering = new Ordering() + { + @Override + public int compare(Row left, Row right) + { + return Longs.compare(left.getTimestampFromEpoch(), right.getTimestampFromEpoch()); + } + }; + + for (final String dimension : query.getLimitSpec().getOrderBy()) { + ordering = ordering.compound( + new Comparator() + { + @Override + public int compare(Row left, Row right) + { + if (left instanceof MapBasedRow && right instanceof MapBasedRow) { + // There are no multi-value dimensions at this point, they should have been flattened out + String leftDimVal = left.getDimension(dimension).get(0); + String rightDimVal = right.getDimension(dimension).get(0); + return leftDimVal.compareTo(rightDimVal); + } else { + throw new ISE("Unknown type for rows[%s, %s]", left.getClass(), right.getClass()); + } + } + } + ); + } + final Ordering theOrdering = ordering; + + return new Comparator() + { + @Override + public int compare(Row row, Row row2) + { + return theOrdering.compare(row, row2); + } + }; + } } diff --git a/client/src/main/java/com/metamx/druid/query/group/LimitSpec.java b/client/src/main/java/com/metamx/druid/query/group/LimitSpec.java new file mode 100644 index 00000000000..3660f75b00d --- /dev/null +++ b/client/src/main/java/com/metamx/druid/query/group/LimitSpec.java @@ -0,0 +1,43 @@ +/* + * Druid - a distributed column store. + * Copyright (C) 2012 Metamarkets Group Inc. + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + */ + +package com.metamx.druid.query.group; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +import java.util.Comparator; +import java.util.List; + +/** + */ +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +@JsonSubTypes(value = { + @JsonSubTypes.Type(name = "default", value = DefaultLimitSpec.class) +}) +public interface LimitSpec +{ + public List getOrderBy(); + + public int getLimit(); + + public Comparator getComparator(); + + public byte[] getCacheKey(); +} diff --git a/index-common/pom.xml b/index-common/pom.xml index 0bae9b7a70d..d6b0cefb949 100644 --- a/index-common/pom.xml +++ b/index-common/pom.xml @@ -18,7 +18,8 @@ ~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. --> - + 4.0.0 com.metamx.druid druid-index-common @@ -42,6 +43,10 @@ com.metamx java-util + + com.metamx + bytebuffer-collections + com.ning diff --git a/index-common/src/main/java/com/metamx/druid/index/column/SpatialIndex.java b/index-common/src/main/java/com/metamx/druid/index/column/SpatialIndex.java index 20a6ff5b9ac..27502e13f35 100644 --- a/index-common/src/main/java/com/metamx/druid/index/column/SpatialIndex.java +++ b/index-common/src/main/java/com/metamx/druid/index/column/SpatialIndex.java @@ -18,7 +18,7 @@ */ package com.metamx.druid.index.column; -import com.metamx.common.spatial.rtree.ImmutableRTree; +import com.metamx.collections.spatial.ImmutableRTree; /** */ diff --git a/index-common/src/main/java/com/metamx/druid/index/serde/DictionaryEncodedColumnPartSerde.java b/index-common/src/main/java/com/metamx/druid/index/serde/DictionaryEncodedColumnPartSerde.java index 2f0ae4906d6..afc6069c98a 100644 --- a/index-common/src/main/java/com/metamx/druid/index/serde/DictionaryEncodedColumnPartSerde.java +++ b/index-common/src/main/java/com/metamx/druid/index/serde/DictionaryEncodedColumnPartSerde.java @@ -21,10 +21,12 @@ package com.metamx.druid.index.serde; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.primitives.Ints; +import com.metamx.collections.spatial.ImmutableRTree; import com.metamx.common.IAE; -import com.metamx.common.spatial.rtree.ImmutableRTree; import com.metamx.druid.index.column.ColumnBuilder; import com.metamx.druid.index.column.ValueType; +import com.metamx.druid.kv.ByteBufferSerializer; import com.metamx.druid.kv.ConciseCompressedIndexedInts; import com.metamx.druid.kv.GenericIndexed; import com.metamx.druid.kv.IndexedRTree; @@ -50,7 +52,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde private final VSizeIndexedInts singleValuedColumn; private final VSizeIndexed multiValuedColumn; private final GenericIndexed bitmaps; - private final GenericIndexed spatialIndex; + private final ImmutableRTree spatialIndex; private final int size; @@ -59,7 +61,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde VSizeIndexedInts singleValCol, VSizeIndexed multiValCol, GenericIndexed bitmaps, - GenericIndexed spatialIndex + ImmutableRTree spatialIndex ) { this.dictionary = dictionary; @@ -78,7 +80,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde } size += bitmaps.getSerializedSize(); if (spatialIndex != null) { - size += spatialIndex.getSerializedSize(); + size += spatialIndex.size() + Ints.BYTES; } this.size = size; @@ -117,7 +119,9 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde multiValuedColumn.writeToChannel(channel); } bitmaps.writeToChannel(channel); - spatialIndex.writeToChannel(channel); + if (spatialIndex != null) { + ByteBufferSerializer.writeToChannel(spatialIndex, IndexedRTree.objectStrategy, channel); + } } @Override @@ -146,9 +150,9 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde buffer, ConciseCompressedIndexedInts.objectStrategy ); - GenericIndexed spatialIndex = null; + ImmutableRTree spatialIndex = null; if (buffer.hasRemaining()) { - spatialIndex = GenericIndexed.read( + spatialIndex = ByteBufferSerializer.read( buffer, IndexedRTree.objectStrategy ); } diff --git a/index-common/src/main/java/com/metamx/druid/index/serde/SpatialIndexColumnPartSupplier.java b/index-common/src/main/java/com/metamx/druid/index/serde/SpatialIndexColumnPartSupplier.java index 5c9c9e415b8..660d1cb3561 100644 --- a/index-common/src/main/java/com/metamx/druid/index/serde/SpatialIndexColumnPartSupplier.java +++ b/index-common/src/main/java/com/metamx/druid/index/serde/SpatialIndexColumnPartSupplier.java @@ -19,7 +19,7 @@ package com.metamx.druid.index.serde; import com.google.common.base.Supplier; -import com.metamx.common.spatial.rtree.ImmutableRTree; +import com.metamx.collections.spatial.ImmutableRTree; import com.metamx.druid.index.column.SpatialIndex; import com.metamx.druid.kv.GenericIndexed; @@ -29,13 +29,13 @@ public class SpatialIndexColumnPartSupplier implements Supplier { private static final ImmutableRTree EMPTY_SET = new ImmutableRTree(); - private final GenericIndexed indexedTree; + private final ImmutableRTree indexedTree; public SpatialIndexColumnPartSupplier( - GenericIndexed indexedTree + ImmutableRTree indexedTree ) { - this.indexedTree = indexedTree; + this.indexedTree = (indexedTree == null) ? EMPTY_SET : indexedTree; } @Override @@ -46,8 +46,7 @@ public class SpatialIndexColumnPartSupplier implements Supplier @Override public ImmutableRTree getRTree() { - // There is only ever 1 RTree per dimension - return indexedTree.get(0); + return indexedTree; } }; } diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/ComplexMetricColumnSerializer.java b/index-common/src/main/java/com/metamx/druid/index/v1/ComplexMetricColumnSerializer.java index d09581c1c5b..55032f54da0 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/ComplexMetricColumnSerializer.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/ComplexMetricColumnSerializer.java @@ -21,7 +21,7 @@ package com.metamx.druid.index.v1; import com.google.common.io.Files; import com.metamx.druid.index.v1.serde.ComplexMetricSerde; -import com.metamx.druid.kv.FlattenedArrayWriter; +import com.metamx.druid.kv.GenericIndexedWriter; import com.metamx.druid.kv.IOPeon; import java.io.File; @@ -36,7 +36,7 @@ public class ComplexMetricColumnSerializer implements MetricColumnSerializer private final IOPeon ioPeon; private final File outDir; - private FlattenedArrayWriter writer; + private GenericIndexedWriter writer; public ComplexMetricColumnSerializer( String metricName, @@ -55,7 +55,7 @@ public class ComplexMetricColumnSerializer implements MetricColumnSerializer @Override public void open() throws IOException { - writer = new FlattenedArrayWriter( + writer = new GenericIndexedWriter( ioPeon, String.format("%s_%s", metricName, outDir.getName()), serde.getObjectStrategy() ); diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/CompressedFloatsSupplierSerializer.java b/index-common/src/main/java/com/metamx/druid/index/v1/CompressedFloatsSupplierSerializer.java index 090ab7b0c7a..441ea2d0519 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/CompressedFloatsSupplierSerializer.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/CompressedFloatsSupplierSerializer.java @@ -25,7 +25,7 @@ import com.google.common.io.OutputSupplier; import com.google.common.primitives.Ints; import com.metamx.druid.collect.ResourceHolder; import com.metamx.druid.collect.StupidResourceHolder; -import com.metamx.druid.kv.FlattenedArrayWriter; +import com.metamx.druid.kv.GenericIndexedWriter; import com.metamx.druid.kv.IOPeon; import java.io.IOException; @@ -50,7 +50,7 @@ public class CompressedFloatsSupplierSerializer { final CompressedFloatsSupplierSerializer retVal = new CompressedFloatsSupplierSerializer( sizePer, - new FlattenedArrayWriter>( + new GenericIndexedWriter>( ioPeon, filenameBase, CompressedFloatBufferObjectStrategy.getBufferForOrder(order) ) ); @@ -58,7 +58,7 @@ public class CompressedFloatsSupplierSerializer } private final int sizePer; - private final FlattenedArrayWriter> flattener; + private final GenericIndexedWriter> flattener; private int numInserted = 0; @@ -66,7 +66,7 @@ public class CompressedFloatsSupplierSerializer public CompressedFloatsSupplierSerializer( int sizePer, - FlattenedArrayWriter> flattener + GenericIndexedWriter> flattener ) { this.sizePer = sizePer; diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/CompressedLongsSupplierSerializer.java b/index-common/src/main/java/com/metamx/druid/index/v1/CompressedLongsSupplierSerializer.java index ef4260c8005..3358e209db0 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/CompressedLongsSupplierSerializer.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/CompressedLongsSupplierSerializer.java @@ -26,7 +26,7 @@ import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import com.metamx.druid.collect.ResourceHolder; import com.metamx.druid.collect.StupidResourceHolder; -import com.metamx.druid.kv.FlattenedArrayWriter; +import com.metamx.druid.kv.GenericIndexedWriter; import com.metamx.druid.kv.IOPeon; import java.io.IOException; @@ -44,7 +44,7 @@ public class CompressedLongsSupplierSerializer { final CompressedLongsSupplierSerializer retVal = new CompressedLongsSupplierSerializer( 0xFFFF / Longs.BYTES, - new FlattenedArrayWriter>( + new GenericIndexedWriter>( ioPeon, filenameBase, CompressedLongBufferObjectStrategy.getBufferForOrder(order) ) ); @@ -52,7 +52,7 @@ public class CompressedLongsSupplierSerializer } private final int sizePer; - private final FlattenedArrayWriter> flattener; + private final GenericIndexedWriter> flattener; private int numInserted = 0; @@ -60,7 +60,7 @@ public class CompressedLongsSupplierSerializer public CompressedLongsSupplierSerializer( int sizePer, - FlattenedArrayWriter> flattener + GenericIndexedWriter> flattener ) { this.sizePer = sizePer; diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndex.java b/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndex.java index dc70e1cee32..6e664c348a3 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndex.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndex.java @@ -25,6 +25,7 @@ import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -57,6 +58,7 @@ import org.joda.time.Interval; import javax.annotation.Nullable; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -82,6 +84,7 @@ public class IncrementalIndex implements Iterable private final ImmutableList metricNames; private final LinkedHashMap dimensionOrder; private final CopyOnWriteArrayList dimensions; + private final SpatialDimensionRowFormatter spatialDimensionRowFormatter; private final DimensionHolder dimValues; private final ConcurrentSkipListMap facts; @@ -90,15 +93,11 @@ public class IncrementalIndex implements Iterable // This is modified on add() by a (hopefully) single thread. private InputRow in; - public IncrementalIndex( - long minTimestamp, - QueryGranularity gran, - final AggregatorFactory[] metrics - ) + public IncrementalIndex(IncrementalIndexSchema incrementalIndexSchema) { - this.minTimestamp = minTimestamp; - this.gran = gran; - this.metrics = metrics; + this.minTimestamp = incrementalIndexSchema.getMinTimestamp(); + this.gran = incrementalIndexSchema.getGran(); + this.metrics = incrementalIndexSchema.getMetrics(); final ImmutableList.Builder metricNamesBuilder = ImmutableList.builder(); final ImmutableMap.Builder metricIndexesBuilder = ImmutableMap.builder(); @@ -115,11 +114,31 @@ public class IncrementalIndex implements Iterable this.dimensionOrder = Maps.newLinkedHashMap(); this.dimensions = new CopyOnWriteArrayList(); - this.dimValues = new DimensionHolder(); + int index = 0; + for (String dim : incrementalIndexSchema.getDimensions()) { + dimensionOrder.put(dim, index++); + dimensions.add(dim); + } + this.spatialDimensionRowFormatter = new SpatialDimensionRowFormatter(incrementalIndexSchema.getSpatialDimensions()); + this.dimValues = new DimensionHolder(); this.facts = new ConcurrentSkipListMap(); } + public IncrementalIndex( + long minTimestamp, + QueryGranularity gran, + final AggregatorFactory[] metrics + ) + { + this( + new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp) + .withQueryGranularity(gran) + .withMetrics(metrics) + .build() + ); + } + /** * Adds a new row. The row might correspond with another row that already exists, in which case this will * update that row instead of inserting a new one. @@ -132,6 +151,8 @@ public class IncrementalIndex implements Iterable */ public int add(InputRow row) { + row = spatialDimensionRowFormatter.formatRow(row); + if (row.getTimestampFromEpoch() < minTimestamp) { throw new IAE("Cannot add row[%s] because it is below the minTimestamp[%s]", row, new DateTime(minTimestamp)); } @@ -144,13 +165,6 @@ public class IncrementalIndex implements Iterable dimension = dimension.toLowerCase(); List dimensionValues = row.getDimension(dimension); - // FIXME: Must be a better way to do this - // Join all coordinate dimension values into a single string for bitmap indexing - // Split this string for spatial indexing - if (dimension.endsWith(".geo")) { - dimensionValues = Arrays.asList(JOINER.join(dimensionValues)); - } - final Integer index = dimensionOrder.get(dimension); if (index == null) { dimensionOrder.put(dimension, dimensionOrder.size()); @@ -237,7 +251,7 @@ public class IncrementalIndex implements Iterable final String typeName = agg.getTypeName(); final String columnName = column.toLowerCase(); - if(typeName.equals("float")) { + if (typeName.equals("float")) { return new ObjectColumnSelector() { @Override diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndexSchema.java b/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndexSchema.java new file mode 100644 index 00000000000..5bb0b63103b --- /dev/null +++ b/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndexSchema.java @@ -0,0 +1,128 @@ +package com.metamx.druid.index.v1; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.metamx.druid.QueryGranularity; +import com.metamx.druid.aggregation.AggregatorFactory; + +import java.util.Collections; +import java.util.List; + +/** + */ +public class IncrementalIndexSchema +{ + private final long minTimestamp; + private final QueryGranularity gran; + private final List dimensions; + private final List spatialDimensions; + private final AggregatorFactory[] metrics; + + public IncrementalIndexSchema( + long minTimestamp, + QueryGranularity gran, + List dimensions, + List spatialDimensions, + AggregatorFactory[] metrics + ) + { + this.minTimestamp = minTimestamp; + this.gran = gran; + this.dimensions = dimensions; + this.spatialDimensions = spatialDimensions; + this.metrics = metrics; + } + + public long getMinTimestamp() + { + return minTimestamp; + } + + public QueryGranularity getGran() + { + return gran; + } + + public List getDimensions() + { + return dimensions; + } + + public List getSpatialDimensions() + { + return spatialDimensions; + } + + public AggregatorFactory[] getMetrics() + { + return metrics; + } + + public static class Builder + { + private long minTimestamp; + private QueryGranularity gran; + private List dimensions; + private List spatialDimensions; + private AggregatorFactory[] metrics; + + public Builder() + { + this.minTimestamp = 0L; + this.gran = QueryGranularity.NONE; + this.dimensions = Lists.newArrayList(); + this.spatialDimensions = Lists.newArrayList(); + this.metrics = new AggregatorFactory[]{}; + } + + public Builder withMinTimestamp(long minTimestamp) + { + this.minTimestamp = minTimestamp; + return this; + } + + public Builder withQueryGranularity(QueryGranularity gran) + { + this.gran = gran; + return this; + } + + public Builder withDimensions(Iterable dimensions) + { + this.dimensions = Lists.newArrayList( + Iterables.transform( + dimensions, new Function() + { + @Override + public String apply(String input) + { + return input.toLowerCase(); + } + } + ) + ); + Collections.sort(this.dimensions); + return this; + } + + public Builder withSpatialDimensions(List spatialDimensions) + { + this.spatialDimensions = spatialDimensions; + return this; + } + + public Builder withMetrics(AggregatorFactory[] metrics) + { + this.metrics = metrics; + return this; + } + + public IncrementalIndexSchema build() + { + return new IncrementalIndexSchema( + minTimestamp, gran, dimensions, spatialDimensions, metrics + ); + } + } +} diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/Index.java b/index-common/src/main/java/com/metamx/druid/index/v1/Index.java index bdacd3380ad..fcec6135b86 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/Index.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/Index.java @@ -19,12 +19,11 @@ package com.metamx.druid.index.v1; +import com.metamx.collections.spatial.ImmutableRTree; import com.metamx.common.logger.Logger; -import com.metamx.common.spatial.rtree.ImmutableRTree; import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet; import org.joda.time.Interval; -import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java b/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java index b6991ffbe0b..32b789969e9 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java @@ -31,6 +31,7 @@ import com.google.common.io.ByteStreams; import com.google.common.io.Closeables; import com.google.common.io.Files; import com.google.common.primitives.Ints; +import com.metamx.collections.spatial.ImmutableRTree; import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.io.smoosh.FileSmoosher; @@ -38,7 +39,6 @@ import com.metamx.common.io.smoosh.Smoosh; import com.metamx.common.io.smoosh.SmooshedFileMapper; import com.metamx.common.io.smoosh.SmooshedWriter; import com.metamx.common.logger.Logger; -import com.metamx.common.spatial.rtree.ImmutableRTree; import com.metamx.druid.index.QueryableIndex; import com.metamx.druid.index.SimpleQueryableIndex; import com.metamx.druid.index.column.Column; @@ -57,6 +57,7 @@ import com.metamx.druid.index.serde.LongGenericColumnSupplier; import com.metamx.druid.index.serde.SpatialIndexColumnPartSupplier; import com.metamx.druid.jackson.DefaultObjectMapper; import com.metamx.druid.kv.ArrayIndexed; +import com.metamx.druid.kv.ByteBufferSerializer; import com.metamx.druid.kv.ConciseCompressedIndexedInts; import com.metamx.druid.kv.GenericIndexed; import com.metamx.druid.kv.IndexedIterable; @@ -333,7 +334,7 @@ public class IndexIO Map> dimValueLookups = Maps.newHashMap(); Map dimColumns = Maps.newHashMap(); Map> invertedIndexed = Maps.newHashMap(); - Map> spatialIndexed = Maps.newHashMap(); + Map spatialIndexed = Maps.newHashMap(); for (String dimension : IndexedIterable.create(availableDimensions)) { ByteBuffer dimBuffer = smooshedFiles.mapFile(makeDimFile(inDir, dimension).getName()); @@ -358,13 +359,11 @@ public class IndexIO } ByteBuffer spatialBuffer = smooshedFiles.mapFile("spatial.drd"); - if (spatialBuffer != null) { - for (String dimension : IndexedIterable.create(availableDimensions)) { - spatialIndexed.put( - serializerUtils.readString(spatialBuffer), - GenericIndexed.read(spatialBuffer, IndexedRTree.objectStrategy) - ); - } + while (spatialBuffer != null && spatialBuffer.hasRemaining()) { + spatialIndexed.put( + serializerUtils.readString(spatialBuffer), + ByteBufferSerializer.read(spatialBuffer, IndexedRTree.objectStrategy) + ); } final MMappedIndex retVal = new MMappedIndex( @@ -422,12 +421,12 @@ public class IndexIO ); } - Map> spatialIndexes = Maps.newHashMap(); + Map spatialIndexes = Maps.newHashMap(); final ByteBuffer spatialBuffer = v8SmooshedFiles.mapFile("spatial.drd"); while (spatialBuffer != null && spatialBuffer.hasRemaining()) { spatialIndexes.put( serializerUtils.readString(spatialBuffer), - GenericIndexed.read(spatialBuffer, IndexedRTree.objectStrategy) + ByteBufferSerializer.read(spatialBuffer, IndexedRTree.objectStrategy) ); } @@ -464,7 +463,7 @@ public class IndexIO VSizeIndexedInts singleValCol = null; VSizeIndexed multiValCol = VSizeIndexed.readFromByteBuffer(dimBuffer.asReadOnlyBuffer()); GenericIndexed bitmaps = bitmapIndexes.get(dimension); - GenericIndexed spatialIndex = spatialIndexes.get(dimension); + ImmutableRTree spatialIndex = spatialIndexes.get(dimension); boolean onlyOneValue = true; ConciseSet nullsSet = null; diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/IndexMerger.java b/index-common/src/main/java/com/metamx/druid/index/v1/IndexMerger.java index 61af64df73a..368b59d6e2c 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/IndexMerger.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/IndexMerger.java @@ -34,6 +34,10 @@ import com.google.common.io.Closeables; import com.google.common.io.Files; import com.google.common.io.OutputSupplier; import com.google.common.primitives.Ints; +import com.metamx.collections.spatial.ImmutableRTree; +import com.metamx.collections.spatial.RTree; +import com.metamx.collections.spatial.search.RadiusBound; +import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy; import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.common.guava.FunctionalIterable; @@ -41,9 +45,6 @@ import com.metamx.common.guava.MergeIterable; import com.metamx.common.guava.nary.BinaryFn; import com.metamx.common.io.smoosh.Smoosh; import com.metamx.common.logger.Logger; -import com.metamx.common.spatial.rtree.ImmutableRTree; -import com.metamx.common.spatial.rtree.RTree; -import com.metamx.common.spatial.rtree.split.LinearGutmanSplitStrategy; import com.metamx.druid.CombiningIterable; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.ToLowerCaseAggregatorFactory; @@ -52,8 +53,9 @@ import com.metamx.druid.guava.GuavaUtils; import com.metamx.druid.index.QueryableIndex; import com.metamx.druid.index.v1.serde.ComplexMetricSerde; import com.metamx.druid.index.v1.serde.ComplexMetrics; +import com.metamx.druid.kv.ByteBufferWriter; import com.metamx.druid.kv.ConciseCompressedIndexedInts; -import com.metamx.druid.kv.FlattenedArrayWriter; +import com.metamx.druid.kv.GenericIndexedWriter; import com.metamx.druid.kv.GenericIndexed; import com.metamx.druid.kv.IOPeon; import com.metamx.druid.kv.Indexed; @@ -453,7 +455,7 @@ public class IndexMerger } for (String dimension : mergedDimensions) { - final FlattenedArrayWriter writer = new FlattenedArrayWriter( + final GenericIndexedWriter writer = new GenericIndexedWriter( ioPeon, dimension, GenericIndexed.stringStrategy ); writer.open(); @@ -708,7 +710,7 @@ public class IndexMerger Indexed dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.stringStrategy); log.info("Starting dimension[%s] with cardinality[%,d]", dimension, dimVals.size()); - FlattenedArrayWriter writer = new FlattenedArrayWriter( + GenericIndexedWriter writer = new GenericIndexedWriter( ioPeon, dimension, ConciseCompressedIndexedInts.objectStrategy ); writer.open(); @@ -757,6 +759,10 @@ public class IndexMerger for (int i = 0; i < mergedDimensions.size(); ++i) { String dimension = mergedDimensions.get(i); + if (!dimension.endsWith(".geo")) { + continue; + } + File dimOutFile = dimOuts.get(i).getFile(); final MappedByteBuffer dimValsMapped = Files.map(dimOutFile); @@ -766,26 +772,24 @@ public class IndexMerger Indexed dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.stringStrategy); log.info("Indexing geo dimension[%s] with cardinality[%,d]", dimension, dimVals.size()); - FlattenedArrayWriter writer = new FlattenedArrayWriter( + ByteBufferWriter writer = new ByteBufferWriter( ioPeon, dimension, IndexedRTree.objectStrategy ); writer.open(); RTree tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50)); - if (dimension.endsWith(".geo")) { - int count = 0; - for (String dimVal : IndexedIterable.create(dimVals)) { - progress.progress(); + int count = 0; + for (String dimVal : IndexedIterable.create(dimVals)) { + progress.progress(); - List stringCoords = Lists.newArrayList(SPLITTER.split(dimVal)); - float[] coords = new float[stringCoords.size()]; - for (int j = 0; j < coords.length; j++) { - coords[j] = Float.valueOf(stringCoords.get(j)); - } - tree.insert(coords, count); - count++; + List stringCoords = Lists.newArrayList(SPLITTER.split(dimVal)); + float[] coords = new float[stringCoords.size()]; + for (int j = 0; j < coords.length; j++) { + coords[j] = Float.valueOf(stringCoords.get(j)); } + tree.insert(coords, count); + count++; } writer.write(ImmutableRTree.newImmutableFromMutable(tree)); diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/MMappedIndex.java b/index-common/src/main/java/com/metamx/druid/index/v1/MMappedIndex.java index b5084d15f71..fcdcef4eddc 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/MMappedIndex.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/MMappedIndex.java @@ -23,8 +23,8 @@ import com.google.common.base.Function; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.primitives.Ints; +import com.metamx.collections.spatial.ImmutableRTree; import com.metamx.common.logger.Logger; -import com.metamx.common.spatial.rtree.ImmutableRTree; import com.metamx.druid.kv.ConciseCompressedIndexedInts; import com.metamx.druid.kv.GenericIndexed; import com.metamx.druid.kv.Indexed; @@ -56,7 +56,7 @@ public class MMappedIndex final Map> dimValueLookups; final Map dimColumns; final Map> invertedIndexes; - final Map> spatialIndexes; + final Map spatialIndexes; private final Map metricIndexes = Maps.newHashMap(); @@ -69,7 +69,7 @@ public class MMappedIndex Map> dimValueLookups, Map dimColumns, Map> invertedIndexes, - Map> spatialIndexes + Map spatialIndexes ) { this.availableDimensions = availableDimensions; @@ -148,7 +148,7 @@ public class MMappedIndex return invertedIndexes; } - public Map> getSpatialIndexes() + public Map getSpatialIndexes() { return spatialIndexes; } @@ -187,7 +187,7 @@ public class MMappedIndex Map dimColumns = Maps.newHashMap(); Map> invertedIndexes = Maps.newLinkedHashMap(); - Map> spatialIndexes = Maps.newLinkedHashMap(); + Map spatialIndexes = Maps.newLinkedHashMap(); for (String dimension : Arrays.asList(index.dimensions)) { final String[] dimVals = index.reverseDimLookup.get(dimension); @@ -260,10 +260,7 @@ public class MMappedIndex ) ); - spatialIndexes.put( - dimension, - GenericIndexed.fromIterable(Arrays.asList(index.getSpatialIndex(dimension)), IndexedRTree.objectStrategy) - ); + spatialIndexes.put(dimension, index.getSpatialIndex(dimension)); } log.info("Making MMappedIndex"); diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/MetricHolder.java b/index-common/src/main/java/com/metamx/druid/index/v1/MetricHolder.java index 075b9dbfe52..dc7002af24b 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/MetricHolder.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/MetricHolder.java @@ -27,7 +27,7 @@ import com.metamx.common.IAE; import com.metamx.common.ISE; import com.metamx.druid.index.v1.serde.ComplexMetricSerde; import com.metamx.druid.index.v1.serde.ComplexMetrics; -import com.metamx.druid.kv.FlattenedArrayWriter; +import com.metamx.druid.kv.GenericIndexedWriter; import com.metamx.druid.kv.GenericIndexed; import com.metamx.druid.kv.Indexed; import com.metamx.druid.kv.IndexedFloats; @@ -63,7 +63,7 @@ public class MetricHolder } public static void writeComplexMetric( - OutputSupplier outSupplier, String name, String typeName, FlattenedArrayWriter column + OutputSupplier outSupplier, String name, String typeName, GenericIndexedWriter column ) throws IOException { OutputStream out = null; diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/SpatialDimensionRowFormatter.java b/index-common/src/main/java/com/metamx/druid/index/v1/SpatialDimensionRowFormatter.java new file mode 100644 index 00000000000..1bceaae68b6 --- /dev/null +++ b/index-common/src/main/java/com/metamx/druid/index/v1/SpatialDimensionRowFormatter.java @@ -0,0 +1,107 @@ +package com.metamx.druid.index.v1; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Predicate; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.metamx.druid.input.InputRow; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + */ +public class SpatialDimensionRowFormatter +{ + private static final Joiner JOINER = Joiner.on(","); + + private final List spatialDimensions; + private final Set spatialDimNames; + + public SpatialDimensionRowFormatter(List spatialDimensions) + { + this.spatialDimensions = spatialDimensions; + this.spatialDimNames = Sets.newHashSet( + Iterables.concat( + Lists.transform( + spatialDimensions, + new Function>() + { + @Override + public List apply(SpatialDimensionSchema input) + { + return input.getDims(); + } + } + ) + ) + ); + } + + public InputRow formatRow(final InputRow row) + { + final Map> finalDims = Maps.newHashMap(); + + // remove all spatial dimensions + Set filtered = Sets.filter( + Sets.newHashSet(row.getDimensions()), + new Predicate() + { + @Override + public boolean apply(String input) + { + return !spatialDimNames.contains(input); + } + } + ); + for (String dim : filtered) { + finalDims.put(dim, row.getDimension(dim)); + } + + InputRow retVal = new InputRow() + { + @Override + public List getDimensions() + { + return Lists.newArrayList(finalDims.keySet()); + } + + @Override + public long getTimestampFromEpoch() + { + return row.getTimestampFromEpoch(); + } + + @Override + public List getDimension(String dimension) + { + return finalDims.get(dimension); + } + + @Override + public float getFloatMetric(String metric) + { + return row.getFloatMetric(metric); + } + }; + + for (SpatialDimensionSchema spatialDimension : spatialDimensions) { + List spatialDimVals = Lists.newArrayList(); + for (String partialSpatialDim : spatialDimension.getDims()) { + List dimVals = row.getDimension(partialSpatialDim); + if (dimVals == null || dimVals.isEmpty()) { + return retVal; + } + spatialDimVals.addAll(dimVals); + } + finalDims.put(spatialDimension.getDimName(), Arrays.asList(JOINER.join(spatialDimVals))); + } + + return retVal; + } +} diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/SpatialDimensionSchema.java b/index-common/src/main/java/com/metamx/druid/index/v1/SpatialDimensionSchema.java new file mode 100644 index 00000000000..7e50961925e --- /dev/null +++ b/index-common/src/main/java/com/metamx/druid/index/v1/SpatialDimensionSchema.java @@ -0,0 +1,40 @@ +package com.metamx.druid.index.v1; + +import com.google.common.base.Function; +import com.google.common.collect.Lists; + +import java.util.List; + +/** + */ +public class SpatialDimensionSchema +{ + private final String dimName; + private final List dims; + + public SpatialDimensionSchema(String dimName, List dims) + { + this.dimName = dimName.toLowerCase(); + this.dims = Lists.transform( + dims, + new Function() + { + @Override + public String apply(String input) + { + return input.toLowerCase(); + } + } + ); + } + + public String getDimName() + { + return dimName; + } + + public List getDims() + { + return dims; + } +} diff --git a/index-common/src/main/java/com/metamx/druid/kv/ByteBufferSerializer.java b/index-common/src/main/java/com/metamx/druid/kv/ByteBufferSerializer.java new file mode 100644 index 00000000000..93f49451941 --- /dev/null +++ b/index-common/src/main/java/com/metamx/druid/kv/ByteBufferSerializer.java @@ -0,0 +1,30 @@ +package com.metamx.druid.kv; + +import com.google.common.primitives.Ints; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.WritableByteChannel; + +/** + */ +public class ByteBufferSerializer +{ + public static T read(ByteBuffer buffer, ObjectStrategy strategy) + { + int size = buffer.getInt(); + ByteBuffer bufferToUse = buffer.asReadOnlyBuffer(); + bufferToUse.limit(bufferToUse.position() + size); + buffer.position(bufferToUse.limit()); + + return strategy.fromByteBuffer(bufferToUse, size); + } + + public static void writeToChannel(T obj, ObjectStrategy strategy, WritableByteChannel channel) + throws IOException + { + byte[] toWrite = strategy.toBytes(obj); + channel.write(ByteBuffer.allocate(Ints.BYTES).putInt(0, toWrite.length)); + channel.write(ByteBuffer.wrap(toWrite)); + } +} diff --git a/index-common/src/main/java/com/metamx/druid/kv/ByteBufferWriter.java b/index-common/src/main/java/com/metamx/druid/kv/ByteBufferWriter.java new file mode 100644 index 00000000000..0b384b3a852 --- /dev/null +++ b/index-common/src/main/java/com/metamx/druid/kv/ByteBufferWriter.java @@ -0,0 +1,92 @@ +package com.metamx.druid.kv; + +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import com.google.common.io.ByteStreams; +import com.google.common.io.CountingOutputStream; +import com.google.common.io.InputSupplier; +import com.google.common.primitives.Ints; + +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; + +/** + */ +public class ByteBufferWriter implements Closeable +{ + private final IOPeon ioPeon; + private final String filenameBase; + private final ObjectStrategy strategy; + + private CountingOutputStream headerOut = null; + private CountingOutputStream valueOut = null; + + public ByteBufferWriter( + IOPeon ioPeon, + String filenameBase, + ObjectStrategy strategy + ) + { + this.ioPeon = ioPeon; + this.filenameBase = filenameBase; + this.strategy = strategy; + } + + public void open() throws IOException + { + headerOut = new CountingOutputStream(ioPeon.makeOutputStream(makeFilename("header"))); + valueOut = new CountingOutputStream(ioPeon.makeOutputStream(makeFilename("value"))); + } + + public void write(T objectToWrite) throws IOException + { + byte[] bytesToWrite = strategy.toBytes(objectToWrite); + + headerOut.write(Ints.toByteArray(bytesToWrite.length)); + valueOut.write(bytesToWrite); + } + + private String makeFilename(String suffix) + { + return String.format("%s.%s", filenameBase, suffix); + } + + @Override + public void close() throws IOException + { + headerOut.close(); + valueOut.close(); + + final long numBytesWritten = headerOut.getCount() + valueOut.getCount(); + Preconditions.checkState( + numBytesWritten < Integer.MAX_VALUE, "Wrote[%s] bytes, which is too many.", numBytesWritten + ); + } + + public InputSupplier combineStreams() + { + return ByteStreams.join( + Iterables.transform( + Arrays.asList("header", "value"), + new Function>() + { + @Override + public InputSupplier apply(final String input) + { + return new InputSupplier() + { + @Override + public InputStream getInput() throws IOException + { + return ioPeon.makeInputStream(makeFilename(input)); + } + }; + } + } + ) + ); + } +} diff --git a/index-common/src/main/java/com/metamx/druid/kv/FlattenedArrayWriter.java b/index-common/src/main/java/com/metamx/druid/kv/GenericIndexedWriter.java similarity index 97% rename from index-common/src/main/java/com/metamx/druid/kv/FlattenedArrayWriter.java rename to index-common/src/main/java/com/metamx/druid/kv/GenericIndexedWriter.java index 4c5696a3b8f..dd970ee403e 100644 --- a/index-common/src/main/java/com/metamx/druid/kv/FlattenedArrayWriter.java +++ b/index-common/src/main/java/com/metamx/druid/kv/GenericIndexedWriter.java @@ -36,7 +36,7 @@ import java.util.Arrays; /** * Streams arrays of objects out in the binary format described by GenericIndexed */ -public class FlattenedArrayWriter implements Closeable +public class GenericIndexedWriter implements Closeable { private static final byte[] EMPTY_ARRAY = new byte[]{}; @@ -51,7 +51,7 @@ public class FlattenedArrayWriter implements Closeable private CountingOutputStream valuesOut = null; int numWritten = 0; - public FlattenedArrayWriter( + public GenericIndexedWriter( IOPeon ioPeon, String filenameBase, ObjectStrategy strategy diff --git a/index-common/src/main/java/com/metamx/druid/kv/IndexedRTree.java b/index-common/src/main/java/com/metamx/druid/kv/IndexedRTree.java index a723d8bc6af..8c8e3c46d50 100644 --- a/index-common/src/main/java/com/metamx/druid/kv/IndexedRTree.java +++ b/index-common/src/main/java/com/metamx/druid/kv/IndexedRTree.java @@ -1,7 +1,7 @@ package com.metamx.druid.kv; import com.google.common.collect.Ordering; -import com.metamx.common.spatial.rtree.ImmutableRTree; +import com.metamx.collections.spatial.ImmutableRTree; import java.nio.ByteBuffer; diff --git a/index-common/src/test/java/com/metamx/druid/index/v1/CompressedFloatsSupplierSerializerTest.java b/index-common/src/test/java/com/metamx/druid/index/v1/CompressedFloatsSupplierSerializerTest.java index df300cf7f23..8ddf982fc7c 100644 --- a/index-common/src/test/java/com/metamx/druid/index/v1/CompressedFloatsSupplierSerializerTest.java +++ b/index-common/src/test/java/com/metamx/druid/index/v1/CompressedFloatsSupplierSerializerTest.java @@ -19,26 +19,19 @@ package com.metamx.druid.index.v1; -import com.google.common.collect.Maps; import com.google.common.io.OutputSupplier; import com.metamx.druid.collect.ResourceHolder; -import com.metamx.druid.kv.FlattenedArrayWriter; -import com.metamx.druid.kv.IOPeon; +import com.metamx.druid.kv.GenericIndexedWriter; import com.metamx.druid.kv.IndexedFloats; import org.junit.Assert; import org.junit.Test; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.File; -import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.nio.FloatBuffer; -import java.util.Map; /** */ @@ -50,7 +43,7 @@ public class CompressedFloatsSupplierSerializerTest final ByteOrder order = ByteOrder.nativeOrder(); CompressedFloatsSupplierSerializer serializer = new CompressedFloatsSupplierSerializer( 999, - new FlattenedArrayWriter>( + new GenericIndexedWriter>( new IOPeonForTesting(), "test", CompressedFloatBufferObjectStrategy.getBufferForOrder(order) diff --git a/index-common/src/test/java/com/metamx/druid/index/v1/CompressedLongsSupplierSerializerTest.java b/index-common/src/test/java/com/metamx/druid/index/v1/CompressedLongsSupplierSerializerTest.java index 1746c2c2331..c387db96618 100644 --- a/index-common/src/test/java/com/metamx/druid/index/v1/CompressedLongsSupplierSerializerTest.java +++ b/index-common/src/test/java/com/metamx/druid/index/v1/CompressedLongsSupplierSerializerTest.java @@ -21,7 +21,7 @@ package com.metamx.druid.index.v1; import com.google.common.io.OutputSupplier; import com.metamx.druid.collect.ResourceHolder; -import com.metamx.druid.kv.FlattenedArrayWriter; +import com.metamx.druid.kv.GenericIndexedWriter; import com.metamx.druid.kv.IndexedLongs; import org.junit.Assert; import org.junit.Test; @@ -43,7 +43,7 @@ public class CompressedLongsSupplierSerializerTest final ByteOrder order = ByteOrder.nativeOrder(); CompressedLongsSupplierSerializer serializer = new CompressedLongsSupplierSerializer( 999, - new FlattenedArrayWriter>( + new GenericIndexedWriter>( new IOPeonForTesting(), "test", CompressedLongBufferObjectStrategy.getBufferForOrder(order) diff --git a/pom.xml b/pom.xml index cf63a0b2817..b18ebf5a681 100644 --- a/pom.xml +++ b/pom.xml @@ -38,7 +38,7 @@ UTF-8 - 0.22.0 + 0.22.3-SNAPSHOT 2.0.1-21-22 @@ -72,6 +72,11 @@ java-util ${metamx.java-util.version} + + com.metamx + bytebuffer-collections + 0.0.1-SNAPSHOT + com.metamx server-metrics diff --git a/server/src/main/java/com/metamx/druid/index/brita/BitmapIndexSelector.java b/server/src/main/java/com/metamx/druid/index/brita/BitmapIndexSelector.java index e18c7ef868b..fb8837de1f5 100644 --- a/server/src/main/java/com/metamx/druid/index/brita/BitmapIndexSelector.java +++ b/server/src/main/java/com/metamx/druid/index/brita/BitmapIndexSelector.java @@ -19,7 +19,7 @@ package com.metamx.druid.index.brita; -import com.metamx.common.spatial.rtree.ImmutableRTree; +import com.metamx.collections.spatial.ImmutableRTree; import com.metamx.druid.kv.Indexed; import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet; diff --git a/server/src/main/java/com/metamx/druid/index/brita/SpatialFilter.java b/server/src/main/java/com/metamx/druid/index/brita/SpatialFilter.java index b813b2f07a7..c5297bfbb17 100644 --- a/server/src/main/java/com/metamx/druid/index/brita/SpatialFilter.java +++ b/server/src/main/java/com/metamx/druid/index/brita/SpatialFilter.java @@ -18,11 +18,7 @@ */ package com.metamx.druid.index.brita; -import com.google.common.base.Function; -import com.google.common.base.Predicate; -import com.google.common.collect.Iterables; -import com.metamx.common.spatial.rtree.ImmutableRTree; -import com.metamx.common.spatial.rtree.search.Bound; +import com.metamx.collections.spatial.search.Bound; import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet; /** @@ -44,22 +40,7 @@ public class SpatialFilter implements Filter @Override public ImmutableConciseSet goConcise(final BitmapIndexSelector selector) { - //ImmutableRTree foo = selector.getSpatialIndex(dimension); - //ImmutableRTree.print(foo); - Iterable indexes = selector.getSpatialIndex(dimension).search(bound); - return ImmutableConciseSet.union( - Iterables.transform( - indexes, - new Function() - { - @Override - public ImmutableConciseSet apply(Integer input) - { - return selector.getConciseInvertedIndex(dimension, input); - } - } - ) - ); + return ImmutableConciseSet.union(selector.getSpatialIndex(dimension).search(bound)); } @Override diff --git a/server/src/main/java/com/metamx/druid/index/brita/ValueMatcherFactory.java b/server/src/main/java/com/metamx/druid/index/brita/ValueMatcherFactory.java index 1bc9bb8c5c9..cc9971b23b8 100644 --- a/server/src/main/java/com/metamx/druid/index/brita/ValueMatcherFactory.java +++ b/server/src/main/java/com/metamx/druid/index/brita/ValueMatcherFactory.java @@ -20,7 +20,7 @@ package com.metamx.druid.index.brita; import com.google.common.base.Predicate; -import com.metamx.common.spatial.rtree.search.Bound; +import com.metamx.collections.spatial.search.Bound; /** */ diff --git a/server/src/main/java/com/metamx/druid/index/v1/IncrementalIndexStorageAdapter.java b/server/src/main/java/com/metamx/druid/index/v1/IncrementalIndexStorageAdapter.java index 51e49433fd5..a1eaefb29df 100644 --- a/server/src/main/java/com/metamx/druid/index/v1/IncrementalIndexStorageAdapter.java +++ b/server/src/main/java/com/metamx/druid/index/v1/IncrementalIndexStorageAdapter.java @@ -26,10 +26,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import com.metamx.collections.spatial.search.Bound; import com.metamx.common.IAE; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterator; -import com.metamx.common.spatial.rtree.search.Bound; import com.metamx.druid.Capabilities; import com.metamx.druid.QueryGranularity; import com.metamx.druid.StorageAdapter; diff --git a/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java b/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java index 2d55d425077..19098e7e59e 100644 --- a/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java +++ b/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java @@ -25,10 +25,10 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.collect.Maps; import com.google.common.io.Closeables; +import com.metamx.collections.spatial.ImmutableRTree; import com.metamx.common.collect.MoreIterators; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterator; -import com.metamx.common.spatial.rtree.ImmutableRTree; import com.metamx.druid.BaseStorageAdapter; import com.metamx.druid.Capabilities; import com.metamx.druid.QueryGranularity; diff --git a/server/src/test/java/com/metamx/druid/index/brita/SpatialFilterTest.java b/server/src/test/java/com/metamx/druid/index/brita/SpatialFilterTest.java index 1d25d2c4c94..4c739f8e9ad 100644 --- a/server/src/test/java/com/metamx/druid/index/brita/SpatialFilterTest.java +++ b/server/src/test/java/com/metamx/druid/index/brita/SpatialFilterTest.java @@ -3,8 +3,8 @@ package com.metamx.druid.index.brita; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; -import com.metamx.common.spatial.rtree.search.RadiusBound; -import com.metamx.common.spatial.rtree.search.RectangularBound; +import com.metamx.collections.spatial.search.RadiusBound; +import com.metamx.collections.spatial.search.RectangularBound; import com.metamx.druid.Druids; import com.metamx.druid.QueryGranularity; import com.metamx.druid.TestHelper; @@ -16,9 +16,10 @@ import com.metamx.druid.index.QueryableIndex; import com.metamx.druid.index.QueryableIndexSegment; import com.metamx.druid.index.Segment; import com.metamx.druid.index.v1.IncrementalIndex; +import com.metamx.druid.index.v1.IncrementalIndexSchema; import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.index.v1.IndexMerger; -import com.metamx.druid.index.v1.TestIndex; +import com.metamx.druid.index.v1.SpatialDimensionSchema; import com.metamx.druid.input.MapBasedInputRow; import com.metamx.druid.query.FinalizeResultsQueryRunner; import com.metamx.druid.query.QueryRunner; @@ -52,7 +53,7 @@ public class SpatialFilterTest new LongSumAggregatorFactory("val", "val") }; - private static List DIMS = Lists.newArrayList("dim", "dim.geo"); + private static List DIMS = Lists.newArrayList("dim", "lat", "long"); @Parameterized.Parameters public static Collection constructorFeeder() throws IOException @@ -78,9 +79,17 @@ public class SpatialFilterTest private static IncrementalIndex makeIncrementalIndex() throws IOException { IncrementalIndex theIndex = new IncrementalIndex( - DATA_INTERVAL.getStartMillis(), - QueryGranularity.DAY, - METRIC_AGGS + new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) + .withQueryGranularity(QueryGranularity.DAY) + .withMetrics(METRIC_AGGS) + .withSpatialDimensions( + Arrays.asList( + new SpatialDimensionSchema( + "dim.geo", + Arrays.asList("lat", "long") + ) + ) + ).build() ); theIndex.add( new MapBasedInputRow( @@ -89,7 +98,8 @@ public class SpatialFilterTest ImmutableMap.of( "timestamp", new DateTime("2013-01-01").toString(), "dim", "foo", - "dim.geo", Arrays.asList(0.0f, 0.0f), + "lat", 0.0f, + "long", 0.0f, "val", 17l ) ) @@ -101,7 +111,8 @@ public class SpatialFilterTest ImmutableMap.of( "timestamp", new DateTime("2013-01-02").toString(), "dim", "foo", - "dim.geo", Arrays.asList(1.0f, 3.0f), + "lat", 1.0f, + "long", 3.0f, "val", 29l ) ) @@ -113,7 +124,8 @@ public class SpatialFilterTest ImmutableMap.of( "timestamp", new DateTime("2013-01-03").toString(), "dim", "foo", - "dim.geo", Arrays.asList(4.0f, 2.0f), + "lat", 4.0f, + "long", 2.0f, "val", 13l ) ) @@ -125,7 +137,8 @@ public class SpatialFilterTest ImmutableMap.of( "timestamp", new DateTime("2013-01-04").toString(), "dim", "foo", - "dim.geo", Arrays.asList(7.0f, 3.0f), + "lat", 7.0f, + "long", 3.0f, "val", 91l ) ) @@ -137,7 +150,8 @@ public class SpatialFilterTest ImmutableMap.of( "timestamp", new DateTime("2013-01-05").toString(), "dim", "foo", - "dim.geo", Arrays.asList(8.0f, 6.0f), + "lat", 8.0f, + "long", 6.0f, "val", 47l ) ) @@ -151,14 +165,11 @@ public class SpatialFilterTest new DateTime("2013-01-01").getMillis(), DIMS, ImmutableMap.of( - "timestamp", - new DateTime("2013-01-01").toString(), - "dim", - "boo", - "dim.geo", - Arrays.asList((float) (rand.nextFloat() * 10 + 10.0), (float) (rand.nextFloat() * 10 + 10.0)), - "val", - i + "timestamp", new DateTime("2013-01-01").toString(), + "dim", "boo", + "lat", (float) (rand.nextFloat() * 10 + 10.0), + "long", (float) (rand.nextFloat() * 10 + 10.0), + "val", i ) ) ); @@ -183,19 +194,43 @@ public class SpatialFilterTest { try { IncrementalIndex first = new IncrementalIndex( - DATA_INTERVAL.getStartMillis(), - QueryGranularity.DAY, - METRIC_AGGS + new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) + .withQueryGranularity(QueryGranularity.DAY) + .withMetrics(METRIC_AGGS) + .withSpatialDimensions( + Arrays.asList( + new SpatialDimensionSchema( + "dim.geo", + Arrays.asList("lat", "long") + ) + ) + ).build() ); IncrementalIndex second = new IncrementalIndex( - DATA_INTERVAL.getStartMillis(), - QueryGranularity.DAY, - METRIC_AGGS + new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) + .withQueryGranularity(QueryGranularity.DAY) + .withMetrics(METRIC_AGGS) + .withSpatialDimensions( + Arrays.asList( + new SpatialDimensionSchema( + "dim.geo", + Arrays.asList("lat", "long") + ) + ) + ).build() ); IncrementalIndex third = new IncrementalIndex( - DATA_INTERVAL.getStartMillis(), - QueryGranularity.DAY, - METRIC_AGGS + new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis()) + .withQueryGranularity(QueryGranularity.DAY) + .withMetrics(METRIC_AGGS) + .withSpatialDimensions( + Arrays.asList( + new SpatialDimensionSchema( + "dim.geo", + Arrays.asList("lat", "long") + ) + ) + ).build() );