diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index cbe9ba48bc9..5404aa07147 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -77,7 +77,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex { super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, concurrentEventAdd); this.maxRowCount = maxRowCount; - this.maxBytesInMemory = maxBytesInMemory == 0 ? -1 : maxBytesInMemory; + this.maxBytesInMemory = maxBytesInMemory == 0 ? Long.MAX_VALUE : maxBytesInMemory; this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions()) : new PlainFactsHolder(sortFacts); maxBytesPerRowForAggregators = getMaxBytesPerRowForAggregators(incrementalIndexSchema); @@ -170,7 +170,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex concurrentSet(rowIndex, aggs); // Last ditch sanity checks - if ((numEntries.get() >= maxRowCount || (maxBytesInMemory > 0 && sizeInBytes.get() >= maxBytesInMemory)) + if ((numEntries.get() >= maxRowCount || sizeInBytes.get() >= maxBytesInMemory) && facts.getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX && !skipMaxRowsInMemoryCheck) { throw new IndexSizeExceededException( diff --git a/server/src/main/java/io/druid/segment/indexing/TuningConfigs.java b/server/src/main/java/io/druid/segment/indexing/TuningConfigs.java index 571005f4370..bc4df16d1c1 100644 --- a/server/src/main/java/io/druid/segment/indexing/TuningConfigs.java +++ b/server/src/main/java/io/druid/segment/indexing/TuningConfigs.java @@ -29,6 +29,12 @@ public class TuningConfigs // In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting // maxBytes to max jvm memory of the process that starts first. Instead we set the default based on // the actual task node's jvm memory. - return maxBytesInMemory == 0 ? TuningConfig.DEFAULT_MAX_BYTES_IN_MEMORY : maxBytesInMemory; + long newMaxBytesInMemory = maxBytesInMemory; + if (maxBytesInMemory == 0) { + newMaxBytesInMemory = TuningConfig.DEFAULT_MAX_BYTES_IN_MEMORY; + } else if (maxBytesInMemory < 0) { + newMaxBytesInMemory = Long.MAX_VALUE; + } + return newMaxBytesInMemory; } } diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index e5c2954f2d6..a9eb40611d0 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -126,6 +126,7 @@ public class AppenderatorImpl implements Appenderator private final VersionedIntervalTimeline sinkTimeline = new VersionedIntervalTimeline<>( String.CASE_INSENSITIVE_ORDER ); + private final long maxBytesTuningConfig; private final QuerySegmentWalker texasRanger; // This variable updated in add(), persist(), and drop() @@ -182,7 +183,7 @@ public class AppenderatorImpl implements Appenderator Preconditions.checkNotNull(cache, "cache"), cacheConfig ); - + maxBytesTuningConfig = TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()); log.info("Created Appenderator for dataSource[%s].", schema.getDataSource()); } @@ -260,7 +261,7 @@ public class AppenderatorImpl implements Appenderator if (System.currentTimeMillis() > nextFlush) { persist = true; persistReasons.add(StringUtils.format( - " current time[%d] is greater than nextFlush[%d],", + "current time[%d] is greater than nextFlush[%d]", System.currentTimeMillis(), nextFlush )); @@ -268,19 +269,17 @@ public class AppenderatorImpl implements Appenderator if (rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) { persist = true; persistReasons.add(StringUtils.format( - " rowsCurrentlyInMemory[%d] is greater than maxRowsInMemory[%d],", + "rowsCurrentlyInMemory[%d] is greater than maxRowsInMemory[%d]", rowsCurrentlyInMemory.get(), tuningConfig.getMaxRowsInMemory() )); } - if (tuningConfig.getMaxBytesInMemory() > 0 - && bytesCurrentlyInMemory.get() - >= TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory())) { + if (bytesCurrentlyInMemory.get() >= maxBytesTuningConfig) { persist = true; persistReasons.add(StringUtils.format( - " bytesCurrentlyInMemory[%d] is greater than maxBytesInMemory[%d]", + "bytesCurrentlyInMemory[%d] is greater than maxBytesInMemory[%d]", bytesCurrentlyInMemory.get(), - TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()) + maxBytesTuningConfig )); } if (persist) { @@ -354,7 +353,7 @@ public class AppenderatorImpl implements Appenderator identifier.getShardSpec(), identifier.getVersion(), tuningConfig.getMaxRowsInMemory(), - TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), + maxBytesTuningConfig, tuningConfig.isReportParseExceptions(), null ); @@ -1026,7 +1025,7 @@ public class AppenderatorImpl implements Appenderator identifier.getShardSpec(), identifier.getVersion(), tuningConfig.getMaxRowsInMemory(), - TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), + maxBytesTuningConfig, tuningConfig.isReportParseExceptions(), null, hydrants