Small topn scan improvements (#3526)

* Remove unused numProcessed param from PooledTopNAlgorithm.aggregateDimValue()

* Replace AtomicInteger with simple int in PooledTopNAlgorithm.scanAndAggregate() and aggregateDimValue()

* Remove unused import
This commit is contained in:
Roman Leventov 2016-10-17 20:36:19 +03:00 committed by Charles Allen
parent 285516bede
commit 9611358f0a
1 changed files with 26 additions and 37 deletions

View File

@ -31,7 +31,6 @@ import io.druid.segment.data.IndexedInts;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
/**
*/
@ -203,7 +202,7 @@ public class PooledTopNAlgorithm
final int aggSize = theAggregators.length;
final int aggExtra = aggSize % AGG_UNROLL_COUNT;
final AtomicInteger currentPosition = new AtomicInteger(0);
int currentPosition = 0;
while (!cursor.isDone()) {
final IndexedInts dimValues = dimSelector.getRow();
@ -212,10 +211,9 @@ public class PooledTopNAlgorithm
final int dimExtra = dimSize % AGG_UNROLL_COUNT;
switch(dimExtra){
case 7:
aggregateDimValue(
currentPosition = aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
@ -225,10 +223,9 @@ public class PooledTopNAlgorithm
currentPosition
);
case 6:
aggregateDimValue(
currentPosition = aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
@ -238,10 +235,9 @@ public class PooledTopNAlgorithm
currentPosition
);
case 5:
aggregateDimValue(
currentPosition = aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
@ -251,10 +247,9 @@ public class PooledTopNAlgorithm
currentPosition
);
case 4:
aggregateDimValue(
currentPosition = aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
@ -264,10 +259,9 @@ public class PooledTopNAlgorithm
currentPosition
);
case 3:
aggregateDimValue(
currentPosition = aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
@ -277,10 +271,9 @@ public class PooledTopNAlgorithm
currentPosition
);
case 2:
aggregateDimValue(
currentPosition = aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
@ -290,10 +283,9 @@ public class PooledTopNAlgorithm
currentPosition
);
case 1:
aggregateDimValue(
currentPosition = aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
@ -304,10 +296,9 @@ public class PooledTopNAlgorithm
);
}
for (int i = dimExtra; i < dimSize; i += AGG_UNROLL_COUNT) {
aggregateDimValue(
currentPosition = aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
@ -316,10 +307,9 @@ public class PooledTopNAlgorithm
dimValues.get(i),
currentPosition
);
aggregateDimValue(
currentPosition = aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
@ -328,10 +318,9 @@ public class PooledTopNAlgorithm
dimValues.get(i + 1),
currentPosition
);
aggregateDimValue(
currentPosition = aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
@ -340,10 +329,9 @@ public class PooledTopNAlgorithm
dimValues.get(i + 2),
currentPosition
);
aggregateDimValue(
currentPosition = aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
@ -352,10 +340,9 @@ public class PooledTopNAlgorithm
dimValues.get(i + 3),
currentPosition
);
aggregateDimValue(
currentPosition = aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
@ -364,10 +351,9 @@ public class PooledTopNAlgorithm
dimValues.get(i + 4),
currentPosition
);
aggregateDimValue(
currentPosition = aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
@ -376,10 +362,9 @@ public class PooledTopNAlgorithm
dimValues.get(i + 5),
currentPosition
);
aggregateDimValue(
currentPosition = aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
@ -388,10 +373,9 @@ public class PooledTopNAlgorithm
dimValues.get(i + 6),
currentPosition
);
aggregateDimValue(
currentPosition = aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
@ -405,24 +389,28 @@ public class PooledTopNAlgorithm
}
}
private static void aggregateDimValue(
/**
* Returns a new currentPosition, incremented if a new position was initialized, otherwise the same position as passed
* in the last argument.
*/
private static int aggregateDimValue(
final int[] positions,
final BufferAggregator[] theAggregators,
final int numProcessed,
final ByteBuffer resultsBuf,
final int numBytesPerRecord,
final int[] aggregatorOffsets,
final int aggSize,
final int aggExtra,
final int dimIndex,
final AtomicInteger currentPosition
int currentPosition
)
{
if (SKIP_POSITION_VALUE == positions[dimIndex]) {
return;
return currentPosition;
}
if (INIT_POSITION_VALUE == positions[dimIndex]) {
positions[dimIndex] = currentPosition.getAndIncrement() * numBytesPerRecord;
positions[dimIndex] = currentPosition * numBytesPerRecord;
currentPosition++;
final int pos = positions[dimIndex];
for (int j = 0; j < aggSize; ++j) {
theAggregators[j].init(resultsBuf, pos + aggregatorOffsets[j]);
@ -456,6 +444,7 @@ public class PooledTopNAlgorithm
theAggregators[j+6].aggregate(resultsBuf, position + aggregatorOffsets[j+6]);
theAggregators[j+7].aggregate(resultsBuf, position + aggregatorOffsets[j+7]);
}
return currentPosition;
}
@Override