diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index 355cf19417b..f5d4d8248cd 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -151,8 +151,6 @@ public class OffheapIncrementalIndex extends IncrementalIndex selectors = Maps.newHashMap(); aggOffsetInBuffer = new int[metrics.length]; - BufferAggregator[] aggregators = new BufferAggregator[metrics.length]; - for (int i = 0; i < metrics.length; i++) { AggregatorFactory agg = metrics[i]; @@ -167,7 +165,6 @@ public class OffheapIncrementalIndex extends IncrementalIndex new OnheapIncrementalIndex.ObjectCachingColumnSelectorFactory(columnSelectorFactory) ); - aggregators[i] = agg.factorizeBuffered(columnSelectorFactory); if (i == 0) { aggOffsetInBuffer[i] = 0; } else { @@ -177,7 +174,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length - 1].getMaxIntermediateSize(); - return aggregators; + return new BufferAggregator[metrics.length]; } @Override @@ -203,6 +200,15 @@ public class OffheapIncrementalIndex extends IncrementalIndex bufferOffset = indexAndOffset[1]; aggBuffer = aggBuffers.get(bufferIndex).get(); } else { + rowContainer.set(row); + for (int i = 0; i < metrics.length; i++) { + final AggregatorFactory agg = metrics[i]; + getAggs()[i] = agg.factorizeBuffered( + makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics) + ); + } + rowContainer.set(null); + bufferIndex = aggBuffers.size() - 1; ByteBuffer lastBuffer = aggBuffers.isEmpty() ? null : aggBuffers.get(aggBuffers.size() - 1).get(); int[] lastAggregatorsIndexAndOffset = indexAndOffsets.isEmpty() diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 9676ee67b32..dcf6c0d2946 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -155,12 +155,15 @@ public class OnheapIncrementalIndex extends IncrementalIndex } else { aggs = new Aggregator[metrics.length]; + rowContainer.set(row); for (int i = 0; i < metrics.length; i++) { final AggregatorFactory agg = metrics[i]; aggs[i] = agg.factorize( selectors.get(agg.getName()) ); } + rowContainer.set(null); + final Integer rowIndex = indexIncrement.getAndIncrement(); concurrentSet(rowIndex, aggs); diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java index 86777202051..0fd115b45c6 100644 --- a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexTest.java @@ -29,6 +29,8 @@ import io.druid.granularity.QueryGranularity; import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.CloserRule; +import io.druid.query.aggregation.FilteredAggregatorFactory; +import io.druid.query.filter.SelectorDimFilter; import org.joda.time.DateTime; import org.junit.Rule; import org.junit.Test; @@ -73,7 +75,15 @@ public class IncrementalIndexTest public IncrementalIndex createIndex() { return new OnheapIncrementalIndex( - 0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000 + 0, + QueryGranularity.MINUTE, + new AggregatorFactory[]{ + new FilteredAggregatorFactory( + new CountAggregatorFactory("cnt"), + new SelectorDimFilter("billy", "A") + ) + }, + 1000 ); } } @@ -88,7 +98,12 @@ public class IncrementalIndexTest return new OffheapIncrementalIndex( 0L, QueryGranularity.NONE, - new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, + new AggregatorFactory[]{ + new FilteredAggregatorFactory( + new CountAggregatorFactory("cnt"), + new SelectorDimFilter("billy", "A") + ) + }, 1000000, new StupidPool( new Supplier() @@ -104,7 +119,6 @@ public class IncrementalIndexTest } } } - } ); } @@ -153,6 +167,13 @@ public class IncrementalIndexTest ImmutableMap.of("billy", "A", "joe", "B") ) ); + index.add( + new MapBasedInputRow( + new DateTime().minus(1).getMillis(), + Lists.newArrayList("billy", "joe"), + ImmutableMap.of("billy", "C", "joe", "B") + ) + ); index.add( new MapBasedInputRow( new DateTime().minus(1).getMillis(),