diff --git a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java index 56e4c0fef4a..6c7fb11461a 100644 --- a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java @@ -40,7 +40,7 @@ public class PooledTopNAlgorithm private final Capabilities capabilities; private final TopNQuery query; private final StupidPool bufferPool; - private static final int UNROLL_COUNT = 8; // Must be able to fit loop below + private static final int AGG_UNROLL_COUNT = 8; // Must be able to fit loop below public PooledTopNAlgorithm( Capabilities capabilities, @@ -165,59 +165,80 @@ public class PooledTopNAlgorithm offset += aggregatorSizes[j]; } - final int nAggregators = theAggregators.length; - final int extra = nAggregators % UNROLL_COUNT; + final int aggSize = theAggregators.length; + final int aggExtra = aggSize % AGG_UNROLL_COUNT; while (!cursor.isDone()) { final IndexedInts dimValues = dimSelector.getRow(); - final int size = dimValues.size(); - for (int i = 0; i < size; ++i) { - final int dimIndex = dimValues.get(i); - int position = positions[dimIndex]; - if (SKIP_POSITION_VALUE == position) { - continue; - } - if (INIT_POSITION_VALUE == position) { - positions[dimIndex] = (dimIndex - numProcessed) * numBytesPerRecord; - position = positions[dimIndex]; - for (int j = 0; j < nAggregators; ++j) { - theAggregators[j].init(resultsBuf, position + aggregatorOffsets[j]); - } - position = positions[dimIndex]; - } - - switch(extra) { - case 7: - theAggregators[6].aggregate(resultsBuf, position + aggregatorOffsets[6]); - case 6: - theAggregators[5].aggregate(resultsBuf, position + aggregatorOffsets[5]); - case 5: - theAggregators[4].aggregate(resultsBuf, position + aggregatorOffsets[4]); - case 4: - theAggregators[3].aggregate(resultsBuf, position + aggregatorOffsets[3]); - case 3: - theAggregators[2].aggregate(resultsBuf, position + aggregatorOffsets[2]); - case 2: - theAggregators[1].aggregate(resultsBuf, position + aggregatorOffsets[1]); - case 1: - theAggregators[0].aggregate(resultsBuf, position + aggregatorOffsets[0]); - } - for (int j = extra; j < nAggregators; j += UNROLL_COUNT) { - theAggregators[j].aggregate(resultsBuf, position + aggregatorOffsets[j]); - theAggregators[j+1].aggregate(resultsBuf, position + aggregatorOffsets[j+1]); - theAggregators[j+2].aggregate(resultsBuf, position + aggregatorOffsets[j+2]); - theAggregators[j+3].aggregate(resultsBuf, position + aggregatorOffsets[j+3]); - theAggregators[j+4].aggregate(resultsBuf, position + aggregatorOffsets[j+4]); - theAggregators[j+5].aggregate(resultsBuf, position + aggregatorOffsets[j+5]); - theAggregators[j+6].aggregate(resultsBuf, position + aggregatorOffsets[j+6]); - theAggregators[j+7].aggregate(resultsBuf, position + aggregatorOffsets[j+7]); - } + final int dimSize = dimValues.size(); + final int dimExtra = dimSize - dimSize % 4; + final int dimUpper = (dimSize / 4) * 4 - 1; + for (int i = 0; i < dimUpper; i += 4) { + aggregateDimValues(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(i)); + aggregateDimValues(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(i+1)); + aggregateDimValues(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(i+2)); + aggregateDimValues(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(i+3)); + } + for (int i = dimExtra; i < dimSize; ++i) { + aggregateDimValues(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(i)); } cursor.advance(); } } + private static void aggregateDimValues( + int[] positions, + BufferAggregator[] theAggregators, + int numProcessed, + ByteBuffer resultsBuf, + int numBytesPerRecord, + int[] aggregatorOffsets, + int aggSize, + int aggExtra, + int dimIndex + ) + { + if (SKIP_POSITION_VALUE == positions[dimIndex]) { + return; + } + if (INIT_POSITION_VALUE == positions[dimIndex]) { + positions[dimIndex] = (dimIndex - numProcessed) * numBytesPerRecord; + final int pos = positions[dimIndex]; + for (int j = 0; j < aggSize; ++j) { + theAggregators[j].init(resultsBuf, pos + aggregatorOffsets[j]); + } + } + final int position = positions[dimIndex]; + + switch(aggExtra) { + case 7: + theAggregators[6].aggregate(resultsBuf, position + aggregatorOffsets[6]); + case 6: + theAggregators[5].aggregate(resultsBuf, position + aggregatorOffsets[5]); + case 5: + theAggregators[4].aggregate(resultsBuf, position + aggregatorOffsets[4]); + case 4: + theAggregators[3].aggregate(resultsBuf, position + aggregatorOffsets[3]); + case 3: + theAggregators[2].aggregate(resultsBuf, position + aggregatorOffsets[2]); + case 2: + theAggregators[1].aggregate(resultsBuf, position + aggregatorOffsets[1]); + case 1: + theAggregators[0].aggregate(resultsBuf, position + aggregatorOffsets[0]); + } + for (int j = aggExtra; j < aggSize; j += AGG_UNROLL_COUNT) { + theAggregators[j].aggregate(resultsBuf, position + aggregatorOffsets[j]); + theAggregators[j+1].aggregate(resultsBuf, position + aggregatorOffsets[j+1]); + theAggregators[j+2].aggregate(resultsBuf, position + aggregatorOffsets[j+2]); + theAggregators[j+3].aggregate(resultsBuf, position + aggregatorOffsets[j+3]); + theAggregators[j+4].aggregate(resultsBuf, position + aggregatorOffsets[j+4]); + theAggregators[j+5].aggregate(resultsBuf, position + aggregatorOffsets[j+5]); + theAggregators[j+6].aggregate(resultsBuf, position + aggregatorOffsets[j+6]); + theAggregators[j+7].aggregate(resultsBuf, position + aggregatorOffsets[j+7]); + } + } + @Override protected void updateResults( PooledTopNParams params,