Merge pull request #127 from metamx/spatial

Spatial indexes
This commit is contained in:
cheddar 2013-05-15 15:50:25 -07:00
commit 89f0a6cc09
57 changed files with 2032 additions and 283 deletions

View File

@ -33,7 +33,8 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo;
@JsonSubTypes.Type(name="extraction", value=ExtractionDimFilter.class),
@JsonSubTypes.Type(name="regex", value=RegexDimFilter.class),
@JsonSubTypes.Type(name="search", value=SearchQueryDimFilter.class),
@JsonSubTypes.Type(name="javascript", value=JavaScriptDimFilter.class)
@JsonSubTypes.Type(name="javascript", value=JavaScriptDimFilter.class),
@JsonSubTypes.Type(name="spatial", value=SpatialDimFilter.class)
})
public interface DimFilter
{

View File

@ -35,6 +35,7 @@ class DimFilterCacheHelper
static final byte REGEX_CACHE_ID = 0x5;
static final byte SEARCH_QUERY_TYPE_ID = 0x6;
static final byte JAVASCRIPT_CACHE_ID = 0x7;
static final byte SPATIAL_CACHE_ID = 0x8;
static byte[] computeCacheKey(byte cacheIdKey, List<DimFilter> filters)
{

View File

@ -0,0 +1,102 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.query.filter;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.metamx.collections.spatial.search.Bound;
import java.nio.ByteBuffer;
/**
*/
public class SpatialDimFilter implements DimFilter
{
private final String dimension;
private final Bound bound;
@JsonCreator
public SpatialDimFilter(
@JsonProperty("dimension") String dimension,
@JsonProperty("bound") Bound bound
)
{
Preconditions.checkArgument(dimension != null, "dimension must not be null");
Preconditions.checkArgument(bound != null, "bound must not be null");
this.dimension = dimension;
this.bound = bound;
}
@Override
public byte[] getCacheKey()
{
byte[] dimBytes = dimension.getBytes();
byte[] boundBytes = bound.getCacheKey();
return ByteBuffer.allocate(1 + dimBytes.length + boundBytes.length)
.put(DimFilterCacheHelper.SPATIAL_CACHE_ID)
.put(dimBytes)
.put(boundBytes)
.array();
}
@JsonProperty
public String getDimension()
{
return dimension;
}
@JsonProperty
public Bound getBound()
{
return bound;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SpatialDimFilter that = (SpatialDimFilter) o;
if (bound != null ? !bound.equals(that.bound) : that.bound != null) {
return false;
}
if (dimension != null ? !dimension.equals(that.dimension) : that.dimension != null) {
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = dimension != null ? dimension.hashCode() : 0;
result = 31 * result + (bound != null ? bound.hashCode() : 0);
return result;
}
}

View File

@ -18,7 +18,8 @@
~ Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.metamx.druid</groupId>
<artifactId>druid-index-common</artifactId>
@ -42,6 +43,10 @@
<groupId>com.metamx</groupId>
<artifactId>java-util</artifactId>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>bytebuffer-collections</artifactId>
</dependency>
<dependency>
<groupId>com.ning</groupId>

View File

@ -29,6 +29,12 @@ public abstract class AbstractColumn implements Column
throw new UnsupportedOperationException();
}
@Override
public SpatialIndex getSpatialIndex()
{
throw new UnsupportedOperationException();
}
@Override
public ComplexColumn getComplexColumn()
{

View File

@ -28,4 +28,5 @@ public interface BitmapIndex
public int getCardinality();
public String getValue(int index);
public ImmutableConciseSet getConciseSet(String value);
public ImmutableConciseSet getConciseSet(int idx);
}

View File

@ -31,4 +31,5 @@ public interface Column
public GenericColumn getGenericColumn();
public ComplexColumn getComplexColumn();
public BitmapIndex getBitmapIndex();
public SpatialIndex getSpatialIndex();
}

View File

@ -34,6 +34,7 @@ public class ColumnBuilder
private Supplier<GenericColumn> genericColumn = null;
private Supplier<ComplexColumn> complexColumn = null;
private Supplier<BitmapIndex> bitmapIndex = null;
private Supplier<SpatialIndex> spatialIndex = null;
public ColumnBuilder setType(ValueType type)
{
@ -77,6 +78,12 @@ public class ColumnBuilder
return this;
}
public ColumnBuilder setSpatialIndex(Supplier<SpatialIndex> spatialIndex)
{
this.spatialIndex = spatialIndex;
return this;
}
public Column build()
{
Preconditions.checkState(type != null, "Type must be set.");
@ -86,6 +93,7 @@ public class ColumnBuilder
.setType(type)
.setDictionaryEncoded(dictionaryEncodedColumn != null)
.setHasBitmapIndexes(bitmapIndex != null)
.setHasSpatialIndexes(spatialIndex != null)
.setRunLengthEncoded(runLengthColumn != null)
.setHasMultipleValues(hasMultipleValues)
,
@ -93,7 +101,8 @@ public class ColumnBuilder
runLengthColumn,
genericColumn,
complexColumn,
bitmapIndex
bitmapIndex,
spatialIndex
);
}
}

View File

@ -28,5 +28,6 @@ public interface ColumnCapabilities
public boolean isDictionaryEncoded();
public boolean isRunLengthEncoded();
public boolean hasBitmapIndexes();
public boolean hasSpatialIndexes();
public boolean hasMultipleValues();
}

View File

@ -29,6 +29,7 @@ public class ColumnCapabilitiesImpl implements ColumnCapabilities
private boolean dictionaryEncoded = false;
private boolean runLengthEncoded = false;
private boolean hasInvertedIndexes = false;
private boolean hasSpatialIndexes = false;
private boolean hasMultipleValues = false;
@Override
@ -83,6 +84,19 @@ public class ColumnCapabilitiesImpl implements ColumnCapabilities
return this;
}
@Override
@JsonProperty("hasSpatialIndexes")
public boolean hasSpatialIndexes()
{
return hasSpatialIndexes;
}
public ColumnCapabilitiesImpl setHasSpatialIndexes(boolean hasSpatialIndexes)
{
this.hasSpatialIndexes = hasSpatialIndexes;
return this;
}
@Override
@JsonProperty("hasMultipleValues")
public boolean hasMultipleValues()

View File

@ -32,6 +32,7 @@ class SimpleColumn implements Column
private final Supplier<GenericColumn> genericColumn;
private final Supplier<ComplexColumn> complexColumn;
private final Supplier<BitmapIndex> bitmapIndex;
private final Supplier<SpatialIndex> spatialIndex;
SimpleColumn(
ColumnCapabilities capabilities,
@ -39,7 +40,8 @@ class SimpleColumn implements Column
Supplier<RunLengthColumn> runLengthColumn,
Supplier<GenericColumn> genericColumn,
Supplier<ComplexColumn> complexColumn,
Supplier<BitmapIndex> bitmapIndex
Supplier<BitmapIndex> bitmapIndex,
Supplier<SpatialIndex> spatialIndex
)
{
this.capabilities = capabilities;
@ -48,6 +50,7 @@ class SimpleColumn implements Column
this.genericColumn = genericColumn;
this.complexColumn = complexColumn;
this.bitmapIndex = bitmapIndex;
this.spatialIndex = spatialIndex;
}
@Override
@ -98,4 +101,10 @@ class SimpleColumn implements Column
{
return bitmapIndex == null ? null : bitmapIndex.get();
}
@Override
public SpatialIndex getSpatialIndex()
{
return spatialIndex == null ? null : spatialIndex.get();
}
}

View File

@ -0,0 +1,28 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.index.column;
import com.metamx.collections.spatial.ImmutableRTree;
/**
*/
public interface SpatialIndex
{
public ImmutableRTree getRTree();
}

View File

@ -63,11 +63,17 @@ public class BitmapIndexColumnPartSupplier implements Supplier<BitmapIndex>
{
final int index = dictionary.indexOf(value);
if (index < 0) {
return getConciseSet(index);
}
@Override
public ImmutableConciseSet getConciseSet(int idx)
{
if (idx < 0) {
return EMPTY_SET;
}
final ImmutableConciseSet bitmap = bitmaps.get(index);
final ImmutableConciseSet bitmap = bitmaps.get(idx);
return bitmap == null ? EMPTY_SET : bitmap;
}
};

View File

@ -21,11 +21,15 @@ package com.metamx.druid.index.serde;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.primitives.Ints;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.common.IAE;
import com.metamx.druid.index.column.ColumnBuilder;
import com.metamx.druid.index.column.ValueType;
import com.metamx.druid.kv.ByteBufferSerializer;
import com.metamx.druid.kv.ConciseCompressedIndexedInts;
import com.metamx.druid.kv.GenericIndexed;
import com.metamx.druid.kv.IndexedRTree;
import com.metamx.druid.kv.VSizeIndexed;
import com.metamx.druid.kv.VSizeIndexedInts;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
@ -35,7 +39,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
/**
*/
*/
public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
{
@JsonCreator
@ -48,6 +52,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
private final VSizeIndexedInts singleValuedColumn;
private final VSizeIndexed multiValuedColumn;
private final GenericIndexed<ImmutableConciseSet> bitmaps;
private final ImmutableRTree spatialIndex;
private final long size;
@ -55,25 +60,28 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
GenericIndexed<String> dictionary,
VSizeIndexedInts singleValCol,
VSizeIndexed multiValCol,
GenericIndexed<ImmutableConciseSet> bitmaps
GenericIndexed<ImmutableConciseSet> bitmaps,
ImmutableRTree spatialIndex
)
{
this.dictionary = dictionary;
this.singleValuedColumn = singleValCol;
this.multiValuedColumn = multiValCol;
this.bitmaps = bitmaps;
this.spatialIndex = spatialIndex;
long size = dictionary.getSerializedSize();
if (singleValCol != null && multiValCol == null) {
size += singleValCol.getSerializedSize();
}
else if (singleValCol == null && multiValCol != null) {
} else if (singleValCol == null && multiValCol != null) {
size += multiValCol.getSerializedSize();
}
else {
} else {
throw new IAE("Either singleValCol[%s] or multiValCol[%s] must be set", singleValCol, multiValCol);
}
size += bitmaps.getSerializedSize();
if (spatialIndex != null) {
size += spatialIndex.size() + Ints.BYTES;
}
this.size = size;
}
@ -84,6 +92,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
singleValuedColumn = null;
multiValuedColumn = null;
bitmaps = null;
spatialIndex = null;
size = 0;
}
@ -106,11 +115,13 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
dictionary.writeToChannel(channel);
if (isSingleValued()) {
singleValuedColumn.writeToChannel(channel);
}
else {
} else {
multiValuedColumn.writeToChannel(channel);
}
bitmaps.writeToChannel(channel);
if (spatialIndex != null) {
ByteBufferSerializer.writeToChannel(spatialIndex, IndexedRTree.objectStrategy, channel);
}
}
@Override
@ -128,8 +139,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
multiValuedColumn = null;
builder.setHasMultipleValues(false)
.setDictionaryEncodedColumn(new DictionaryEncodedColumnSupplier(dictionary, singleValuedColumn, null));
}
else {
} else {
singleValuedColumn = null;
multiValuedColumn = VSizeIndexed.readFromByteBuffer(buffer);
builder.setHasMultipleValues(true)
@ -139,9 +149,22 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
GenericIndexed<ImmutableConciseSet> bitmaps = GenericIndexed.read(
buffer, ConciseCompressedIndexedInts.objectStrategy
);
builder.setBitmapIndex(new BitmapIndexColumnPartSupplier(bitmaps, dictionary));
return new DictionaryEncodedColumnPartSerde(dictionary, singleValuedColumn, multiValuedColumn, bitmaps);
ImmutableRTree spatialIndex = null;
if (buffer.hasRemaining()) {
spatialIndex = ByteBufferSerializer.read(
buffer, IndexedRTree.objectStrategy
);
builder.setSpatialIndex(new SpatialIndexColumnPartSupplier(spatialIndex));
}
return new DictionaryEncodedColumnPartSerde(
dictionary,
singleValuedColumn,
multiValuedColumn,
bitmaps,
spatialIndex
);
}
}

View File

@ -0,0 +1,53 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.index.serde;
import com.google.common.base.Supplier;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.druid.index.column.SpatialIndex;
import com.metamx.druid.kv.GenericIndexed;
/**
*/
public class SpatialIndexColumnPartSupplier implements Supplier<SpatialIndex>
{
private static final ImmutableRTree EMPTY_SET = new ImmutableRTree();
private final ImmutableRTree indexedTree;
public SpatialIndexColumnPartSupplier(
ImmutableRTree indexedTree
)
{
this.indexedTree = (indexedTree == null) ? EMPTY_SET : indexedTree;
}
@Override
public SpatialIndex get()
{
return new SpatialIndex()
{
@Override
public ImmutableRTree getRTree()
{
return indexedTree;
}
};
}
}

View File

@ -21,7 +21,7 @@ package com.metamx.druid.index.v1;
import com.google.common.io.Files;
import com.metamx.druid.index.v1.serde.ComplexMetricSerde;
import com.metamx.druid.kv.FlattenedArrayWriter;
import com.metamx.druid.kv.GenericIndexedWriter;
import com.metamx.druid.kv.IOPeon;
import java.io.File;
@ -36,7 +36,7 @@ public class ComplexMetricColumnSerializer implements MetricColumnSerializer
private final IOPeon ioPeon;
private final File outDir;
private FlattenedArrayWriter writer;
private GenericIndexedWriter writer;
public ComplexMetricColumnSerializer(
String metricName,
@ -55,7 +55,7 @@ public class ComplexMetricColumnSerializer implements MetricColumnSerializer
@Override
public void open() throws IOException
{
writer = new FlattenedArrayWriter(
writer = new GenericIndexedWriter(
ioPeon, String.format("%s_%s", metricName, outDir.getName()), serde.getObjectStrategy()
);

View File

@ -25,7 +25,7 @@ import com.google.common.io.OutputSupplier;
import com.google.common.primitives.Ints;
import com.metamx.druid.collect.ResourceHolder;
import com.metamx.druid.collect.StupidResourceHolder;
import com.metamx.druid.kv.FlattenedArrayWriter;
import com.metamx.druid.kv.GenericIndexedWriter;
import com.metamx.druid.kv.IOPeon;
import java.io.IOException;
@ -50,7 +50,7 @@ public class CompressedFloatsSupplierSerializer
{
final CompressedFloatsSupplierSerializer retVal = new CompressedFloatsSupplierSerializer(
sizePer,
new FlattenedArrayWriter<ResourceHolder<FloatBuffer>>(
new GenericIndexedWriter<ResourceHolder<FloatBuffer>>(
ioPeon, filenameBase, CompressedFloatBufferObjectStrategy.getBufferForOrder(order)
)
);
@ -58,7 +58,7 @@ public class CompressedFloatsSupplierSerializer
}
private final int sizePer;
private final FlattenedArrayWriter<ResourceHolder<FloatBuffer>> flattener;
private final GenericIndexedWriter<ResourceHolder<FloatBuffer>> flattener;
private int numInserted = 0;
@ -66,7 +66,7 @@ public class CompressedFloatsSupplierSerializer
public CompressedFloatsSupplierSerializer(
int sizePer,
FlattenedArrayWriter<ResourceHolder<FloatBuffer>> flattener
GenericIndexedWriter<ResourceHolder<FloatBuffer>> flattener
)
{
this.sizePer = sizePer;

View File

@ -26,7 +26,7 @@ import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import com.metamx.druid.collect.ResourceHolder;
import com.metamx.druid.collect.StupidResourceHolder;
import com.metamx.druid.kv.FlattenedArrayWriter;
import com.metamx.druid.kv.GenericIndexedWriter;
import com.metamx.druid.kv.IOPeon;
import java.io.IOException;
@ -44,7 +44,7 @@ public class CompressedLongsSupplierSerializer
{
final CompressedLongsSupplierSerializer retVal = new CompressedLongsSupplierSerializer(
0xFFFF / Longs.BYTES,
new FlattenedArrayWriter<ResourceHolder<LongBuffer>>(
new GenericIndexedWriter<ResourceHolder<LongBuffer>>(
ioPeon, filenameBase, CompressedLongBufferObjectStrategy.getBufferForOrder(order)
)
);
@ -52,7 +52,7 @@ public class CompressedLongsSupplierSerializer
}
private final int sizePer;
private final FlattenedArrayWriter<ResourceHolder<LongBuffer>> flattener;
private final GenericIndexedWriter<ResourceHolder<LongBuffer>> flattener;
private int numInserted = 0;
@ -60,7 +60,7 @@ public class CompressedLongsSupplierSerializer
public CompressedLongsSupplierSerializer(
int sizePer,
FlattenedArrayWriter<ResourceHolder<LongBuffer>> flattener
GenericIndexedWriter<ResourceHolder<LongBuffer>> flattener
)
{
this.sizePer = sizePer;

View File

@ -20,6 +20,7 @@
package com.metamx.druid.index.v1;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.collect.ImmutableList;
@ -36,11 +37,6 @@ import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.Aggregator;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.post.PostAggregator;
import com.metamx.druid.index.column.Column;
import com.metamx.druid.index.column.ComplexColumn;
import com.metamx.druid.index.column.DictionaryEncodedColumn;
import com.metamx.druid.index.column.GenericColumn;
import com.metamx.druid.index.column.ValueType;
import com.metamx.druid.index.v1.serde.ComplexMetricExtractor;
import com.metamx.druid.index.v1.serde.ComplexMetricSerde;
import com.metamx.druid.index.v1.serde.ComplexMetrics;
@ -70,6 +66,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
public class IncrementalIndex implements Iterable<Row>
{
private static final Logger log = new Logger(IncrementalIndex.class);
private static final Joiner JOINER = Joiner.on(",");
private final long minTimestamp;
private final QueryGranularity gran;
@ -80,6 +77,8 @@ public class IncrementalIndex implements Iterable<Row>
private final ImmutableList<String> metricNames;
private final LinkedHashMap<String, Integer> dimensionOrder;
private final CopyOnWriteArrayList<String> dimensions;
private final List<SpatialDimensionSchema> spatialDimensions;
private final SpatialDimensionRowFormatter spatialDimensionRowFormatter;
private final DimensionHolder dimValues;
private final ConcurrentSkipListMap<TimeAndDims, Aggregator[]> facts;
@ -88,15 +87,11 @@ public class IncrementalIndex implements Iterable<Row>
// This is modified on add() by a (hopefully) single thread.
private InputRow in;
public IncrementalIndex(
long minTimestamp,
QueryGranularity gran,
final AggregatorFactory[] metrics
)
public IncrementalIndex(IncrementalIndexSchema incrementalIndexSchema)
{
this.minTimestamp = minTimestamp;
this.gran = gran;
this.metrics = metrics;
this.minTimestamp = incrementalIndexSchema.getMinTimestamp();
this.gran = incrementalIndexSchema.getGran();
this.metrics = incrementalIndexSchema.getMetrics();
final ImmutableList.Builder<String> metricNamesBuilder = ImmutableList.builder();
final ImmutableMap.Builder<String, Integer> metricIndexesBuilder = ImmutableMap.builder();
@ -113,11 +108,32 @@ public class IncrementalIndex implements Iterable<Row>
this.dimensionOrder = Maps.newLinkedHashMap();
this.dimensions = new CopyOnWriteArrayList<String>();
this.dimValues = new DimensionHolder();
int index = 0;
for (String dim : incrementalIndexSchema.getDimensions()) {
dimensionOrder.put(dim, index++);
dimensions.add(dim);
}
this.spatialDimensions = incrementalIndexSchema.getSpatialDimensions();
this.spatialDimensionRowFormatter = new SpatialDimensionRowFormatter(spatialDimensions);
this.dimValues = new DimensionHolder();
this.facts = new ConcurrentSkipListMap<TimeAndDims, Aggregator[]>();
}
public IncrementalIndex(
long minTimestamp,
QueryGranularity gran,
final AggregatorFactory[] metrics
)
{
this(
new IncrementalIndexSchema.Builder().withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build()
);
}
/**
* Adds a new row. The row might correspond with another row that already exists, in which case this will
* update that row instead of inserting a new one.
@ -130,6 +146,8 @@ public class IncrementalIndex implements Iterable<Row>
*/
public int add(InputRow row)
{
row = spatialDimensionRowFormatter.formatRow(row);
if (row.getTimestampFromEpoch() < minTimestamp) {
throw new IAE("Cannot add row[%s] because it is below the minTimestamp[%s]", row, new DateTime(minTimestamp));
}
@ -140,6 +158,7 @@ public class IncrementalIndex implements Iterable<Row>
List<String[]> overflow = null;
for (String dimension : rowDimensions) {
dimension = dimension.toLowerCase();
List<String> dimensionValues = row.getDimension(dimension);
final Integer index = dimensionOrder.get(dimension);
if (index == null) {
@ -149,9 +168,9 @@ public class IncrementalIndex implements Iterable<Row>
if (overflow == null) {
overflow = Lists.newArrayList();
}
overflow.add(getDimVals(dimValues.add(dimension), row.getDimension(dimension)));
overflow.add(getDimVals(dimValues.add(dimension), dimensionValues));
} else {
dims[index] = getDimVals(dimValues.get(dimension), row.getDimension(dimension));
dims[index] = getDimVals(dimValues.get(dimension), dimensionValues);
}
}
@ -227,7 +246,7 @@ public class IncrementalIndex implements Iterable<Row>
final String typeName = agg.getTypeName();
final String columnName = column.toLowerCase();
if(typeName.equals("float")) {
if (typeName.equals("float")) {
return new ObjectColumnSelector<Float>()
{
@Override
@ -332,6 +351,11 @@ public class IncrementalIndex implements Iterable<Row>
return dimensions;
}
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
}
public String getMetricType(String metric)
{
return metricTypes.get(metric);
@ -463,8 +487,7 @@ public class IncrementalIndex implements Iterable<Row>
if (holder == null) {
holder = new DimDim();
dimensions.put(dimension, holder);
}
else {
} else {
throw new ISE("dimension[%s] already existed even though add() was called!?", dimension);
}
return holder;

View File

@ -24,7 +24,6 @@ import com.google.common.collect.Maps;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import com.metamx.druid.aggregation.Aggregator;
import com.metamx.druid.input.Row;
import com.metamx.druid.kv.EmptyIndexedInts;
import com.metamx.druid.kv.Indexed;
import com.metamx.druid.kv.IndexedInts;
@ -63,9 +62,6 @@ public class IncrementalIndexAdapter implements IndexableAdapter
}
int rowNum = 0;
for (Row row : index) {
}
for (IncrementalIndex.TimeAndDims timeAndDims : index.getFacts().keySet()) {
final String[][] dims = timeAndDims.getDims();
@ -211,11 +207,16 @@ public class IncrementalIndexAdapter implements IndexableAdapter
metrics[i] = aggs[i].get();
}
Map<String, String> description = Maps.newHashMap();
for (SpatialDimensionSchema spatialDimensionSchema : index.getSpatialDimensions()) {
description.put(spatialDimensionSchema.getDimName(), "spatial");
}
return new Rowboat(
timeAndDims.getTimestamp(),
dims,
metrics,
count++
count++,
description
);
}
}

View File

@ -0,0 +1,128 @@
package com.metamx.druid.index.v1;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.aggregation.AggregatorFactory;
import java.util.Collections;
import java.util.List;
/**
*/
public class IncrementalIndexSchema
{
private final long minTimestamp;
private final QueryGranularity gran;
private final List<String> dimensions;
private final List<SpatialDimensionSchema> spatialDimensions;
private final AggregatorFactory[] metrics;
public IncrementalIndexSchema(
long minTimestamp,
QueryGranularity gran,
List<String> dimensions,
List<SpatialDimensionSchema> spatialDimensions,
AggregatorFactory[] metrics
)
{
this.minTimestamp = minTimestamp;
this.gran = gran;
this.dimensions = dimensions;
this.spatialDimensions = spatialDimensions;
this.metrics = metrics;
}
public long getMinTimestamp()
{
return minTimestamp;
}
public QueryGranularity getGran()
{
return gran;
}
public List<String> getDimensions()
{
return dimensions;
}
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
}
public AggregatorFactory[] getMetrics()
{
return metrics;
}
public static class Builder
{
private long minTimestamp;
private QueryGranularity gran;
private List<String> dimensions;
private List<SpatialDimensionSchema> spatialDimensions;
private AggregatorFactory[] metrics;
public Builder()
{
this.minTimestamp = 0L;
this.gran = QueryGranularity.NONE;
this.dimensions = Lists.newArrayList();
this.spatialDimensions = Lists.newArrayList();
this.metrics = new AggregatorFactory[]{};
}
public Builder withMinTimestamp(long minTimestamp)
{
this.minTimestamp = minTimestamp;
return this;
}
public Builder withQueryGranularity(QueryGranularity gran)
{
this.gran = gran;
return this;
}
public Builder withDimensions(Iterable<String> dimensions)
{
this.dimensions = Lists.newArrayList(
Iterables.transform(
dimensions, new Function<String, String>()
{
@Override
public String apply(String input)
{
return input.toLowerCase();
}
}
)
);
Collections.sort(this.dimensions);
return this;
}
public Builder withSpatialDimensions(List<SpatialDimensionSchema> spatialDimensions)
{
this.spatialDimensions = spatialDimensions;
return this;
}
public Builder withMetrics(AggregatorFactory[] metrics)
{
this.metrics = metrics;
return this;
}
public IncrementalIndexSchema build()
{
return new IncrementalIndexSchema(
minTimestamp, gran, dimensions, spatialDimensions, metrics
);
}
}
}

View File

@ -19,6 +19,7 @@
package com.metamx.druid.index.v1;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.common.logger.Logger;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.joda.time.Interval;
@ -45,6 +46,7 @@ public class Index
final Map<String, Map<String, Integer>> dimIdLookup;
final Map<String, String[]> reverseDimLookup;
final Map<String, ImmutableConciseSet[]> indexes;
final Map<String, ImmutableRTree> spatialIndexes;
final Map<String, DimensionColumn> dimensionValues;
/*
@ -79,6 +81,7 @@ public class Index
Map<String, Map<String, Integer>> dimIdLookup,
Map<String, String[]> reverseDimLookup,
Map<String, ImmutableConciseSet[]> indexes,
Map<String, ImmutableRTree> spatialIndexes,
Map<String, DimensionColumn> dimensionValues
)
{
@ -90,6 +93,7 @@ public class Index
this.dimIdLookup = dimIdLookup;
this.reverseDimLookup = reverseDimLookup;
this.indexes = indexes;
this.spatialIndexes = spatialIndexes;
this.dimensionValues = dimensionValues;
for (int i = 0; i < dimensions.length; i++) {
@ -126,4 +130,37 @@ public class Index
return emptySet;
}
}
public ImmutableConciseSet getInvertedIndex(String dimension, int valueIndex)
{
try {
return indexes.get(dimension)[valueIndex];
}
catch (NullPointerException e) {
log.warn(
e,
"NPE on dimension[%s], valueIndex[%d], with index over interval[%s]",
dimension,
valueIndex,
dataInterval
);
return emptySet;
}
}
public ImmutableRTree getSpatialIndex(String dimension)
{
try {
return spatialIndexes.get(dimension);
}
catch (NullPointerException e) {
log.warn(
e,
"NPE on dimension[%s] over interval[%s]",
dimension,
dataInterval
);
return new ImmutableRTree();
}
}
}

View File

@ -31,6 +31,7 @@ import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.io.Files;
import com.google.common.primitives.Ints;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.io.smoosh.FileSmoosher;
@ -53,11 +54,14 @@ import com.metamx.druid.index.serde.FloatGenericColumnPartSerde;
import com.metamx.druid.index.serde.FloatGenericColumnSupplier;
import com.metamx.druid.index.serde.LongGenericColumnPartSerde;
import com.metamx.druid.index.serde.LongGenericColumnSupplier;
import com.metamx.druid.index.serde.SpatialIndexColumnPartSupplier;
import com.metamx.druid.jackson.DefaultObjectMapper;
import com.metamx.druid.kv.ArrayIndexed;
import com.metamx.druid.kv.ByteBufferSerializer;
import com.metamx.druid.kv.ConciseCompressedIndexedInts;
import com.metamx.druid.kv.GenericIndexed;
import com.metamx.druid.kv.IndexedIterable;
import com.metamx.druid.kv.IndexedRTree;
import com.metamx.druid.kv.VSizeIndexed;
import com.metamx.druid.kv.VSizeIndexedInts;
import com.metamx.druid.utils.SerializerUtils;
@ -221,7 +225,10 @@ public class IndexIO
case 6:
case 7:
log.info("Old version, re-persisting.");
IndexMerger.append(Arrays.<IndexableAdapter>asList(new QueryableIndexIndexableAdapter(loadIndex(toConvert))), converted);
IndexMerger.append(
Arrays.<IndexableAdapter>asList(new QueryableIndexIndexableAdapter(loadIndex(toConvert))),
converted
);
return true;
case 8:
DefaultIndexIOHandler.convertV8toV9(toConvert, converted);
@ -251,6 +258,7 @@ public class IndexIO
* @return
*/
public boolean canBeMapped(File inDir) throws IOException;
public MMappedIndex mapDir(File inDir) throws IOException;
/**
@ -265,6 +273,7 @@ public class IndexIO
public static class DefaultIndexIOHandler implements IndexIOHandler
{
private static final Logger log = new Logger(DefaultIndexIOHandler.class);
@Override
public Index readIndex(File inDir)
{
@ -348,6 +357,15 @@ public class IndexIO
);
}
Map<String, ImmutableRTree> spatialIndexed = Maps.newHashMap();
ByteBuffer spatialBuffer = smooshedFiles.mapFile("spatial.drd");
while (spatialBuffer != null && spatialBuffer.hasRemaining()) {
spatialIndexed.put(
serializerUtils.readString(spatialBuffer),
ByteBufferSerializer.read(spatialBuffer, IndexedRTree.objectStrategy)
);
}
final MMappedIndex retVal = new MMappedIndex(
availableDimensions,
availableMetrics,
@ -356,7 +374,8 @@ public class IndexIO
metrics,
dimValueLookups,
dimColumns,
invertedIndexed
invertedIndexed,
spatialIndexed
);
log.debug("Mapped v8 index[%s] in %,d millis", inDir, System.currentTimeMillis() - startTime);
@ -402,6 +421,15 @@ public class IndexIO
);
}
Map<String, ImmutableRTree> spatialIndexes = Maps.newHashMap();
final ByteBuffer spatialBuffer = v8SmooshedFiles.mapFile("spatial.drd");
while (spatialBuffer != null && spatialBuffer.hasRemaining()) {
spatialIndexes.put(
serializerUtils.readString(spatialBuffer),
ByteBufferSerializer.read(spatialBuffer, IndexedRTree.objectStrategy)
);
}
final LinkedHashSet<String> skippedFiles = Sets.newLinkedHashSet();
final Set<String> skippedDimensions = Sets.newLinkedHashSet();
for (String filename : v8SmooshedFiles.getInternalFilenames()) {
@ -435,6 +463,7 @@ public class IndexIO
VSizeIndexedInts singleValCol = null;
VSizeIndexed multiValCol = VSizeIndexed.readFromByteBuffer(dimBuffer.asReadOnlyBuffer());
GenericIndexed<ImmutableConciseSet> bitmaps = bitmapIndexes.get(dimension);
ImmutableRTree spatialIndex = spatialIndexes.get(dimension);
boolean onlyOneValue = true;
ConciseSet nullsSet = null;
@ -476,8 +505,7 @@ public class IndexIO
Iterables.concat(Arrays.asList(theNullSet), bitmaps),
ConciseCompressedIndexedInts.objectStrategy
);
}
else {
} else {
bumpedDictionary = false;
bitmaps = GenericIndexed.fromIterable(
Iterables.concat(
@ -487,8 +515,7 @@ public class IndexIO
ConciseCompressedIndexedInts.objectStrategy
);
}
}
else {
} else {
bumpedDictionary = false;
}
@ -517,7 +544,13 @@ public class IndexIO
}
builder.addSerde(
new DictionaryEncodedColumnPartSerde(dictionary, singleValCol, multiValCol, bitmaps)
new DictionaryEncodedColumnPartSerde(
dictionary,
singleValCol,
multiValCol,
bitmaps,
spatialIndex
)
);
final ColumnDescriptor serdeficator = builder.build();
@ -673,8 +706,11 @@ public class IndexIO
new BitmapIndexColumnPartSupplier(
index.getInvertedIndexes().get(dimension), index.getDimValueLookup(dimension)
)
).setSpatialIndex(
new SpatialIndexColumnPartSupplier(
index.getSpatialIndexes().get(dimension)
)
.build()
).build()
);
}

View File

@ -21,6 +21,8 @@ package com.metamx.druid.index.v1;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Splitter;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
@ -32,6 +34,9 @@ import com.google.common.io.Closeables;
import com.google.common.io.Files;
import com.google.common.io.OutputSupplier;
import com.google.common.primitives.Ints;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.collections.spatial.RTree;
import com.metamx.collections.spatial.split.LinearGutmanSplitStrategy;
import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.common.guava.FunctionalIterable;
@ -47,13 +52,15 @@ import com.metamx.druid.guava.GuavaUtils;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.v1.serde.ComplexMetricSerde;
import com.metamx.druid.index.v1.serde.ComplexMetrics;
import com.metamx.druid.kv.ByteBufferWriter;
import com.metamx.druid.kv.ConciseCompressedIndexedInts;
import com.metamx.druid.kv.FlattenedArrayWriter;
import com.metamx.druid.kv.GenericIndexed;
import com.metamx.druid.kv.GenericIndexedWriter;
import com.metamx.druid.kv.IOPeon;
import com.metamx.druid.kv.Indexed;
import com.metamx.druid.kv.IndexedInts;
import com.metamx.druid.kv.IndexedIterable;
import com.metamx.druid.kv.IndexedRTree;
import com.metamx.druid.kv.TmpFileIOPeon;
import com.metamx.druid.kv.VSizeIndexedWriter;
import com.metamx.druid.utils.JodaUtils;
@ -88,6 +95,7 @@ public class IndexMerger
private static final SerializerUtils serializerUtils = new SerializerUtils();
private static final int INVALID_ROW = -1;
private static final Splitter SPLITTER = Splitter.on(",");
public static File persist(final IncrementalIndex index, File outDir) throws IOException
{
@ -446,7 +454,7 @@ public class IndexMerger
}
for (String dimension : mergedDimensions) {
final FlattenedArrayWriter<String> writer = new FlattenedArrayWriter<String>(
final GenericIndexedWriter<String> writer = new GenericIndexedWriter<String>(
ioPeon, dimension, GenericIndexed.stringStrategy
);
writer.open();
@ -566,7 +574,13 @@ public class IndexMerger
j++;
}
return new Rowboat(input.getTimestamp(), newDims, newMetrics, input.getRowNum());
return new Rowboat(
input.getTimestamp(),
newDims,
newMetrics,
input.getRowNum(),
input.getDescriptions()
);
}
}
)
@ -619,6 +633,7 @@ public class IndexMerger
rowNumConversions.add(IntBuffer.wrap(arr));
}
final Map<String, String> descriptions = Maps.newHashMap();
for (Rowboat theRow : theRows) {
progress.progress();
timeWriter.add(theRow.getTimestamp());
@ -653,6 +668,8 @@ public class IndexMerger
);
time = System.currentTimeMillis();
}
descriptions.putAll(theRow.getDescriptions());
}
for (IntBuffer rowNumConversion : rowNumConversions) {
@ -701,7 +718,7 @@ public class IndexMerger
Indexed<String> dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.stringStrategy);
log.info("Starting dimension[%s] with cardinality[%,d]", dimension, dimVals.size());
FlattenedArrayWriter<ImmutableConciseSet> writer = new FlattenedArrayWriter<ImmutableConciseSet>(
GenericIndexedWriter<ImmutableConciseSet> writer = new GenericIndexedWriter<ImmutableConciseSet>(
ioPeon, dimension, ConciseCompressedIndexedInts.objectStrategy
);
writer.open();
@ -718,7 +735,10 @@ public class IndexMerger
}
ConciseSet bitset = new ConciseSet();
for (Integer row : CombiningIterable.createSplatted(convertedInverteds, Ordering.<Integer>natural().nullsFirst())) {
for (Integer row : CombiningIterable.createSplatted(
convertedInverteds,
Ordering.<Integer>natural().nullsFirst()
)) {
if (row != INVALID_ROW) {
bitset.add(row);
}
@ -734,12 +754,68 @@ public class IndexMerger
log.info("Completed dimension[%s] in %,d millis.", dimension, System.currentTimeMillis() - dimStartTime);
}
/************ Create Geographical Indexes *************/
// FIXME: Rewrite when indexing is updated
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
final File geoFile = new File(v8OutDir, "spatial.drd");
Files.touch(geoFile);
out = Files.newOutputStreamSupplier(geoFile, true);
for (int i = 0; i < mergedDimensions.size(); ++i) {
String dimension = mergedDimensions.get(i);
if (!"spatial".equals(descriptions.get(dimension))) {
continue;
}
File dimOutFile = dimOuts.get(i).getFile();
final MappedByteBuffer dimValsMapped = Files.map(dimOutFile);
if (!dimension.equals(serializerUtils.readString(dimValsMapped))) {
throw new ISE("dimensions[%s] didn't equate!? This is a major WTF moment.", dimension);
}
Indexed<String> dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.stringStrategy);
log.info("Indexing geo dimension[%s] with cardinality[%,d]", dimension, dimVals.size());
ByteBufferWriter<ImmutableRTree> writer = new ByteBufferWriter<ImmutableRTree>(
ioPeon, dimension, IndexedRTree.objectStrategy
);
writer.open();
RTree tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50));
int count = 0;
for (String dimVal : IndexedIterable.create(dimVals)) {
progress.progress();
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
float[] coords = new float[stringCoords.size()];
for (int j = 0; j < coords.length; j++) {
coords[j] = Float.valueOf(stringCoords.get(j));
}
tree.insert(coords, count);
count++;
}
writer.write(ImmutableRTree.newImmutableFromMutable(tree));
writer.close();
serializerUtils.writeString(out, dimension);
ByteStreams.copy(writer.combineStreams(), out);
ioPeon.cleanup();
log.info("Completed spatial dimension[%s] in %,d millis.", dimension, stopwatch.elapsedMillis());
}
log.info("outDir[%s] completed inverted.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
final ArrayList<String> expectedFiles = Lists.newArrayList(
Iterables.concat(
Arrays.asList(
"index.drd", "inverted.drd", String.format("time_%s.drd", IndexIO.BYTE_ORDER)
"index.drd", "inverted.drd", "spatial.drd", String.format("time_%s.drd", IndexIO.BYTE_ORDER)
),
Iterables.transform(mergedDimensions, GuavaUtils.formatFunction("dim_%s.drd")),
Iterables.transform(
@ -1005,7 +1081,13 @@ public class IndexMerger
}
}
final Rowboat retVal = new Rowboat(input.getTimestamp(), newDims, input.getMetrics(), input.getRowNum());
final Rowboat retVal = new Rowboat(
input.getTimestamp(),
newDims,
input.getMetrics(),
input.getRowNum(),
input.getDescriptions()
);
retVal.addRow(indexNumber, input.getRowNum());
@ -1084,7 +1166,8 @@ public class IndexMerger
lhs.getTimestamp(),
lhs.getDims(),
metrics,
lhs.getRowNum()
lhs.getRowNum(),
lhs.getDescriptions()
);
for (Rowboat rowboat : Arrays.asList(lhs, rhs)) {

View File

@ -23,12 +23,14 @@ import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.common.logger.Logger;
import com.metamx.druid.kv.ConciseCompressedIndexedInts;
import com.metamx.druid.kv.GenericIndexed;
import com.metamx.druid.kv.Indexed;
import com.metamx.druid.kv.IndexedList;
import com.metamx.druid.kv.IndexedLongs;
import com.metamx.druid.kv.IndexedRTree;
import com.metamx.druid.kv.VSizeIndexed;
import com.metamx.druid.kv.VSizeIndexedInts;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
@ -54,6 +56,7 @@ public class MMappedIndex
final Map<String, GenericIndexed<String>> dimValueLookups;
final Map<String, VSizeIndexed> dimColumns;
final Map<String, GenericIndexed<ImmutableConciseSet>> invertedIndexes;
final Map<String, ImmutableRTree> spatialIndexes;
private final Map<String, Integer> metricIndexes = Maps.newHashMap();
@ -65,7 +68,8 @@ public class MMappedIndex
Map<String, MetricHolder> metrics,
Map<String, GenericIndexed<String>> dimValueLookups,
Map<String, VSizeIndexed> dimColumns,
Map<String, GenericIndexed<ImmutableConciseSet>> invertedIndexes
Map<String, GenericIndexed<ImmutableConciseSet>> invertedIndexes,
Map<String, ImmutableRTree> spatialIndexes
)
{
this.availableDimensions = availableDimensions;
@ -76,6 +80,7 @@ public class MMappedIndex
this.dimValueLookups = dimValueLookups;
this.dimColumns = dimColumns;
this.invertedIndexes = invertedIndexes;
this.spatialIndexes = spatialIndexes;
for (int i = 0; i < availableMetrics.size(); i++) {
metricIndexes.put(availableMetrics.get(i), i);
@ -143,6 +148,11 @@ public class MMappedIndex
return invertedIndexes;
}
public Map<String, ImmutableRTree> getSpatialIndexes()
{
return spatialIndexes;
}
public ImmutableConciseSet getInvertedIndex(String dimension, String value)
{
final GenericIndexed<String> lookup = dimValueLookups.get(dimension);
@ -177,6 +187,8 @@ public class MMappedIndex
Map<String, VSizeIndexed> dimColumns = Maps.newHashMap();
Map<String, GenericIndexed<ImmutableConciseSet>> invertedIndexes = Maps.newLinkedHashMap();
Map<String, ImmutableRTree> spatialIndexes = Maps.newLinkedHashMap();
for (String dimension : Arrays.asList(index.dimensions)) {
final String[] dimVals = index.reverseDimLookup.get(dimension);
final DimensionColumn dimColumn = index.dimensionValues.get(dimension);
@ -247,6 +259,8 @@ public class MMappedIndex
ConciseCompressedIndexedInts.objectStrategy
)
);
spatialIndexes.put(dimension, index.getSpatialIndex(dimension));
}
log.info("Making MMappedIndex");
@ -258,7 +272,8 @@ public class MMappedIndex
index.metricVals,
dimValueLookups,
dimColumns,
invertedIndexes
invertedIndexes,
spatialIndexes
);
}
}

View File

@ -35,159 +35,163 @@ import java.util.NoSuchElementException;
/**
*/
public class MMappedIndexAdapter implements IndexableAdapter
{
private final MMappedIndex index;
private final int numRows;
public MMappedIndexAdapter(MMappedIndex index)
{
private final MMappedIndex index;
private final int numRows;
this.index = index;
public MMappedIndexAdapter(MMappedIndex index)
{
this.index = index;
numRows = index.getReadOnlyTimestamps().size();
}
@Override
public Interval getDataInterval()
{
return index.getDataInterval();
}
@Override
public int getNumRows()
{
return numRows;
}
@Override
public Indexed<String> getAvailableDimensions()
{
return index.getAvailableDimensions();
}
@Override
public Indexed<String> getAvailableMetrics()
{
return index.getAvailableMetrics();
}
@Override
public Indexed<String> getDimValueLookup(String dimension)
{
return index.getDimValueLookup(dimension);
}
@Override
public Iterable<Rowboat> getRows()
{
return new Iterable<Rowboat>()
{
@Override
public Iterator<Rowboat> iterator()
{
return new Iterator<Rowboat>()
{
final IndexedLongs timestamps = index.getReadOnlyTimestamps();
final MetricHolder[] metrics;
final IndexedFloats[] floatMetrics;
final Map<String, Indexed<? extends IndexedInts>> dimensions;
final int numMetrics = index.getAvailableMetrics().size();
int currRow = 0;
boolean done = false;
{
dimensions = Maps.newLinkedHashMap();
for (String dim : index.getAvailableDimensions()) {
dimensions.put(dim, index.getDimColumn(dim));
}
final Indexed<String> availableMetrics = index.getAvailableMetrics();
metrics = new MetricHolder[availableMetrics.size()];
floatMetrics = new IndexedFloats[availableMetrics.size()];
for (int i = 0; i < metrics.length; ++i) {
metrics[i] = index.getMetricHolder(availableMetrics.get(i));
if (metrics[i].getType() == MetricHolder.MetricType.FLOAT) {
floatMetrics[i] = metrics[i].getFloatType();
}
}
}
@Override
public boolean hasNext()
{
final boolean hasNext = currRow < numRows;
if (!hasNext && !done) {
Closeables.closeQuietly(timestamps);
for (IndexedFloats floatMetric : floatMetrics) {
Closeables.closeQuietly(floatMetric);
}
done = true;
}
return hasNext;
}
@Override
public Rowboat next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
int[][] dims = new int[dimensions.size()][];
int dimIndex = 0;
for (String dim : dimensions.keySet()) {
IndexedInts dimVals = dimensions.get(dim).get(currRow);
int[] theVals = new int[dimVals.size()];
for (int j = 0; j < theVals.length; ++j) {
theVals[j] = dimVals.get(j);
}
dims[dimIndex++] = theVals;
}
Object[] metricArray = new Object[numMetrics];
for (int i = 0; i < metricArray.length; ++i) {
switch (metrics[i].getType()) {
case FLOAT:
metricArray[i] = floatMetrics[i].get(currRow);
break;
case COMPLEX:
metricArray[i] = metrics[i].getComplexType().get(currRow);
}
}
final Rowboat retVal = new Rowboat(timestamps.get(currRow), dims, metricArray, currRow);
++currRow;
return retVal;
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
};
}
};
}
@Override
public IndexedInts getInverteds(String dimension, String value)
{
return new ConciseCompressedIndexedInts(index.getInvertedIndex(dimension, value));
}
@Override
public String getMetricType(String metric)
{
MetricHolder holder = index.getMetricHolder(metric);
if (holder == null) {
return null;
}
return holder.getTypeName();
}
numRows = index.getReadOnlyTimestamps().size();
}
@Override
public Interval getDataInterval()
{
return index.getDataInterval();
}
@Override
public int getNumRows()
{
return numRows;
}
@Override
public Indexed<String> getAvailableDimensions()
{
return index.getAvailableDimensions();
}
@Override
public Indexed<String> getAvailableMetrics()
{
return index.getAvailableMetrics();
}
@Override
public Indexed<String> getDimValueLookup(String dimension)
{
return index.getDimValueLookup(dimension);
}
@Override
public Iterable<Rowboat> getRows()
{
return new Iterable<Rowboat>()
{
@Override
public Iterator<Rowboat> iterator()
{
return new Iterator<Rowboat>()
{
final IndexedLongs timestamps = index.getReadOnlyTimestamps();
final MetricHolder[] metrics;
final IndexedFloats[] floatMetrics;
final Map<String, Indexed<? extends IndexedInts>> dimensions;
final int numMetrics = index.getAvailableMetrics().size();
int currRow = 0;
boolean done = false;
{
dimensions = Maps.newLinkedHashMap();
for (String dim : index.getAvailableDimensions()) {
dimensions.put(dim, index.getDimColumn(dim));
}
final Indexed<String> availableMetrics = index.getAvailableMetrics();
metrics = new MetricHolder[availableMetrics.size()];
floatMetrics = new IndexedFloats[availableMetrics.size()];
for (int i = 0; i < metrics.length; ++i) {
metrics[i] = index.getMetricHolder(availableMetrics.get(i));
if (metrics[i].getType() == MetricHolder.MetricType.FLOAT) {
floatMetrics[i] = metrics[i].getFloatType();
}
}
}
@Override
public boolean hasNext()
{
final boolean hasNext = currRow < numRows;
if (!hasNext && !done) {
Closeables.closeQuietly(timestamps);
for (IndexedFloats floatMetric : floatMetrics) {
Closeables.closeQuietly(floatMetric);
}
done = true;
}
return hasNext;
}
@Override
public Rowboat next()
{
if (!hasNext()) {
throw new NoSuchElementException();
}
int[][] dims = new int[dimensions.size()][];
int dimIndex = 0;
for (String dim : dimensions.keySet()) {
IndexedInts dimVals = dimensions.get(dim).get(currRow);
int[] theVals = new int[dimVals.size()];
for (int j = 0; j < theVals.length; ++j) {
theVals[j] = dimVals.get(j);
}
dims[dimIndex++] = theVals;
}
Object[] metricArray = new Object[numMetrics];
for (int i = 0; i < metricArray.length; ++i) {
switch (metrics[i].getType()) {
case FLOAT:
metricArray[i] = floatMetrics[i].get(currRow);
break;
case COMPLEX:
metricArray[i] = metrics[i].getComplexType().get(currRow);
}
}
Map<String, String> descriptions = Maps.newHashMap();
for (String spatialDim : index.getSpatialIndexes().keySet()) {
descriptions.put(spatialDim, "spatial");
}
final Rowboat retVal = new Rowboat(timestamps.get(currRow), dims, metricArray, currRow, descriptions);
++currRow;
return retVal;
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
};
}
};
}
@Override
public IndexedInts getInverteds(String dimension, String value)
{
return new ConciseCompressedIndexedInts(index.getInvertedIndex(dimension, value));
}
@Override
public String getMetricType(String metric)
{
MetricHolder holder = index.getMetricHolder(metric);
if (holder == null) {
return null;
}
return holder.getTypeName();
}
}

View File

@ -27,7 +27,7 @@ import com.metamx.common.IAE;
import com.metamx.common.ISE;
import com.metamx.druid.index.v1.serde.ComplexMetricSerde;
import com.metamx.druid.index.v1.serde.ComplexMetrics;
import com.metamx.druid.kv.FlattenedArrayWriter;
import com.metamx.druid.kv.GenericIndexedWriter;
import com.metamx.druid.kv.GenericIndexed;
import com.metamx.druid.kv.Indexed;
import com.metamx.druid.kv.IndexedFloats;
@ -63,7 +63,7 @@ public class MetricHolder
}
public static void writeComplexMetric(
OutputSupplier<? extends OutputStream> outSupplier, String name, String typeName, FlattenedArrayWriter column
OutputSupplier<? extends OutputStream> outSupplier, String name, String typeName, GenericIndexedWriter column
) throws IOException
{
OutputStream out = null;

View File

@ -50,7 +50,7 @@ import java.util.NoSuchElementException;
import java.util.Set;
/**
*/
*/
public class QueryableIndexIndexableAdapter implements IndexableAdapter
{
private static final Logger log = new Logger(QueryableIndexIndexableAdapter.class);
@ -74,11 +74,9 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
if (col == null) {
log.warn("Wtf!? column[%s] didn't exist!?!?!?", dim);
}
else if (col.getDictionaryEncoding() != null) {
} else if (col.getDictionaryEncoding() != null) {
availableDimensions.add(dim);
}
else {
} else {
log.info("No dictionary on dimension[%s]", dim);
}
}
@ -236,8 +234,7 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
final IndexedInts dimVals;
if (dict.hasMultipleValues()) {
dimVals = dict.getMultiValueRow(currRow);
}
else {
} else {
dimVals = new ArrayBasedIndexedInts(new int[]{dict.getSingleValueRow(currRow)});
}
@ -253,14 +250,19 @@ public class QueryableIndexIndexableAdapter implements IndexableAdapter
for (int i = 0; i < metricArray.length; ++i) {
if (metrics[i] instanceof GenericColumn) {
metricArray[i] = ((GenericColumn) metrics[i]).getFloatSingleValueRow(currRow);
}
else if (metrics[i] instanceof ComplexColumn) {
} else if (metrics[i] instanceof ComplexColumn) {
metricArray[i] = ((ComplexColumn) metrics[i]).getRowValue(currRow);
}
}
Map<String, String> descriptions = Maps.newHashMap();
for (String columnName : input.getColumnNames()) {
if (input.getColumn(columnName).getSpatialIndex() != null) {
descriptions.put(columnName, "spatial");
}
}
final Rowboat retVal = new Rowboat(
timestamps.getLongSingleValueRow(currRow), dims, metricArray, currRow
timestamps.getLongSingleValueRow(currRow), dims, metricArray, currRow, descriptions
);
++currRow;

View File

@ -37,17 +37,21 @@ public class Rowboat implements Comparable<Rowboat>
private final int rowNum;
private final Map<Integer, TreeSet<Integer>> comprisedRows;
private Map<String, String> columnDescriptor;
public Rowboat(
long timestamp,
int[][] dims,
Object[] metrics,
int rowNum
int rowNum,
Map<String, String> columnDescriptor
)
{
this.timestamp = timestamp;
this.dims = dims;
this.metrics = metrics;
this.rowNum = rowNum;
this.columnDescriptor = columnDescriptor;
this.comprisedRows = Maps.newHashMap();
}
@ -87,6 +91,11 @@ public class Rowboat implements Comparable<Rowboat>
return rowNum;
}
public Map<String, String> getDescriptions()
{
return columnDescriptor;
}
@Override
public int compareTo(Rowboat rhs)
{

View File

@ -0,0 +1,140 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.index.v1;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.druid.input.InputRow;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
*/
public class SpatialDimensionRowFormatter
{
private static final Joiner JOINER = Joiner.on(",");
private final List<SpatialDimensionSchema> spatialDimensions;
private final Set<String> spatialDimNames;
public SpatialDimensionRowFormatter(List<SpatialDimensionSchema> spatialDimensions)
{
this.spatialDimensions = spatialDimensions;
this.spatialDimNames = Sets.newHashSet(
Iterables.concat(
Lists.transform(
spatialDimensions,
new Function<SpatialDimensionSchema, List<String>>()
{
@Override
public List<String> apply(SpatialDimensionSchema input)
{
return input.getDims();
}
}
)
)
);
}
public InputRow formatRow(final InputRow row)
{
final Map<String, List<String>> finalDimLookup = Maps.newHashMap();
// remove all spatial dimensions
final List<String> finalDims = Lists.newArrayList(
Iterables.filter(
Lists.transform(
row.getDimensions(),
new Function<String, String>()
{
@Override
public String apply(String input)
{
return input.toLowerCase();
}
}
)
,
new Predicate<String>()
{
@Override
public boolean apply(String input)
{
return !spatialDimNames.contains(input);
}
}
)
);
for (String dim : finalDims) {
finalDimLookup.put(dim, row.getDimension(dim));
}
InputRow retVal = new InputRow()
{
@Override
public List<String> getDimensions()
{
return finalDims;
}
@Override
public long getTimestampFromEpoch()
{
return row.getTimestampFromEpoch();
}
@Override
public List<String> getDimension(String dimension)
{
return finalDimLookup.get(dimension);
}
@Override
public float getFloatMetric(String metric)
{
return row.getFloatMetric(metric);
}
};
for (SpatialDimensionSchema spatialDimension : spatialDimensions) {
List<String> spatialDimVals = Lists.newArrayList();
for (String partialSpatialDim : spatialDimension.getDims()) {
List<String> dimVals = row.getDimension(partialSpatialDim);
if (dimVals == null || dimVals.isEmpty()) {
return retVal;
}
spatialDimVals.addAll(dimVals);
}
finalDimLookup.put(spatialDimension.getDimName(), Arrays.asList(JOINER.join(spatialDimVals)));
finalDims.add(spatialDimension.getDimName());
}
return retVal;
}
}

View File

@ -0,0 +1,40 @@
package com.metamx.druid.index.v1;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import java.util.List;
/**
*/
public class SpatialDimensionSchema
{
private final String dimName;
private final List<String> dims;
public SpatialDimensionSchema(String dimName, List<String> dims)
{
this.dimName = dimName.toLowerCase();
this.dims = Lists.transform(
dims,
new Function<String, String>()
{
@Override
public String apply(String input)
{
return input.toLowerCase();
}
}
);
}
public String getDimName()
{
return dimName;
}
public List<String> getDims()
{
return dims;
}
}

View File

@ -22,8 +22,10 @@ package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.metamx.common.parsers.CSVParser;
import com.metamx.common.parsers.Parser;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import java.util.List;
@ -33,11 +35,13 @@ public class CSVDataSpec implements DataSpec
{
private final List<String> columns;
private final List<String> dimensions;
private final List<SpatialDimensionSchema> spatialDimensions;
@JsonCreator
public CSVDataSpec(
@JsonProperty("columns") List<String> columns,
@JsonProperty("dimensions") List<String> dimensions
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions
)
{
Preconditions.checkNotNull(columns, "columns");
@ -47,6 +51,9 @@ public class CSVDataSpec implements DataSpec
this.columns = columns;
this.dimensions = dimensions;
this.spatialDimensions = (spatialDimensions == null)
? Lists.<SpatialDimensionSchema>newArrayList()
: spatialDimensions;
}
@JsonProperty("columns")
@ -62,6 +69,13 @@ public class CSVDataSpec implements DataSpec
return dimensions;
}
@JsonProperty("spatialDimensions")
@Override
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
}
@Override
public void verify(List<String> usedCols)
{

View File

@ -22,6 +22,7 @@ package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.metamx.common.parsers.Parser;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import java.util.List;
@ -41,5 +42,7 @@ public interface DataSpec
public List<String> getDimensions();
public List<SpatialDimensionSchema> getSpatialDimensions();
public Parser<String, Object> getParser();
}

View File

@ -22,8 +22,10 @@ package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.metamx.common.parsers.DelimitedParser;
import com.metamx.common.parsers.Parser;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import java.util.List;
@ -34,12 +36,14 @@ public class DelimitedDataSpec implements DataSpec
private final String delimiter;
private final List<String> columns;
private final List<String> dimensions;
private final List<SpatialDimensionSchema> spatialDimensions;
@JsonCreator
public DelimitedDataSpec(
@JsonProperty("delimiter") String delimiter,
@JsonProperty("columns") List<String> columns,
@JsonProperty("dimensions") List<String> dimensions
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions
)
{
Preconditions.checkNotNull(columns);
@ -50,6 +54,9 @@ public class DelimitedDataSpec implements DataSpec
this.delimiter = (delimiter == null) ? DelimitedParser.DEFAULT_DELIMITER : delimiter;
this.columns = columns;
this.dimensions = dimensions;
this.spatialDimensions = (spatialDimensions == null)
? Lists.<SpatialDimensionSchema>newArrayList()
: spatialDimensions;
}
@JsonProperty("delimiter")
@ -71,6 +78,13 @@ public class DelimitedDataSpec implements DataSpec
return dimensions;
}
@JsonProperty("spatialDimensions")
@Override
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
}
@Override
public void verify(List<String> usedCols)
{

View File

@ -20,8 +20,10 @@
package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Lists;
import com.metamx.common.parsers.JSONParser;
import com.metamx.common.parsers.Parser;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import java.util.List;
@ -30,12 +32,17 @@ import java.util.List;
public class JSONDataSpec implements DataSpec
{
private final List<String> dimensions;
private final List<SpatialDimensionSchema> spatialDimensions;
public JSONDataSpec(
@JsonProperty("dimensions") List<String> dimensions
@JsonProperty("dimensions") List<String> dimensions,
@JsonProperty("spatialDimensions") List<SpatialDimensionSchema> spatialDimensions
)
{
this.dimensions = dimensions;
this.spatialDimensions = (spatialDimensions == null)
? Lists.<SpatialDimensionSchema>newArrayList()
: spatialDimensions;
}
@JsonProperty("dimensions")
@ -45,6 +52,13 @@ public class JSONDataSpec implements DataSpec
return dimensions;
}
@JsonProperty("spatialDimensions")
@Override
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return spatialDimensions;
}
@Override
public void verify(List<String> usedCols)
{

View File

@ -22,6 +22,7 @@ package com.metamx.druid.indexer.data;
import com.fasterxml.jackson.annotation.JsonValue;
import com.metamx.common.parsers.Parser;
import com.metamx.common.parsers.ToLowerCaseParser;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import java.util.List;
@ -56,6 +57,12 @@ public class ToLowercaseDataSpec implements DataSpec
return delegate.getDimensions();
}
@Override
public List<SpatialDimensionSchema> getSpatialDimensions()
{
return delegate.getSpatialDimensions();
}
@Override
public Parser<String, Object> getParser()
{

View File

@ -0,0 +1,30 @@
package com.metamx.druid.kv;
import com.google.common.primitives.Ints;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
/**
*/
public class ByteBufferSerializer<T>
{
public static <T> T read(ByteBuffer buffer, ObjectStrategy<T> strategy)
{
int size = buffer.getInt();
ByteBuffer bufferToUse = buffer.asReadOnlyBuffer();
bufferToUse.limit(bufferToUse.position() + size);
buffer.position(bufferToUse.limit());
return strategy.fromByteBuffer(bufferToUse, size);
}
public static <T> void writeToChannel(T obj, ObjectStrategy<T> strategy, WritableByteChannel channel)
throws IOException
{
byte[] toWrite = strategy.toBytes(obj);
channel.write(ByteBuffer.allocate(Ints.BYTES).putInt(0, toWrite.length));
channel.write(ByteBuffer.wrap(toWrite));
}
}

View File

@ -0,0 +1,92 @@
package com.metamx.druid.kv;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.io.ByteStreams;
import com.google.common.io.CountingOutputStream;
import com.google.common.io.InputSupplier;
import com.google.common.primitives.Ints;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.Arrays;
/**
*/
public class ByteBufferWriter<T> implements Closeable
{
private final IOPeon ioPeon;
private final String filenameBase;
private final ObjectStrategy<T> strategy;
private CountingOutputStream headerOut = null;
private CountingOutputStream valueOut = null;
public ByteBufferWriter(
IOPeon ioPeon,
String filenameBase,
ObjectStrategy<T> strategy
)
{
this.ioPeon = ioPeon;
this.filenameBase = filenameBase;
this.strategy = strategy;
}
public void open() throws IOException
{
headerOut = new CountingOutputStream(ioPeon.makeOutputStream(makeFilename("header")));
valueOut = new CountingOutputStream(ioPeon.makeOutputStream(makeFilename("value")));
}
public void write(T objectToWrite) throws IOException
{
byte[] bytesToWrite = strategy.toBytes(objectToWrite);
headerOut.write(Ints.toByteArray(bytesToWrite.length));
valueOut.write(bytesToWrite);
}
private String makeFilename(String suffix)
{
return String.format("%s.%s", filenameBase, suffix);
}
@Override
public void close() throws IOException
{
headerOut.close();
valueOut.close();
final long numBytesWritten = headerOut.getCount() + valueOut.getCount();
Preconditions.checkState(
numBytesWritten < Integer.MAX_VALUE, "Wrote[%s] bytes, which is too many.", numBytesWritten
);
}
public InputSupplier<InputStream> combineStreams()
{
return ByteStreams.join(
Iterables.transform(
Arrays.asList("header", "value"),
new Function<String, InputSupplier<InputStream>>()
{
@Override
public InputSupplier<InputStream> apply(final String input)
{
return new InputSupplier<InputStream>()
{
@Override
public InputStream getInput() throws IOException
{
return ioPeon.makeInputStream(makeFilename(input));
}
};
}
}
)
);
}
}

View File

@ -36,7 +36,7 @@ import java.util.Arrays;
/**
* Streams arrays of objects out in the binary format described by GenericIndexed
*/
public class FlattenedArrayWriter<T> implements Closeable
public class GenericIndexedWriter<T> implements Closeable
{
private static final byte[] EMPTY_ARRAY = new byte[]{};
@ -51,7 +51,7 @@ public class FlattenedArrayWriter<T> implements Closeable
private CountingOutputStream valuesOut = null;
int numWritten = 0;
public FlattenedArrayWriter(
public GenericIndexedWriter(
IOPeon ioPeon,
String filenameBase,
ObjectStrategy<T> strategy

View File

@ -0,0 +1,84 @@
package com.metamx.druid.kv;
import com.google.common.collect.Ordering;
import com.metamx.collections.spatial.ImmutableRTree;
import java.nio.ByteBuffer;
/**
*/
public class IndexedRTree implements Comparable<IndexedRTree>
{
public static ObjectStrategy<ImmutableRTree> objectStrategy =
new ImmutableRTreeObjectStrategy();
private static Ordering<ImmutableRTree> comparator = new Ordering<ImmutableRTree>()
{
@Override
public int compare(
ImmutableRTree tree, ImmutableRTree tree1
)
{
if (tree.size() == 0 && tree1.size() == 0) {
return 0;
}
if (tree.size() == 0) {
return -1;
}
if (tree1.size() == 0) {
return 1;
}
return tree.compareTo(tree1);
}
}.nullsFirst();
private final ImmutableRTree immutableRTree;
public IndexedRTree(ImmutableRTree immutableRTree)
{
this.immutableRTree = immutableRTree;
}
@Override
public int compareTo(IndexedRTree spatialIndexedInts)
{
return immutableRTree.compareTo(spatialIndexedInts.getImmutableRTree());
}
public ImmutableRTree getImmutableRTree()
{
return immutableRTree;
}
private static class ImmutableRTreeObjectStrategy
implements ObjectStrategy<ImmutableRTree>
{
@Override
public Class<? extends ImmutableRTree> getClazz()
{
return ImmutableRTree.class;
}
@Override
public ImmutableRTree fromByteBuffer(ByteBuffer buffer, int numBytes)
{
buffer.limit(buffer.position() + numBytes);
return new ImmutableRTree(buffer.asReadOnlyBuffer());
}
@Override
public byte[] toBytes(ImmutableRTree val)
{
if (val == null || val.size() == 0) {
return new byte[]{};
}
return val.toBytes();
}
@Override
public int compare(ImmutableRTree o1, ImmutableRTree o2)
{
return comparator.compare(o1, o2);
}
}
}

View File

@ -19,26 +19,19 @@
package com.metamx.druid.index.v1;
import com.google.common.collect.Maps;
import com.google.common.io.OutputSupplier;
import com.metamx.druid.collect.ResourceHolder;
import com.metamx.druid.kv.FlattenedArrayWriter;
import com.metamx.druid.kv.IOPeon;
import com.metamx.druid.kv.GenericIndexedWriter;
import com.metamx.druid.kv.IndexedFloats;
import org.junit.Assert;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.FloatBuffer;
import java.util.Map;
/**
*/
@ -50,7 +43,7 @@ public class CompressedFloatsSupplierSerializerTest
final ByteOrder order = ByteOrder.nativeOrder();
CompressedFloatsSupplierSerializer serializer = new CompressedFloatsSupplierSerializer(
999,
new FlattenedArrayWriter<ResourceHolder<FloatBuffer>>(
new GenericIndexedWriter<ResourceHolder<FloatBuffer>>(
new IOPeonForTesting(),
"test",
CompressedFloatBufferObjectStrategy.getBufferForOrder(order)

View File

@ -21,7 +21,7 @@ package com.metamx.druid.index.v1;
import com.google.common.io.OutputSupplier;
import com.metamx.druid.collect.ResourceHolder;
import com.metamx.druid.kv.FlattenedArrayWriter;
import com.metamx.druid.kv.GenericIndexedWriter;
import com.metamx.druid.kv.IndexedLongs;
import org.junit.Assert;
import org.junit.Test;
@ -43,7 +43,7 @@ public class CompressedLongsSupplierSerializerTest
final ByteOrder order = ByteOrder.nativeOrder();
CompressedLongsSupplierSerializer serializer = new CompressedLongsSupplierSerializer(
999,
new FlattenedArrayWriter<ResourceHolder<LongBuffer>>(
new GenericIndexedWriter<ResourceHolder<LongBuffer>>(
new IOPeonForTesting(),
"test",
CompressedLongBufferObjectStrategy.getBufferForOrder(order)

View File

@ -209,7 +209,7 @@ public class HadoopDruidIndexerConfig
)
{
this.dataSource = dataSource;
this.timestampColumnName = timestampColumnName;
this.timestampColumnName = (timestampColumnName == null) ? null : timestampColumnName.toLowerCase();
this.timestampFormat = timestampFormat;
this.dataSpec = dataSpec;
this.granularitySpec = granularitySpec;
@ -294,7 +294,7 @@ public class HadoopDruidIndexerConfig
public void setTimestampColumnName(String timestampColumnName)
{
this.timestampColumnName = timestampColumnName;
this.timestampColumnName = timestampColumnName.toLowerCase();
}
@JsonProperty()

View File

@ -35,6 +35,7 @@ import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.client.DataSegment;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.index.v1.IncrementalIndexSchema;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.indexer.data.StringInputRowParser;
@ -152,7 +153,8 @@ public class IndexGeneratorJob implements Jobby
}
}
public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config) {
public static List<DataSegment> getPublishedSegments(HadoopDruidIndexerConfig config)
{
final Configuration conf = new Configuration();
final ObjectMapper jsonMapper = HadoopDruidIndexerConfig.jsonMapper;
@ -182,7 +184,7 @@ public class IndexGeneratorJob implements Jobby
List<DataSegment> publishedSegments = publishedSegmentsBuilder.build();
return publishedSegments;
}
}
public static class IndexGeneratorMapper extends HadoopDruidIndexerMapper<BytesWritable, Text>
@ -197,7 +199,7 @@ public class IndexGeneratorJob implements Jobby
// Group by bucket, sort by timestamp
final Optional<Bucket> bucket = getConfig().getBucket(inputRow);
if(!bucket.isPresent()) {
if (!bucket.isPresent()) {
throw new ISE("WTF?! No bucket found for row: %s", inputRow);
}
@ -590,9 +592,12 @@ public class IndexGeneratorJob implements Jobby
private IncrementalIndex makeIncrementalIndex(Bucket theBucket, AggregatorFactory[] aggs)
{
return new IncrementalIndex(
theBucket.time.getMillis(),
config.getRollupSpec().getRollupGranularity(),
aggs
new IncrementalIndexSchema.Builder()
.withMinTimestamp(theBucket.time.getMillis())
.withSpatialDimensions(config.getDataSpec().getSpatialDimensions())
.withQueryGranularity(config.getRollupSpec().getRollupGranularity())
.withMetrics(aggs)
.build()
);
}

View File

@ -317,7 +317,7 @@ public class TaskSerdeTest
"foo",
"timestamp",
"auto",
new JSONDataSpec(ImmutableList.of("foo")),
new JSONDataSpec(ImmutableList.of("foo"), null),
null,
new UniformGranularitySpec(Granularity.DAY, ImmutableList.of(new Interval("2010-01-01/P1D"))),
new StaticPathSpec("bar"),

View File

@ -72,6 +72,11 @@
<artifactId>java-util</artifactId>
<version>${metamx.java-util.version}</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>bytebuffer-collections</artifactId>
<version>0.0.1</version>
</dependency>
<dependency>
<groupId>com.metamx</groupId>
<artifactId>server-metrics</artifactId>

View File

@ -44,6 +44,8 @@ public abstract class BaseStorageAdapter implements StorageAdapter
public abstract ImmutableConciseSet getInvertedIndex(String dimension, String dimVal);
public abstract ImmutableConciseSet getInvertedIndex(String dimension, int idx);
public abstract Offset getFilterOffset(Filter filter);
@Override

View File

@ -180,15 +180,15 @@ public class MasterMain
);
Initialization.announceDefaultService(serviceDiscoveryConfig, serviceAnnouncer, lifecycle);
IndexingServiceClient indexingServiceClient = null;
ServiceProvider serviceProvider = null;
if (druidMasterConfig.getMergerServiceName() != null) {
ServiceProvider serviceProvider = Initialization.makeServiceProvider(
serviceProvider = Initialization.makeServiceProvider(
druidMasterConfig.getMergerServiceName(),
serviceDiscovery,
lifecycle
);
indexingServiceClient = new IndexingServiceClient(httpClient, jsonMapper, serviceProvider);
}
IndexingServiceClient indexingServiceClient = new IndexingServiceClient(httpClient, jsonMapper, serviceProvider);
final ConfigManagerConfig configManagerConfig = configFactory.build(ConfigManagerConfig.class);
DbConnector.createConfigTable(dbi, configManagerConfig.getConfigTable());

View File

@ -19,6 +19,7 @@
package com.metamx.druid.index.brita;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.druid.kv.Indexed;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
@ -29,4 +30,6 @@ public interface BitmapIndexSelector
public Indexed<String> getDimensionValues(String dimension);
public int getNumRows();
public ImmutableConciseSet getConciseInvertedIndex(String dimension, String value);
public ImmutableConciseSet getConciseInvertedIndex(String dimension, int idx);
public ImmutableRTree getSpatialIndex(String dimension);
}

View File

@ -30,6 +30,7 @@ import com.metamx.druid.query.filter.OrDimFilter;
import com.metamx.druid.query.filter.RegexDimFilter;
import com.metamx.druid.query.filter.SearchQueryDimFilter;
import com.metamx.druid.query.filter.SelectorDimFilter;
import com.metamx.druid.query.filter.SpatialDimFilter;
import javax.annotation.Nullable;
import java.util.List;
@ -89,6 +90,10 @@ public class Filters
final JavaScriptDimFilter javaScriptDimFilter = (JavaScriptDimFilter) dimFilter;
filter = new JavaScriptFilter(javaScriptDimFilter.getDimension(), javaScriptDimFilter.getFunction());
} else if (dimFilter instanceof SpatialDimFilter) {
final SpatialDimFilter spatialDimFilter = (SpatialDimFilter) dimFilter;
filter = new SpatialFilter(spatialDimFilter.getDimension(), spatialDimFilter.getBound());
}
return filter;

View File

@ -0,0 +1,98 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.index.brita;
import com.metamx.collections.spatial.search.Bound;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import it.uniroma3.mat.extendedset.intset.IntSet;
import java.util.Iterator;
/**
*/
public class SpatialFilter implements Filter
{
private final String dimension;
private final Bound bound;
public SpatialFilter(
String dimension,
Bound bound
)
{
this.dimension = dimension;
this.bound = bound;
}
@Override
public ImmutableConciseSet goConcise(final BitmapIndexSelector selector)
{
final Iterator<ImmutableConciseSet> dimValueIndexesIter = selector.getSpatialIndex(dimension).search(bound)
.iterator();
ImmutableConciseSet retVal = ImmutableConciseSet.union(
new Iterable<ImmutableConciseSet>()
{
@Override
public Iterator<ImmutableConciseSet> iterator()
{
return new Iterator<ImmutableConciseSet>()
{
private IntSet.IntIterator iter;
@Override
public boolean hasNext()
{
return dimValueIndexesIter.hasNext() || iter.hasNext();
}
@Override
public ImmutableConciseSet next()
{
if (iter != null && !iter.hasNext()) {
iter = null;
}
if (iter == null) {
ImmutableConciseSet immutableConciseSet = dimValueIndexesIter.next();
iter = immutableConciseSet.iterator();
}
return selector.getConciseInvertedIndex(dimension, iter.next());
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
};
}
}
);
return retVal;
}
@Override
public ValueMatcher makeMatcher(ValueMatcherFactory factory)
{
return factory.makeValueMatcher(
dimension,
bound
);
}
}

View File

@ -20,6 +20,7 @@
package com.metamx.druid.index.brita;
import com.google.common.base.Predicate;
import com.metamx.collections.spatial.search.Bound;
/**
*/
@ -27,4 +28,5 @@ public interface ValueMatcherFactory
{
public ValueMatcher makeValueMatcher(String dimension, String value);
public ValueMatcher makeValueMatcher(String dimension, Predicate<String> value);
public ValueMatcher makeValueMatcher(String dimension, Bound bound);
}

View File

@ -21,10 +21,12 @@ package com.metamx.druid.index.v1;
import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.collections.spatial.search.Bound;
import com.metamx.common.IAE;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.FunctionalIterator;
@ -62,6 +64,8 @@ import java.util.concurrent.ConcurrentNavigableMap;
*/
public class IncrementalIndexStorageAdapter implements StorageAdapter
{
private static final Splitter SPLITTER = Splitter.on(",");
private final IncrementalIndex index;
public IncrementalIndexStorageAdapter(
@ -115,11 +119,14 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
public Iterable<Cursor> makeCursors(final Filter filter, final Interval interval, final QueryGranularity gran)
{
Interval actualIntervalTmp = interval;
Interval dataInterval = getInterval();
if (!actualIntervalTmp.overlaps(dataInterval)) {
final Interval indexInterval = getInterval();
if (!actualIntervalTmp.overlaps(indexInterval)) {
return ImmutableList.of();
}
final Interval dataInterval = new Interval(getMinTime().getMillis(), gran.next(getMaxTime().getMillis()));
if (actualIntervalTmp.getStart().isBefore(dataInterval.getStart())) {
actualIntervalTmp = actualIntervalTmp.withStart(dataInterval.getStart());
}
@ -367,7 +374,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
final String columnName = column.toLowerCase();
final Integer metricIndexInt = index.getMetricIndex(columnName);
if(metricIndexInt != null) {
if (metricIndexInt != null) {
final int metricIndex = metricIndexInt;
final ComplexMetricSerde serde = ComplexMetrics.getSerdeForType(index.getMetricType(columnName));
@ -390,7 +397,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
final Integer dimensionIndexInt = index.getDimensionIndex(columnName);
if(dimensionIndexInt != null) {
if (dimensionIndexInt != null) {
final int dimensionIndex = dimensionIndexInt;
return new ObjectColumnSelector<String>()
{
@ -404,10 +411,14 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
public String get()
{
final String[] dimVals = currEntry.getKey().getDims()[dimensionIndex];
if(dimVals.length == 1) return dimVals[0];
if(dimVals.length == 0) return null;
if (dimVals.length == 1) {
return dimVals[0];
}
if (dimVals.length == 0) {
return null;
}
throw new UnsupportedOperationException(
"makeObjectColumnSelector does not support multivalued columns"
"makeObjectColumnSelector does not support multivalued columns"
);
}
};
@ -505,6 +516,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
}
return new FunctionalIterable<SearchHit>(retVal).limit(query.getLimit());
}
@ -527,6 +539,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
public void set(Map.Entry<IncrementalIndex.TimeAndDims, Aggregator[]> currEntry)
{
this.currEntry = currEntry;
this.currEntry = currEntry;
}
public IncrementalIndex.TimeAndDims getKey()
@ -613,7 +626,44 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return false;
}
};
}
@Override
public ValueMatcher makeValueMatcher(final String dimension, final Bound bound)
{
if (!dimension.endsWith(".geo")) {
return new BooleanValueMatcher(false);
}
Integer dimIndexObject = index.getDimensionIndex(dimension.toLowerCase());
if (dimIndexObject == null) {
return new BooleanValueMatcher(false);
}
final int dimIndex = dimIndexObject;
return new ValueMatcher()
{
@Override
public boolean matches()
{
String[][] dims = holder.getKey().getDims();
if (dimIndex >= dims.length || dims[dimIndex] == null) {
return false;
}
for (String dimVal : dims[dimIndex]) {
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
float[] coords = new float[stringCoords.size()];
for (int j = 0; j < coords.length; j++) {
coords[j] = Float.valueOf(stringCoords.get(j));
}
if (bound.contains(coords)) {
return true;
}
}
return false;
}
};
}
}
}

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
import com.google.common.io.Closeables;
import com.metamx.collections.spatial.ImmutableRTree;
import com.metamx.common.collect.MoreIterators;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.FunctionalIterator;
@ -136,11 +137,14 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
public Iterable<Cursor> makeCursors(Filter filter, Interval interval, QueryGranularity gran)
{
Interval actualInterval = interval;
final Interval dataInterval = getInterval();
if (!actualInterval.overlaps(dataInterval)) {
final Interval indexInterval = getInterval();
if (!actualInterval.overlaps(indexInterval)) {
return ImmutableList.of();
}
final Interval dataInterval = new Interval(getMinTime().getMillis(), gran.next(getMaxTime().getMillis()));
if (actualInterval.getStart().isBefore(dataInterval.getStart())) {
actualInterval = actualInterval.withStart(dataInterval.getStart());
}
@ -224,6 +228,30 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
return column.getBitmapIndex().getConciseSet(dimVal);
}
@Override
public ImmutableConciseSet getInvertedIndex(String dimension, int idx)
{
final Column column = index.getColumn(dimension.toLowerCase());
if (column == null) {
return new ImmutableConciseSet();
}
if (!column.getCapabilities().hasBitmapIndexes()) {
return new ImmutableConciseSet();
}
return column.getBitmapIndex().getConciseSet(idx);
}
public ImmutableRTree getRTreeSpatialIndex(String dimension)
{
final Column column = index.getColumn(dimension.toLowerCase());
if (column == null || !column.getCapabilities().hasSpatialIndexes()) {
return new ImmutableRTree();
}
return column.getSpatialIndex().getRTree();
}
@Override
public Offset getFilterOffset(Filter filter)
{
@ -349,8 +377,7 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
return column.lookupId(name);
}
};
}
else {
} else {
return new DimensionSelector()
{
@Override
@ -409,8 +436,8 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
if (cachedMetricVals == null) {
Column holder = index.getColumn(metricName);
if (holder != null && holder.getCapabilities().getType() == ValueType.FLOAT) {
cachedMetricVals = holder.getGenericColumn();
genericColumnCache.put(metricName, cachedMetricVals);
cachedMetricVals = holder.getGenericColumn();
genericColumnCache.put(metricName, cachedMetricVals);
}
}
@ -788,8 +815,7 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
return column.lookupId(name);
}
};
}
else {
} else {
return new DimensionSelector()
{
@Override
@ -848,8 +874,8 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
if (cachedMetricVals == null) {
Column holder = index.getColumn(metricName);
if (holder != null && holder.getCapabilities().getType() == ValueType.FLOAT) {
cachedMetricVals = holder.getGenericColumn();
genericColumnCache.put(metricName, cachedMetricVals);
cachedMetricVals = holder.getGenericColumn();
genericColumnCache.put(metricName, cachedMetricVals);
}
}
@ -1133,5 +1159,17 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
{
return getInvertedIndex(dimension, value);
}
@Override
public ImmutableConciseSet getConciseInvertedIndex(String dimension, int idx)
{
return getInvertedIndex(dimension, idx);
}
@Override
public ImmutableRTree getSpatialIndex(String dimension)
{
return getRTreeSpatialIndex(dimension);
}
}
}

View File

@ -21,6 +21,7 @@ package com.metamx.druid.query;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Longs;
import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.logger.Logger;
import com.metamx.druid.QueryGranularity;

View File

@ -0,0 +1,511 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.index.brita;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.collections.spatial.search.RadiusBound;
import com.metamx.collections.spatial.search.RectangularBound;
import com.metamx.druid.Druids;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.TestHelper;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.CountAggregatorFactory;
import com.metamx.druid.aggregation.LongSumAggregatorFactory;
import com.metamx.druid.index.IncrementalIndexSegment;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.QueryableIndexSegment;
import com.metamx.druid.index.Segment;
import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.index.v1.IncrementalIndexSchema;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.query.FinalizeResultsQueryRunner;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.filter.SpatialDimFilter;
import com.metamx.druid.query.timeseries.TimeseriesQuery;
import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.TimeseriesResultValue;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
/**
*/
@RunWith(Parameterized.class)
public class SpatialFilterTest
{
private static Interval DATA_INTERVAL = new Interval("2013-01-01/2013-01-07");
private static AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("val", "val")
};
private static List<String> DIMS = Lists.newArrayList("dim", "lat", "long");
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
{
final IncrementalIndex rtIndex = makeIncrementalIndex();
final QueryableIndex mMappedTestIndex = makeQueryableIndex();
final QueryableIndex mergedRealtimeIndex = makeMergedQueryableIndex();
return Arrays.asList(
new Object[][]{
{
new IncrementalIndexSegment(rtIndex)
},
{
new QueryableIndexSegment(null, mMappedTestIndex)
},
{
new QueryableIndexSegment(null, mergedRealtimeIndex)
}
}
);
}
private static IncrementalIndex makeIncrementalIndex() throws IOException
{
IncrementalIndex theIndex = new IncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
)
)
).build()
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-01").toString(),
"dim", "foo",
"lat", 0.0f,
"long", 0.0f,
"val", 17l
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-02").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-02").toString(),
"dim", "foo",
"lat", 1.0f,
"long", 3.0f,
"val", 29l
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-03").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-03").toString(),
"dim", "foo",
"lat", 4.0f,
"long", 2.0f,
"val", 13l
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-04").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-04").toString(),
"dim", "foo",
"lat", 7.0f,
"long", 3.0f,
"val", 91l
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"dim", "foo",
"lat", 8.0f,
"long", 6.0f,
"val", 47l
)
)
);
// Add a bunch of random points
Random rand = new Random();
for (int i = 5; i < 5000; i++) {
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-01").toString(),
"dim", "boo",
"lat", (float) (rand.nextFloat() * 10 + 10.0),
"long", (float) (rand.nextFloat() * 10 + 10.0),
"val", i
)
)
);
}
return theIndex;
}
private static QueryableIndex makeQueryableIndex() throws IOException
{
IncrementalIndex theIndex = makeIncrementalIndex();
File tmpFile = File.createTempFile("billy", "yay");
tmpFile.delete();
tmpFile.mkdirs();
tmpFile.deleteOnExit();
IndexMerger.persist(theIndex, tmpFile);
return IndexIO.loadIndex(tmpFile);
}
private static QueryableIndex makeMergedQueryableIndex()
{
try {
IncrementalIndex first = new IncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
)
)
).build()
);
IncrementalIndex second = new IncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
)
)
).build()
);
IncrementalIndex third = new IncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
)
)
).build()
);
first.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-01").toString(),
"dim", "foo",
"lat", 0.0f,
"long", 0.0f,
"val", 17l
)
)
);
first.add(
new MapBasedInputRow(
new DateTime("2013-01-02").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-02").toString(),
"dim", "foo",
"lat", 1.0f,
"long", 3.0f,
"val", 29l
)
)
);
first.add(
new MapBasedInputRow(
new DateTime("2013-01-03").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-03").toString(),
"dim", "foo",
"lat", 4.0f,
"long", 2.0f,
"val", 13l
)
)
);
second.add(
new MapBasedInputRow(
new DateTime("2013-01-04").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-04").toString(),
"dim", "foo",
"lat", 7.0f,
"long", 3.0f,
"val", 91l
)
)
);
second.add(
new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"dim", "foo",
"lat", 8.0f,
"long", 6.0f,
"val", 47l
)
)
);
// Add a bunch of random points
Random rand = new Random();
for (int i = 5; i < 5000; i++) {
third.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-01").toString(),
"dim", "boo",
"lat", (float) (rand.nextFloat() * 10 + 10.0),
"long", (float) (rand.nextFloat() * 10 + 10.0),
"val", i
)
)
);
}
File tmpFile = File.createTempFile("yay", "who");
tmpFile.delete();
File firstFile = new File(tmpFile, "first");
File secondFile = new File(tmpFile, "second");
File thirdFile = new File(tmpFile, "third");
File mergedFile = new File(tmpFile, "merged");
firstFile.mkdirs();
firstFile.deleteOnExit();
secondFile.mkdirs();
secondFile.deleteOnExit();
thirdFile.mkdirs();
thirdFile.deleteOnExit();
mergedFile.mkdirs();
mergedFile.deleteOnExit();
IndexMerger.persist(first, DATA_INTERVAL, firstFile);
IndexMerger.persist(second, DATA_INTERVAL, secondFile);
IndexMerger.persist(third, DATA_INTERVAL, thirdFile);
QueryableIndex mergedRealtime = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
Arrays.asList(IndexIO.loadIndex(firstFile), IndexIO.loadIndex(secondFile), IndexIO.loadIndex(thirdFile)),
METRIC_AGGS,
mergedFile
)
);
return mergedRealtime;
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
private final Segment segment;
public SpatialFilterTest(Segment segment)
{
this.segment = segment;
}
@Test
public void testSpatialQuery()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.granularity(QueryGranularity.ALL)
.intervals(Arrays.asList(new Interval("2013-01-01/2013-01-07")))
.filters(
new SpatialDimFilter(
"dim.geo",
new RadiusBound(new float[]{0.0f, 0.0f}, 5)
)
)
.aggregators(
Arrays.<AggregatorFactory>asList(
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("val", "val")
)
)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<TimeseriesResultValue>(
new DateTime("2013-01-01T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
.put("rows", 3L)
.put("val", 59l)
.build()
)
)
);
try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory();
QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(segment),
factory.getToolchest()
);
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Test
public void testSpatialQueryMorePoints()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.granularity(QueryGranularity.DAY)
.intervals(Arrays.asList(new Interval("2013-01-01/2013-01-07")))
.filters(
new SpatialDimFilter(
"dim.geo",
new RectangularBound(new float[]{0.0f, 0.0f}, new float[]{9.0f, 9.0f})
)
)
.aggregators(
Arrays.<AggregatorFactory>asList(
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("val", "val")
)
)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<TimeseriesResultValue>(
new DateTime("2013-01-01T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
.put("rows", 1L)
.put("val", 17l)
.build()
)
),
new Result<TimeseriesResultValue>(
new DateTime("2013-01-02T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
.put("rows", 1L)
.put("val", 29l)
.build()
)
),
new Result<TimeseriesResultValue>(
new DateTime("2013-01-03T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
.put("rows", 1L)
.put("val", 13l)
.build()
)
),
new Result<TimeseriesResultValue>(
new DateTime("2013-01-04T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
.put("rows", 1L)
.put("val", 91l)
.build()
)
),
new Result<TimeseriesResultValue>(
new DateTime("2013-01-05T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
.put("rows", 1L)
.put("val", 47l)
.build()
)
)
);
try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory();
QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(segment),
factory.getToolchest()
);
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -162,7 +162,7 @@ public class TestIndex
{
StringInputRowParser parser = new StringInputRowParser(
new TimestampSpec("ts", "iso"),
new DelimitedDataSpec("\t", Arrays.asList(COLUMNS), Arrays.asList(DIMENSIONS)),
new DelimitedDataSpec("\t", Arrays.asList(COLUMNS), Arrays.asList(DIMENSIONS), null),
Arrays.<String>asList()
);
boolean runOnce = false;