From adf60d15489195755202f6b94dbd857e9525ec19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 10 Mar 2014 11:53:04 -0700 Subject: [PATCH 1/5] test reset sanity on out of order values --- .../IncrementalIndexStorageAdapterTest.java | 55 ++++++++++++++++++- 1 file changed, 54 insertions(+), 1 deletion(-) diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java index e3b603af9ae..fc0fa5fa177 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java @@ -37,9 +37,12 @@ import io.druid.query.filter.DimFilters; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryEngine; -import junit.framework.Assert; +import io.druid.segment.Cursor; +import io.druid.segment.DimensionSelector; +import io.druid.segment.filter.SelectorFilter; import org.joda.time.DateTime; import org.joda.time.Interval; +import org.junit.Assert; import org.junit.Test; import java.nio.ByteBuffer; @@ -115,6 +118,56 @@ public class IncrementalIndexStorageAdapterTest Assert.assertEquals(ImmutableMap.of("sally", "bo", "cnt", 1l), row.getEvent()); } + @Test + public void testResetSanity() { + IncrementalIndex index = new IncrementalIndex( + 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")} + ); + + + DateTime t = DateTime.now(); + Interval interval = new Interval(t.minusMinutes(1), t.plusMinutes(1)); + + index.add( + new MapBasedInputRow( + t.minus(1).getMillis(), + Lists.newArrayList("billy"), + ImmutableMap.of("billy", "hi") + ) + ); + index.add( + new MapBasedInputRow( + t.minus(1).getMillis(), + Lists.newArrayList("sally"), + ImmutableMap.of("sally", "bo") + ) + ); + + IncrementalIndexStorageAdapter adapter = new IncrementalIndexStorageAdapter(index); + Iterable cursorIterable = adapter.makeCursors(new SelectorFilter("sally", "bo"), + interval, + QueryGranularity.NONE); + Cursor cursor = cursorIterable.iterator().next(); + DimensionSelector dimSelector; + + dimSelector = cursor.makeDimensionSelector("sally"); + Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0))); + + index.add( + new MapBasedInputRow( + t.minus(1).getMillis(), + Lists.newArrayList("sally"), + ImmutableMap.of("sally", "ah") + ) + ); + + // Cursor reset should not be affected by out of order values + cursor.reset(); + + dimSelector = cursor.makeDimensionSelector("sally"); + Assert.assertEquals("bo", dimSelector.lookupName(dimSelector.getRow().get(0))); + } + @Test public void testFilterByNull() throws Exception { From 2db73998b903d877364966c805f910138ceb0e3d Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 10 Mar 2014 12:01:49 -0700 Subject: [PATCH 2/5] add fix for II --- .../IncrementalIndexStorageAdapter.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 8843879f91b..878b79682e9 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -241,19 +241,17 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter if (numAdvanced == -1) { numAdvanced = 0; - while (baseIter.hasNext()) { - currEntry.set(baseIter.next()); - if (filterMatcher.matches()) { - return; - } - - numAdvanced++; - } } else { Iterators.advance(baseIter, numAdvanced); - if (baseIter.hasNext()) { - currEntry.set(baseIter.next()); + } + + while (baseIter.hasNext()) { + currEntry.set(baseIter.next()); + if (filterMatcher.matches()) { + return; } + + numAdvanced++; } done = cursorMap.size() == 0 || !baseIter.hasNext(); From 6ce5266aca65de28b2ac462fb02cf2c3b57dd937 Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 10 Mar 2014 12:19:58 -0700 Subject: [PATCH 3/5] make stuff work --- .../IncrementalIndexStorageAdapter.java | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java index 878b79682e9..b66373c7b49 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexStorageAdapter.java @@ -241,21 +241,27 @@ public class IncrementalIndexStorageAdapter implements StorageAdapter if (numAdvanced == -1) { numAdvanced = 0; + while (baseIter.hasNext()) { + currEntry.set(baseIter.next()); + if (filterMatcher.matches()) { + return; + } + + numAdvanced++; + } } else { Iterators.advance(baseIter, numAdvanced); - } + while (baseIter.hasNext()) { + currEntry.set(baseIter.next()); + if (filterMatcher.matches()) { + break; + } - while (baseIter.hasNext()) { - currEntry.set(baseIter.next()); - if (filterMatcher.matches()) { - return; + numAdvanced++; } - - numAdvanced++; } done = cursorMap.size() == 0 || !baseIter.hasNext(); - } @Override From b81bd6c959bec08f45e4014c4d38f3983f227077 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Xavier=20L=C3=A9aut=C3=A9?= Date: Mon, 10 Mar 2014 13:10:45 -0700 Subject: [PATCH 4/5] add test for single value index top-n --- .../IncrementalIndexStorageAdapterTest.java | 63 ++++++++++++++++++- 1 file changed, 60 insertions(+), 3 deletions(-) diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java index fc0fa5fa177..3c52b2b322f 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexStorageAdapterTest.java @@ -22,6 +22,7 @@ package io.druid.segment.incremental; import com.google.common.base.Supplier; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.metamx.common.guava.Sequence; import com.metamx.common.guava.Sequences; @@ -30,6 +31,7 @@ import io.druid.data.input.MapBasedInputRow; import io.druid.data.input.MapBasedRow; import io.druid.data.input.Row; import io.druid.granularity.QueryGranularity; +import io.druid.query.Result; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.query.aggregation.LongSumAggregatorFactory; @@ -37,6 +39,9 @@ import io.druid.query.filter.DimFilters; import io.druid.query.groupby.GroupByQuery; import io.druid.query.groupby.GroupByQueryConfig; import io.druid.query.groupby.GroupByQueryEngine; +import io.druid.query.topn.TopNQueryBuilder; +import io.druid.query.topn.TopNQueryEngine; +import io.druid.query.topn.TopNResultValue; import io.druid.segment.Cursor; import io.druid.segment.DimensionSelector; import io.druid.segment.filter.SelectorFilter; @@ -169,9 +174,61 @@ public class IncrementalIndexStorageAdapterTest } @Test - public void testFilterByNull() throws Exception - { - IncrementalIndex index = new IncrementalIndex( + public void testSingleValueTopN() + { + IncrementalIndex index = new IncrementalIndex( + 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")} + ); + + DateTime t = DateTime.now(); + index.add( + new MapBasedInputRow( + t.minus(1).getMillis(), + Lists.newArrayList("sally"), + ImmutableMap.of("sally", "bo") + ) + ); + + TopNQueryEngine engine = new TopNQueryEngine( + new StupidPool( + new Supplier() + { + @Override + public ByteBuffer get() + { + return ByteBuffer.allocate(50000); + } + } + ) + ); + + final Iterable> results = engine.query( + new TopNQueryBuilder().dataSource("test") + .granularity(QueryGranularity.ALL) + .intervals(Lists.newArrayList(new Interval(0, new DateTime().getMillis()))) + .dimension("sally") + .metric("cnt") + .threshold(10) + .aggregators( + Lists.newArrayList( + new LongSumAggregatorFactory( + "cnt", + "cnt" + ) + ) + ) + .build(), + new IncrementalIndexStorageAdapter(index) + ); + + Assert.assertEquals(1, Iterables.size(results)); + Assert.assertEquals(1, results.iterator().next().getValue().getValue().size()); + } + + @Test + public void testFilterByNull() throws Exception + { + IncrementalIndex index = new IncrementalIndex( 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")} ); From 36b54f7e8fcd8c1222456123121822e5bd985904 Mon Sep 17 00:00:00 2001 From: fjy Date: Mon, 10 Mar 2014 13:22:47 -0700 Subject: [PATCH 5/5] fix all problems with topNs --- .../druid/query/topn/BaseTopNAlgorithm.java | 3 +- .../IncrementalIndexStorageAdapter.java | 29 ++++++++----------- 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/processing/src/main/java/io/druid/query/topn/BaseTopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/BaseTopNAlgorithm.java index 47093cea8a5..0c32d8db676 100644 --- a/processing/src/main/java/io/druid/query/topn/BaseTopNAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/BaseTopNAlgorithm.java @@ -78,8 +78,6 @@ public abstract class BaseTopNAlgorithm