before facts get it , indexAndOffsets should already know about it

This commit is contained in:
Himanshu Gupta 2016-02-12 13:04:46 -06:00
parent d63eec65a1
commit da5fcd0124
1 changed files with 15 additions and 8 deletions

View File

@ -200,14 +200,18 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
bufferOffset = indexAndOffset[1];
aggBuffer = aggBuffers.get(bufferIndex).get();
} else {
rowContainer.set(row);
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
getAggs()[i] = agg.factorizeBuffered(
makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)
);
if (metrics.length > 0 && getAggs()[0] == null) {
// note: creation of Aggregators is done lazily when at least one row from input is available
// so that FilteredAggregators could be initialized correctly.
rowContainer.set(row);
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
getAggs()[i] = agg.factorizeBuffered(
makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)
);
}
rowContainer.set(null);
}
rowContainer.set(null);
bufferIndex = aggBuffers.size() - 1;
ByteBuffer lastBuffer = aggBuffers.isEmpty() ? null : aggBuffers.get(aggBuffers.size() - 1).get();
@ -241,10 +245,13 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
}
final Integer rowIndex = indexIncrement.getAndIncrement();
// note that indexAndOffsets must be updated before facts, because as soon as we update facts
// concurrent readers get hold of it and might ask for newly added row
indexAndOffsets.add(new int[]{bufferIndex, bufferOffset});
final Integer prev = facts.putIfAbsent(key, rowIndex);
if (null == prev) {
numEntries.incrementAndGet();
indexAndOffsets.add(new int[]{bufferIndex, bufferOffset});
} else {
throw new ISE("WTF! we are in sychronized block.");
}