Merge pull request #2239 from sirpkt/filtered-aggregators-at-ingestion

fix NPE with filtered aggregator at ingestion time
This commit is contained in:
Fangjin Yang 2016-02-11 10:21:28 -08:00
commit f0871ea40b
3 changed files with 37 additions and 7 deletions

View File

@ -151,8 +151,6 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
selectors = Maps.newHashMap();
aggOffsetInBuffer = new int[metrics.length];
BufferAggregator[] aggregators = new BufferAggregator[metrics.length];
for (int i = 0; i < metrics.length; i++) {
AggregatorFactory agg = metrics[i];
@ -167,7 +165,6 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
new OnheapIncrementalIndex.ObjectCachingColumnSelectorFactory(columnSelectorFactory)
);
aggregators[i] = agg.factorizeBuffered(columnSelectorFactory);
if (i == 0) {
aggOffsetInBuffer[i] = 0;
} else {
@ -177,7 +174,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length - 1].getMaxIntermediateSize();
return aggregators;
return new BufferAggregator[metrics.length];
}
@Override
@ -203,6 +200,15 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
bufferOffset = indexAndOffset[1];
aggBuffer = aggBuffers.get(bufferIndex).get();
} else {
rowContainer.set(row);
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
getAggs()[i] = agg.factorizeBuffered(
makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)
);
}
rowContainer.set(null);
bufferIndex = aggBuffers.size() - 1;
ByteBuffer lastBuffer = aggBuffers.isEmpty() ? null : aggBuffers.get(aggBuffers.size() - 1).get();
int[] lastAggregatorsIndexAndOffset = indexAndOffsets.isEmpty()

View File

@ -155,12 +155,15 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
} else {
aggs = new Aggregator[metrics.length];
rowContainer.set(row);
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorize(
selectors.get(agg.getName())
);
}
rowContainer.set(null);
final Integer rowIndex = indexIncrement.getAndIncrement();
concurrentSet(rowIndex, aggs);

View File

@ -29,6 +29,8 @@ import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.CloserRule;
import io.druid.query.aggregation.FilteredAggregatorFactory;
import io.druid.query.filter.SelectorDimFilter;
import org.joda.time.DateTime;
import org.junit.Rule;
import org.junit.Test;
@ -73,7 +75,15 @@ public class IncrementalIndexTest
public IncrementalIndex createIndex()
{
return new OnheapIncrementalIndex(
0, QueryGranularity.MINUTE, new AggregatorFactory[]{new CountAggregatorFactory("cnt")}, 1000
0,
QueryGranularity.MINUTE,
new AggregatorFactory[]{
new FilteredAggregatorFactory(
new CountAggregatorFactory("cnt"),
new SelectorDimFilter("billy", "A")
)
},
1000
);
}
}
@ -88,7 +98,12 @@ public class IncrementalIndexTest
return new OffheapIncrementalIndex(
0L,
QueryGranularity.NONE,
new AggregatorFactory[]{new CountAggregatorFactory("cnt")},
new AggregatorFactory[]{
new FilteredAggregatorFactory(
new CountAggregatorFactory("cnt"),
new SelectorDimFilter("billy", "A")
)
},
1000000,
new StupidPool<ByteBuffer>(
new Supplier<ByteBuffer>()
@ -104,7 +119,6 @@ public class IncrementalIndexTest
}
}
}
}
);
}
@ -153,6 +167,13 @@ public class IncrementalIndexTest
ImmutableMap.<String, Object>of("billy", "A", "joe", "B")
)
);
index.add(
new MapBasedInputRow(
new DateTime().minus(1).getMillis(),
Lists.newArrayList("billy", "joe"),
ImmutableMap.<String, Object>of("billy", "C", "joe", "B")
)
);
index.add(
new MapBasedInputRow(
new DateTime().minus(1).getMillis(),