mirror of https://github.com/apache/druid.git
optimize topn on particular situation
This commit is contained in:
parent
2307500b45
commit
a5ef30ff84
|
@ -77,12 +77,16 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
|
|||
final int cardinality = params.getCardinality();
|
||||
int numProcessed = 0;
|
||||
while (numProcessed < cardinality) {
|
||||
final int numToProcess = Math.min(params.getNumValuesPerPass(), cardinality - numProcessed);
|
||||
final int numToProcess;
|
||||
int maxNumToProcess = Math.min(params.getNumValuesPerPass(), cardinality - numProcessed);
|
||||
|
||||
DimValSelector theDimValSelector;
|
||||
if (!hasDimValSelector) {
|
||||
numToProcess = maxNumToProcess;
|
||||
theDimValSelector = makeDimValSelector(params, numProcessed, numToProcess);
|
||||
} else {
|
||||
//skip invalid, calculate length to have enough valid value to process or hit the end.
|
||||
numToProcess = computeNewLength(dimValSelector, numProcessed, maxNumToProcess);
|
||||
theDimValSelector = updateDimValSelector(dimValSelector, numProcessed, numToProcess);
|
||||
}
|
||||
|
||||
|
@ -101,6 +105,20 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
|
|||
|
||||
protected abstract DimValSelector makeDimValSelector(Parameters params, int numProcessed, int numToProcess);
|
||||
|
||||
/**
|
||||
* Skip invalid value, calculate length to have enough valid value to process or hit the end.
|
||||
*
|
||||
* @param dimValSelector the dim value selector which record value is valid or invalid.
|
||||
* @param numProcessed the start position to process
|
||||
* @param numToProcess the number of valid value to process
|
||||
*
|
||||
* @return the length between which have enough valid value to process or hit the end.
|
||||
*/
|
||||
protected int computeNewLength(DimValSelector dimValSelector, int numProcessed, int numToProcess)
|
||||
{
|
||||
return numToProcess;
|
||||
}
|
||||
|
||||
protected abstract DimValSelector updateDimValSelector(
|
||||
DimValSelector dimValSelector,
|
||||
int numProcessed,
|
||||
|
|
|
@ -31,6 +31,7 @@ import io.druid.segment.data.IndexedInts;
|
|||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
*/
|
||||
|
@ -126,6 +127,21 @@ public class PooledTopNAlgorithm
|
|||
return query.getTopNMetricSpec().configureOptimizer(arrayProvider).build();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int computeNewLength(int[] dimValSelector, int numProcessed, int numToProcess)
|
||||
{
|
||||
int valid = 0;
|
||||
int length = 0;
|
||||
for (int i = numProcessed; i < dimValSelector.length && valid < numToProcess; i++) {
|
||||
length++;
|
||||
if (SKIP_POSITION_VALUE != dimValSelector[i]) {
|
||||
valid++;
|
||||
}
|
||||
}
|
||||
return length;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int[] updateDimValSelector(int[] dimValSelector, int numProcessed, int numToProcess)
|
||||
{
|
||||
final int[] retVal = Arrays.copyOf(dimValSelector, dimValSelector.length);
|
||||
|
@ -184,6 +200,7 @@ public class PooledTopNAlgorithm
|
|||
|
||||
final int aggSize = theAggregators.length;
|
||||
final int aggExtra = aggSize % AGG_UNROLL_COUNT;
|
||||
final AtomicInteger currentPosition = new AtomicInteger(0);
|
||||
|
||||
while (!cursor.isDone()) {
|
||||
final IndexedInts dimValues = dimSelector.getRow();
|
||||
|
@ -192,29 +209,194 @@ public class PooledTopNAlgorithm
|
|||
final int dimExtra = dimSize % AGG_UNROLL_COUNT;
|
||||
switch(dimExtra){
|
||||
case 7:
|
||||
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(6));
|
||||
aggregateDimValue(
|
||||
positions,
|
||||
theAggregators,
|
||||
numProcessed,
|
||||
resultsBuf,
|
||||
numBytesPerRecord,
|
||||
aggregatorOffsets,
|
||||
aggSize,
|
||||
aggExtra,
|
||||
dimValues.get(6),
|
||||
currentPosition
|
||||
);
|
||||
case 6:
|
||||
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(5));
|
||||
aggregateDimValue(
|
||||
positions,
|
||||
theAggregators,
|
||||
numProcessed,
|
||||
resultsBuf,
|
||||
numBytesPerRecord,
|
||||
aggregatorOffsets,
|
||||
aggSize,
|
||||
aggExtra,
|
||||
dimValues.get(5),
|
||||
currentPosition
|
||||
);
|
||||
case 5:
|
||||
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(4));
|
||||
aggregateDimValue(
|
||||
positions,
|
||||
theAggregators,
|
||||
numProcessed,
|
||||
resultsBuf,
|
||||
numBytesPerRecord,
|
||||
aggregatorOffsets,
|
||||
aggSize,
|
||||
aggExtra,
|
||||
dimValues.get(4),
|
||||
currentPosition
|
||||
);
|
||||
case 4:
|
||||
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(3));
|
||||
aggregateDimValue(
|
||||
positions,
|
||||
theAggregators,
|
||||
numProcessed,
|
||||
resultsBuf,
|
||||
numBytesPerRecord,
|
||||
aggregatorOffsets,
|
||||
aggSize,
|
||||
aggExtra,
|
||||
dimValues.get(3),
|
||||
currentPosition
|
||||
);
|
||||
case 3:
|
||||
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(2));
|
||||
aggregateDimValue(
|
||||
positions,
|
||||
theAggregators,
|
||||
numProcessed,
|
||||
resultsBuf,
|
||||
numBytesPerRecord,
|
||||
aggregatorOffsets,
|
||||
aggSize,
|
||||
aggExtra,
|
||||
dimValues.get(2),
|
||||
currentPosition
|
||||
);
|
||||
case 2:
|
||||
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(1));
|
||||
aggregateDimValue(
|
||||
positions,
|
||||
theAggregators,
|
||||
numProcessed,
|
||||
resultsBuf,
|
||||
numBytesPerRecord,
|
||||
aggregatorOffsets,
|
||||
aggSize,
|
||||
aggExtra,
|
||||
dimValues.get(1),
|
||||
currentPosition
|
||||
);
|
||||
case 1:
|
||||
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(0));
|
||||
aggregateDimValue(
|
||||
positions,
|
||||
theAggregators,
|
||||
numProcessed,
|
||||
resultsBuf,
|
||||
numBytesPerRecord,
|
||||
aggregatorOffsets,
|
||||
aggSize,
|
||||
aggExtra,
|
||||
dimValues.get(0),
|
||||
currentPosition
|
||||
);
|
||||
}
|
||||
for (int i = dimExtra; i < dimSize; i += AGG_UNROLL_COUNT) {
|
||||
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(i));
|
||||
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(i+1));
|
||||
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(i+2));
|
||||
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(i+3));
|
||||
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(i+4));
|
||||
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(i+5));
|
||||
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(i+6));
|
||||
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(i+7));
|
||||
aggregateDimValue(
|
||||
positions,
|
||||
theAggregators,
|
||||
numProcessed,
|
||||
resultsBuf,
|
||||
numBytesPerRecord,
|
||||
aggregatorOffsets,
|
||||
aggSize,
|
||||
aggExtra,
|
||||
dimValues.get(i),
|
||||
currentPosition
|
||||
);
|
||||
aggregateDimValue(
|
||||
positions,
|
||||
theAggregators,
|
||||
numProcessed,
|
||||
resultsBuf,
|
||||
numBytesPerRecord,
|
||||
aggregatorOffsets,
|
||||
aggSize,
|
||||
aggExtra,
|
||||
dimValues.get(i + 1),
|
||||
currentPosition
|
||||
);
|
||||
aggregateDimValue(
|
||||
positions,
|
||||
theAggregators,
|
||||
numProcessed,
|
||||
resultsBuf,
|
||||
numBytesPerRecord,
|
||||
aggregatorOffsets,
|
||||
aggSize,
|
||||
aggExtra,
|
||||
dimValues.get(i + 2),
|
||||
currentPosition
|
||||
);
|
||||
aggregateDimValue(
|
||||
positions,
|
||||
theAggregators,
|
||||
numProcessed,
|
||||
resultsBuf,
|
||||
numBytesPerRecord,
|
||||
aggregatorOffsets,
|
||||
aggSize,
|
||||
aggExtra,
|
||||
dimValues.get(i + 3),
|
||||
currentPosition
|
||||
);
|
||||
aggregateDimValue(
|
||||
positions,
|
||||
theAggregators,
|
||||
numProcessed,
|
||||
resultsBuf,
|
||||
numBytesPerRecord,
|
||||
aggregatorOffsets,
|
||||
aggSize,
|
||||
aggExtra,
|
||||
dimValues.get(i + 4),
|
||||
currentPosition
|
||||
);
|
||||
aggregateDimValue(
|
||||
positions,
|
||||
theAggregators,
|
||||
numProcessed,
|
||||
resultsBuf,
|
||||
numBytesPerRecord,
|
||||
aggregatorOffsets,
|
||||
aggSize,
|
||||
aggExtra,
|
||||
dimValues.get(i + 5),
|
||||
currentPosition
|
||||
);
|
||||
aggregateDimValue(
|
||||
positions,
|
||||
theAggregators,
|
||||
numProcessed,
|
||||
resultsBuf,
|
||||
numBytesPerRecord,
|
||||
aggregatorOffsets,
|
||||
aggSize,
|
||||
aggExtra,
|
||||
dimValues.get(i + 6),
|
||||
currentPosition
|
||||
);
|
||||
aggregateDimValue(
|
||||
positions,
|
||||
theAggregators,
|
||||
numProcessed,
|
||||
resultsBuf,
|
||||
numBytesPerRecord,
|
||||
aggregatorOffsets,
|
||||
aggSize,
|
||||
aggExtra,
|
||||
dimValues.get(i + 7),
|
||||
currentPosition
|
||||
);
|
||||
}
|
||||
cursor.advance();
|
||||
}
|
||||
|
@ -229,14 +411,15 @@ public class PooledTopNAlgorithm
|
|||
final int[] aggregatorOffsets,
|
||||
final int aggSize,
|
||||
final int aggExtra,
|
||||
final int dimIndex
|
||||
final int dimIndex,
|
||||
final AtomicInteger currentPosition
|
||||
)
|
||||
{
|
||||
if (SKIP_POSITION_VALUE == positions[dimIndex]) {
|
||||
return;
|
||||
}
|
||||
if (INIT_POSITION_VALUE == positions[dimIndex]) {
|
||||
positions[dimIndex] = (dimIndex - numProcessed) * numBytesPerRecord;
|
||||
positions[dimIndex] = currentPosition.getAndIncrement() * numBytesPerRecord;
|
||||
final int pos = positions[dimIndex];
|
||||
for (int j = 0; j < aggSize; ++j) {
|
||||
theAggregators[j].init(resultsBuf, pos + aggregatorOffsets[j]);
|
||||
|
|
Loading…
Reference in New Issue