From 9611358f0a7cb324e58c8b8fb5d4ed3782981bd5 Mon Sep 17 00:00:00 2001 From: Roman Leventov Date: Mon, 17 Oct 2016 20:36:19 +0300 Subject: [PATCH] Small topn scan improvements (#3526) * Remove unused numProcessed param from PooledTopNAlgorithm.aggregateDimValue() * Replace AtomicInteger with simple int in PooledTopNAlgorithm.scanAndAggregate() and aggregateDimValue() * Remove unused import --- .../druid/query/topn/PooledTopNAlgorithm.java | 63 ++++++++----------- 1 file changed, 26 insertions(+), 37 deletions(-) diff --git a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java index 404f3962ca4..24453bf5974 100644 --- a/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java +++ b/processing/src/main/java/io/druid/query/topn/PooledTopNAlgorithm.java @@ -31,7 +31,6 @@ import io.druid.segment.data.IndexedInts; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.concurrent.atomic.AtomicInteger; /** */ @@ -203,7 +202,7 @@ public class PooledTopNAlgorithm final int aggSize = theAggregators.length; final int aggExtra = aggSize % AGG_UNROLL_COUNT; - final AtomicInteger currentPosition = new AtomicInteger(0); + int currentPosition = 0; while (!cursor.isDone()) { final IndexedInts dimValues = dimSelector.getRow(); @@ -212,10 +211,9 @@ public class PooledTopNAlgorithm final int dimExtra = dimSize % AGG_UNROLL_COUNT; switch(dimExtra){ case 7: - aggregateDimValue( + currentPosition = aggregateDimValue( positions, theAggregators, - numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, @@ -225,10 +223,9 @@ public class PooledTopNAlgorithm currentPosition ); case 6: - aggregateDimValue( + currentPosition = aggregateDimValue( positions, theAggregators, - numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, @@ -238,10 +235,9 @@ public class PooledTopNAlgorithm currentPosition ); case 5: - aggregateDimValue( + currentPosition = aggregateDimValue( positions, theAggregators, - numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, @@ -251,10 +247,9 @@ public class PooledTopNAlgorithm currentPosition ); case 4: - aggregateDimValue( + currentPosition = aggregateDimValue( positions, theAggregators, - numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, @@ -264,10 +259,9 @@ public class PooledTopNAlgorithm currentPosition ); case 3: - aggregateDimValue( + currentPosition = aggregateDimValue( positions, theAggregators, - numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, @@ -277,10 +271,9 @@ public class PooledTopNAlgorithm currentPosition ); case 2: - aggregateDimValue( + currentPosition = aggregateDimValue( positions, theAggregators, - numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, @@ -290,10 +283,9 @@ public class PooledTopNAlgorithm currentPosition ); case 1: - aggregateDimValue( + currentPosition = aggregateDimValue( positions, theAggregators, - numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, @@ -304,10 +296,9 @@ public class PooledTopNAlgorithm ); } for (int i = dimExtra; i < dimSize; i += AGG_UNROLL_COUNT) { - aggregateDimValue( + currentPosition = aggregateDimValue( positions, theAggregators, - numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, @@ -316,10 +307,9 @@ public class PooledTopNAlgorithm dimValues.get(i), currentPosition ); - aggregateDimValue( + currentPosition = aggregateDimValue( positions, theAggregators, - numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, @@ -328,10 +318,9 @@ public class PooledTopNAlgorithm dimValues.get(i + 1), currentPosition ); - aggregateDimValue( + currentPosition = aggregateDimValue( positions, theAggregators, - numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, @@ -340,10 +329,9 @@ public class PooledTopNAlgorithm dimValues.get(i + 2), currentPosition ); - aggregateDimValue( + currentPosition = aggregateDimValue( positions, theAggregators, - numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, @@ -352,10 +340,9 @@ public class PooledTopNAlgorithm dimValues.get(i + 3), currentPosition ); - aggregateDimValue( + currentPosition = aggregateDimValue( positions, theAggregators, - numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, @@ -364,10 +351,9 @@ public class PooledTopNAlgorithm dimValues.get(i + 4), currentPosition ); - aggregateDimValue( + currentPosition = aggregateDimValue( positions, theAggregators, - numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, @@ -376,10 +362,9 @@ public class PooledTopNAlgorithm dimValues.get(i + 5), currentPosition ); - aggregateDimValue( + currentPosition = aggregateDimValue( positions, theAggregators, - numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, @@ -388,10 +373,9 @@ public class PooledTopNAlgorithm dimValues.get(i + 6), currentPosition ); - aggregateDimValue( + currentPosition = aggregateDimValue( positions, theAggregators, - numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, @@ -405,24 +389,28 @@ public class PooledTopNAlgorithm } } - private static void aggregateDimValue( + /** + * Returns a new currentPosition, incremented if a new position was initialized, otherwise the same position as passed + * in the last argument. + */ + private static int aggregateDimValue( final int[] positions, final BufferAggregator[] theAggregators, - final int numProcessed, final ByteBuffer resultsBuf, final int numBytesPerRecord, final int[] aggregatorOffsets, final int aggSize, final int aggExtra, final int dimIndex, - final AtomicInteger currentPosition + int currentPosition ) { if (SKIP_POSITION_VALUE == positions[dimIndex]) { - return; + return currentPosition; } if (INIT_POSITION_VALUE == positions[dimIndex]) { - positions[dimIndex] = currentPosition.getAndIncrement() * numBytesPerRecord; + positions[dimIndex] = currentPosition * numBytesPerRecord; + currentPosition++; final int pos = positions[dimIndex]; for (int j = 0; j < aggSize; ++j) { theAggregators[j].init(resultsBuf, pos + aggregatorOffsets[j]); @@ -456,6 +444,7 @@ public class PooledTopNAlgorithm theAggregators[j+6].aggregate(resultsBuf, position + aggregatorOffsets[j+6]); theAggregators[j+7].aggregate(resultsBuf, position + aggregatorOffsets[j+7]); } + return currentPosition; } @Override