checking stuff in but stuff not really working yet

This commit is contained in:
fjy 2013-05-03 10:35:26 -07:00
parent 6d4c0850ca
commit d57141f46e
34 changed files with 791 additions and 189 deletions

View File

@ -21,7 +21,7 @@ package com.metamx.druid.query.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.metamx.common.spatial.rtree.search.Bound;
import com.metamx.collections.spatial.search.Bound;
import java.nio.ByteBuffer;

View File

@ -0,0 +1,81 @@
package com.metamx.druid.query.group;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.primitives.Ints;
import java.nio.ByteBuffer;
import java.util.Comparator;
import java.util.List;
/**
*/
public class DefaultLimitSpec implements LimitSpec
{
private static final byte CACHE_TYPE_ID = 0x0;
private static Joiner JOINER = Joiner.on("");
private final List<String> orderBy;
private final int limit;
@JsonCreator
public DefaultLimitSpec(
@JsonProperty("orderBy") List<String> orderBy,
@JsonProperty("limit") int limit
)
{
this.orderBy = (orderBy == null) ? Lists.<String>newArrayList() : orderBy;
this.limit = limit;
}
public DefaultLimitSpec()
{
this.orderBy = Lists.newArrayList();
this.limit = 0;
}
@JsonProperty
@Override
public List<String> getOrderBy()
{
return orderBy;
}
@JsonProperty
@Override
public int getLimit()
{
return limit;
}
@Override
public Comparator getComparator()
{
return null;
}
@Override
public byte[] getCacheKey()
{
byte[] orderByBytes = JOINER.join(orderBy).getBytes();
byte[] limitBytes = Ints.toByteArray(limit);
return ByteBuffer.allocate(1 + orderByBytes.length + limitBytes.length)
.put(CACHE_TYPE_ID)
.put(orderByBytes)
.put(limitBytes)
.array();
}
@Override
public String toString()
{
return "DefaultLimitSpec{" +
"orderBy='" + orderBy + '\'' +
", limit=" + limit +
'}';
}
}

View File

@ -49,7 +49,7 @@ public class GroupByQuery extends BaseQuery<Row>
return new Builder();
}
private final int threshold;
private final LimitSpec limitSpec;
private final DimFilter dimFilter;
private final QueryGranularity granularity;
private final List<DimensionSpec> dimensions;
@ -60,7 +60,7 @@ public class GroupByQuery extends BaseQuery<Row>
public GroupByQuery(
@JsonProperty("dataSource") String dataSource,
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("threshold") int threshold,
@JsonProperty("limitSpec") LimitSpec limitSpec,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("granularity") QueryGranularity granularity,
@JsonProperty("dimensions") List<DimensionSpec> dimensions,
@ -70,7 +70,7 @@ public class GroupByQuery extends BaseQuery<Row>
)
{
super(dataSource, querySegmentSpec, context);
this.threshold = threshold;
this.limitSpec = (limitSpec == null) ? new DefaultLimitSpec() : limitSpec;
this.dimFilter = dimFilter;
this.granularity = granularity;
this.dimensions = dimensions == null ? ImmutableList.<DimensionSpec>of() : dimensions;
@ -82,10 +82,10 @@ public class GroupByQuery extends BaseQuery<Row>
Queries.verifyAggregations(this.aggregatorSpecs, this.postAggregatorSpecs);
}
@JsonProperty("threshold")
public int getThreshold()
@JsonProperty
public LimitSpec getLimitSpec()
{
return threshold;
return limitSpec;
}
@JsonProperty("filter")
@ -136,7 +136,7 @@ public class GroupByQuery extends BaseQuery<Row>
return new GroupByQuery(
getDataSource(),
getQuerySegmentSpec(),
threshold,
limitSpec,
dimFilter,
granularity,
dimensions,
@ -152,7 +152,7 @@ public class GroupByQuery extends BaseQuery<Row>
return new GroupByQuery(
getDataSource(),
spec,
threshold,
limitSpec,
dimFilter,
granularity,
dimensions,
@ -166,7 +166,7 @@ public class GroupByQuery extends BaseQuery<Row>
{
private String dataSource;
private QuerySegmentSpec querySegmentSpec;
private int threshold;
private LimitSpec limitSpec;
private DimFilter dimFilter;
private QueryGranularity granularity;
private List<DimensionSpec> dimensions;
@ -180,7 +180,7 @@ public class GroupByQuery extends BaseQuery<Row>
{
dataSource = builder.dataSource;
querySegmentSpec = builder.querySegmentSpec;
threshold = builder.threshold;
limitSpec = builder.limitSpec;
dimFilter = builder.dimFilter;
granularity = builder.granularity;
dimensions = builder.dimensions;
@ -200,9 +200,9 @@ public class GroupByQuery extends BaseQuery<Row>
return setQuerySegmentSpec(new LegacySegmentSpec(interval));
}
public Builder setThreshold(int threshold)
public Builder setLimitSpec(LimitSpec limitSpec)
{
this.threshold = threshold;
this.limitSpec = limitSpec;
return this;
}
@ -298,7 +298,7 @@ public class GroupByQuery extends BaseQuery<Row>
return new GroupByQuery(
dataSource,
querySegmentSpec,
threshold,
limitSpec,
dimFilter,
granularity,
dimensions,

View File

@ -21,12 +21,12 @@ package com.metamx.druid.query.group;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;
import com.google.common.base.Functions;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.primitives.Longs;
import com.metamx.common.ISE;
import com.metamx.common.guava.Accumulator;
import com.metamx.common.guava.ConcatSequence;
@ -35,10 +35,8 @@ import com.metamx.common.guava.Sequences;
import com.metamx.druid.Query;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.post.PostAggregator;
import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.initialization.Initialization;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.input.MapBasedRow;
import com.metamx.druid.input.Row;
import com.metamx.druid.input.Rows;
@ -46,15 +44,14 @@ import com.metamx.druid.query.MetricManipulationFn;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.QueryToolChest;
import com.metamx.druid.query.dimension.DimensionSpec;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.TimeseriesResultValue;
import com.metamx.druid.utils.PropUtils;
import com.metamx.emitter.service.ServiceMetricEvent;
import org.joda.time.Interval;
import org.joda.time.Minutes;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@ -150,20 +147,27 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
);
// convert millis back to timestamp according to granularity to preserve time zone information
Sequence<Row> retVal = Sequences.map(
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
new Function<Row, Row>()
{
@Override
public Row apply(Row input)
{
final MapBasedRow row = (MapBasedRow) input;
return new MapBasedRow(gran.toDateTime(row.getTimestampFromEpoch()), row.getEvent());
}
}
);
// sort results to be returned
if (!query.getLimitSpec().getOrderBy().isEmpty()) {
retVal = Sequences.sort(retVal, makeComparator(query));
}
return Sequences.limit(
Sequences.map(
Sequences.simple(index.iterableWithPostAggregations(query.getPostAggregatorSpecs())),
new Function<Row, Row>()
{
@Override
public Row apply(Row input)
{
final MapBasedRow row = (MapBasedRow) input;
return new MapBasedRow(gran.toDateTime(row.getTimestampFromEpoch()), row.getEvent());
}
}
),
(query.getThreshold() > 0) ? query.getThreshold() : maxRows
retVal,
(query.getLimitSpec().getLimit() > 0) ? query.getLimitSpec().getLimit() : maxRows
);
}
@ -217,4 +221,46 @@ public class GroupByQueryQueryToolChest extends QueryToolChest<Row, GroupByQuery
{
return TYPE_REFERENCE;
}
private Comparator<Row> makeComparator(GroupByQuery query)
{
Ordering<Row> ordering = new Ordering<Row>()
{
@Override
public int compare(Row left, Row right)
{
return Longs.compare(left.getTimestampFromEpoch(), right.getTimestampFromEpoch());
}
};
for (final String dimension : query.getLimitSpec().getOrderBy()) {
ordering = ordering.compound(
new Comparator<Row>()
{
@Override
public int compare(Row left, Row right)
{
if (left instanceof MapBasedRow && right instanceof MapBasedRow) {
// There are no multi-value dimensions at this point, they should have been flattened out
String leftDimVal = left.getDimension(dimension).get(0);
String rightDimVal = right.getDimension(dimension).get(0);
return leftDimVal.compareTo(rightDimVal);
} else {
throw new ISE("Unknown type for rows[%s, %s]", left.getClass(), right.getClass());
}
}
}
);
}
final Ordering<Row> theOrdering = ordering;
return new Comparator<Row>()
{
@Override
public int compare(Row row, Row row2)
{
return theOrdering.compare(row, row2);
}
};
}
}

View File

@ -0,0 +1,43 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.group;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import java.util.Comparator;
import java.util.List;
/**
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
@JsonSubTypes.Type(name = "default", value = DefaultLimitSpec.class)
})
public interface LimitSpec
{
public List<String> getOrderBy();
public int getLimit();
public Comparator getComparator();
public byte[] getCacheKey();
}

View File

@ -18,7 +18,8 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-index-common</artifactId>
@ -42,6 +43,10 @@
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>bytebuffer-collections</artifactId>
</dependency>
<dependency>
<groupId>com.ning</groupId>

View File

@ -18,7 +18,7 @@
*/
package com.metamx.druid.index.column;
import com.metamx.common.spatial.rtree.ImmutableRTree;
import com.metamx.collections.spatial.ImmutableRTree;
/**
*/

View File

@ -21,10 +21,12 @@ package com.metamx.druid.index.serde;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Ints;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.common.IAE;
import com.metamx.common.spatial.rtree.ImmutableRTree;
import com.metamx.druid.index.column.ColumnBuilder;
import com.metamx.druid.index.column.ValueType;
import com.metamx.druid.kv.ByteBufferSerializer;
import com.metamx.druid.kv.ConciseCompressedIndexedInts;
import com.metamx.druid.kv.GenericIndexed;
import com.metamx.druid.kv.IndexedRTree;
@ -50,7 +52,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
private final VSizeIndexedInts singleValuedColumn;
private final VSizeIndexed multiValuedColumn;
private final GenericIndexed<ImmutableConciseSet> bitmaps;
private final GenericIndexed<ImmutableRTree> spatialIndex;
private final ImmutableRTree spatialIndex;
private final int size;
@ -59,7 +61,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
VSizeIndexedInts singleValCol,
VSizeIndexed multiValCol,
GenericIndexed<ImmutableConciseSet> bitmaps,
GenericIndexed<ImmutableRTree> spatialIndex
ImmutableRTree spatialIndex
)
{
this.dictionary = dictionary;
@ -78,7 +80,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
}
size += bitmaps.getSerializedSize();
if (spatialIndex != null) {
size += spatialIndex.getSerializedSize();
size += spatialIndex.size() + Ints.BYTES;
}
this.size = size;
@ -117,7 +119,9 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
multiValuedColumn.writeToChannel(channel);
}
bitmaps.writeToChannel(channel);
spatialIndex.writeToChannel(channel);
if (spatialIndex != null) {
ByteBufferSerializer.writeToChannel(spatialIndex, IndexedRTree.objectStrategy, channel);
}
}
@Override
@ -146,9 +150,9 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
buffer, ConciseCompressedIndexedInts.objectStrategy
);
GenericIndexed<ImmutableRTree> spatialIndex = null;
ImmutableRTree spatialIndex = null;
if (buffer.hasRemaining()) {
spatialIndex = GenericIndexed.read(
spatialIndex = ByteBufferSerializer.read(
buffer, IndexedRTree.objectStrategy
);
}

View File

@ -19,7 +19,7 @@
package com.metamx.druid.index.serde;
import com.google.common.base.Supplier;
import com.metamx.common.spatial.rtree.ImmutableRTree;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.druid.index.column.SpatialIndex;
import com.metamx.druid.kv.GenericIndexed;
@ -29,13 +29,13 @@ public class SpatialIndexColumnPartSupplier implements Supplier<SpatialIndex>
{
private static final ImmutableRTree EMPTY_SET = new ImmutableRTree();
private final GenericIndexed<ImmutableRTree> indexedTree;
private final ImmutableRTree indexedTree;
public SpatialIndexColumnPartSupplier(
GenericIndexed<ImmutableRTree> indexedTree
ImmutableRTree indexedTree
)
{
this.indexedTree = indexedTree;
this.indexedTree = (indexedTree == null) ? EMPTY_SET : indexedTree;
}
@Override
@ -46,8 +46,7 @@ public class SpatialIndexColumnPartSupplier implements Supplier<SpatialIndex>
@Override
public ImmutableRTree getRTree()
{
// There is only ever 1 RTree per dimension
return indexedTree.get(0);
return indexedTree;
}
};
}

View File

@ -21,7 +21,7 @@ package com.metamx.druid.index.v1;
import com.google.common.io.Files;
import com.metamx.druid.index.v1.serde.ComplexMetricSerde;
import com.metamx.druid.kv.FlattenedArrayWriter;
import com.metamx.druid.kv.GenericIndexedWriter;
import com.metamx.druid.kv.IOPeon;
import java.io.File;
@ -36,7 +36,7 @@ public class ComplexMetricColumnSerializer implements MetricColumnSerializer
private final IOPeon ioPeon;
private final File outDir;
private FlattenedArrayWriter writer;
private GenericIndexedWriter writer;
public ComplexMetricColumnSerializer(
String metricName,
@ -55,7 +55,7 @@ public class ComplexMetricColumnSerializer implements MetricColumnSerializer
@Override
public void open() throws IOException
{
writer = new FlattenedArrayWriter(
writer = new GenericIndexedWriter(
ioPeon, String.format("%s_%s", metricName, outDir.getName()), serde.getObjectStrategy()
);

View File

@ -25,7 +25,7 @@ import com.google.common.io.OutputSupplier;
import com.google.common.primitives.Ints;
import com.metamx.druid.collect.ResourceHolder;
import com.metamx.druid.collect.StupidResourceHolder;
import com.metamx.druid.kv.FlattenedArrayWriter;
import com.metamx.druid.kv.GenericIndexedWriter;
import com.metamx.druid.kv.IOPeon;
import java.io.IOException;
@ -50,7 +50,7 @@ public class CompressedFloatsSupplierSerializer
{
final CompressedFloatsSupplierSerializer retVal = new CompressedFloatsSupplierSerializer(
sizePer,
new FlattenedArrayWriter<ResourceHolder<FloatBuffer>>(
new GenericIndexedWriter<ResourceHolder<FloatBuffer>>(
ioPeon, filenameBase, CompressedFloatBufferObjectStrategy.getBufferForOrder(order)
)
);
@ -58,7 +58,7 @@ public class CompressedFloatsSupplierSerializer
}
private final int sizePer;
private final FlattenedArrayWriter<ResourceHolder<FloatBuffer>> flattener;
private final GenericIndexedWriter<ResourceHolder<FloatBuffer>> flattener;
private int numInserted = 0;
@ -66,7 +66,7 @@ public class CompressedFloatsSupplierSerializer
public CompressedFloatsSupplierSerializer(
int sizePer,
FlattenedArrayWriter<ResourceHolder<FloatBuffer>> flattener
GenericIndexedWriter<ResourceHolder<FloatBuffer>> flattener
)
{
this.sizePer = sizePer;

View File

@ -26,7 +26,7 @@ import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.metamx.druid.collect.ResourceHolder;
import com.metamx.druid.collect.StupidResourceHolder;
import com.metamx.druid.kv.FlattenedArrayWriter;
import com.metamx.druid.kv.GenericIndexedWriter;
import com.metamx.druid.kv.IOPeon;
import java.io.IOException;
@ -44,7 +44,7 @@ public class CompressedLongsSupplierSerializer
{
final CompressedLongsSupplierSerializer retVal = new CompressedLongsSupplierSerializer(
0xFFFF / Longs.BYTES,
new FlattenedArrayWriter<ResourceHolder<LongBuffer>>(
new GenericIndexedWriter<ResourceHolder<LongBuffer>>(
ioPeon, filenameBase, CompressedLongBufferObjectStrategy.getBufferForOrder(order)
)
);
@ -52,7 +52,7 @@ public class CompressedLongsSupplierSerializer
}
private final int sizePer;
private final FlattenedArrayWriter<ResourceHolder<LongBuffer>> flattener;
private final GenericIndexedWriter<ResourceHolder<LongBuffer>> flattener;
private int numInserted = 0;
@ -60,7 +60,7 @@ public class CompressedLongsSupplierSerializer
public CompressedLongsSupplierSerializer(
int sizePer,
FlattenedArrayWriter<ResourceHolder<LongBuffer>> flattener
GenericIndexedWriter<ResourceHolder<LongBuffer>> flattener
)
{
this.sizePer = sizePer;

View File

@ -25,6 +25,7 @@ import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@ -57,6 +58,7 @@ import org.joda.time.Interval;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
@ -82,6 +84,7 @@ public class IncrementalIndex implements Iterable<Row>
private final ImmutableList<String> metricNames;
private final LinkedHashMap<String, Integer> dimensionOrder;
private final CopyOnWriteArrayList<String> dimensions;
private final SpatialDimensionRowFormatter spatialDimensionRowFormatter;
private final DimensionHolder dimValues;
private final ConcurrentSkipListMap<TimeAndDims, Aggregator[]> facts;
@ -90,15 +93,11 @@ public class IncrementalIndex implements Iterable<Row>
// This is modified on add() by a (hopefully) single thread.
private InputRow in;
public IncrementalIndex(
long minTimestamp,
QueryGranularity gran,
final AggregatorFactory[] metrics
)
public IncrementalIndex(IncrementalIndexSchema incrementalIndexSchema)
{
this.minTimestamp = minTimestamp;
this.gran = gran;
this.metrics = metrics;
this.minTimestamp = incrementalIndexSchema.getMinTimestamp();
this.gran = incrementalIndexSchema.getGran();
this.metrics = incrementalIndexSchema.getMetrics();
final ImmutableList.Builder<String> metricNamesBuilder = ImmutableList.builder();
final ImmutableMap.Builder<String, Integer> metricIndexesBuilder = ImmutableMap.builder();
@ -115,11 +114,31 @@ public class IncrementalIndex implements Iterable<Row>
this.dimensionOrder = Maps.newLinkedHashMap();
this.dimensions = new CopyOnWriteArrayList<String>();
this.dimValues = new DimensionHolder();
int index = 0;
for (String dim : incrementalIndexSchema.getDimensions()) {
dimensionOrder.put(dim, index++);
dimensions.add(dim);
}
this.spatialDimensionRowFormatter = new SpatialDimensionRowFormatter(incrementalIndexSchema.getSpatialDimensions());
this.dimValues = new DimensionHolder();
this.facts = new ConcurrentSkipListMap<TimeAndDims, Aggregator[]>();
}
public IncrementalIndex(
long minTimestamp,
QueryGranularity gran,
final AggregatorFactory[] metrics
)
{
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build()
);
}
/**
* Adds a new row. The row might correspond with another row that already exists, in which case this will
* update that row instead of inserting a new one.
@ -132,6 +151,8 @@ public class IncrementalIndex implements Iterable<Row>
*/
public int add(InputRow row)
{
row = spatialDimensionRowFormatter.formatRow(row);
if (row.getTimestampFromEpoch() < minTimestamp) {
throw new IAE("Cannot add row[%s] because it is below the minTimestamp[%s]", row, new DateTime(minTimestamp));
}
@ -144,13 +165,6 @@ public class IncrementalIndex implements Iterable<Row>
dimension = dimension.toLowerCase();
List<String> dimensionValues = row.getDimension(dimension);
// FIXME: Must be a better way to do this
// Join all coordinate dimension values into a single string for bitmap indexing
// Split this string for spatial indexing
if (dimension.endsWith(".geo")) {
dimensionValues = Arrays.asList(JOINER.join(dimensionValues));
}
final Integer index = dimensionOrder.get(dimension);
if (index == null) {
dimensionOrder.put(dimension, dimensionOrder.size());
@ -237,7 +251,7 @@ public class IncrementalIndex implements Iterable<Row>
final String typeName = agg.getTypeName();
final String columnName = column.toLowerCase();
if(typeName.equals("float")) {
if (typeName.equals("float")) {
return new ObjectColumnSelector<Float>()
{
@Override

View File

@ -0,0 +1,128 @@
package com.metamx.druid.index.v1;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import java.util.Collections;
import java.util.List;
/**
*/
public class IncrementalIndexSchema
{
private final long minTimestamp;
private final QueryGranularity gran;
private final List<String> dimensions;
private final List<SpatialDimensionSchema> spatialDimensions;
private final AggregatorFactory[] metrics;
public IncrementalIndexSchema(
long minTimestamp,
QueryGranularity gran,
List<String> dimensions,
List<SpatialDimensionSchema> spatialDimensions,
AggregatorFactory[] metrics
)
{
this.minTimestamp = minTimestamp;
this.gran = gran;
this.dimensions = dimensions;
this.spatialDimensions = spatialDimensions;
this.metrics = metrics;
}
public long getMinTimestamp()
{
return minTimestamp;
}
public QueryGranularity getGran()
{
return gran;
}
public List<String> getDimensions()
{
return dimensions;
}
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
}
public AggregatorFactory[] getMetrics()
{
return metrics;
}
public static class Builder
{
private long minTimestamp;
private QueryGranularity gran;
private List<String> dimensions;
private List<SpatialDimensionSchema> spatialDimensions;
private AggregatorFactory[] metrics;
public Builder()
{
this.minTimestamp = 0L;
this.gran = QueryGranularity.NONE;
this.dimensions = Lists.newArrayList();
this.spatialDimensions = Lists.newArrayList();
this.metrics = new AggregatorFactory[]{};
}
public Builder withMinTimestamp(long minTimestamp)
{
this.minTimestamp = minTimestamp;
return this;
}
public Builder withQueryGranularity(QueryGranularity gran)
{
this.gran = gran;
return this;
}
public Builder withDimensions(Iterable<String> dimensions)
{
this.dimensions = Lists.newArrayList(
Iterables.transform(
dimensions, new Function<String, String>()
{
@Override
public String apply(String input)
{
return input.toLowerCase();
}
}
)
);
Collections.sort(this.dimensions);
return this;
}
public Builder withSpatialDimensions(List<SpatialDimensionSchema> spatialDimensions)
{
this.spatialDimensions = spatialDimensions;
return this;
}
public Builder withMetrics(AggregatorFactory[] metrics)
{
this.metrics = metrics;
return this;
}
public IncrementalIndexSchema build()
{
return new IncrementalIndexSchema(
minTimestamp, gran, dimensions, spatialDimensions, metrics
);
}
}
}

View File

@ -19,12 +19,11 @@
package com.metamx.druid.index.v1;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.common.logger.Logger;
import com.metamx.common.spatial.rtree.ImmutableRTree;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.joda.time.Interval;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

View File

@ -31,6 +31,7 @@ import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import com.google.common.primitives.Ints;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.io.smoosh.FileSmoosher;
@ -38,7 +39,6 @@ import com.metamx.common.io.smoosh.Smoosh;
import com.metamx.common.io.smoosh.SmooshedFileMapper;
import com.metamx.common.io.smoosh.SmooshedWriter;
import com.metamx.common.logger.Logger;
import com.metamx.common.spatial.rtree.ImmutableRTree;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.SimpleQueryableIndex;
import com.metamx.druid.index.column.Column;
@ -57,6 +57,7 @@ import com.metamx.druid.index.serde.LongGenericColumnSupplier;
import com.metamx.druid.index.serde.SpatialIndexColumnPartSupplier;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.kv.ArrayIndexed;
import com.metamx.druid.kv.ByteBufferSerializer;
import com.metamx.druid.kv.ConciseCompressedIndexedInts;
import com.metamx.druid.kv.GenericIndexed;
import com.metamx.druid.kv.IndexedIterable;
@ -333,7 +334,7 @@ public class IndexIO
Map<String, GenericIndexed<String>> dimValueLookups = Maps.newHashMap();
Map<String, VSizeIndexed> dimColumns = Maps.newHashMap();
Map<String, GenericIndexed<ImmutableConciseSet>> invertedIndexed = Maps.newHashMap();
Map<String, GenericIndexed<ImmutableRTree>> spatialIndexed = Maps.newHashMap();
Map<String, ImmutableRTree> spatialIndexed = Maps.newHashMap();
for (String dimension : IndexedIterable.create(availableDimensions)) {
ByteBuffer dimBuffer = smooshedFiles.mapFile(makeDimFile(inDir, dimension).getName());
@ -358,13 +359,11 @@ public class IndexIO
}
ByteBuffer spatialBuffer = smooshedFiles.mapFile("spatial.drd");
if (spatialBuffer != null) {
for (String dimension : IndexedIterable.create(availableDimensions)) {
spatialIndexed.put(
serializerUtils.readString(spatialBuffer),
GenericIndexed.read(spatialBuffer, IndexedRTree.objectStrategy)
);
}
while (spatialBuffer != null && spatialBuffer.hasRemaining()) {
spatialIndexed.put(
serializerUtils.readString(spatialBuffer),
ByteBufferSerializer.read(spatialBuffer, IndexedRTree.objectStrategy)
);
}
final MMappedIndex retVal = new MMappedIndex(
@ -422,12 +421,12 @@ public class IndexIO
);
}
Map<String, GenericIndexed<ImmutableRTree>> spatialIndexes = Maps.newHashMap();
Map<String, ImmutableRTree> spatialIndexes = Maps.newHashMap();
final ByteBuffer spatialBuffer = v8SmooshedFiles.mapFile("spatial.drd");
while (spatialBuffer != null && spatialBuffer.hasRemaining()) {
spatialIndexes.put(
serializerUtils.readString(spatialBuffer),
GenericIndexed.read(spatialBuffer, IndexedRTree.objectStrategy)
ByteBufferSerializer.read(spatialBuffer, IndexedRTree.objectStrategy)
);
}
@ -464,7 +463,7 @@ public class IndexIO
VSizeIndexedInts singleValCol = null;
VSizeIndexed multiValCol = VSizeIndexed.readFromByteBuffer(dimBuffer.asReadOnlyBuffer());
GenericIndexed<ImmutableConciseSet> bitmaps = bitmapIndexes.get(dimension);
GenericIndexed<ImmutableRTree> spatialIndex = spatialIndexes.get(dimension);
ImmutableRTree spatialIndex = spatialIndexes.get(dimension);
boolean onlyOneValue = true;
ConciseSet nullsSet = null;

View File

@ -34,6 +34,10 @@ import com.google.common.io.Closeables;
import com.google.common.io.Files;
import com.google.common.io.OutputSupplier;
import com.google.common.primitives.Ints;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.collections.spatial.RTree;
import com.metamx.collections.spatial.search.RadiusBound;
import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
@ -41,9 +45,6 @@ import com.metamx.common.guava.MergeIterable;
import com.metamx.common.guava.nary.BinaryFn;
import com.metamx.common.io.smoosh.Smoosh;
import com.metamx.common.logger.Logger;
import com.metamx.common.spatial.rtree.ImmutableRTree;
import com.metamx.common.spatial.rtree.RTree;
import com.metamx.common.spatial.rtree.split.LinearGutmanSplitStrategy;
import com.metamx.druid.CombiningIterable;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.ToLowerCaseAggregatorFactory;
@ -52,8 +53,9 @@ import com.metamx.druid.guava.GuavaUtils;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.v1.serde.ComplexMetricSerde;
import com.metamx.druid.index.v1.serde.ComplexMetrics;
import com.metamx.druid.kv.ByteBufferWriter;
import com.metamx.druid.kv.ConciseCompressedIndexedInts;
import com.metamx.druid.kv.FlattenedArrayWriter;
import com.metamx.druid.kv.GenericIndexedWriter;
import com.metamx.druid.kv.GenericIndexed;
import com.metamx.druid.kv.IOPeon;
import com.metamx.druid.kv.Indexed;
@ -453,7 +455,7 @@ public class IndexMerger
}
for (String dimension : mergedDimensions) {
final FlattenedArrayWriter<String> writer = new FlattenedArrayWriter<String>(
final GenericIndexedWriter<String> writer = new GenericIndexedWriter<String>(
ioPeon, dimension, GenericIndexed.stringStrategy
);
writer.open();
@ -708,7 +710,7 @@ public class IndexMerger
Indexed<String> dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.stringStrategy);
log.info("Starting dimension[%s] with cardinality[%,d]", dimension, dimVals.size());
FlattenedArrayWriter<ImmutableConciseSet> writer = new FlattenedArrayWriter<ImmutableConciseSet>(
GenericIndexedWriter<ImmutableConciseSet> writer = new GenericIndexedWriter<ImmutableConciseSet>(
ioPeon, dimension, ConciseCompressedIndexedInts.objectStrategy
);
writer.open();
@ -757,6 +759,10 @@ public class IndexMerger
for (int i = 0; i < mergedDimensions.size(); ++i) {
String dimension = mergedDimensions.get(i);
if (!dimension.endsWith(".geo")) {
continue;
}
File dimOutFile = dimOuts.get(i).getFile();
final MappedByteBuffer dimValsMapped = Files.map(dimOutFile);
@ -766,26 +772,24 @@ public class IndexMerger
Indexed<String> dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.stringStrategy);
log.info("Indexing geo dimension[%s] with cardinality[%,d]", dimension, dimVals.size());
FlattenedArrayWriter<ImmutableRTree> writer = new FlattenedArrayWriter<ImmutableRTree>(
ByteBufferWriter<ImmutableRTree> writer = new ByteBufferWriter<ImmutableRTree>(
ioPeon, dimension, IndexedRTree.objectStrategy
);
writer.open();
RTree tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50));
if (dimension.endsWith(".geo")) {
int count = 0;
for (String dimVal : IndexedIterable.create(dimVals)) {
progress.progress();
int count = 0;
for (String dimVal : IndexedIterable.create(dimVals)) {
progress.progress();
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
float[] coords = new float[stringCoords.size()];
for (int j = 0; j < coords.length; j++) {
coords[j] = Float.valueOf(stringCoords.get(j));
}
tree.insert(coords, count);
count++;
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
float[] coords = new float[stringCoords.size()];
for (int j = 0; j < coords.length; j++) {
coords[j] = Float.valueOf(stringCoords.get(j));
}
tree.insert(coords, count);
count++;
}
writer.write(ImmutableRTree.newImmutableFromMutable(tree));

View File

@ -23,8 +23,8 @@ import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.common.logger.Logger;
import com.metamx.common.spatial.rtree.ImmutableRTree;
import com.metamx.druid.kv.ConciseCompressedIndexedInts;
import com.metamx.druid.kv.GenericIndexed;
import com.metamx.druid.kv.Indexed;
@ -56,7 +56,7 @@ public class MMappedIndex
final Map<String, GenericIndexed<String>> dimValueLookups;
final Map<String, VSizeIndexed> dimColumns;
final Map<String, GenericIndexed<ImmutableConciseSet>> invertedIndexes;
final Map<String, GenericIndexed<ImmutableRTree>> spatialIndexes;
final Map<String, ImmutableRTree> spatialIndexes;
private final Map<String, Integer> metricIndexes = Maps.newHashMap();
@ -69,7 +69,7 @@ public class MMappedIndex
Map<String, GenericIndexed<String>> dimValueLookups,
Map<String, VSizeIndexed> dimColumns,
Map<String, GenericIndexed<ImmutableConciseSet>> invertedIndexes,
Map<String, GenericIndexed<ImmutableRTree>> spatialIndexes
Map<String, ImmutableRTree> spatialIndexes
)
{
this.availableDimensions = availableDimensions;
@ -148,7 +148,7 @@ public class MMappedIndex
return invertedIndexes;
}
public Map<String, GenericIndexed<ImmutableRTree>> getSpatialIndexes()
public Map<String, ImmutableRTree> getSpatialIndexes()
{
return spatialIndexes;
}
@ -187,7 +187,7 @@ public class MMappedIndex
Map<String, VSizeIndexed> dimColumns = Maps.newHashMap();
Map<String, GenericIndexed<ImmutableConciseSet>> invertedIndexes = Maps.newLinkedHashMap();
Map<String, GenericIndexed<ImmutableRTree>> spatialIndexes = Maps.newLinkedHashMap();
Map<String, ImmutableRTree> spatialIndexes = Maps.newLinkedHashMap();
for (String dimension : Arrays.asList(index.dimensions)) {
final String[] dimVals = index.reverseDimLookup.get(dimension);
@ -260,10 +260,7 @@ public class MMappedIndex
)
);
spatialIndexes.put(
dimension,
GenericIndexed.fromIterable(Arrays.asList(index.getSpatialIndex(dimension)), IndexedRTree.objectStrategy)
);
spatialIndexes.put(dimension, index.getSpatialIndex(dimension));
}
log.info("Making MMappedIndex");

View File

@ -27,7 +27,7 @@ import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.druid.index.v1.serde.ComplexMetricSerde;
import com.metamx.druid.index.v1.serde.ComplexMetrics;
import com.metamx.druid.kv.FlattenedArrayWriter;
import com.metamx.druid.kv.GenericIndexedWriter;
import com.metamx.druid.kv.GenericIndexed;
import com.metamx.druid.kv.Indexed;
import com.metamx.druid.kv.IndexedFloats;
@ -63,7 +63,7 @@ public class MetricHolder
}
public static void writeComplexMetric(
OutputSupplier<? extends OutputStream> outSupplier, String name, String typeName, FlattenedArrayWriter column
OutputSupplier<? extends OutputStream> outSupplier, String name, String typeName, GenericIndexedWriter column
) throws IOException
{
OutputStream out = null;

View File

@ -0,0 +1,107 @@
package com.metamx.druid.index.v1;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.druid.input.InputRow;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
*/
public class SpatialDimensionRowFormatter
{
private static final Joiner JOINER = Joiner.on(",");
private final List<SpatialDimensionSchema> spatialDimensions;
private final Set<String> spatialDimNames;
public SpatialDimensionRowFormatter(List<SpatialDimensionSchema> spatialDimensions)
{
this.spatialDimensions = spatialDimensions;
this.spatialDimNames = Sets.newHashSet(
Iterables.concat(
Lists.transform(
spatialDimensions,
new Function<SpatialDimensionSchema, List<String>>()
{
@Override
public List<String> apply(SpatialDimensionSchema input)
{
return input.getDims();
}
}
)
)
);
}
public InputRow formatRow(final InputRow row)
{
final Map<String, List<String>> finalDims = Maps.newHashMap();
// remove all spatial dimensions
Set<String> filtered = Sets.filter(
Sets.newHashSet(row.getDimensions()),
new Predicate<String>()
{
@Override
public boolean apply(String input)
{
return !spatialDimNames.contains(input);
}
}
);
for (String dim : filtered) {
finalDims.put(dim, row.getDimension(dim));
}
InputRow retVal = new InputRow()
{
@Override
public List<String> getDimensions()
{
return Lists.newArrayList(finalDims.keySet());
}
@Override
public long getTimestampFromEpoch()
{
return row.getTimestampFromEpoch();
}
@Override
public List<String> getDimension(String dimension)
{
return finalDims.get(dimension);
}
@Override
public float getFloatMetric(String metric)
{
return row.getFloatMetric(metric);
}
};
for (SpatialDimensionSchema spatialDimension : spatialDimensions) {
List<String> spatialDimVals = Lists.newArrayList();
for (String partialSpatialDim : spatialDimension.getDims()) {
List<String> dimVals = row.getDimension(partialSpatialDim);
if (dimVals == null || dimVals.isEmpty()) {
return retVal;
}
spatialDimVals.addAll(dimVals);
}
finalDims.put(spatialDimension.getDimName(), Arrays.asList(JOINER.join(spatialDimVals)));
}
return retVal;
}
}

View File

@ -0,0 +1,40 @@
package com.metamx.druid.index.v1;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import java.util.List;
/**
*/
public class SpatialDimensionSchema
{
private final String dimName;
private final List<String> dims;
public SpatialDimensionSchema(String dimName, List<String> dims)
{
this.dimName = dimName.toLowerCase();
this.dims = Lists.transform(
dims,
new Function<String, String>()
{
@Override
public String apply(String input)
{
return input.toLowerCase();
}
}
);
}
public String getDimName()
{
return dimName;
}
public List<String> getDims()
{
return dims;
}
}

View File

@ -0,0 +1,30 @@
package com.metamx.druid.kv;
import com.google.common.primitives.Ints;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
/**
*/
public class ByteBufferSerializer<T>
{
public static <T> T read(ByteBuffer buffer, ObjectStrategy<T> strategy)
{
int size = buffer.getInt();
ByteBuffer bufferToUse = buffer.asReadOnlyBuffer();
bufferToUse.limit(bufferToUse.position() + size);
buffer.position(bufferToUse.limit());
return strategy.fromByteBuffer(bufferToUse, size);
}
public static <T> void writeToChannel(T obj, ObjectStrategy<T> strategy, WritableByteChannel channel)
throws IOException
{
byte[] toWrite = strategy.toBytes(obj);
channel.write(ByteBuffer.allocate(Ints.BYTES).putInt(0, toWrite.length));
channel.write(ByteBuffer.wrap(toWrite));
}
}

View File

@ -0,0 +1,92 @@
package com.metamx.druid.kv;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.io.ByteStreams;
import com.google.common.io.CountingOutputStream;
import com.google.common.io.InputSupplier;
import com.google.common.primitives.Ints;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
/**
*/
public class ByteBufferWriter<T> implements Closeable
{
private final IOPeon ioPeon;
private final String filenameBase;
private final ObjectStrategy<T> strategy;
private CountingOutputStream headerOut = null;
private CountingOutputStream valueOut = null;
public ByteBufferWriter(
IOPeon ioPeon,
String filenameBase,
ObjectStrategy<T> strategy
)
{
this.ioPeon = ioPeon;
this.filenameBase = filenameBase;
this.strategy = strategy;
}
public void open() throws IOException
{
headerOut = new CountingOutputStream(ioPeon.makeOutputStream(makeFilename("header")));
valueOut = new CountingOutputStream(ioPeon.makeOutputStream(makeFilename("value")));
}
public void write(T objectToWrite) throws IOException
{
byte[] bytesToWrite = strategy.toBytes(objectToWrite);
headerOut.write(Ints.toByteArray(bytesToWrite.length));
valueOut.write(bytesToWrite);
}
private String makeFilename(String suffix)
{
return String.format("%s.%s", filenameBase, suffix);
}
@Override
public void close() throws IOException
{
headerOut.close();
valueOut.close();
final long numBytesWritten = headerOut.getCount() + valueOut.getCount();
Preconditions.checkState(
numBytesWritten < Integer.MAX_VALUE, "Wrote[%s] bytes, which is too many.", numBytesWritten
);
}
public InputSupplier<InputStream> combineStreams()
{
return ByteStreams.join(
Iterables.transform(
Arrays.asList("header", "value"),
new Function<String, InputSupplier<InputStream>>()
{
@Override
public InputSupplier<InputStream> apply(final String input)
{
return new InputSupplier<InputStream>()
{
@Override
public InputStream getInput() throws IOException
{
return ioPeon.makeInputStream(makeFilename(input));
}
};
}
}
)
);
}
}

View File

@ -36,7 +36,7 @@ import java.util.Arrays;
/**
* Streams arrays of objects out in the binary format described by GenericIndexed
*/
public class FlattenedArrayWriter<T> implements Closeable
public class GenericIndexedWriter<T> implements Closeable
{
private static final byte[] EMPTY_ARRAY = new byte[]{};
@ -51,7 +51,7 @@ public class FlattenedArrayWriter<T> implements Closeable
private CountingOutputStream valuesOut = null;
int numWritten = 0;
public FlattenedArrayWriter(
public GenericIndexedWriter(
IOPeon ioPeon,
String filenameBase,
ObjectStrategy<T> strategy

View File

@ -1,7 +1,7 @@
package com.metamx.druid.kv;
import com.google.common.collect.Ordering;
import com.metamx.common.spatial.rtree.ImmutableRTree;
import com.metamx.collections.spatial.ImmutableRTree;
import java.nio.ByteBuffer;

View File

@ -19,26 +19,19 @@
package com.metamx.druid.index.v1;
import com.google.common.collect.Maps;
import com.google.common.io.OutputSupplier;
import com.metamx.druid.collect.ResourceHolder;
import com.metamx.druid.kv.FlattenedArrayWriter;
import com.metamx.druid.kv.IOPeon;
import com.metamx.druid.kv.GenericIndexedWriter;
import com.metamx.druid.kv.IndexedFloats;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.FloatBuffer;
import java.util.Map;
/**
*/
@ -50,7 +43,7 @@ public class CompressedFloatsSupplierSerializerTest
final ByteOrder order = ByteOrder.nativeOrder();
CompressedFloatsSupplierSerializer serializer = new CompressedFloatsSupplierSerializer(
999,
new FlattenedArrayWriter<ResourceHolder<FloatBuffer>>(
new GenericIndexedWriter<ResourceHolder<FloatBuffer>>(
new IOPeonForTesting(),
"test",
CompressedFloatBufferObjectStrategy.getBufferForOrder(order)

View File

@ -21,7 +21,7 @@ package com.metamx.druid.index.v1;
import com.google.common.io.OutputSupplier;
import com.metamx.druid.collect.ResourceHolder;
import com.metamx.druid.kv.FlattenedArrayWriter;
import com.metamx.druid.kv.GenericIndexedWriter;
import com.metamx.druid.kv.IndexedLongs;
import org.junit.Assert;
import org.junit.Test;
@ -43,7 +43,7 @@ public class CompressedLongsSupplierSerializerTest
final ByteOrder order = ByteOrder.nativeOrder();
CompressedLongsSupplierSerializer serializer = new CompressedLongsSupplierSerializer(
999,
new FlattenedArrayWriter<ResourceHolder<LongBuffer>>(
new GenericIndexedWriter<ResourceHolder<LongBuffer>>(
new IOPeonForTesting(),
"test",
CompressedLongBufferObjectStrategy.getBufferForOrder(order)

View File

@ -38,7 +38,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.22.0</metamx.java-util.version>
<metamx.java-util.version>0.22.3-SNAPSHOT</metamx.java-util.version>
<netflix.curator.version>2.0.1-21-22</netflix.curator.version>
</properties>
@ -72,6 +72,11 @@
<artifactId>java-util</artifactId>
<version>${metamx.java-util.version}</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>bytebuffer-collections</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>server-metrics</artifactId>

View File

@ -19,7 +19,7 @@
package com.metamx.druid.index.brita;
import com.metamx.common.spatial.rtree.ImmutableRTree;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.druid.kv.Indexed;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;

View File

@ -18,11 +18,7 @@
*/
package com.metamx.druid.index.brita;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.metamx.common.spatial.rtree.ImmutableRTree;
import com.metamx.common.spatial.rtree.search.Bound;
import com.metamx.collections.spatial.search.Bound;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
/**
@ -44,22 +40,7 @@ public class SpatialFilter implements Filter
@Override
public ImmutableConciseSet goConcise(final BitmapIndexSelector selector)
{
//ImmutableRTree foo = selector.getSpatialIndex(dimension);
//ImmutableRTree.print(foo);
Iterable<Integer> indexes = selector.getSpatialIndex(dimension).search(bound);
return ImmutableConciseSet.union(
Iterables.transform(
indexes,
new Function<Integer, ImmutableConciseSet>()
{
@Override
public ImmutableConciseSet apply(Integer input)
{
return selector.getConciseInvertedIndex(dimension, input);
}
}
)
);
return ImmutableConciseSet.union(selector.getSpatialIndex(dimension).search(bound));
}
@Override

View File

@ -20,7 +20,7 @@
package com.metamx.druid.index.brita;
import com.google.common.base.Predicate;
import com.metamx.common.spatial.rtree.search.Bound;
import com.metamx.collections.spatial.search.Bound;
/**
*/

View File

@ -26,10 +26,10 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.collections.spatial.search.Bound;
import com.metamx.common.IAE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.FunctionalIterator;
import com.metamx.common.spatial.rtree.search.Bound;
import com.metamx.druid.Capabilities;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.StorageAdapter;

View File

@ -25,10 +25,10 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.common.collect.MoreIterators;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.FunctionalIterator;
import com.metamx.common.spatial.rtree.ImmutableRTree;
import com.metamx.druid.BaseStorageAdapter;
import com.metamx.druid.Capabilities;
import com.metamx.druid.QueryGranularity;

View File

@ -3,8 +3,8 @@ package com.metamx.druid.index.brita;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.spatial.rtree.search.RadiusBound;
import com.metamx.common.spatial.rtree.search.RectangularBound;
import com.metamx.collections.spatial.search.RadiusBound;
import com.metamx.collections.spatial.search.RectangularBound;
import com.metamx.druid.Druids;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.TestHelper;
@ -16,9 +16,10 @@ import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.QueryableIndexSegment;
import com.metamx.druid.index.Segment;
import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.index.v1.IncrementalIndexSchema;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.index.v1.TestIndex;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.query.FinalizeResultsQueryRunner;
import com.metamx.druid.query.QueryRunner;
@ -52,7 +53,7 @@ public class SpatialFilterTest
new LongSumAggregatorFactory("val", "val")
};
private static List<String> DIMS = Lists.newArrayList("dim", "dim.geo");
private static List<String> DIMS = Lists.newArrayList("dim", "lat", "long");
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
@ -78,9 +79,17 @@ public class SpatialFilterTest
private static IncrementalIndex makeIncrementalIndex() throws IOException
{
IncrementalIndex theIndex = new IncrementalIndex(
DATA_INTERVAL.getStartMillis(),
QueryGranularity.DAY,
METRIC_AGGS
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
)
)
).build()
);
theIndex.add(
new MapBasedInputRow(
@ -89,7 +98,8 @@ public class SpatialFilterTest
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-01").toString(),
"dim", "foo",
"dim.geo", Arrays.asList(0.0f, 0.0f),
"lat", 0.0f,
"long", 0.0f,
"val", 17l
)
)
@ -101,7 +111,8 @@ public class SpatialFilterTest
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-02").toString(),
"dim", "foo",
"dim.geo", Arrays.asList(1.0f, 3.0f),
"lat", 1.0f,
"long", 3.0f,
"val", 29l
)
)
@ -113,7 +124,8 @@ public class SpatialFilterTest
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-03").toString(),
"dim", "foo",
"dim.geo", Arrays.asList(4.0f, 2.0f),
"lat", 4.0f,
"long", 2.0f,
"val", 13l
)
)
@ -125,7 +137,8 @@ public class SpatialFilterTest
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-04").toString(),
"dim", "foo",
"dim.geo", Arrays.asList(7.0f, 3.0f),
"lat", 7.0f,
"long", 3.0f,
"val", 91l
)
)
@ -137,7 +150,8 @@ public class SpatialFilterTest
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"dim", "foo",
"dim.geo", Arrays.asList(8.0f, 6.0f),
"lat", 8.0f,
"long", 6.0f,
"val", 47l
)
)
@ -151,14 +165,11 @@ public class SpatialFilterTest
new DateTime("2013-01-01").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp",
new DateTime("2013-01-01").toString(),
"dim",
"boo",
"dim.geo",
Arrays.asList((float) (rand.nextFloat() * 10 + 10.0), (float) (rand.nextFloat() * 10 + 10.0)),
"val",
i
"timestamp", new DateTime("2013-01-01").toString(),
"dim", "boo",
"lat", (float) (rand.nextFloat() * 10 + 10.0),
"long", (float) (rand.nextFloat() * 10 + 10.0),
"val", i
)
)
);
@ -183,19 +194,43 @@ public class SpatialFilterTest
{
try {
IncrementalIndex first = new IncrementalIndex(
DATA_INTERVAL.getStartMillis(),
QueryGranularity.DAY,
METRIC_AGGS
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
)
)
).build()
);
IncrementalIndex second = new IncrementalIndex(
DATA_INTERVAL.getStartMillis(),
QueryGranularity.DAY,
METRIC_AGGS
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
)
)
).build()
);
IncrementalIndex third = new IncrementalIndex(
DATA_INTERVAL.getStartMillis(),
QueryGranularity.DAY,
METRIC_AGGS
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
)
)
).build()
);