Check against the real default of maxBytes(1/6 max mem) in AppenderatorImpl's add (#5758)

* The check for maxBytesInMemory should be >= 0 instead of > 0

* if the default value is 0, the actual check could be skipped
* fix the message for persistReasons

* Address PR comments

* if maxBytes set -1, make is Long.MAX_VAL, so we do not need to check if it's 0 or -1
* set the maxBytesTuningconfig in AppenderatorImpl constructor to avoid duplicate code

* fix the failing test cases

* Address PR comments
This commit is contained in:
Surekha 2018-05-09 13:41:51 -07:00 committed by Gian Merlino
parent c7a59394e0
commit 2f8904e25f
3 changed files with 18 additions and 13 deletions

View File

@ -77,7 +77,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
{ {
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, concurrentEventAdd); super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, concurrentEventAdd);
this.maxRowCount = maxRowCount; this.maxRowCount = maxRowCount;
this.maxBytesInMemory = maxBytesInMemory == 0 ? -1 : maxBytesInMemory; this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : maxBytesInMemory;
this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions()) this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions())
: new PlainFactsHolder(sortFacts); : new PlainFactsHolder(sortFacts);
maxBytesPerRowForAggregators = getMaxBytesPerRowForAggregators(incrementalIndexSchema); maxBytesPerRowForAggregators = getMaxBytesPerRowForAggregators(incrementalIndexSchema);
@ -170,7 +170,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
concurrentSet(rowIndex, aggs); concurrentSet(rowIndex, aggs);
// Last ditch sanity checks // Last ditch sanity checks
if ((numEntries.get() >= maxRowCount || (maxBytesInMemory > 0 && sizeInBytes.get() >= maxBytesInMemory)) if ((numEntries.get() >= maxRowCount || sizeInBytes.get() >= maxBytesInMemory)
&& facts.getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX && facts.getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX
&& !skipMaxRowsInMemoryCheck) { && !skipMaxRowsInMemoryCheck) {
throw new IndexSizeExceededException( throw new IndexSizeExceededException(

View File

@ -29,6 +29,12 @@ public class TuningConfigs
// In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting // In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting
// maxBytes to max jvm memory of the process that starts first. Instead we set the default based on // maxBytes to max jvm memory of the process that starts first. Instead we set the default based on
// the actual task node's jvm memory. // the actual task node's jvm memory.
return maxBytesInMemory == 0 ? TuningConfig.DEFAULT_MAX_BYTES_IN_MEMORY : maxBytesInMemory; long newMaxBytesInMemory = maxBytesInMemory;
if (maxBytesInMemory == 0) {
newMaxBytesInMemory = TuningConfig.DEFAULT_MAX_BYTES_IN_MEMORY;
} else if (maxBytesInMemory < 0) {
newMaxBytesInMemory = Long.MAX_VALUE;
}
return newMaxBytesInMemory;
} }
} }

View File

@ -126,6 +126,7 @@ public class AppenderatorImpl implements Appenderator
private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<>( private final VersionedIntervalTimeline<String, Sink> sinkTimeline = new VersionedIntervalTimeline<>(
String.CASE_INSENSITIVE_ORDER String.CASE_INSENSITIVE_ORDER
); );
private final long maxBytesTuningConfig;
private final QuerySegmentWalker texasRanger; private final QuerySegmentWalker texasRanger;
// This variable updated in add(), persist(), and drop() // This variable updated in add(), persist(), and drop()
@ -182,7 +183,7 @@ public class AppenderatorImpl implements Appenderator
Preconditions.checkNotNull(cache, "cache"), Preconditions.checkNotNull(cache, "cache"),
cacheConfig cacheConfig
); );
maxBytesTuningConfig = TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory());
log.info("Created Appenderator for dataSource[%s].", schema.getDataSource()); log.info("Created Appenderator for dataSource[%s].", schema.getDataSource());
} }
@ -260,7 +261,7 @@ public class AppenderatorImpl implements Appenderator
if (System.currentTimeMillis() > nextFlush) { if (System.currentTimeMillis() > nextFlush) {
persist = true; persist = true;
persistReasons.add(StringUtils.format( persistReasons.add(StringUtils.format(
" current time[%d] is greater than nextFlush[%d],", "current time[%d] is greater than nextFlush[%d]",
System.currentTimeMillis(), System.currentTimeMillis(),
nextFlush nextFlush
)); ));
@ -268,19 +269,17 @@ public class AppenderatorImpl implements Appenderator
if (rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) { if (rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) {
persist = true; persist = true;
persistReasons.add(StringUtils.format( persistReasons.add(StringUtils.format(
" rowsCurrentlyInMemory[%d] is greater than maxRowsInMemory[%d],", "rowsCurrentlyInMemory[%d] is greater than maxRowsInMemory[%d]",
rowsCurrentlyInMemory.get(), rowsCurrentlyInMemory.get(),
tuningConfig.getMaxRowsInMemory() tuningConfig.getMaxRowsInMemory()
)); ));
} }
if (tuningConfig.getMaxBytesInMemory() > 0 if (bytesCurrentlyInMemory.get() >= maxBytesTuningConfig) {
&& bytesCurrentlyInMemory.get()
>= TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory())) {
persist = true; persist = true;
persistReasons.add(StringUtils.format( persistReasons.add(StringUtils.format(
" bytesCurrentlyInMemory[%d] is greater than maxBytesInMemory[%d]", "bytesCurrentlyInMemory[%d] is greater than maxBytesInMemory[%d]",
bytesCurrentlyInMemory.get(), bytesCurrentlyInMemory.get(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()) maxBytesTuningConfig
)); ));
} }
if (persist) { if (persist) {
@ -354,7 +353,7 @@ public class AppenderatorImpl implements Appenderator
identifier.getShardSpec(), identifier.getShardSpec(),
identifier.getVersion(), identifier.getVersion(),
tuningConfig.getMaxRowsInMemory(), tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), maxBytesTuningConfig,
tuningConfig.isReportParseExceptions(), tuningConfig.isReportParseExceptions(),
null null
); );
@ -1026,7 +1025,7 @@ public class AppenderatorImpl implements Appenderator
identifier.getShardSpec(), identifier.getShardSpec(),
identifier.getVersion(), identifier.getVersion(),
tuningConfig.getMaxRowsInMemory(), tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), maxBytesTuningConfig,
tuningConfig.isReportParseExceptions(), tuningConfig.isReportParseExceptions(),
null, null,
hydrants hydrants