Merge pull request #425 from metamx/fix-iisa

Fix TopNs with real-time data sometimes returning more than the filter set requires
This commit is contained in:
fjy 2014-03-10 14:23:20 -06:00
commit e4a91a979d
3 changed files with 126 additions and 18 deletions

View File

@ -78,8 +78,6 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
while (numProcessed < cardinality) {
final int numToProcess = Math.min(params.getNumValuesPerPass(), cardinality - numProcessed);
params.getCursor().reset();
DimValSelector theDimValSelector;
if (!hasDimValSelector) {
theDimValSelector = makeDimValSelector(params, numProcessed, numToProcess);
@ -96,6 +94,7 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
closeAggregators(aggregatesStore);
numProcessed += numToProcess;
params.getCursor().reset();
}
}

View File

@ -241,23 +241,22 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter
if (numAdvanced == -1) {
numAdvanced = 0;
while (baseIter.hasNext()) {
currEntry.set(baseIter.next());
if (filterMatcher.matches()) {
return;
}
numAdvanced++;
}
} else {
Iterators.advance(baseIter, numAdvanced);
if (baseIter.hasNext()) {
currEntry.set(baseIter.next());
}
}
done = cursorMap.size() == 0 || !baseIter.hasNext();
boolean foundMatched = false;
while (baseIter.hasNext()) {
currEntry.set(baseIter.next());
if (filterMatcher.matches()) {
foundMatched = true;
break;
}
numAdvanced++;
}
done = !foundMatched && (cursorMap.size() == 0 || !baseIter.hasNext());
}
@Override

View File

@ -22,6 +22,7 @@ package io.druid.segment.incremental;
import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
@ -30,6 +31,7 @@ import io.druid.data.input.MapBasedInputRow;
import io.druid.data.input.MapBasedRow;
import io.druid.data.input.Row;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Result;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
@ -37,9 +39,15 @@ import io.druid.query.filter.DimFilters;
import io.druid.query.groupby.GroupByQuery;
import io.druid.query.groupby.GroupByQueryConfig;
import io.druid.query.groupby.GroupByQueryEngine;
import junit.framework.Assert;
import io.druid.query.topn.TopNQueryBuilder;
import io.druid.query.topn.TopNQueryEngine;
import io.druid.query.topn.TopNResultValue;
import io.druid.segment.Cursor;
import io.druid.segment.DimensionSelector;
import io.druid.segment.filter.SelectorFilter;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Test;
import java.nio.ByteBuffer;
@ -116,9 +124,111 @@ public class IncrementalIndexStorageAdapterTest
}
@Test
public void testFilterByNull() throws Exception
{
IncrementalIndex index = new IncrementalIndex(
public void testResetSanity() {
IncrementalIndex index = new IncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}
);
DateTime t = DateTime.now();
Interval interval = new Interval(t.minusMinutes(1), t.plusMinutes(1));
index.add(
new MapBasedInputRow(
t.minus(1).getMillis(),
Lists.newArrayList("billy"),
ImmutableMap.<String, Object>of("billy", "hi")
)
);
index.add(
new MapBasedInputRow(
t.minus(1).getMillis(),
Lists.newArrayList("sally"),
ImmutableMap.<String, Object>of("sally", "bo")
)
);
IncrementalIndexStorageAdapter adapter = new IncrementalIndexStorageAdapter(index);
Iterable<Cursor> cursorIterable = adapter.makeCursors(new SelectorFilter("sally", "bo"),
interval,
QueryGranularity.NONE);
Cursor cursor = cursorIterable.iterator().next();
DimensionSelector dimSelector;
dimSelector = cursor.makeDimensionSelector("sally");
Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0)));
index.add(
new MapBasedInputRow(
t.minus(1).getMillis(),
Lists.newArrayList("sally"),
ImmutableMap.<String, Object>of("sally", "ah")
)
);
// Cursor reset should not be affected by out of order values
cursor.reset();
dimSelector = cursor.makeDimensionSelector("sally");
Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0)));
}
@Test
public void testSingleValueTopN()
{
IncrementalIndex index = new IncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}
);
DateTime t = DateTime.now();
index.add(
new MapBasedInputRow(
t.minus(1).getMillis(),
Lists.newArrayList("sally"),
ImmutableMap.<String, Object>of("sally", "bo")
)
);
TopNQueryEngine engine = new TopNQueryEngine(
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
{
@Override
public ByteBuffer get()
{
return ByteBuffer.allocate(50000);
}
}
)
);
final Iterable<Result<TopNResultValue>> results = engine.query(
new TopNQueryBuilder().dataSource("test")
.granularity(QueryGranularity.ALL)
.intervals(Lists.newArrayList(new Interval(0, new DateTime().getMillis())))
.dimension("sally")
.metric("cnt")
.threshold(10)
.aggregators(
Lists.<AggregatorFactory>newArrayList(
new LongSumAggregatorFactory(
"cnt",
"cnt"
)
)
)
.build(),
new IncrementalIndexStorageAdapter(index)
);
Assert.assertEquals(1, Iterables.size(results));
Assert.assertEquals(1, results.iterator().next().getValue().getValue().size());
}
@Test
public void testFilterByNull() throws Exception
{
IncrementalIndex index = new IncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}
);