Port fix for issue where IncrementalIndex doesn't properly filter for "null" values from 0.5.x to master

This commit is contained in:
cheddar 2013-09-18 18:18:53 -05:00
parent a74b6de9c5
commit 46631bf409
2 changed files with 80 additions and 1 deletions

View File

@ -456,6 +456,22 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
}
String idObject = index.getDimension(dimension.toLowerCase()).get(value);
if (idObject == null) {
if (value == null || "".equals(value)) {
final int dimIndex = dimIndexObject;
return new ValueMatcher()
{
@Override
public boolean matches()
{
String[][] dims = holder.getKey().getDims();
if (dimIndex >= dims.length || dims[dimIndex] == null) {
return true;
}
return false;
}
};
}
return new BooleanValueMatcher(false);
}

View File

@ -33,6 +33,7 @@ import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.query.filter.DimFilters;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryEngine;
@ -73,7 +74,6 @@ public class IncrementalIndexStorageAdapterTest
GroupByQueryEngine engine = new GroupByQueryEngine(
Suppliers.<GroupByQueryConfig>ofInstance(new GroupByQueryConfig()
{
@Override
public int getMaxIntermediateRows()
{
@ -115,4 +115,67 @@ public class IncrementalIndexStorageAdapterTest
Assert.assertEquals(ImmutableMap.of("sally", "bo", "cnt", 1l), row.getEvent());
}
@Test
public void testFilterByNull() throws Exception
{
IncrementalIndex index = new IncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}
);
index.add(
new MapBasedInputRow(
new DateTime().minus(1).getMillis(),
Lists.newArrayList("billy"),
ImmutableMap.<String, Object>of("billy", "hi")
)
);
index.add(
new MapBasedInputRow(
new DateTime().minus(1).getMillis(),
Lists.newArrayList("sally"),
ImmutableMap.<String, Object>of("sally", "bo")
)
);
GroupByQueryEngine engine = new GroupByQueryEngine(
Suppliers.<GroupByQueryConfig>ofInstance(new GroupByQueryConfig()
{
@Override
public int getMaxIntermediateRows()
{
return 5;
}
}),
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(50000);
}
}
)
);
final Sequence<Row> rows = engine.process(
GroupByQuery.builder()
.setDataSource("test")
.setGranularity(QueryGranularity.ALL)
.setInterval(new Interval(0, new DateTime().getMillis()))
.addDimension("billy")
.addDimension("sally")
.addAggregator(new LongSumAggregatorFactory("cnt", "cnt"))
.setDimFilter(DimFilters.dimEquals("sally", null))
.build(),
new IncrementalIndexStorageAdapter(index)
);
final ArrayList<Row> results = Sequences.toList(rows, Lists.<Row>newArrayList());
Assert.assertEquals(1, results.size());
MapBasedRow row = (MapBasedRow) results.get(0);
Assert.assertEquals(ImmutableMap.of("billy", "hi", "cnt", 1l), row.getEvent());
}
}