fix spatial bugs and more tests

This commit is contained in:
fjy 2013-05-22 13:35:12 -07:00
parent 47f2d3b0aa
commit 1fb8c9db7f
3 changed files with 611 additions and 14 deletions

View File

@ -22,31 +22,50 @@ package com.metamx.druid.index.v1;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.primitives.Floats;
import com.metamx.common.ISE;
import com.metamx.druid.input.InputRow;
import javax.annotation.Nullable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* We throw away all invalid spatial dimensions
*/
public class SpatialDimensionRowFormatter
{
private static final Joiner JOINER = Joiner.on(",");
private static final Splitter SPLITTER = Splitter.on(",");
private final List<SpatialDimensionSchema> spatialDimensions;
private final Set<String> spatialDimNames;
private final Set<String> spatialPartialDimNames;
public SpatialDimensionRowFormatter(List<SpatialDimensionSchema> spatialDimensions)
{
this.spatialDimensions = spatialDimensions;
this.spatialDimNames = Sets.newHashSet(
Lists.transform(
spatialDimensions,
new Function<SpatialDimensionSchema, String>()
{
@Override
public String apply(SpatialDimensionSchema input)
{
return input.getDimName();
}
}
)
);
this.spatialPartialDimNames = Sets.newHashSet(
Iterables.concat(
Lists.transform(
spatialDimensions,
@ -87,7 +106,7 @@ public class SpatialDimensionRowFormatter
@Override
public boolean apply(String input)
{
return !spatialDimNames.contains(input);
return !spatialDimNames.contains(input) && !spatialPartialDimNames.contains(input);
}
}
)
@ -121,23 +140,32 @@ public class SpatialDimensionRowFormatter
}
};
for (SpatialDimensionSchema spatialDimension : spatialDimensions) {
if (!row.getDimension(spatialDimension.getDimName()).isEmpty()) {
continue;
}
if (!spatialPartialDimNames.isEmpty()) {
for (SpatialDimensionSchema spatialDimension : spatialDimensions) {
List<String> spatialDimVals = Lists.newArrayList();
List<String> spatialDimVals = Lists.newArrayList();
for (String partialSpatialDim : spatialDimension.getDims()) {
List<String> dimVals = row.getDimension(partialSpatialDim);
if (isSpatialDimValsValid(dimVals)) {
spatialDimVals.addAll(dimVals);
}
}
for (String partialSpatialDim : spatialDimension.getDims()) {
List<String> dimVals = row.getDimension(partialSpatialDim);
if (isSpatialDimValsValid(dimVals)) {
spatialDimVals.addAll(dimVals);
if (spatialDimVals.size() == spatialPartialDimNames.size()) {
spatialLookup.put(spatialDimension.getDimName(), Arrays.asList(JOINER.join(spatialDimVals)));
finalDims.add(spatialDimension.getDimName());
}
}
if (spatialDimVals.size() == spatialDimNames.size()) {
spatialLookup.put(spatialDimension.getDimName(), Arrays.asList(JOINER.join(spatialDimVals)));
finalDims.add(spatialDimension.getDimName());
} else {
for (String spatialDimName : spatialDimNames) {
List<String> dimVals = row.getDimension(spatialDimName);
if (dimVals.size() != 1) {
throw new ISE("Cannot have a spatial dimension value with size[%d]", dimVals.size());
}
if (isJoinedSpatialDimValValid(dimVals.get(0))) {
spatialLookup.put(spatialDimName, dimVals);
finalDims.add(spatialDimName);
}
}
}
@ -156,4 +184,18 @@ public class SpatialDimensionRowFormatter
}
return true;
}
private boolean isJoinedSpatialDimValValid(String dimVal)
{
if (dimVal == null || dimVal.isEmpty()) {
return false;
}
Iterable<String> dimVals = SPLITTER.split(dimVal);
for (String val : dimVals) {
if (Floats.tryParse(val) == null) {
return false;
}
}
return true;
}
}

View File

@ -0,0 +1,531 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2012 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/
package com.metamx.druid.index.brita;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.metamx.collections.spatial.search.RadiusBound;
import com.metamx.collections.spatial.search.RectangularBound;
import com.metamx.druid.Druids;
import com.metamx.druid.QueryGranularity;
import com.metamx.druid.TestHelper;
import com.metamx.druid.aggregation.AggregatorFactory;
import com.metamx.druid.aggregation.CountAggregatorFactory;
import com.metamx.druid.aggregation.LongSumAggregatorFactory;
import com.metamx.druid.index.IncrementalIndexSegment;
import com.metamx.druid.index.QueryableIndex;
import com.metamx.druid.index.QueryableIndexSegment;
import com.metamx.druid.index.Segment;
import com.metamx.druid.index.v1.IncrementalIndex;
import com.metamx.druid.index.v1.IncrementalIndexSchema;
import com.metamx.druid.index.v1.IndexIO;
import com.metamx.druid.index.v1.IndexMerger;
import com.metamx.druid.index.v1.SpatialDimensionSchema;
import com.metamx.druid.input.MapBasedInputRow;
import com.metamx.druid.query.FinalizeResultsQueryRunner;
import com.metamx.druid.query.QueryRunner;
import com.metamx.druid.query.filter.SpatialDimFilter;
import com.metamx.druid.query.timeseries.TimeseriesQuery;
import com.metamx.druid.query.timeseries.TimeseriesQueryRunnerFactory;
import com.metamx.druid.result.Result;
import com.metamx.druid.result.TimeseriesResultValue;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Random;
/**
*/
@RunWith(Parameterized.class)
public class SpatialFilterBonusTest
{
private static Interval DATA_INTERVAL = new Interval("2013-01-01/2013-01-07");
private static AggregatorFactory[] METRIC_AGGS = new AggregatorFactory[]{
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("val", "val")
};
private static List<String> DIMS = Lists.newArrayList("dim", "dim.geo");
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
{
final IncrementalIndex rtIndex = makeIncrementalIndex();
final QueryableIndex mMappedTestIndex = makeQueryableIndex();
final QueryableIndex mergedRealtimeIndex = makeMergedQueryableIndex();
return Arrays.asList(
new Object[][]{
{
new IncrementalIndexSegment(rtIndex)
},
{
new QueryableIndexSegment(null, mMappedTestIndex)
},
{
new QueryableIndexSegment(null, mergedRealtimeIndex)
}
}
);
}
private static IncrementalIndex makeIncrementalIndex() throws IOException
{
IncrementalIndex theIndex = new IncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
)
)
).build()
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-01").toString(),
"dim", "foo",
"dim.geo", "0.0,0.0",
"val", 17l
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-02").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-02").toString(),
"dim", "foo",
"dim.geo", "1.0,3.0",
"val", 29l
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-03").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-03").toString(),
"dim", "foo",
"dim.geo", "4.0,2.0",
"val", 13l
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-04").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-04").toString(),
"dim", "foo",
"dim.geo", "7.0,3.0",
"val", 91l
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"dim", "foo",
"dim.geo", "8.0,6.0",
"val", 47l
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"dim", "foo",
"dim.geo", "_mmx.unknown",
"val", 501l
)
)
);
// Add a bunch of random points
Random rand = new Random();
for (int i = 5; i < 5000; i++) {
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-01").toString(),
"dim", "boo",
"dim.geo", String.format(
"%s,%s",
(float) (rand.nextFloat() * 10 + 10.0),
(float) (rand.nextFloat() * 10 + 10.0)
),
"val", i
)
)
);
}
return theIndex;
}
private static QueryableIndex makeQueryableIndex() throws IOException
{
IncrementalIndex theIndex = makeIncrementalIndex();
File tmpFile = File.createTempFile("billy", "yay");
tmpFile.delete();
tmpFile.mkdirs();
tmpFile.deleteOnExit();
IndexMerger.persist(theIndex, tmpFile);
return IndexIO.loadIndex(tmpFile);
}
private static QueryableIndex makeMergedQueryableIndex()
{
try {
IncrementalIndex first = new IncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
)
)
).build()
);
IncrementalIndex second = new IncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
)
)
).build()
);
IncrementalIndex third = new IncrementalIndex(
new IncrementalIndexSchema.Builder().withMinTimestamp(DATA_INTERVAL.getStartMillis())
.withQueryGranularity(QueryGranularity.DAY)
.withMetrics(METRIC_AGGS)
.withSpatialDimensions(
Arrays.asList(
new SpatialDimensionSchema(
"dim.geo",
Lists.<String>newArrayList()
)
)
).build()
);
first.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-01").toString(),
"dim", "foo",
"dim.geo", "0.0,0.0",
"val", 17l
)
)
);
first.add(
new MapBasedInputRow(
new DateTime("2013-01-02").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-02").toString(),
"dim", "foo",
"dim.geo", "1.0,3.0",
"val", 29l
)
)
);
first.add(
new MapBasedInputRow(
new DateTime("2013-01-03").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-03").toString(),
"dim", "foo",
"dim.geo", "4.0,2.0",
"val", 13l
)
)
);
first.add(
new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"dim", "foo",
"dim.geo", "_mmx.unknown",
"val", 501l
)
)
);
second.add(
new MapBasedInputRow(
new DateTime("2013-01-04").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-04").toString(),
"dim", "foo",
"dim.geo", "7.0,3.0",
"val", 91l
)
)
);
second.add(
new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"dim", "foo",
"dim.geo", "8.0,6.0",
"val", 47l
)
)
);
// Add a bunch of random points
Random rand = new Random();
for (int i = 5; i < 5000; i++) {
third.add(
new MapBasedInputRow(
new DateTime("2013-01-01").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-01").toString(),
"dim", "boo",
"dim.geo", String.format(
"%s,%s",
(float) (rand.nextFloat() * 10 + 10.0),
(float) (rand.nextFloat() * 10 + 10.0)
),
"val", i
)
)
);
}
File tmpFile = File.createTempFile("yay", "who");
tmpFile.delete();
File firstFile = new File(tmpFile, "first");
File secondFile = new File(tmpFile, "second");
File thirdFile = new File(tmpFile, "third");
File mergedFile = new File(tmpFile, "merged");
firstFile.mkdirs();
firstFile.deleteOnExit();
secondFile.mkdirs();
secondFile.deleteOnExit();
thirdFile.mkdirs();
thirdFile.deleteOnExit();
mergedFile.mkdirs();
mergedFile.deleteOnExit();
IndexMerger.persist(first, DATA_INTERVAL, firstFile);
IndexMerger.persist(second, DATA_INTERVAL, secondFile);
IndexMerger.persist(third, DATA_INTERVAL, thirdFile);
QueryableIndex mergedRealtime = IndexIO.loadIndex(
IndexMerger.mergeQueryableIndex(
Arrays.asList(IndexIO.loadIndex(firstFile), IndexIO.loadIndex(secondFile), IndexIO.loadIndex(thirdFile)),
METRIC_AGGS,
mergedFile
)
);
return mergedRealtime;
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
private final Segment segment;
public SpatialFilterBonusTest(Segment segment)
{
this.segment = segment;
}
@Test
public void testSpatialQuery()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.granularity(QueryGranularity.ALL)
.intervals(Arrays.asList(new Interval("2013-01-01/2013-01-07")))
.filters(
new SpatialDimFilter(
"dim.geo",
new RadiusBound(new float[]{0.0f, 0.0f}, 5)
)
)
.aggregators(
Arrays.<AggregatorFactory>asList(
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("val", "val")
)
)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<TimeseriesResultValue>(
new DateTime("2013-01-01T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
.put("rows", 3L)
.put("val", 59l)
.build()
)
)
);
try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory();
QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(segment),
factory.getToolchest()
);
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Test
public void testSpatialQueryMorePoints()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.granularity(QueryGranularity.DAY)
.intervals(Arrays.asList(new Interval("2013-01-01/2013-01-07")))
.filters(
new SpatialDimFilter(
"dim.geo",
new RectangularBound(new float[]{0.0f, 0.0f}, new float[]{9.0f, 9.0f})
)
)
.aggregators(
Arrays.<AggregatorFactory>asList(
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("val", "val")
)
)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<TimeseriesResultValue>(
new DateTime("2013-01-01T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
.put("rows", 1L)
.put("val", 17l)
.build()
)
),
new Result<TimeseriesResultValue>(
new DateTime("2013-01-02T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
.put("rows", 1L)
.put("val", 29l)
.build()
)
),
new Result<TimeseriesResultValue>(
new DateTime("2013-01-03T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
.put("rows", 1L)
.put("val", 13l)
.build()
)
),
new Result<TimeseriesResultValue>(
new DateTime("2013-01-04T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
.put("rows", 1L)
.put("val", 91l)
.build()
)
),
new Result<TimeseriesResultValue>(
new DateTime("2013-01-05T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
.put("rows", 1L)
.put("val", 47l)
.build()
)
)
);
try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory();
QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(segment),
factory.getToolchest()
);
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}

View File

@ -188,6 +188,18 @@ public class SpatialFilterTest
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"dim", "foo",
"dim.geo", "_mmx.unknown",
"val", 501l
)
)
);
// Add a bunch of random points
Random rand = new Random();
@ -318,6 +330,18 @@ public class SpatialFilterTest
)
)
);
first.add(
new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"dim", "foo",
"dim.geo", "_mmx.unknown",
"val", 501l
)
)
);
second.add(
new MapBasedInputRow(
new DateTime("2013-01-04").getMillis(),