Merge pull request #765 from metamx/fix-spatial

fix spatial indexing with multiple spatial dims
This commit is contained in:
Gian Merlino 2014-09-29 17:23:29 -07:00
commit a3a2c8b86f
2 changed files with 120 additions and 37 deletions

View File

@ -49,26 +49,17 @@ public class SpatialDimensionRowFormatter
private static final Joiner JOINER = Joiner.on(",");
private static final Splitter SPLITTER = Splitter.on(",");
private final List<SpatialDimensionSchema> spatialDimensions;
private final Set<String> spatialDimNames;
private final Map<String, SpatialDimensionSchema> spatialDimensionMap;
private final Set<String> spatialPartialDimNames;
public SpatialDimensionRowFormatter(List<SpatialDimensionSchema> spatialDimensions)
{
this.spatialDimensions = spatialDimensions;
this.spatialDimNames = Sets.newHashSet(
Lists.transform(
spatialDimensions,
new Function<SpatialDimensionSchema, String>()
{
@Override
public String apply(SpatialDimensionSchema input)
{
return input.getDimName();
}
}
)
);
this.spatialDimensionMap = Maps.newHashMap();
for (SpatialDimensionSchema spatialDimension : spatialDimensions) {
if (this.spatialDimensionMap.put(spatialDimension.getDimName(), spatialDimension) != null) {
throw new ISE("Duplicate spatial dimension names found! Check your schema yo!");
}
}
this.spatialPartialDimNames = Sets.newHashSet(
Iterables.concat(
Lists.transform(
@ -110,7 +101,7 @@ public class SpatialDimensionRowFormatter
@Override
public boolean apply(String input)
{
return !spatialDimNames.contains(input) && !spatialPartialDimNames.contains(input);
return !spatialDimensionMap.containsKey(input) && !spatialPartialDimNames.contains(input);
}
}
)
@ -173,32 +164,32 @@ public class SpatialDimensionRowFormatter
}
};
if (!spatialPartialDimNames.isEmpty()) {
for (SpatialDimensionSchema spatialDimension : spatialDimensions) {
List<String> spatialDimVals = Lists.newArrayList();
for (Map.Entry<String, SpatialDimensionSchema> entry : spatialDimensionMap.entrySet()) {
final String spatialDimName = entry.getKey();
final SpatialDimensionSchema spatialDim = entry.getValue();
for (String partialSpatialDim : spatialDimension.getDims()) {
List<String> dimVals = row.getDimension(partialSpatialDim);
if (isSpatialDimValsValid(dimVals)) {
spatialDimVals.addAll(dimVals);
}
}
if (spatialDimVals.size() == spatialPartialDimNames.size()) {
spatialLookup.put(spatialDimension.getDimName(), Arrays.asList(JOINER.join(spatialDimVals)));
finalDims.add(spatialDimension.getDimName());
}
}
} else {
for (String spatialDimName : spatialDimNames) {
List<String> dimVals = row.getDimension(spatialDimName);
List<String> dimVals = row.getDimension(spatialDimName);
if (dimVals != null && !dimVals.isEmpty()) {
if (dimVals.size() != 1) {
throw new ISE("Cannot have a spatial dimension value with size[%d]", dimVals.size());
throw new ISE("Spatial dimension value must be in an array!");
}
if (isJoinedSpatialDimValValid(dimVals.get(0))) {
spatialLookup.put(spatialDimName, dimVals);
finalDims.add(spatialDimName);
}
} else {
List<String> spatialDimVals = Lists.newArrayList();
for (String dim : spatialDim.getDims()) {
List<String> partialDimVals = row.getDimension(dim);
if (isSpatialDimValsValid(partialDimVals)) {
spatialDimVals.addAll(partialDimVals);
}
}
if (spatialDimVals.size() == spatialDim.getDims().size()) {
spatialLookup.put(spatialDimName, Arrays.asList(JOINER.join(spatialDimVals)));
finalDims.add(spatialDimName);
}
}
}

View File

@ -76,7 +76,7 @@ public class SpatialFilterTest
new LongSumAggregatorFactory("val", "val")
};
private static List<String> DIMS = Lists.newArrayList("dim", "lat", "long");
private static List<String> DIMS = Lists.newArrayList("dim", "lat", "long", "lat2", "long2");
@Parameterized.Parameters
public static Collection<?> constructorFeeder() throws IOException
@ -110,6 +110,10 @@ public class SpatialFilterTest
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
)
).build()
@ -204,6 +208,18 @@ public class SpatialFilterTest
)
)
);
theIndex.add(
new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"lat2", 0.0f,
"long2", 0.0f,
"val", 13l
)
)
);
// Add a bunch of random points
Random rand = new Random();
@ -250,6 +266,10 @@ public class SpatialFilterTest
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
)
).build()
@ -263,6 +283,10 @@ public class SpatialFilterTest
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
)
).build()
@ -276,6 +300,10 @@ public class SpatialFilterTest
new SpatialDimensionSchema(
"dim.geo",
Arrays.asList("lat", "long")
),
new SpatialDimensionSchema(
"spatialIsRad",
Arrays.asList("lat2", "long2")
)
)
).build()
@ -372,6 +400,18 @@ public class SpatialFilterTest
)
)
);
second.add(
new MapBasedInputRow(
new DateTime("2013-01-05").getMillis(),
DIMS,
ImmutableMap.<String, Object>of(
"timestamp", new DateTime("2013-01-05").toString(),
"lat2", 0.0f,
"long2", 0.0f,
"val", 13l
)
)
);
// Add a bunch of random points
Random rand = new Random();
@ -486,6 +526,58 @@ public class SpatialFilterTest
}
}
@Test
public void testSpatialQueryWithOtherSpatialDim()
{
TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
.dataSource("test")
.granularity(QueryGranularity.ALL)
.intervals(Arrays.asList(new Interval("2013-01-01/2013-01-07")))
.filters(
new SpatialDimFilter(
"spatialIsRad",
new RadiusBound(new float[]{0.0f, 0.0f}, 5)
)
)
.aggregators(
Arrays.<AggregatorFactory>asList(
new CountAggregatorFactory("rows"),
new LongSumAggregatorFactory("val", "val")
)
)
.build();
List<Result<TimeseriesResultValue>> expectedResults = Arrays.asList(
new Result<TimeseriesResultValue>(
new DateTime("2013-01-01T00:00:00.000Z"),
new TimeseriesResultValue(
ImmutableMap.<String, Object>builder()
.put("rows", 1L)
.put("val", 13l)
.build()
)
)
);
try {
TimeseriesQueryRunnerFactory factory = new TimeseriesQueryRunnerFactory(
new TimeseriesQueryQueryToolChest(new QueryConfig()),
new TimeseriesQueryEngine(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER
);
QueryRunner runner = new FinalizeResultsQueryRunner(
factory.createRunner(segment),
factory.getToolchest()
);
TestHelper.assertExpectedResults(expectedResults, runner.run(query));
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
@Test
public void testSpatialQueryMorePoints()
{