semi working spatial

This commit is contained in:
fjy 2013-04-19 16:56:56 -07:00
parent 71269d7e88
commit 6c4e844f97
9 changed files with 221 additions and 97 deletions

View File

@ -24,7 +24,5 @@ import com.metamx.common.spatial.rtree.ImmutableRTree;
*/
public interface SpatialIndex
{
public int getCardinality();
public String getValue(int index);
public ImmutableRTree getRTree(String value);
public ImmutableRTree getRTree();
}

View File

@ -149,7 +149,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde
);
builder.setBitmapIndex(new BitmapIndexColumnPartSupplier(bitmaps, dictionary));
builder.setSpatialIndex(new SpatialIndexColumnPartSupplier(spatialIndex, dictionary));
builder.setSpatialIndex(new SpatialIndexColumnPartSupplier(spatialIndex));
return new DictionaryEncodedColumnPartSerde(
dictionary,

View File

@ -30,14 +30,12 @@ public class SpatialIndexColumnPartSupplier implements Supplier<SpatialIndex>
private static final ImmutableRTree EMPTY_SET = new ImmutableRTree();
private final GenericIndexed<ImmutableRTree> indexedTree;
private final GenericIndexed<String> dictionary;
public SpatialIndexColumnPartSupplier(
GenericIndexed<ImmutableRTree> indexedTree,
GenericIndexed<String> dictionary
) {
GenericIndexed<ImmutableRTree> indexedTree
)
{
this.indexedTree = indexedTree;
this.dictionary = dictionary;
}
@Override
@ -46,28 +44,10 @@ public class SpatialIndexColumnPartSupplier implements Supplier<SpatialIndex>
return new SpatialIndex()
{
@Override
public int getCardinality()
public ImmutableRTree getRTree()
{
return dictionary.size();
}
@Override
public String getValue(int index)
{
return dictionary.get(index);
}
@Override
public ImmutableRTree getRTree(String value)
{
final int index = dictionary.indexOf(value);
if (index < 0) {
return EMPTY_SET;
}
final ImmutableRTree rTree = indexedTree.get(index);
return rTree == null ? EMPTY_SET : rTree;
// There is only ever 1 RTree per dimension
return indexedTree.get(0);
}
};
}

View File

@ -138,7 +138,9 @@ public class IncrementalIndex implements Iterable<Row>
dimension = dimension.toLowerCase();
List<String> dimensionValues = row.getDimension(dimension);
// FIXME: HACK!!! Rewrite this when this file is rewritten
// FIXME: Must be a better way to do this
// Join all coordinate dimension values into a single string for bitmap indexing
// Split this string for spatial indexing
if (dimension.endsWith(".geo")) {
dimensionValues = Arrays.asList(JOINER.join(dimensionValues));
}

View File

@ -543,7 +543,13 @@ public class IndexIO
}
builder.addSerde(
new DictionaryEncodedColumnPartSerde(dictionary, singleValCol, multiValCol, bitmaps, spatialIndex)
new DictionaryEncodedColumnPartSerde(
dictionary,
singleValCol,
multiValCol,
bitmaps,
spatialIndex
)
);
final ColumnDescriptor serdeficator = builder.build();
@ -701,7 +707,7 @@ public class IndexIO
)
).setSpatialIndex(
new SpatialIndexColumnPartSupplier(
index.getSpatialIndexes().get(dimension), index.getDimValueLookup(dimension)
index.getSpatialIndexes().get(dimension)
)
).build()
);

View File

@ -689,59 +689,6 @@ public class IndexMerger
System.currentTimeMillis() - startTime
);
/************ Create Geographical Indexes *************/
// FIXME: Rewrite when indexing is updated
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
final File geoFile = new File(v8OutDir, "spatial.drd");
Files.touch(geoFile);
out = Files.newOutputStreamSupplier(geoFile, true);
for (int i = 0; i < mergedDimensions.size(); ++i) {
String dimension = mergedDimensions.get(i);
File dimOutFile = dimOuts.get(i).getFile();
final MappedByteBuffer dimValsMapped = Files.map(dimOutFile);
if (!dimension.equals(serializerUtils.readString(dimValsMapped))) {
throw new ISE("dimensions[%s] didn't equate!? This is a major WTF moment.", dimension);
}
Indexed<String> dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.stringStrategy);
log.info("Indexing geo dimension[%s] with cardinality[%,d]", dimension, dimVals.size());
FlattenedArrayWriter<ImmutableRTree> writer = new FlattenedArrayWriter<ImmutableRTree>(
ioPeon, dimension, IndexedRTree.objectStrategy
);
writer.open();
RTree tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50));
if (dimension.endsWith(".geo")) {
int count = 0;
for (String dimVal : IndexedIterable.create(dimVals)) {
progress.progress();
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
float[] coords = new float[stringCoords.size()];
for (int j = 0; j < coords.length; j++) {
coords[j] = Float.valueOf(stringCoords.get(j));
}
tree.insert(coords, count);
}
}
writer.write(ImmutableRTree.newImmutableFromMutable(tree));
writer.close();
serializerUtils.writeString(out, dimension);
ByteStreams.copy(writer.combineStreams(), out);
ioPeon.cleanup();
log.info("Completed spatial dimension[%s] in %,d millis.", dimension, stopwatch.elapsedMillis());
}
/************ Create Inverted Indexes *************/
startTime = System.currentTimeMillis();
@ -798,6 +745,58 @@ public class IndexMerger
log.info("Completed dimension[%s] in %,d millis.", dimension, System.currentTimeMillis() - dimStartTime);
}
/************ Create Geographical Indexes *************/
// FIXME: Rewrite when indexing is updated
Stopwatch stopwatch = new Stopwatch();
stopwatch.start();
final File geoFile = new File(v8OutDir, "spatial.drd");
Files.touch(geoFile);
out = Files.newOutputStreamSupplier(geoFile, true);
for (int i = 0; i < mergedDimensions.size(); ++i) {
String dimension = mergedDimensions.get(i);
File dimOutFile = dimOuts.get(i).getFile();
final MappedByteBuffer dimValsMapped = Files.map(dimOutFile);
if (!dimension.equals(serializerUtils.readString(dimValsMapped))) {
throw new ISE("dimensions[%s] didn't equate!? This is a major WTF moment.", dimension);
}
Indexed<String> dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.stringStrategy);
log.info("Indexing geo dimension[%s] with cardinality[%,d]", dimension, dimVals.size());
FlattenedArrayWriter<ImmutableRTree> writer = new FlattenedArrayWriter<ImmutableRTree>(
ioPeon, dimension, IndexedRTree.objectStrategy
);
writer.open();
RTree tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50));
if (dimension.endsWith(".geo")) {
int count = 0;
for (String dimVal : IndexedIterable.create(dimVals)) {
progress.progress();
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
float[] coords = new float[stringCoords.size()];
for (int j = 0; j < coords.length; j++) {
coords[j] = Float.valueOf(stringCoords.get(j));
}
tree.insert(coords, count);
count++;
}
}
writer.write(ImmutableRTree.newImmutableFromMutable(tree));
writer.close();
serializerUtils.writeString(out, dimension);
ByteStreams.copy(writer.combineStreams(), out);
ioPeon.cleanup();
log.info("Completed spatial dimension[%s] in %,d millis.", dimension, stopwatch.elapsedMillis());
}
log.info("outDir[%s] completed inverted.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime);
final ArrayList<String> expectedFiles = Lists.newArrayList(

View File

@ -19,25 +19,15 @@
package com.metamx.druid.index.v1;
import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.metamx.common.logger.Logger;
import com.metamx.common.spatial.rtree.ImmutableRTree;
import com.metamx.druid.kv.ConciseCompressedIndexedInts;
import com.metamx.druid.kv.GenericIndexed;
import com.metamx.druid.kv.Indexed;
import com.metamx.druid.kv.IndexedList;
import com.metamx.druid.kv.IndexedLongs;
import com.metamx.druid.kv.VSizeIndexed;
import com.metamx.druid.kv.VSizeIndexedInts;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
import org.joda.time.Interval;
import java.nio.ByteOrder;
import java.nio.LongBuffer;
import java.util.Arrays;
import java.util.Map;
/**

View File

@ -244,7 +244,7 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter
return new ImmutableRTree();
}
return column.getSpatialIndex().getRTree(dimension);
return column.getSpatialIndex().getRTree();
}
@Override

View File

@ -0,0 +1,149 @@
package com.metamx.druid.index.brita;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.common.spatial.rtree.search.RadiusBound;
import com.metamx.druid.Druids;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.TestHelper;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.CountAggregatorFactory;
import com.metamx.druid.aggregation.DoubleSumAggregatorFactory;
import com.metamx.druid.aggregation.LongSumAggregatorFactory;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.QueryableIndexSegment;
import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.query.FinalizeResultsQueryRunner;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.filter.SpatialDimFilter;
import com.metamx.druid.query.timeseries.TimeseriesQuery;
import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.TimeseriesResultValue;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
/**
*/
public class SpatialFilterTest
{
private QueryableIndex makeIndex() throws IOException
{
final List<String> dims = Lists.newArrayList("dim", "dim.geo");
IncrementalIndex theIndex = new IncrementalIndex(
new DateTime("2013-01-01").getMillis(),
QueryGranularity.DAY,
new AggregatorFactory[]{
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("val", "val")
}
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
dims,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-01").toString(),
"dim", "foo",
"dim.geo", Arrays.asList(0.0f, 0.0f),
"val", 17l
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-02").getMillis(),
dims,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-02").toString(),
"dim", "foo",
"dim.geo", Arrays.asList(1.0f, 3.0f),
"val", 29l
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-03").getMillis(),
dims,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-03").toString(),
"dim", "foo",
"dim.geo", Arrays.asList(4.0f, 2.0f),
"val", 13l
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-03").getMillis(),
dims,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-03").toString(),
"dim", "foo",
"dim.geo", Arrays.asList(7.0f, 3.0f),
"val", 91l
)
)
);
File tmpFile = File.createTempFile("billy", "yay");
tmpFile.delete();
tmpFile.mkdirs();
tmpFile.deleteOnExit();
IndexMerger.persist(theIndex, tmpFile);
return IndexIO.loadIndex(tmpFile);
}
@Test
public void testSpatialQuery()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.granularity(QueryGranularity.ALL)
.intervals(Arrays.asList(new Interval("2013-01-01/2013-01-07")))
.filters(new SpatialDimFilter("dim.geo", new RadiusBound(new float[]{0.0f, 0.0f}, 5)))
.aggregators(
Arrays.<AggregatorFactory>asList(
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("val", "val")
)
)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<TimeseriesResultValue>(
new DateTime("2013-01-01T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
.put("rows", 3L)
.put("val", 59l)
.build()
)
)
);
try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory();
QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(new QueryableIndexSegment(null, makeIndex())),
factory.getToolchest()
);
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}