Merge pull request #2222 from binlijin/topN_optimize

optimize topn on particular situation
This commit is contained in:
Fangjin Yang 2016-02-01 22:55:34 -08:00
commit 377e3583f8
2 changed files with 219 additions and 18 deletions

View File

@ -77,12 +77,16 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
final int cardinality = params.getCardinality(); final int cardinality = params.getCardinality();
int numProcessed = 0; int numProcessed = 0;
while (numProcessed < cardinality) { while (numProcessed < cardinality) {
final int numToProcess = Math.min(params.getNumValuesPerPass(), cardinality - numProcessed); final int numToProcess;
int maxNumToProcess = Math.min(params.getNumValuesPerPass(), cardinality - numProcessed);
DimValSelector theDimValSelector; DimValSelector theDimValSelector;
if (!hasDimValSelector) { if (!hasDimValSelector) {
numToProcess = maxNumToProcess;
theDimValSelector = makeDimValSelector(params, numProcessed, numToProcess); theDimValSelector = makeDimValSelector(params, numProcessed, numToProcess);
} else { } else {
//skip invalid, calculate length to have enough valid value to process or hit the end.
numToProcess = computeNewLength(dimValSelector, numProcessed, maxNumToProcess);
theDimValSelector = updateDimValSelector(dimValSelector, numProcessed, numToProcess); theDimValSelector = updateDimValSelector(dimValSelector, numProcessed, numToProcess);
} }
@ -101,6 +105,20 @@ public abstract class BaseTopNAlgorithm<DimValSelector, DimValAggregateStore, Pa
protected abstract DimValSelector makeDimValSelector(Parameters params, int numProcessed, int numToProcess); protected abstract DimValSelector makeDimValSelector(Parameters params, int numProcessed, int numToProcess);
/**
* Skip invalid value, calculate length to have enough valid value to process or hit the end.
*
* @param dimValSelector the dim value selector which record value is valid or invalid.
* @param numProcessed the start position to process
* @param numToProcess the number of valid value to process
*
* @return the length between which have enough valid value to process or hit the end.
*/
protected int computeNewLength(DimValSelector dimValSelector, int numProcessed, int numToProcess)
{
return numToProcess;
}
protected abstract DimValSelector updateDimValSelector( protected abstract DimValSelector updateDimValSelector(
DimValSelector dimValSelector, DimValSelector dimValSelector,
int numProcessed, int numProcessed,

View File

@ -31,6 +31,7 @@ import io.druid.segment.data.IndexedInts;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
*/ */
@ -126,6 +127,21 @@ public class PooledTopNAlgorithm
return query.getTopNMetricSpec().configureOptimizer(arrayProvider).build(); return query.getTopNMetricSpec().configureOptimizer(arrayProvider).build();
} }
@Override
protected int computeNewLength(int[] dimValSelector, int numProcessed, int numToProcess)
{
int valid = 0;
int length = 0;
for (int i = numProcessed; i < dimValSelector.length && valid < numToProcess; i++) {
length++;
if (SKIP_POSITION_VALUE != dimValSelector[i]) {
valid++;
}
}
return length;
}
@Override
protected int[] updateDimValSelector(int[] dimValSelector, int numProcessed, int numToProcess) protected int[] updateDimValSelector(int[] dimValSelector, int numProcessed, int numToProcess)
{ {
final int[] retVal = Arrays.copyOf(dimValSelector, dimValSelector.length); final int[] retVal = Arrays.copyOf(dimValSelector, dimValSelector.length);
@ -184,6 +200,7 @@ public class PooledTopNAlgorithm
final int aggSize = theAggregators.length; final int aggSize = theAggregators.length;
final int aggExtra = aggSize % AGG_UNROLL_COUNT; final int aggExtra = aggSize % AGG_UNROLL_COUNT;
final AtomicInteger currentPosition = new AtomicInteger(0);
while (!cursor.isDone()) { while (!cursor.isDone()) {
final IndexedInts dimValues = dimSelector.getRow(); final IndexedInts dimValues = dimSelector.getRow();
@ -192,29 +209,194 @@ public class PooledTopNAlgorithm
final int dimExtra = dimSize % AGG_UNROLL_COUNT; final int dimExtra = dimSize % AGG_UNROLL_COUNT;
switch(dimExtra){ switch(dimExtra){
case 7: case 7:
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(6)); aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(6),
currentPosition
);
case 6: case 6:
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(5)); aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(5),
currentPosition
);
case 5: case 5:
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(4)); aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(4),
currentPosition
);
case 4: case 4:
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(3)); aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(3),
currentPosition
);
case 3: case 3:
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(2)); aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(2),
currentPosition
);
case 2: case 2:
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(1)); aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(1),
currentPosition
);
case 1: case 1:
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(0)); aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(0),
currentPosition
);
} }
for (int i = dimExtra; i < dimSize; i += AGG_UNROLL_COUNT) { for (int i = dimExtra; i < dimSize; i += AGG_UNROLL_COUNT) {
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(i)); aggregateDimValue(
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(i+1)); positions,
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(i+2)); theAggregators,
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(i+3)); numProcessed,
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(i+4)); resultsBuf,
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(i+5)); numBytesPerRecord,
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(i+6)); aggregatorOffsets,
aggregateDimValue(positions, theAggregators, numProcessed, resultsBuf, numBytesPerRecord, aggregatorOffsets, aggSize, aggExtra, dimValues.get(i+7)); aggSize,
aggExtra,
dimValues.get(i),
currentPosition
);
aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 1),
currentPosition
);
aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 2),
currentPosition
);
aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 3),
currentPosition
);
aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 4),
currentPosition
);
aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 5),
currentPosition
);
aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 6),
currentPosition
);
aggregateDimValue(
positions,
theAggregators,
numProcessed,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 7),
currentPosition
);
} }
cursor.advance(); cursor.advance();
} }
@ -229,14 +411,15 @@ public class PooledTopNAlgorithm
final int[] aggregatorOffsets, final int[] aggregatorOffsets,
final int aggSize, final int aggSize,
final int aggExtra, final int aggExtra,
final int dimIndex final int dimIndex,
final AtomicInteger currentPosition
) )
{ {
if (SKIP_POSITION_VALUE == positions[dimIndex]) { if (SKIP_POSITION_VALUE == positions[dimIndex]) {
return; return;
} }
if (INIT_POSITION_VALUE == positions[dimIndex]) { if (INIT_POSITION_VALUE == positions[dimIndex]) {
positions[dimIndex] = (dimIndex - numProcessed) * numBytesPerRecord; positions[dimIndex] = currentPosition.getAndIncrement() * numBytesPerRecord;
final int pos = positions[dimIndex]; final int pos = positions[dimIndex];
for (int j = 0; j < aggSize; ++j) { for (int j = 0; j < aggSize; ++j) {
theAggregators[j].init(resultsBuf, pos + aggregatorOffsets[j]); theAggregators[j].init(resultsBuf, pos + aggregatorOffsets[j]);