From 6c4e844f97bf2ab753a7ed0d3d4d93ac39e2457e Mon Sep 17 00:00:00 2001 From: fjy Date: Fri, 19 Apr 2013 16:56:56 -0700 Subject: [PATCH] semi working spatial --- .../druid/index/column/SpatialIndex.java | 4 +- .../DictionaryEncodedColumnPartSerde.java | 2 +- .../serde/SpatialIndexColumnPartSupplier.java | 32 +--- .../druid/index/v1/IncrementalIndex.java | 4 +- .../com/metamx/druid/index/v1/IndexIO.java | 10 +- .../metamx/druid/index/v1/IndexMerger.java | 105 ++++++------ .../metamx/druid/index/v1/MMappedIndex.java | 10 -- .../v1/QueryableIndexStorageAdapter.java | 2 +- .../druid/index/brita/SpatialFilterTest.java | 149 ++++++++++++++++++ 9 files changed, 221 insertions(+), 97 deletions(-) create mode 100644 server/src/test/java/com/metamx/druid/index/brita/SpatialFilterTest.java diff --git a/index-common/src/main/java/com/metamx/druid/index/column/SpatialIndex.java b/index-common/src/main/java/com/metamx/druid/index/column/SpatialIndex.java index 800ab293234..20a6ff5b9ac 100644 --- a/index-common/src/main/java/com/metamx/druid/index/column/SpatialIndex.java +++ b/index-common/src/main/java/com/metamx/druid/index/column/SpatialIndex.java @@ -24,7 +24,5 @@ import com.metamx.common.spatial.rtree.ImmutableRTree; */ public interface SpatialIndex { - public int getCardinality(); - public String getValue(int index); - public ImmutableRTree getRTree(String value); + public ImmutableRTree getRTree(); } diff --git a/index-common/src/main/java/com/metamx/druid/index/serde/DictionaryEncodedColumnPartSerde.java b/index-common/src/main/java/com/metamx/druid/index/serde/DictionaryEncodedColumnPartSerde.java index ddd5b2f5356..fecb35e9083 100644 --- a/index-common/src/main/java/com/metamx/druid/index/serde/DictionaryEncodedColumnPartSerde.java +++ b/index-common/src/main/java/com/metamx/druid/index/serde/DictionaryEncodedColumnPartSerde.java @@ -149,7 +149,7 @@ public class DictionaryEncodedColumnPartSerde implements ColumnPartSerde ); builder.setBitmapIndex(new BitmapIndexColumnPartSupplier(bitmaps, dictionary)); - builder.setSpatialIndex(new SpatialIndexColumnPartSupplier(spatialIndex, dictionary)); + builder.setSpatialIndex(new SpatialIndexColumnPartSupplier(spatialIndex)); return new DictionaryEncodedColumnPartSerde( dictionary, diff --git a/index-common/src/main/java/com/metamx/druid/index/serde/SpatialIndexColumnPartSupplier.java b/index-common/src/main/java/com/metamx/druid/index/serde/SpatialIndexColumnPartSupplier.java index 413d936b8ca..5c9c9e415b8 100644 --- a/index-common/src/main/java/com/metamx/druid/index/serde/SpatialIndexColumnPartSupplier.java +++ b/index-common/src/main/java/com/metamx/druid/index/serde/SpatialIndexColumnPartSupplier.java @@ -30,14 +30,12 @@ public class SpatialIndexColumnPartSupplier implements Supplier private static final ImmutableRTree EMPTY_SET = new ImmutableRTree(); private final GenericIndexed indexedTree; - private final GenericIndexed dictionary; public SpatialIndexColumnPartSupplier( - GenericIndexed indexedTree, - GenericIndexed dictionary - ) { + GenericIndexed indexedTree + ) + { this.indexedTree = indexedTree; - this.dictionary = dictionary; } @Override @@ -46,28 +44,10 @@ public class SpatialIndexColumnPartSupplier implements Supplier return new SpatialIndex() { @Override - public int getCardinality() + public ImmutableRTree getRTree() { - return dictionary.size(); - } - - @Override - public String getValue(int index) - { - return dictionary.get(index); - } - - @Override - public ImmutableRTree getRTree(String value) - { - final int index = dictionary.indexOf(value); - - if (index < 0) { - return EMPTY_SET; - } - - final ImmutableRTree rTree = indexedTree.get(index); - return rTree == null ? EMPTY_SET : rTree; + // There is only ever 1 RTree per dimension + return indexedTree.get(0); } }; } diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndex.java b/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndex.java index b71cadbc58a..46759476b70 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndex.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/IncrementalIndex.java @@ -138,7 +138,9 @@ public class IncrementalIndex implements Iterable dimension = dimension.toLowerCase(); List dimensionValues = row.getDimension(dimension); - // FIXME: HACK!!! Rewrite this when this file is rewritten + // FIXME: Must be a better way to do this + // Join all coordinate dimension values into a single string for bitmap indexing + // Split this string for spatial indexing if (dimension.endsWith(".geo")) { dimensionValues = Arrays.asList(JOINER.join(dimensionValues)); } diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java b/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java index 0dda868e402..6d3ff975549 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/IndexIO.java @@ -543,7 +543,13 @@ public class IndexIO } builder.addSerde( - new DictionaryEncodedColumnPartSerde(dictionary, singleValCol, multiValCol, bitmaps, spatialIndex) + new DictionaryEncodedColumnPartSerde( + dictionary, + singleValCol, + multiValCol, + bitmaps, + spatialIndex + ) ); final ColumnDescriptor serdeficator = builder.build(); @@ -701,7 +707,7 @@ public class IndexIO ) ).setSpatialIndex( new SpatialIndexColumnPartSupplier( - index.getSpatialIndexes().get(dimension), index.getDimValueLookup(dimension) + index.getSpatialIndexes().get(dimension) ) ).build() ); diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/IndexMerger.java b/index-common/src/main/java/com/metamx/druid/index/v1/IndexMerger.java index ddee90e867f..91d073bb0b2 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/IndexMerger.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/IndexMerger.java @@ -689,59 +689,6 @@ public class IndexMerger System.currentTimeMillis() - startTime ); - - /************ Create Geographical Indexes *************/ - // FIXME: Rewrite when indexing is updated - Stopwatch stopwatch = new Stopwatch(); - stopwatch.start(); - - final File geoFile = new File(v8OutDir, "spatial.drd"); - Files.touch(geoFile); - out = Files.newOutputStreamSupplier(geoFile, true); - - for (int i = 0; i < mergedDimensions.size(); ++i) { - String dimension = mergedDimensions.get(i); - - File dimOutFile = dimOuts.get(i).getFile(); - final MappedByteBuffer dimValsMapped = Files.map(dimOutFile); - - if (!dimension.equals(serializerUtils.readString(dimValsMapped))) { - throw new ISE("dimensions[%s] didn't equate!? This is a major WTF moment.", dimension); - } - Indexed dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.stringStrategy); - log.info("Indexing geo dimension[%s] with cardinality[%,d]", dimension, dimVals.size()); - - FlattenedArrayWriter writer = new FlattenedArrayWriter( - ioPeon, dimension, IndexedRTree.objectStrategy - ); - writer.open(); - - RTree tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50)); - - if (dimension.endsWith(".geo")) { - int count = 0; - for (String dimVal : IndexedIterable.create(dimVals)) { - progress.progress(); - - List stringCoords = Lists.newArrayList(SPLITTER.split(dimVal)); - float[] coords = new float[stringCoords.size()]; - for (int j = 0; j < coords.length; j++) { - coords[j] = Float.valueOf(stringCoords.get(j)); - } - tree.insert(coords, count); - } - } - writer.write(ImmutableRTree.newImmutableFromMutable(tree)); - writer.close(); - - serializerUtils.writeString(out, dimension); - ByteStreams.copy(writer.combineStreams(), out); - ioPeon.cleanup(); - - log.info("Completed spatial dimension[%s] in %,d millis.", dimension, stopwatch.elapsedMillis()); - } - - /************ Create Inverted Indexes *************/ startTime = System.currentTimeMillis(); @@ -798,6 +745,58 @@ public class IndexMerger log.info("Completed dimension[%s] in %,d millis.", dimension, System.currentTimeMillis() - dimStartTime); } + /************ Create Geographical Indexes *************/ + // FIXME: Rewrite when indexing is updated + Stopwatch stopwatch = new Stopwatch(); + stopwatch.start(); + + final File geoFile = new File(v8OutDir, "spatial.drd"); + Files.touch(geoFile); + out = Files.newOutputStreamSupplier(geoFile, true); + + for (int i = 0; i < mergedDimensions.size(); ++i) { + String dimension = mergedDimensions.get(i); + + File dimOutFile = dimOuts.get(i).getFile(); + final MappedByteBuffer dimValsMapped = Files.map(dimOutFile); + + if (!dimension.equals(serializerUtils.readString(dimValsMapped))) { + throw new ISE("dimensions[%s] didn't equate!? This is a major WTF moment.", dimension); + } + Indexed dimVals = GenericIndexed.read(dimValsMapped, GenericIndexed.stringStrategy); + log.info("Indexing geo dimension[%s] with cardinality[%,d]", dimension, dimVals.size()); + + FlattenedArrayWriter writer = new FlattenedArrayWriter( + ioPeon, dimension, IndexedRTree.objectStrategy + ); + writer.open(); + + RTree tree = new RTree(2, new LinearGutmanSplitStrategy(0, 50)); + + if (dimension.endsWith(".geo")) { + int count = 0; + for (String dimVal : IndexedIterable.create(dimVals)) { + progress.progress(); + + List stringCoords = Lists.newArrayList(SPLITTER.split(dimVal)); + float[] coords = new float[stringCoords.size()]; + for (int j = 0; j < coords.length; j++) { + coords[j] = Float.valueOf(stringCoords.get(j)); + } + tree.insert(coords, count); + count++; + } + } + writer.write(ImmutableRTree.newImmutableFromMutable(tree)); + writer.close(); + + serializerUtils.writeString(out, dimension); + ByteStreams.copy(writer.combineStreams(), out); + ioPeon.cleanup(); + + log.info("Completed spatial dimension[%s] in %,d millis.", dimension, stopwatch.elapsedMillis()); + } + log.info("outDir[%s] completed inverted.drd in %,d millis.", v8OutDir, System.currentTimeMillis() - startTime); final ArrayList expectedFiles = Lists.newArrayList( diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/MMappedIndex.java b/index-common/src/main/java/com/metamx/druid/index/v1/MMappedIndex.java index 687c7672ec8..ea8347d96af 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/MMappedIndex.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/MMappedIndex.java @@ -19,25 +19,15 @@ package com.metamx.druid.index.v1; -import com.google.common.base.Function; -import com.google.common.collect.Iterables; import com.google.common.collect.Maps; -import com.google.common.primitives.Ints; import com.metamx.common.logger.Logger; import com.metamx.common.spatial.rtree.ImmutableRTree; -import com.metamx.druid.kv.ConciseCompressedIndexedInts; import com.metamx.druid.kv.GenericIndexed; -import com.metamx.druid.kv.Indexed; -import com.metamx.druid.kv.IndexedList; import com.metamx.druid.kv.IndexedLongs; import com.metamx.druid.kv.VSizeIndexed; -import com.metamx.druid.kv.VSizeIndexedInts; import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet; import org.joda.time.Interval; -import java.nio.ByteOrder; -import java.nio.LongBuffer; -import java.util.Arrays; import java.util.Map; /** diff --git a/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java b/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java index 9d54140d241..7d4d043ac1d 100644 --- a/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java +++ b/server/src/main/java/com/metamx/druid/index/v1/QueryableIndexStorageAdapter.java @@ -244,7 +244,7 @@ public class QueryableIndexStorageAdapter extends BaseStorageAdapter return new ImmutableRTree(); } - return column.getSpatialIndex().getRTree(dimension); + return column.getSpatialIndex().getRTree(); } @Override diff --git a/server/src/test/java/com/metamx/druid/index/brita/SpatialFilterTest.java b/server/src/test/java/com/metamx/druid/index/brita/SpatialFilterTest.java new file mode 100644 index 00000000000..7255ce00405 --- /dev/null +++ b/server/src/test/java/com/metamx/druid/index/brita/SpatialFilterTest.java @@ -0,0 +1,149 @@ +package com.metamx.druid.index.brita; + +import com.google.common.base.Throwables; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import com.metamx.common.spatial.rtree.search.RadiusBound; +import com.metamx.druid.Druids; +import com.metamx.druid.QueryGranularity; +import com.metamx.druid.TestHelper; +import com.metamx.druid.aggregation.AggregatorFactory; +import com.metamx.druid.aggregation.CountAggregatorFactory; +import com.metamx.druid.aggregation.DoubleSumAggregatorFactory; +import com.metamx.druid.aggregation.LongSumAggregatorFactory; +import com.metamx.druid.index.QueryableIndex; +import com.metamx.druid.index.QueryableIndexSegment; +import com.metamx.druid.index.v1.IncrementalIndex; +import com.metamx.druid.index.v1.IndexIO; +import com.metamx.druid.index.v1.IndexMerger; +import com.metamx.druid.input.MapBasedInputRow; +import com.metamx.druid.query.FinalizeResultsQueryRunner; +import com.metamx.druid.query.QueryRunner; +import com.metamx.druid.query.filter.SpatialDimFilter; +import com.metamx.druid.query.timeseries.TimeseriesQuery; +import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerFactory; +import com.metamx.druid.result.Result; +import com.metamx.druid.result.TimeseriesResultValue; +import org.joda.time.DateTime; +import org.joda.time.Interval; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +/** + */ +public class SpatialFilterTest +{ + private QueryableIndex makeIndex() throws IOException + { + final List dims = Lists.newArrayList("dim", "dim.geo"); + IncrementalIndex theIndex = new IncrementalIndex( + new DateTime("2013-01-01").getMillis(), + QueryGranularity.DAY, + new AggregatorFactory[]{ + new CountAggregatorFactory("rows"), + new LongSumAggregatorFactory("val", "val") + } + ); + theIndex.add( + new MapBasedInputRow( + new DateTime("2013-01-01").getMillis(), + dims, + ImmutableMap.of( + "timestamp", new DateTime("2013-01-01").toString(), + "dim", "foo", + "dim.geo", Arrays.asList(0.0f, 0.0f), + "val", 17l + ) + ) + ); + theIndex.add( + new MapBasedInputRow( + new DateTime("2013-01-02").getMillis(), + dims, + ImmutableMap.of( + "timestamp", new DateTime("2013-01-02").toString(), + "dim", "foo", + "dim.geo", Arrays.asList(1.0f, 3.0f), + "val", 29l + ) + ) + ); + theIndex.add( + new MapBasedInputRow( + new DateTime("2013-01-03").getMillis(), + dims, + ImmutableMap.of( + "timestamp", new DateTime("2013-01-03").toString(), + "dim", "foo", + "dim.geo", Arrays.asList(4.0f, 2.0f), + "val", 13l + ) + ) + ); + theIndex.add( + new MapBasedInputRow( + new DateTime("2013-01-03").getMillis(), + dims, + ImmutableMap.of( + "timestamp", new DateTime("2013-01-03").toString(), + "dim", "foo", + "dim.geo", Arrays.asList(7.0f, 3.0f), + "val", 91l + ) + ) + ); + + File tmpFile = File.createTempFile("billy", "yay"); + tmpFile.delete(); + tmpFile.mkdirs(); + tmpFile.deleteOnExit(); + + IndexMerger.persist(theIndex, tmpFile); + return IndexIO.loadIndex(tmpFile); + } + + @Test + public void testSpatialQuery() + { + TimeseriesQuery query = Druids.newTimeseriesQueryBuilder() + .dataSource("test") + .granularity(QueryGranularity.ALL) + .intervals(Arrays.asList(new Interval("2013-01-01/2013-01-07"))) + .filters(new SpatialDimFilter("dim.geo", new RadiusBound(new float[]{0.0f, 0.0f}, 5))) + .aggregators( + Arrays.asList( + new CountAggregatorFactory("rows"), + new LongSumAggregatorFactory("val", "val") + ) + ) + .build(); + + List> expectedResults = Arrays.asList( + new Result( + new DateTime("2013-01-01T00:00:00.000Z"), + new TimeseriesResultValue( + ImmutableMap.builder() + .put("rows", 3L) + .put("val", 59l) + .build() + ) + ) + ); + try { + TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(); + QueryRunner runner = new FinalizeResultsQueryRunner( + factory.createRunner(new QueryableIndexSegment(null, makeIndex())), + factory.getToolchest() + ); + + TestHelper.assertExpectedResults(expectedResults, runner.run(query)); + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } +}