From e01a78515e41bdd7161211fd40de64b135691c65 Mon Sep 17 00:00:00 2001 From: fjy Date: Thu, 25 Apr 2013 19:01:45 -0700 Subject: [PATCH] many more fixes and unit tests --- .../metamx/druid/index/v1/IndexMerger.java | 1 + .../druid/index/brita/SpatialFilter.java | 12 +- .../index/brita/ValueMatcherFactory.java | 2 + .../v1/IncrementalIndexStorageAdapter.java | 43 ++++ .../druid/index/brita/SpatialFilterTest.java | 232 ++++++++++++++++-- 5 files changed, 255 insertions(+), 35 deletions(-) diff --git a/index-common/src/main/java/com/metamx/druid/index/v1/IndexMerger.java b/index-common/src/main/java/com/metamx/druid/index/v1/IndexMerger.java index 91d073bb0b2..61af64df73a 100644 --- a/index-common/src/main/java/com/metamx/druid/index/v1/IndexMerger.java +++ b/index-common/src/main/java/com/metamx/druid/index/v1/IndexMerger.java @@ -787,6 +787,7 @@ public class IndexMerger count++; } } + writer.write(ImmutableRTree.newImmutableFromMutable(tree)); writer.close(); diff --git a/server/src/main/java/com/metamx/druid/index/brita/SpatialFilter.java b/server/src/main/java/com/metamx/druid/index/brita/SpatialFilter.java index cccaeead1e7..b813b2f07a7 100644 --- a/server/src/main/java/com/metamx/druid/index/brita/SpatialFilter.java +++ b/server/src/main/java/com/metamx/druid/index/brita/SpatialFilter.java @@ -21,6 +21,7 @@ package com.metamx.druid.index.brita; import com.google.common.base.Function; import com.google.common.base.Predicate; import com.google.common.collect.Iterables; +import com.metamx.common.spatial.rtree.ImmutableRTree; import com.metamx.common.spatial.rtree.search.Bound; import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet; @@ -43,6 +44,8 @@ public class SpatialFilter implements Filter @Override public ImmutableConciseSet goConcise(final BitmapIndexSelector selector) { + //ImmutableRTree foo = selector.getSpatialIndex(dimension); + //ImmutableRTree.print(foo); Iterable indexes = selector.getSpatialIndex(dimension).search(bound); return ImmutableConciseSet.union( Iterables.transform( @@ -64,14 +67,7 @@ public class SpatialFilter implements Filter { return factory.makeValueMatcher( dimension, - new Predicate() - { - @Override - public boolean apply(String input) - { - return true; - } - } + bound ); } } diff --git a/server/src/main/java/com/metamx/druid/index/brita/ValueMatcherFactory.java b/server/src/main/java/com/metamx/druid/index/brita/ValueMatcherFactory.java index 84cac3fb489..1bc9bb8c5c9 100644 --- a/server/src/main/java/com/metamx/druid/index/brita/ValueMatcherFactory.java +++ b/server/src/main/java/com/metamx/druid/index/brita/ValueMatcherFactory.java @@ -20,6 +20,7 @@ package com.metamx.druid.index.brita; import com.google.common.base.Predicate; +import com.metamx.common.spatial.rtree.search.Bound; /** */ @@ -27,4 +28,5 @@ public interface ValueMatcherFactory { public ValueMatcher makeValueMatcher(String dimension, String value); public ValueMatcher makeValueMatcher(String dimension, Predicate value); + public ValueMatcher makeValueMatcher(String dimension, Bound bound); } diff --git a/server/src/main/java/com/metamx/druid/index/v1/IncrementalIndexStorageAdapter.java b/server/src/main/java/com/metamx/druid/index/v1/IncrementalIndexStorageAdapter.java index 9e5694ef4e4..51e49433fd5 100644 --- a/server/src/main/java/com/metamx/druid/index/v1/IncrementalIndexStorageAdapter.java +++ b/server/src/main/java/com/metamx/druid/index/v1/IncrementalIndexStorageAdapter.java @@ -21,6 +21,7 @@ package com.metamx.druid.index.v1; import com.google.common.base.Function; import com.google.common.base.Predicate; +import com.google.common.base.Splitter; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; @@ -28,6 +29,7 @@ import com.google.common.collect.Sets; import com.metamx.common.IAE; import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterator; +import com.metamx.common.spatial.rtree.search.Bound; import com.metamx.druid.Capabilities; import com.metamx.druid.QueryGranularity; import com.metamx.druid.StorageAdapter; @@ -62,6 +64,8 @@ import java.util.concurrent.ConcurrentNavigableMap; */ public class IncrementalIndexStorageAdapter implements StorageAdapter { + private static final Splitter SPLITTER = Splitter.on(","); + private final IncrementalIndex index; public IncrementalIndexStorageAdapter( @@ -512,6 +516,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter } } + return new FunctionalIterable(retVal).limit(query.getLimit()); } @@ -534,6 +539,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter public void set(Map.Entry currEntry) { this.currEntry = currEntry; + this.currEntry = currEntry; } public IncrementalIndex.TimeAndDims getKey() @@ -620,7 +626,44 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter return false; } }; + } + @Override + public ValueMatcher makeValueMatcher(final String dimension, final Bound bound) + { + if (!dimension.endsWith(".geo")) { + return new BooleanValueMatcher(false); + } + + Integer dimIndexObject = index.getDimensionIndex(dimension.toLowerCase()); + if (dimIndexObject == null) { + return new BooleanValueMatcher(false); + } + final int dimIndex = dimIndexObject; + + return new ValueMatcher() + { + @Override + public boolean matches() + { + String[][] dims = holder.getKey().getDims(); + if (dimIndex >= dims.length || dims[dimIndex] == null) { + return false; + } + + for (String dimVal : dims[dimIndex]) { + List stringCoords = Lists.newArrayList(SPLITTER.split(dimVal)); + float[] coords = new float[stringCoords.size()]; + for (int j = 0; j < coords.length; j++) { + coords[j] = Float.valueOf(stringCoords.get(j)); + } + if (bound.contains(coords)) { + return true; + } + } + return false; + } + }; } } } diff --git a/server/src/test/java/com/metamx/druid/index/brita/SpatialFilterTest.java b/server/src/test/java/com/metamx/druid/index/brita/SpatialFilterTest.java index eaf800cd529..1d25d2c4c94 100644 --- a/server/src/test/java/com/metamx/druid/index/brita/SpatialFilterTest.java +++ b/server/src/test/java/com/metamx/druid/index/brita/SpatialFilterTest.java @@ -2,25 +2,23 @@ package com.metamx.druid.index.brita; import com.google.common.base.Throwables; import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; -import com.metamx.common.spatial.rtree.ImmutableRTree; -import com.metamx.common.spatial.rtree.RTree; import com.metamx.common.spatial.rtree.search.RadiusBound; -import com.metamx.common.spatial.rtree.split.LinearGutmanSplitStrategy; +import com.metamx.common.spatial.rtree.search.RectangularBound; import com.metamx.druid.Druids; import com.metamx.druid.QueryGranularity; import com.metamx.druid.TestHelper; import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.CountAggregatorFactory; -import com.metamx.druid.aggregation.DoubleSumAggregatorFactory; import com.metamx.druid.aggregation.LongSumAggregatorFactory; +import com.metamx.druid.index.IncrementalIndexSegment; import com.metamx.druid.index.QueryableIndex; import com.metamx.druid.index.QueryableIndexSegment; +import com.metamx.druid.index.Segment; import com.metamx.druid.index.v1.IncrementalIndex; import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.index.v1.IndexMerger; +import com.metamx.druid.index.v1.TestIndex; import com.metamx.druid.input.MapBasedInputRow; import com.metamx.druid.query.FinalizeResultsQueryRunner; import com.metamx.druid.query.QueryRunner; @@ -29,37 +27,65 @@ import com.metamx.druid.query.timeseries.TimeseriesQuery; import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerFactory; import com.metamx.druid.result.Result; import com.metamx.druid.result.TimeseriesResultValue; -import junit.framework.Assert; import org.joda.time.DateTime; import org.joda.time.Interval; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.io.File; import java.io.IOException; import java.util.Arrays; +import java.util.Collection; import java.util.List; import java.util.Random; -import java.util.Set; /** */ +@RunWith(Parameterized.class) public class SpatialFilterTest { - private QueryableIndex makeIndex() throws IOException + private static Interval DATA_INTERVAL = new Interval("2013-01-01/2013-01-07"); + + private static AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{ + new CountAggregatorFactory("rows"), + new LongSumAggregatorFactory("val", "val") + }; + + private static List DIMS = Lists.newArrayList("dim", "dim.geo"); + + @Parameterized.Parameters + public static Collection constructorFeeder() throws IOException { - final List dims = Lists.newArrayList("dim", "dim.geo"); - IncrementalIndex theIndex = new IncrementalIndex( - new DateTime("2013-01-01").getMillis(), - QueryGranularity.DAY, - new AggregatorFactory[]{ - new CountAggregatorFactory("rows"), - new LongSumAggregatorFactory("val", "val") + final IncrementalIndex rtIndex = makeIncrementalIndex(); + final QueryableIndex mMappedTestIndex = makeQueryableIndex(); + final QueryableIndex mergedRealtimeIndex = makeMergedQueryableIndex(); + return Arrays.asList( + new Object[][]{ + { + new IncrementalIndexSegment(rtIndex) + }, + { + new QueryableIndexSegment(null, mMappedTestIndex) + }, + { + new QueryableIndexSegment(null, mergedRealtimeIndex) + } } ); + } + + private static IncrementalIndex makeIncrementalIndex() throws IOException + { + IncrementalIndex theIndex = new IncrementalIndex( + DATA_INTERVAL.getStartMillis(), + QueryGranularity.DAY, + METRIC_AGGS + ); theIndex.add( new MapBasedInputRow( new DateTime("2013-01-01").getMillis(), - dims, + DIMS, ImmutableMap.of( "timestamp", new DateTime("2013-01-01").toString(), "dim", "foo", @@ -71,7 +97,7 @@ public class SpatialFilterTest theIndex.add( new MapBasedInputRow( new DateTime("2013-01-02").getMillis(), - dims, + DIMS, ImmutableMap.of( "timestamp", new DateTime("2013-01-02").toString(), "dim", "foo", @@ -83,7 +109,7 @@ public class SpatialFilterTest theIndex.add( new MapBasedInputRow( new DateTime("2013-01-03").getMillis(), - dims, + DIMS, ImmutableMap.of( "timestamp", new DateTime("2013-01-03").toString(), "dim", "foo", @@ -95,9 +121,9 @@ public class SpatialFilterTest theIndex.add( new MapBasedInputRow( new DateTime("2013-01-04").getMillis(), - dims, + DIMS, ImmutableMap.of( - "timestamp", new DateTime("2013-01-03").toString(), + "timestamp", new DateTime("2013-01-04").toString(), "dim", "foo", "dim.geo", Arrays.asList(7.0f, 3.0f), "val", 91l @@ -107,9 +133,9 @@ public class SpatialFilterTest theIndex.add( new MapBasedInputRow( new DateTime("2013-01-05").getMillis(), - dims, + DIMS, ImmutableMap.of( - "timestamp", new DateTime("2013-01-03").toString(), + "timestamp", new DateTime("2013-01-05").toString(), "dim", "foo", "dim.geo", Arrays.asList(8.0f, 6.0f), "val", 47l @@ -123,7 +149,7 @@ public class SpatialFilterTest theIndex.add( new MapBasedInputRow( new DateTime("2013-01-01").getMillis(), - dims, + DIMS, ImmutableMap.of( "timestamp", new DateTime("2013-01-01").toString(), @@ -138,6 +164,12 @@ public class SpatialFilterTest ); } + return theIndex; + } + + private static QueryableIndex makeQueryableIndex() throws IOException + { + IncrementalIndex theIndex = makeIncrementalIndex(); File tmpFile = File.createTempFile("billy", "yay"); tmpFile.delete(); tmpFile.mkdirs(); @@ -147,6 +179,152 @@ public class SpatialFilterTest return IndexIO.loadIndex(tmpFile); } + private static QueryableIndex makeMergedQueryableIndex() + { + try { + IncrementalIndex first = new IncrementalIndex( + DATA_INTERVAL.getStartMillis(), + QueryGranularity.DAY, + METRIC_AGGS + ); + IncrementalIndex second = new IncrementalIndex( + DATA_INTERVAL.getStartMillis(), + QueryGranularity.DAY, + METRIC_AGGS + ); + IncrementalIndex third = new IncrementalIndex( + DATA_INTERVAL.getStartMillis(), + QueryGranularity.DAY, + METRIC_AGGS + ); + + + first.add( + new MapBasedInputRow( + new DateTime("2013-01-01").getMillis(), + DIMS, + ImmutableMap.of( + "timestamp", new DateTime("2013-01-01").toString(), + "dim", "foo", + "dim.geo", Arrays.asList(0.0f, 0.0f), + "val", 17l + ) + ) + ); + first.add( + new MapBasedInputRow( + new DateTime("2013-01-02").getMillis(), + DIMS, + ImmutableMap.of( + "timestamp", new DateTime("2013-01-02").toString(), + "dim", "foo", + "dim.geo", Arrays.asList(1.0f, 3.0f), + "val", 29l + ) + ) + ); + first.add( + new MapBasedInputRow( + new DateTime("2013-01-03").getMillis(), + DIMS, + ImmutableMap.of( + "timestamp", new DateTime("2013-01-03").toString(), + "dim", "foo", + "dim.geo", Arrays.asList(4.0f, 2.0f), + "val", 13l + ) + ) + ); + second.add( + new MapBasedInputRow( + new DateTime("2013-01-04").getMillis(), + DIMS, + ImmutableMap.of( + "timestamp", new DateTime("2013-01-04").toString(), + "dim", "foo", + "dim.geo", Arrays.asList(7.0f, 3.0f), + "val", 91l + ) + ) + ); + second.add( + new MapBasedInputRow( + new DateTime("2013-01-05").getMillis(), + DIMS, + ImmutableMap.of( + "timestamp", new DateTime("2013-01-05").toString(), + "dim", "foo", + "dim.geo", Arrays.asList(8.0f, 6.0f), + "val", 47l + ) + ) + ); + + // Add a bunch of random points + Random rand = new Random(); + for (int i = 5; i < 5000; i++) { + third.add( + new MapBasedInputRow( + new DateTime("2013-01-01").getMillis(), + DIMS, + ImmutableMap.of( + "timestamp", + new DateTime("2013-01-01").toString(), + "dim", + "boo", + "dim.geo", + Arrays.asList((float) (rand.nextFloat() * 10 + 10.0), (float) (rand.nextFloat() * 10 + 10.0)), + "val", + i + ) + ) + ); + } + + + File tmpFile = File.createTempFile("yay", "who"); + tmpFile.delete(); + + File firstFile = new File(tmpFile, "first"); + File secondFile = new File(tmpFile, "second"); + File thirdFile = new File(tmpFile, "third"); + File mergedFile = new File(tmpFile, "merged"); + + firstFile.mkdirs(); + firstFile.deleteOnExit(); + secondFile.mkdirs(); + secondFile.deleteOnExit(); + thirdFile.mkdirs(); + thirdFile.deleteOnExit(); + mergedFile.mkdirs(); + mergedFile.deleteOnExit(); + + IndexMerger.persist(first, DATA_INTERVAL, firstFile); + IndexMerger.persist(second, DATA_INTERVAL, secondFile); + IndexMerger.persist(third, DATA_INTERVAL, thirdFile); + + QueryableIndex mergedRealtime = IndexIO.loadIndex( + IndexMerger.mergeQueryableIndex( + Arrays.asList(IndexIO.loadIndex(firstFile), IndexIO.loadIndex(secondFile), IndexIO.loadIndex(thirdFile)), + METRIC_AGGS, + mergedFile + ) + ); + + return mergedRealtime; + } + catch (IOException e) { + throw Throwables.propagate(e); + } + } + + private final Segment segment; + + public SpatialFilterTest(Segment segment) + { + this.segment = segment; + } + @Test public void testSpatialQuery() { @@ -182,7 +360,7 @@ public class SpatialFilterTest try { TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(); QueryRunner runner = new FinalizeResultsQueryRunner( - factory.createRunner(new QueryableIndexSegment(null, makeIndex())), + factory.createRunner(segment), factory.getToolchest() ); @@ -203,7 +381,7 @@ public class SpatialFilterTest .filters( new SpatialDimFilter( "dim.geo", - new RadiusBound(new float[]{0.0f, 0.0f}, 10) + new RectangularBound(new float[]{0.0f, 0.0f}, new float[]{9.0f, 9.0f}) ) ) .aggregators( @@ -264,7 +442,7 @@ public class SpatialFilterTest try { TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(); QueryRunner runner = new FinalizeResultsQueryRunner( - factory.createRunner(new QueryableIndexSegment(null, makeIndex())), + factory.createRunner(segment), factory.getToolchest() );