many more fixes and unit tests

This commit is contained in:
fjy 2013-04-25 19:01:45 -07:00
parent d29b3e0960
commit e01a78515e
5 changed files with 255 additions and 35 deletions

View File

@ -787,6 +787,7 @@ public class IndexMerger
count++; count++;
} }
} }
writer.write(ImmutableRTree.newImmutableFromMutable(tree)); writer.write(ImmutableRTree.newImmutableFromMutable(tree));
writer.close(); writer.close();

View File

@ -21,6 +21,7 @@ package com.metamx.druid.index.brita;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;
import com.metamx.common.spatial.rtree.ImmutableRTree;
import com.metamx.common.spatial.rtree.search.Bound; import com.metamx.common.spatial.rtree.search.Bound;
import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet; import it.uniroma3.mat.extendedset.intset.ImmutableConciseSet;
@ -43,6 +44,8 @@ public class SpatialFilter implements Filter
@Override @Override
public ImmutableConciseSet goConcise(final BitmapIndexSelector selector) public ImmutableConciseSet goConcise(final BitmapIndexSelector selector)
{ {
//ImmutableRTree foo = selector.getSpatialIndex(dimension);
//ImmutableRTree.print(foo);
Iterable<Integer> indexes = selector.getSpatialIndex(dimension).search(bound); Iterable<Integer> indexes = selector.getSpatialIndex(dimension).search(bound);
return ImmutableConciseSet.union( return ImmutableConciseSet.union(
Iterables.transform( Iterables.transform(
@ -64,14 +67,7 @@ public class SpatialFilter implements Filter
{ {
return factory.makeValueMatcher( return factory.makeValueMatcher(
dimension, dimension,
new Predicate<String>() bound
{
@Override
public boolean apply(String input)
{
return true;
}
}
); );
} }
} }

View File

@ -20,6 +20,7 @@
package com.metamx.druid.index.brita; package com.metamx.druid.index.brita;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.metamx.common.spatial.rtree.search.Bound;
/** /**
*/ */
@ -27,4 +28,5 @@ public interface ValueMatcherFactory
{ {
public ValueMatcher makeValueMatcher(String dimension, String value); public ValueMatcher makeValueMatcher(String dimension, String value);
public ValueMatcher makeValueMatcher(String dimension, Predicate<String> value); public ValueMatcher makeValueMatcher(String dimension, Predicate<String> value);
public ValueMatcher makeValueMatcher(String dimension, Bound bound);
} }

View File

@ -21,6 +21,7 @@ package com.metamx.druid.index.v1;
import com.google.common.base.Function; import com.google.common.base.Function;
import com.google.common.base.Predicate; import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators; import com.google.common.collect.Iterators;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -28,6 +29,7 @@ import com.google.common.collect.Sets;
import com.metamx.common.IAE; import com.metamx.common.IAE;
import com.metamx.common.guava.FunctionalIterable; import com.metamx.common.guava.FunctionalIterable;
import com.metamx.common.guava.FunctionalIterator; import com.metamx.common.guava.FunctionalIterator;
import com.metamx.common.spatial.rtree.search.Bound;
import com.metamx.druid.Capabilities; import com.metamx.druid.Capabilities;
import com.metamx.druid.QueryGranularity; import com.metamx.druid.QueryGranularity;
import com.metamx.druid.StorageAdapter; import com.metamx.druid.StorageAdapter;
@ -62,6 +64,8 @@ import java.util.concurrent.ConcurrentNavigableMap;
*/ */
public class IncrementalIndexStorageAdapter implements StorageAdapter public class IncrementalIndexStorageAdapter implements StorageAdapter
{ {
private static final Splitter SPLITTER = Splitter.on(",");
private final IncrementalIndex index; private final IncrementalIndex index;
public IncrementalIndexStorageAdapter( public IncrementalIndexStorageAdapter(
@ -512,6 +516,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
} }
} }
return new FunctionalIterable<SearchHit>(retVal).limit(query.getLimit()); return new FunctionalIterable<SearchHit>(retVal).limit(query.getLimit());
} }
@ -534,6 +539,7 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
public void set(Map.Entry<IncrementalIndex.TimeAndDims, Aggregator[]> currEntry) public void set(Map.Entry<IncrementalIndex.TimeAndDims, Aggregator[]> currEntry)
{ {
this.currEntry = currEntry; this.currEntry = currEntry;
this.currEntry = currEntry;
} }
public IncrementalIndex.TimeAndDims getKey() public IncrementalIndex.TimeAndDims getKey()
@ -620,7 +626,44 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
return false; return false;
} }
}; };
}
@Override
public ValueMatcher makeValueMatcher(final String dimension, final Bound bound)
{
if (!dimension.endsWith(".geo")) {
return new BooleanValueMatcher(false);
}
Integer dimIndexObject = index.getDimensionIndex(dimension.toLowerCase());
if (dimIndexObject == null) {
return new BooleanValueMatcher(false);
}
final int dimIndex = dimIndexObject;
return new ValueMatcher()
{
@Override
public boolean matches()
{
String[][] dims = holder.getKey().getDims();
if (dimIndex >= dims.length || dims[dimIndex] == null) {
return false;
}
for (String dimVal : dims[dimIndex]) {
List<String> stringCoords = Lists.newArrayList(SPLITTER.split(dimVal));
float[] coords = new float[stringCoords.size()];
for (int j = 0; j < coords.length; j++) {
coords[j] = Float.valueOf(stringCoords.get(j));
}
if (bound.contains(coords)) {
return true;
}
}
return false;
}
};
} }
} }
} }

View File

@ -2,25 +2,23 @@ package com.metamx.druid.index.brita;
import com.google.common.base.Throwables; import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.metamx.common.spatial.rtree.ImmutableRTree;
import com.metamx.common.spatial.rtree.RTree;
import com.metamx.common.spatial.rtree.search.RadiusBound; import com.metamx.common.spatial.rtree.search.RadiusBound;
import com.metamx.common.spatial.rtree.split.LinearGutmanSplitStrategy; import com.metamx.common.spatial.rtree.search.RectangularBound;
import com.metamx.druid.Druids; import com.metamx.druid.Druids;
import com.metamx.druid.QueryGranularity; import com.metamx.druid.QueryGranularity;
import com.metamx.druid.TestHelper; import com.metamx.druid.TestHelper;
import com.metamx.druid.aggregation.AggregatorFactory; import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.CountAggregatorFactory; import com.metamx.druid.aggregation.CountAggregatorFactory;
import com.metamx.druid.aggregation.DoubleSumAggregatorFactory;
import com.metamx.druid.aggregation.LongSumAggregatorFactory; import com.metamx.druid.aggregation.LongSumAggregatorFactory;
import com.metamx.druid.index.IncrementalIndexSegment;
import com.metamx.druid.index.QueryableIndex; import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.QueryableIndexSegment; import com.metamx.druid.index.QueryableIndexSegment;
import com.metamx.druid.index.Segment;
import com.metamx.druid.index.v1.IncrementalIndex; import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.index.v1.IndexIO; import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger; import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.index.v1.TestIndex;
import com.metamx.druid.input.MapBasedInputRow; import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.query.FinalizeResultsQueryRunner; import com.metamx.druid.query.FinalizeResultsQueryRunner;
import com.metamx.druid.query.QueryRunner; import com.metamx.druid.query.QueryRunner;
@ -29,37 +27,65 @@ import com.metamx.druid.query.timeseries.TimeseriesQuery;
import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerFactory; import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import com.metamx.druid.result.Result; import com.metamx.druid.result.Result;
import com.metamx.druid.result.TimeseriesResultValue; import com.metamx.druid.result.TimeseriesResultValue;
import junit.framework.Assert;
import org.joda.time.DateTime; import org.joda.time.DateTime;
import org.joda.time.Interval; import org.joda.time.Interval;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.Set;
/** /**
*/ */
@RunWith(Parameterized.class)
public class SpatialFilterTest public class SpatialFilterTest
{ {
private QueryableIndex makeIndex() throws IOException private static Interval DATA_INTERVAL = new Interval("2013-01-01/2013-01-07");
{
final List<String> dims = Lists.newArrayList("dim", "dim.geo"); private static AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
IncrementalIndex theIndex = new IncrementalIndex(
new DateTime("2013-01-01").getMillis(),
QueryGranularity.DAY,
new AggregatorFactory[]{
new CountAggregatorFactory("rows"), new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("val", "val") new LongSumAggregatorFactory("val", "val")
};
private static List<String> DIMS = Lists.newArrayList("dim", "dim.geo");
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
{
final IncrementalIndex rtIndex = makeIncrementalIndex();
final QueryableIndex mMappedTestIndex = makeQueryableIndex();
final QueryableIndex mergedRealtimeIndex = makeMergedQueryableIndex();
return Arrays.asList(
new Object[][]{
{
new IncrementalIndexSegment(rtIndex)
},
{
new QueryableIndexSegment(null, mMappedTestIndex)
},
{
new QueryableIndexSegment(null, mergedRealtimeIndex)
} }
}
);
}
private static IncrementalIndex makeIncrementalIndex() throws IOException
{
IncrementalIndex theIndex = new IncrementalIndex(
DATA_INTERVAL.getStartMillis(),
QueryGranularity.DAY,
METRIC_AGGS
); );
theIndex.add( theIndex.add(
new MapBasedInputRow( new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(), new DateTime("2013-01-01").getMillis(),
dims, DIMS,
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-01").toString(), "timestamp", new DateTime("2013-01-01").toString(),
"dim", "foo", "dim", "foo",
@ -71,7 +97,7 @@ public class SpatialFilterTest
theIndex.add( theIndex.add(
new MapBasedInputRow( new MapBasedInputRow(
new DateTime("2013-01-02").getMillis(), new DateTime("2013-01-02").getMillis(),
dims, DIMS,
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-02").toString(), "timestamp", new DateTime("2013-01-02").toString(),
"dim", "foo", "dim", "foo",
@ -83,7 +109,7 @@ public class SpatialFilterTest
theIndex.add( theIndex.add(
new MapBasedInputRow( new MapBasedInputRow(
new DateTime("2013-01-03").getMillis(), new DateTime("2013-01-03").getMillis(),
dims, DIMS,
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-03").toString(), "timestamp", new DateTime("2013-01-03").toString(),
"dim", "foo", "dim", "foo",
@ -95,9 +121,9 @@ public class SpatialFilterTest
theIndex.add( theIndex.add(
new MapBasedInputRow( new MapBasedInputRow(
new DateTime("2013-01-04").getMillis(), new DateTime("2013-01-04").getMillis(),
dims, DIMS,
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-03").toString(), "timestamp", new DateTime("2013-01-04").toString(),
"dim", "foo", "dim", "foo",
"dim.geo", Arrays.asList(7.0f, 3.0f), "dim.geo", Arrays.asList(7.0f, 3.0f),
"val", 91l "val", 91l
@ -107,9 +133,9 @@ public class SpatialFilterTest
theIndex.add( theIndex.add(
new MapBasedInputRow( new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(), new DateTime("2013-01-05").getMillis(),
dims, DIMS,
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-03").toString(), "timestamp", new DateTime("2013-01-05").toString(),
"dim", "foo", "dim", "foo",
"dim.geo", Arrays.asList(8.0f, 6.0f), "dim.geo", Arrays.asList(8.0f, 6.0f),
"val", 47l "val", 47l
@ -123,7 +149,7 @@ public class SpatialFilterTest
theIndex.add( theIndex.add(
new MapBasedInputRow( new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(), new DateTime("2013-01-01").getMillis(),
dims, DIMS,
ImmutableMap.<String, Object>of( ImmutableMap.<String, Object>of(
"timestamp", "timestamp",
new DateTime("2013-01-01").toString(), new DateTime("2013-01-01").toString(),
@ -138,6 +164,12 @@ public class SpatialFilterTest
); );
} }
return theIndex;
}
private static QueryableIndex makeQueryableIndex() throws IOException
{
IncrementalIndex theIndex = makeIncrementalIndex();
File tmpFile = File.createTempFile("billy", "yay"); File tmpFile = File.createTempFile("billy", "yay");
tmpFile.delete(); tmpFile.delete();
tmpFile.mkdirs(); tmpFile.mkdirs();
@ -147,6 +179,152 @@ public class SpatialFilterTest
return IndexIO.loadIndex(tmpFile); return IndexIO.loadIndex(tmpFile);
} }
private static QueryableIndex makeMergedQueryableIndex()
{
try {
IncrementalIndex first = new IncrementalIndex(
DATA_INTERVAL.getStartMillis(),
QueryGranularity.DAY,
METRIC_AGGS
);
IncrementalIndex second = new IncrementalIndex(
DATA_INTERVAL.getStartMillis(),
QueryGranularity.DAY,
METRIC_AGGS
);
IncrementalIndex third = new IncrementalIndex(
DATA_INTERVAL.getStartMillis(),
QueryGranularity.DAY,
METRIC_AGGS
);
first.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-01").toString(),
"dim", "foo",
"dim.geo", Arrays.asList(0.0f, 0.0f),
"val", 17l
)
)
);
first.add(
new MapBasedInputRow(
new DateTime("2013-01-02").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-02").toString(),
"dim", "foo",
"dim.geo", Arrays.asList(1.0f, 3.0f),
"val", 29l
)
)
);
first.add(
new MapBasedInputRow(
new DateTime("2013-01-03").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-03").toString(),
"dim", "foo",
"dim.geo", Arrays.asList(4.0f, 2.0f),
"val", 13l
)
)
);
second.add(
new MapBasedInputRow(
new DateTime("2013-01-04").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-04").toString(),
"dim", "foo",
"dim.geo", Arrays.asList(7.0f, 3.0f),
"val", 91l
)
)
);
second.add(
new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"dim", "foo",
"dim.geo", Arrays.asList(8.0f, 6.0f),
"val", 47l
)
)
);
// Add a bunch of random points
Random rand = new Random();
for (int i = 5; i < 5000; i++) {
third.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp",
new DateTime("2013-01-01").toString(),
"dim",
"boo",
"dim.geo",
Arrays.asList((float) (rand.nextFloat() * 10 + 10.0), (float) (rand.nextFloat() * 10 + 10.0)),
"val",
i
)
)
);
}
File tmpFile = File.createTempFile("yay", "who");
tmpFile.delete();
File firstFile = new File(tmpFile, "first");
File secondFile = new File(tmpFile, "second");
File thirdFile = new File(tmpFile, "third");
File mergedFile = new File(tmpFile, "merged");
firstFile.mkdirs();
firstFile.deleteOnExit();
secondFile.mkdirs();
secondFile.deleteOnExit();
thirdFile.mkdirs();
thirdFile.deleteOnExit();
mergedFile.mkdirs();
mergedFile.deleteOnExit();
IndexMerger.persist(first, DATA_INTERVAL, firstFile);
IndexMerger.persist(second, DATA_INTERVAL, secondFile);
IndexMerger.persist(third, DATA_INTERVAL, thirdFile);
QueryableIndex mergedRealtime = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
Arrays.asList(IndexIO.loadIndex(firstFile), IndexIO.loadIndex(secondFile), IndexIO.loadIndex(thirdFile)),
METRIC_AGGS,
mergedFile
)
);
return mergedRealtime;
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
private final Segment segment;
public SpatialFilterTest(Segment segment)
{
this.segment = segment;
}
@Test @Test
public void testSpatialQuery() public void testSpatialQuery()
{ {
@ -182,7 +360,7 @@ public class SpatialFilterTest
try { try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(); TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory();
QueryRunner runner = new FinalizeResultsQueryRunner( QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(new QueryableIndexSegment(null, makeIndex())), factory.createRunner(segment),
factory.getToolchest() factory.getToolchest()
); );
@ -203,7 +381,7 @@ public class SpatialFilterTest
.filters( .filters(
new SpatialDimFilter( new SpatialDimFilter(
"dim.geo", "dim.geo",
new RadiusBound(new float[]{0.0f, 0.0f}, 10) new RectangularBound(new float[]{0.0f, 0.0f}, new float[]{9.0f, 9.0f})
) )
) )
.aggregators( .aggregators(
@ -264,7 +442,7 @@ public class SpatialFilterTest
try { try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(); TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory();
QueryRunner runner = new FinalizeResultsQueryRunner( QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(new QueryableIndexSegment(null, makeIndex())), factory.createRunner(segment),
factory.getToolchest() factory.getToolchest()
); );