2528 Replace Incremental Index Global Flags with Getters (#7043)

* Eliminated reportParseExceptions and deserializeComplexMetrics

* Removed more global flags

* Cleanup

* Addressed Surekha's recommendations
This commit is contained in:
Justin Borromeo 2019-02-15 13:36:46 -08:00 committed by Surekha
parent 0fa9000849
commit c7eeeabf45
5 changed files with 58 additions and 48 deletions

View File

@ -97,8 +97,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
/**
*/
public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex implements Iterable<Row>, Closeable
{
private volatile DateTime maxIngestedEventTime;
@ -250,7 +248,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
/**
* Setting deserializeComplexMetrics to false is necessary for intermediate aggregation such as groupBy that
* should not deserialize input columns using ComplexMetricSerde for aggregators that return complex metrics.
*
* <p>
* Set concurrentEventAdd to true to indicate that adding of input row should be thread-safe (for example, groupBy
* where the multiple threads can add concurrently to the IncrementalIndex).
*
@ -482,12 +480,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
// Note: This method needs to be thread safe.
protected abstract AddToFactsResult addToFacts(
AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
InputRow row,
AtomicInteger numEntries,
AtomicLong sizeInBytes,
IncrementalIndexRow key,
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier,
@ -608,12 +601,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
{
IncrementalIndexRowResult incrementalIndexRowResult = toIncrementalIndexRow(row);
final AddToFactsResult addToFactsResult = addToFacts(
metrics,
deserializeComplexMetrics,
reportParseExceptions,
row,
numEntries,
bytesInMemory,
incrementalIndexRowResult.getIncrementalIndexRow(),
in,
rowSupplier,
@ -625,7 +613,11 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
incrementalIndexRowResult.getParseExceptionMessages(),
addToFactsResult.getParseExceptionMessages()
);
return new IncrementalIndexAddResult(addToFactsResult.getRowCount(), addToFactsResult.getBytesInMemory(), parseException);
return new IncrementalIndexAddResult(
addToFactsResult.getRowCount(),
addToFactsResult.getBytesInMemory(),
parseException
);
}
@VisibleForTesting
@ -785,9 +777,29 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
return numEntries.get();
}
public long getBytesInMemory()
boolean getDeserializeComplexMetrics()
{
return bytesInMemory.get();
return deserializeComplexMetrics;
}
boolean getReportParseExceptions()
{
return reportParseExceptions;
}
AtomicInteger getNumEntries()
{
return numEntries;
}
AggregatorFactory[] getMetrics()
{
return metrics;
}
public AtomicLong getBytesInMemory()
{
return bytesInMemory;
}
private long getMinTimeMillis()
@ -908,7 +920,10 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
* Index dimension ordering could be changed to initialize from DimensionsSpec after resolution of
* https://github.com/apache/incubator-druid/issues/2011
*/
public void loadDimensionIterable(Iterable<String> oldDimensionOrder, Map<String, ColumnCapabilitiesImpl> oldColumnCapabilities)
public void loadDimensionIterable(
Iterable<String> oldDimensionOrder,
Map<String, ColumnCapabilitiesImpl> oldColumnCapabilities
)
{
synchronized (dimensionDescs) {
if (!dimensionDescs.isEmpty()) {
@ -1289,7 +1304,9 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
public Iterator<IncrementalIndexRow> iterator(boolean descending)
{
if (descending && sortFacts) {
return ((ConcurrentNavigableMap<IncrementalIndexRow, IncrementalIndexRow>) facts).descendingMap().keySet().iterator();
return ((ConcurrentNavigableMap<IncrementalIndexRow, IncrementalIndexRow>) facts).descendingMap()
.keySet()
.iterator();
}
return keySet().iterator();
}
@ -1387,7 +1404,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
{
if (descending && sortFacts) {
return timeOrderedConcat(((ConcurrentNavigableMap<Long, Deque<IncrementalIndexRow>>) facts)
.descendingMap().values(), true).iterator();
.descendingMap().values(), true).iterator();
}
return timeOrderedConcat(facts.values(), false).iterator();
}

View File

@ -40,9 +40,9 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
*
*/
public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
{
@ -133,19 +133,15 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
}
}
aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length - 1].getMaxIntermediateSizeWithNulls();
aggsTotalSize = aggOffsetInBuffer[metrics.length - 1] + metrics[metrics.length
- 1].getMaxIntermediateSizeWithNulls();
return new BufferAggregator[metrics.length];
}
@Override
protected AddToFactsResult addToFacts(
AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
InputRow row,
AtomicInteger numEntries,
AtomicLong sizeInBytes, // ignored, added to make abstract class method impl happy
IncrementalIndexRow key,
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier,
@ -157,6 +153,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
int bufferOffset;
synchronized (this) {
final AggregatorFactory[] metrics = getMetrics();
final int priorIndex = facts.getPriorIndex(key);
if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) {
final int[] indexAndOffset = indexAndOffsets.get(priorIndex);
@ -202,7 +199,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
}
// Last ditch sanity checks
if (numEntries.get() >= maxRowCount && facts.getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX) {
if (getNumEntries().get() >= maxRowCount && facts.getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX) {
throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount);
}
@ -213,7 +210,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
indexAndOffsets.add(new int[]{bufferIndex, bufferOffset});
final int prev = facts.putIfAbsent(key, rowIndex);
if (IncrementalIndexRow.EMPTY_ROW_INDEX == prev) {
numEntries.incrementAndGet();
getNumEntries().incrementAndGet();
} else {
throw new ISE("WTF! we are in sychronized block.");
}
@ -222,7 +219,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
rowContainer.set(row);
for (int i = 0; i < metrics.length; i++) {
for (int i = 0; i < getMetrics().length; i++) {
final BufferAggregator agg = getAggs()[i];
synchronized (agg) {
@ -231,7 +228,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
}
catch (ParseException e) {
// "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
if (reportParseExceptions) {
if (getReportParseExceptions()) {
throw new ParseException(e, "Encountered parse error for aggregator[%s]", getMetricAggs()[i].getName());
} else {
log.debug(e, "Encountered parse error, skipping aggregator[%s].", getMetricAggs()[i].getName());
@ -240,7 +237,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
}
}
rowContainer.set(null);
return new AddToFactsResult(numEntries.get(), 0, new ArrayList<>());
return new AddToFactsResult(getNumEntries().get(), 0, new ArrayList<>());
}
@Override

View File

@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
*
*/
public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
{
@ -105,7 +106,8 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
{
long maxAggregatorIntermediateSize = Integer.BYTES * incrementalIndexSchema.getMetrics().length;
maxAggregatorIntermediateSize += Arrays.stream(incrementalIndexSchema.getMetrics())
.mapToLong(aggregator -> aggregator.getMaxIntermediateSizeWithNulls() + Long.BYTES * 2)
.mapToLong(aggregator -> aggregator.getMaxIntermediateSizeWithNulls()
+ Long.BYTES * 2)
.sum();
return maxAggregatorIntermediateSize;
}
@ -140,12 +142,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
@Override
protected AddToFactsResult addToFacts(
AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
InputRow row,
AtomicInteger numEntries,
AtomicLong sizeInBytes,
IncrementalIndexRow key,
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier,
@ -156,7 +153,9 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
final int priorIndex = facts.getPriorIndex(key);
Aggregator[] aggs;
final AggregatorFactory[] metrics = getMetrics();
final AtomicInteger numEntries = getNumEntries();
final AtomicLong sizeInBytes = getBytesInMemory();
if (IncrementalIndexRow.EMPTY_ROW_INDEX != priorIndex) {
aggs = concurrentGet(priorIndex);
parseExceptionMessages = doAggregate(metrics, aggs, rowContainer, row);
@ -301,7 +300,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
{
final boolean countCheck = size() < maxRowCount;
// if maxBytesInMemory = -1, then ignore sizeCheck
final boolean sizeCheck = maxBytesInMemory <= 0 || getBytesInMemory() < maxBytesInMemory;
final boolean sizeCheck = maxBytesInMemory <= 0 || getBytesInMemory().get() < maxBytesInMemory;
final boolean canAdd = countCheck && sizeCheck;
if (!countCheck && !sizeCheck) {
outOfRowsReason = StringUtils.format(

View File

@ -171,12 +171,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
@Override
protected AddToFactsResult addToFacts(
AggregatorFactory[] metrics,
boolean deserializeComplexMetrics,
boolean reportParseExceptions,
InputRow row,
AtomicInteger numEntries,
AtomicLong sizeInBytes,
IncrementalIndexRow key,
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier,
@ -187,7 +182,9 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
final Integer priorIdex = getFacts().getPriorIndex(key);
Aggregator[] aggs;
final AggregatorFactory[] metrics = getMetrics();
final AtomicInteger numEntries = getNumEntries();
final AtomicLong sizeInBytes = getBytesInMemory();
if (null != priorIdex) {
aggs = indexedMap.get(priorIdex);
} else {
@ -196,7 +193,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
for (int i = 0; i < metrics.length; i++) {
final AggregatorFactory agg = metrics[i];
aggs[i] = agg.factorize(
makeColumnSelectorFactory(agg, rowSupplier, deserializeComplexMetrics)
makeColumnSelectorFactory(agg, rowSupplier, getDeserializeComplexMetrics())
);
}
Integer rowIndex;
@ -233,7 +230,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
}
catch (ParseException e) {
// "aggregate" can throw ParseExceptions if a selector expects something but gets something else.
if (reportParseExceptions) {
if (getReportParseExceptions()) {
throw e;
}
}

View File

@ -279,7 +279,7 @@ public class Sink implements Iterable<FireHydrant>
return 0;
}
return currHydrant.getIndex().getBytesInMemory();
return currHydrant.getIndex().getBytesInMemory().get();
}
}