diff --git a/docs/content/development/extensions-core/kafka-ingestion.md b/docs/content/development/extensions-core/kafka-ingestion.md index 2c09b64e15f..ed2e594d425 100644 --- a/docs/content/development/extensions-core/kafka-ingestion.md +++ b/docs/content/development/extensions-core/kafka-ingestion.md @@ -115,7 +115,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |Field|Type|Description|Required| |-----|----|-----------|--------| |`type`|String|The indexing task type, this should always be `kafka`.|yes| -|`maxRowsInMemory`|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 75000)| +|`maxRowsInMemory`|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists). Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|no (default == 1000000)| +|`maxBytesInMemory`|Long|The number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. Normally this is computed internally and user does not need to set it. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists). |no (default == One-sixth of max JVM memory)| |`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == 5000000)| |`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)| |`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)| diff --git a/docs/content/ingestion/batch-ingestion.md b/docs/content/ingestion/batch-ingestion.md index 743ead5b071..f0f3040cb98 100644 --- a/docs/content/ingestion/batch-ingestion.md +++ b/docs/content/ingestion/batch-ingestion.md @@ -154,7 +154,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |workingPath|String|The working path to use for intermediate results (results between Hadoop jobs).|no (default == '/tmp/druid-indexing')| |version|String|The version of created segments. Ignored for HadoopIndexTask unless useExplicitVersion is set to true|no (default == datetime that indexing starts at)| |partitionsSpec|Object|A specification of how to partition each time bucket into segments. Absence of this property means no partitioning will occur. See 'Partitioning specification' below.|no (default == 'hashed')| -|maxRowsInMemory|Integer|The number of rows to aggregate before persisting. Note that this is the number of post-aggregation rows which may not be equal to the number of input events due to roll-up. This is used to manage the required JVM heap size.|no (default == 75000)| +|maxRowsInMemory|Integer|The number of rows to aggregate before persisting. Note that this is the number of post-aggregation rows which may not be equal to the number of input events due to roll-up. This is used to manage the required JVM heap size. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|no (default == 1000000)| +|maxBytesInMemory|Long|The number of bytes to aggregate in heap memory before persisting. Normally this is computed internally and user does not need to set it. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists).|no (default == One-sixth of max JVM memory)| |leaveIntermediate|Boolean|Leave behind intermediate files (for debugging) in the workingPath when a job completes, whether it passes or fails.|no (default == false)| |cleanupOnFailure|Boolean|Clean up intermediate files when a job fails (unless leaveIntermediate is on).|no (default == true)| |overwriteFiles|Boolean|Override existing files found during indexing.|no (default == false)| diff --git a/docs/content/ingestion/stream-pull.md b/docs/content/ingestion/stream-pull.md index e5797527687..1bd33caad82 100644 --- a/docs/content/ingestion/stream-pull.md +++ b/docs/content/ingestion/stream-pull.md @@ -92,7 +92,7 @@ The property `druid.realtime.specFile` has the path of a file (absolute or relat }, "tuningConfig": { "type" : "realtime", - "maxRowsInMemory": 75000, + "maxRowsInMemory": 1000000, "intermediatePersistPeriod": "PT10m", "windowPeriod": "PT10m", "basePersistDirectory": "\/tmp\/realtime\/basePersist", @@ -141,7 +141,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |Field|Type|Description|Required| |-----|----|-----------|--------| |type|String|This should always be 'realtime'.|no| -|maxRowsInMemory|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 75000)| +|maxRowsInMemory|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 1000000)| +|maxBytesInMemory|Long|The maximum number of bytes to keep in memory to aggregate before persisting. This is used to manage the required JVM heap size.|no (default == One-sixth of max JVM memory)| |windowPeriod|ISO 8601 Period String|The amount of lag time to allow events. This is configured with a 10 minute window, meaning that any event more than 10 minutes ago will be thrown away and not included in the segment generated by the realtime server.|no (default == PT10m)| |intermediatePersistPeriod|ISO8601 Period String|The period that determines the rate at which intermediate persists occur. These persists determine how often commits happen against the incoming realtime stream. If the realtime data loading process is interrupted at time T, it should be restarted to re-read data that arrived at T minus this period.|no (default == PT10m)| |basePersistDirectory|String|The directory to put things that need persistence. The plumber is responsible for the actual intermediate persists and this tells it where to store those persists.|no (default == java tmp dir)| @@ -287,7 +288,8 @@ The following table summarizes constraints between settings in the spec file for |segmentGranularity| Time granularity (minute, hour, day, week, month) for loading data at query time | equal to indexGranularity| more than queryGranularity| |queryGranularity| Time granularity (minute, hour, day, week, month) for rollup | less than segmentGranularity| minute, hour, day, week, month | |intermediatePersistPeriod| The max time (ISO8601 Period) between flushes of ingested rows from memory to disk | avoid excessive flushing | number of un-persisted rows in memory also constrained by maxRowsInMemory | -|maxRowsInMemory| The max number of ingested rows to hold in memory before a flush to disk | number of un-persisted post-aggregation rows in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod | +|maxRowsInMemory| The max number of ingested rows to hold in memory before a flush to disk. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set| number of un-persisted post-aggregation rows in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod | +|maxBytesInMemory| The number of bytes to keep in memory before a flush to disk. Normally this is computed internally and user does not need to set it. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)| number of un-persisted post-aggregation bytes in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod | The normal, expected use cases have the following overall constraints: `intermediatePersistPeriod ≤ windowPeriod < segmentGranularity` and `queryGranularity ≤ segmentGranularity` diff --git a/docs/content/ingestion/tasks.md b/docs/content/ingestion/tasks.md index 4d424efcadb..22e4cfddae2 100644 --- a/docs/content/ingestion/tasks.md +++ b/docs/content/ingestion/tasks.md @@ -77,7 +77,7 @@ The Index Task is a simpler variation of the Index Hadoop task that is designed "tuningConfig" : { "type" : "index", "targetPartitionSize" : 5000000, - "maxRowsInMemory" : 75000 + "maxRowsInMemory" : 1000000 } } } @@ -137,7 +137,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon |--------|-----------|-------|---------| |type|The task type, this should always be "index".|none|yes| |targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|5000000|no| -|maxRowsInMemory|Used in determining when intermediate persists to disk should occur.|75000|no| +|maxRowsInMemory|Used in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|1000000|no| +|maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no| |maxTotalRows|Total number of rows in segments waiting for being published. Used in determining when intermediate publish should occur.|150000|no| |numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if targetPartitionSize is set.|null|no| |indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no| diff --git a/examples/conf-quickstart/tranquility/kafka.json b/examples/conf-quickstart/tranquility/kafka.json index ffe537870d5..38858d25784 100644 --- a/examples/conf-quickstart/tranquility/kafka.json +++ b/examples/conf-quickstart/tranquility/kafka.json @@ -53,7 +53,6 @@ }, "tuningConfig" : { "type" : "realtime", - "maxRowsInMemory" : "100000", "intermediatePersistPeriod" : "PT10M", "windowPeriod" : "PT10M" } diff --git a/examples/conf-quickstart/tranquility/server.json b/examples/conf-quickstart/tranquility/server.json index cbc1d14e772..a17f7166550 100644 --- a/examples/conf-quickstart/tranquility/server.json +++ b/examples/conf-quickstart/tranquility/server.json @@ -53,7 +53,6 @@ }, "tuningConfig" : { "type" : "realtime", - "maxRowsInMemory" : "100000", "intermediatePersistPeriod" : "PT10M", "windowPeriod" : "PT10M" } diff --git a/examples/conf/tranquility/kafka.json b/examples/conf/tranquility/kafka.json index b0adb324893..fb7c9aeb4ca 100644 --- a/examples/conf/tranquility/kafka.json +++ b/examples/conf/tranquility/kafka.json @@ -53,7 +53,6 @@ }, "tuningConfig" : { "type" : "realtime", - "maxRowsInMemory" : "100000", "intermediatePersistPeriod" : "PT10M", "windowPeriod" : "PT10M" } diff --git a/examples/conf/tranquility/server.json b/examples/conf/tranquility/server.json index dabeed18228..ff810f72fbb 100644 --- a/examples/conf/tranquility/server.json +++ b/examples/conf/tranquility/server.json @@ -53,7 +53,6 @@ }, "tuningConfig" : { "type" : "realtime", - "maxRowsInMemory" : "100000", "intermediatePersistPeriod" : "PT10M", "windowPeriod" : "PT10M" } diff --git a/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java index 46b5e4d01e7..816cfd4f2de 100644 --- a/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java +++ b/extensions-contrib/orc-extensions/src/test/java/io/druid/data/input/orc/OrcIndexGeneratorJobTest.java @@ -222,6 +222,7 @@ public class OrcIndexGeneratorJobTest null, null, null, + null, false, false, false, diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java index 4c027764636..a31fb1a92b3 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/KafkaTuningConfig.java @@ -38,6 +38,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false; private final int maxRowsInMemory; + private final long maxBytesInMemory; private final int maxRowsPerSegment; private final Period intermediatePersistPeriod; private final File basePersistDirectory; @@ -58,6 +59,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig @JsonCreator public KafkaTuningConfig( @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, + @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod, @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory, @@ -80,6 +82,9 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory; this.maxRowsPerSegment = maxRowsPerSegment == null ? DEFAULT_MAX_ROWS_PER_SEGMENT : maxRowsPerSegment; + // initializing this to 0, it will be lazily initialized to a value + // @see server.src.main.java.io.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) + this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; this.intermediatePersistPeriod = intermediatePersistPeriod == null ? defaults.getIntermediatePersistPeriod() : intermediatePersistPeriod; @@ -116,6 +121,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig { return new KafkaTuningConfig( config.maxRowsInMemory, + config.maxBytesInMemory, config.maxRowsPerSegment, config.intermediatePersistPeriod, config.basePersistDirectory, @@ -140,6 +146,13 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig return maxRowsInMemory; } + @Override + @JsonProperty + public long getMaxBytesInMemory() + { + return maxBytesInMemory; + } + @JsonProperty public int getMaxRowsPerSegment() { @@ -240,6 +253,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig { return new KafkaTuningConfig( maxRowsInMemory, + maxBytesInMemory, maxRowsPerSegment, intermediatePersistPeriod, dir, @@ -269,6 +283,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig KafkaTuningConfig that = (KafkaTuningConfig) o; return maxRowsInMemory == that.maxRowsInMemory && maxRowsPerSegment == that.maxRowsPerSegment && + maxBytesInMemory == that.maxBytesInMemory && maxPendingPersists == that.maxPendingPersists && reportParseExceptions == that.reportParseExceptions && handoffConditionTimeout == that.handoffConditionTimeout && @@ -289,6 +304,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig return Objects.hash( maxRowsInMemory, maxRowsPerSegment, + maxBytesInMemory, intermediatePersistPeriod, basePersistDirectory, maxPendingPersists, @@ -310,6 +326,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig return "KafkaTuningConfig{" + "maxRowsInMemory=" + maxRowsInMemory + ", maxRowsPerSegment=" + maxRowsPerSegment + + ", maxBytesInMemory=" + maxBytesInMemory + ", intermediatePersistPeriod=" + intermediatePersistPeriod + ", basePersistDirectory=" + basePersistDirectory + ", maxPendingPersists=" + maxPendingPersists + diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java index c796b3eb9b6..53678810cdc 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java @@ -93,6 +93,7 @@ public class KafkaSupervisorSpec implements SupervisorSpec null, null, null, + null, null ); this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig"); diff --git a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java index 4467a65d4e1..4fb2b2409b0 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java @@ -21,6 +21,7 @@ package io.druid.indexing.kafka.supervisor; import com.fasterxml.jackson.annotation.JsonProperty; import io.druid.indexing.kafka.KafkaTuningConfig; +import io.druid.segment.indexing.TuningConfigs; import io.druid.segment.writeout.SegmentWriteOutMediumFactory; import io.druid.segment.IndexSpec; import org.joda.time.Duration; @@ -40,6 +41,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig public KafkaSupervisorTuningConfig( @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, + @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment, @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod, @JsonProperty("basePersistDirectory") File basePersistDirectory, @@ -65,6 +67,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig { super( maxRowsInMemory, + maxBytesInMemory, maxRowsPerSegment, intermediatePersistPeriod, basePersistDirectory, @@ -131,6 +134,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig return "KafkaSupervisorTuningConfig{" + "maxRowsInMemory=" + getMaxRowsInMemory() + ", maxRowsPerSegment=" + getMaxRowsPerSegment() + + ", maxBytesInMemory=" + TuningConfigs.getMaxBytesInMemoryOrDefault(getMaxBytesInMemory()) + ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() + ", basePersistDirectory=" + getBasePersistDirectory() + ", maxPendingPersists=" + getMaxPendingPersists() + diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java index 511fb4da20f..e3b15b0c195 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaIndexTaskTest.java @@ -1965,6 +1965,7 @@ public class KafkaIndexTaskTest { final KafkaTuningConfig tuningConfig = new KafkaTuningConfig( 1000, + null, maxRowsPerSegment, new Period("P1Y"), null, @@ -2007,6 +2008,7 @@ public class KafkaIndexTaskTest { final KafkaTuningConfig tuningConfig = new KafkaTuningConfig( 1000, + null, maxRowsPerSegment, new Period("P1Y"), null, diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java index 78916e7d27b..04a07b81ed7 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/KafkaTuningConfigTest.java @@ -56,7 +56,7 @@ public class KafkaTuningConfigTest ); Assert.assertNotNull(config.getBasePersistDirectory()); - Assert.assertEquals(75000, config.getMaxRowsInMemory()); + Assert.assertEquals(1000000, config.getMaxRowsInMemory()); Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment()); Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(0, config.getMaxPendingPersists()); @@ -103,6 +103,7 @@ public class KafkaTuningConfigTest { KafkaTuningConfig original = new KafkaTuningConfig( 1, + null, 2, new Period("PT3S"), new File("/tmp/xxx"), diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index c44425c17e9..cbea2344b22 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -185,6 +185,7 @@ public class KafkaSupervisorTest extends EasyMockSupport tuningConfig = new KafkaSupervisorTuningConfig( 1000, + null, 50000, new Period("P1Y"), new File("/test"), diff --git a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java index 34477c14372..36fe290d2a2 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/io/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java @@ -58,7 +58,7 @@ public class KafkaSupervisorTuningConfigTest ); Assert.assertNotNull(config.getBasePersistDirectory()); - Assert.assertEquals(75000, config.getMaxRowsInMemory()); + Assert.assertEquals(1000000, config.getMaxRowsInMemory()); Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment()); Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(0, config.getMaxPendingPersists()); diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java index a997e40d299..babc1bc515c 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/HadoopTuningConfig.java @@ -43,7 +43,7 @@ public class HadoopTuningConfig implements TuningConfig private static final PartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec(); private static final Map> DEFAULT_SHARD_SPECS = ImmutableMap.of(); private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec(); - private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 75000; + private static final int DEFAULT_ROW_FLUSH_BOUNDARY = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY; private static final boolean DEFAULT_USE_COMBINER = false; private static final int DEFAULT_NUM_BACKGROUND_PERSIST_THREADS = 0; @@ -56,6 +56,7 @@ public class HadoopTuningConfig implements TuningConfig DEFAULT_SHARD_SPECS, DEFAULT_INDEX_SPEC, DEFAULT_ROW_FLUSH_BOUNDARY, + 0L, false, true, false, @@ -80,6 +81,7 @@ public class HadoopTuningConfig implements TuningConfig private final Map> shardSpecs; private final IndexSpec indexSpec; private final int rowFlushBoundary; + private final long maxBytesInMemory; private final boolean leaveIntermediate; private final Boolean cleanupOnFailure; private final boolean overwriteFiles; @@ -102,6 +104,7 @@ public class HadoopTuningConfig implements TuningConfig final @JsonProperty("shardSpecs") Map> shardSpecs, final @JsonProperty("indexSpec") IndexSpec indexSpec, final @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, + final @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, final @JsonProperty("leaveIntermediate") boolean leaveIntermediate, final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure, final @JsonProperty("overwriteFiles") boolean overwriteFiles, @@ -129,6 +132,9 @@ public class HadoopTuningConfig implements TuningConfig this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null ? DEFAULT_ROW_FLUSH_BOUNDARY : maxRowsInMemoryCOMPAT : maxRowsInMemory; + // initializing this to 0, it will be lazily initialized to a value + // @see server.src.main.java.io.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) + this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; this.leaveIntermediate = leaveIntermediate; this.cleanupOnFailure = cleanupOnFailure == null ? true : cleanupOnFailure; this.overwriteFiles = overwriteFiles; @@ -190,6 +196,12 @@ public class HadoopTuningConfig implements TuningConfig return rowFlushBoundary; } + @JsonProperty + public long getMaxBytesInMemory() + { + return maxBytesInMemory; + } + @JsonProperty public boolean isLeaveIntermediate() { @@ -288,6 +300,7 @@ public class HadoopTuningConfig implements TuningConfig shardSpecs, indexSpec, rowFlushBoundary, + maxBytesInMemory, leaveIntermediate, cleanupOnFailure, overwriteFiles, @@ -315,6 +328,7 @@ public class HadoopTuningConfig implements TuningConfig shardSpecs, indexSpec, rowFlushBoundary, + maxBytesInMemory, leaveIntermediate, cleanupOnFailure, overwriteFiles, @@ -342,6 +356,7 @@ public class HadoopTuningConfig implements TuningConfig specs, indexSpec, rowFlushBoundary, + maxBytesInMemory, leaveIntermediate, cleanupOnFailure, overwriteFiles, diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java index a848714c8eb..077a642d44d 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/IndexGeneratorJob.java @@ -51,6 +51,7 @@ import io.druid.segment.QueryableIndex; import io.druid.segment.column.ColumnCapabilitiesImpl; import io.druid.segment.incremental.IncrementalIndex; import io.druid.segment.incremental.IncrementalIndexSchema; +import io.druid.segment.indexing.TuningConfigs; import io.druid.timeline.DataSegment; import io.druid.timeline.partition.NumberedShardSpec; import io.druid.timeline.partition.ShardSpec; @@ -288,6 +289,7 @@ public class IndexGeneratorJob implements Jobby .setIndexSchema(indexSchema) .setReportParseExceptions(!tuningConfig.isIgnoreInvalidRows()) .setMaxRowCount(tuningConfig.getRowFlushBoundary()) + .setMaxBytesInMemory(TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory())) .buildOnheap(); if (oldDimOrder != null && !indexSchema.getDimensionsSpec().hasCustomDimensions()) { diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java index 3970e23fc1b..88a797cc480 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/BatchDeltaIngestionTest.java @@ -480,6 +480,7 @@ public class BatchDeltaIngestionTest null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java index 8d656f20d4b..7f1e76f547e 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DetermineHashedPartitionsJobTest.java @@ -198,6 +198,7 @@ public class DetermineHashedPartitionsJobTest null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java index 90842519878..f96c5957db0 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/DeterminePartitionsJobTest.java @@ -259,6 +259,7 @@ public class DeterminePartitionsJobTest null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java index 2ed052eafec..6678b13d075 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopDruidIndexerConfigTest.java @@ -84,6 +84,7 @@ public class HadoopDruidIndexerConfigTest ImmutableMap.of(DateTimes.of("2010-01-01T01:00:00").getMillis(), specs), null, null, + null, false, false, false, @@ -160,6 +161,7 @@ public class HadoopDruidIndexerConfigTest ), null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java index 9e4a26a22b4..b389d8ecc91 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/HadoopTuningConfigTest.java @@ -46,6 +46,7 @@ public class HadoopTuningConfigTest null, null, 100, + null, true, true, true, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java index 1b422b6c223..af1d7d944b7 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/IndexGeneratorJobTest.java @@ -91,7 +91,7 @@ public class IndexGeneratorJobTest @Parameterized.Parameters(name = "useCombiner={0}, partitionType={1}, interval={2}, shardInfoForEachSegment={3}, " + "data={4}, inputFormatName={5}, inputRowParser={6}, maxRowsInMemory={7}, " + - "aggs={8}, datasourceName={9}, forceExtendableShardSpecs={10}") + "maxBytesInMemory={8}, aggs={9}, datasourceName={10}, forceExtendableShardSpecs={11}") public static Collection constructFeed() { final List baseConstructors = Arrays.asList( @@ -151,6 +151,7 @@ public class IndexGeneratorJobTest null ), null, + null, aggs1, "website" }, @@ -198,6 +199,7 @@ public class IndexGeneratorJobTest ) ), null, + null, aggs1, "website" }, @@ -246,6 +248,7 @@ public class IndexGeneratorJobTest null ), null, + null, aggs1, "website" }, @@ -303,6 +306,7 @@ public class IndexGeneratorJobTest ) ), null, + null, aggs1, "website" }, @@ -335,6 +339,7 @@ public class IndexGeneratorJobTest null ), 1, // force 1 row max per index for easier testing + null, aggs2, "inherit_dims" }, @@ -367,6 +372,7 @@ public class IndexGeneratorJobTest null ), 1, // force 1 row max per index for easier testing + null, aggs2, "inherit_dims2" } @@ -398,6 +404,7 @@ public class IndexGeneratorJobTest private final String inputFormatName; private final InputRowParser inputRowParser; private final Integer maxRowsInMemory; + private final Long maxBytesInMemory; private final AggregatorFactory[] aggs; private final String datasourceName; private final boolean forceExtendableShardSpecs; @@ -416,6 +423,7 @@ public class IndexGeneratorJobTest String inputFormatName, InputRowParser inputRowParser, Integer maxRowsInMemory, + Long maxBytesInMemory, AggregatorFactory[] aggs, String datasourceName, boolean forceExtendableShardSpecs @@ -429,6 +437,7 @@ public class IndexGeneratorJobTest this.inputFormatName = inputFormatName; this.inputRowParser = inputRowParser; this.maxRowsInMemory = maxRowsInMemory; + this.maxBytesInMemory = maxBytesInMemory; this.aggs = aggs; this.datasourceName = datasourceName; this.forceExtendableShardSpecs = forceExtendableShardSpecs; @@ -511,6 +520,7 @@ public class IndexGeneratorJobTest null, null, maxRowsInMemory, + maxBytesInMemory, false, false, false, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java index c768e2c8e10..a7613249d05 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/JobHelperTest.java @@ -108,6 +108,7 @@ public class JobHelperTest null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java index b4caeed21f4..a24d2615dbc 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java @@ -61,6 +61,7 @@ public class GranularityPathSpecTest null, null, null, + null, false, false, false, diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java index c8d76354434..e673c3e661e 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/updater/HadoopConverterJobTest.java @@ -200,6 +200,7 @@ public class HadoopConverterJobTest null, null, null, + null, false, false, false, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java b/indexing-service/src/main/java/io/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java index 06c6069ae00..800c0300d43 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java @@ -37,7 +37,7 @@ import java.io.File; @JsonTypeName("realtime_appenderator") public class RealtimeAppenderatorTuningConfig implements TuningConfig, AppenderatorConfig { - private static final int defaultMaxRowsInMemory = 75000; + private static final int defaultMaxRowsInMemory = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY; private static final int defaultMaxRowsPerSegment = 5_000_000; private static final Period defaultIntermediatePersistPeriod = new Period("PT10M"); private static final int defaultMaxPendingPersists = 0; @@ -52,8 +52,11 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera return Files.createTempDir(); } + + private final int maxRowsInMemory; private final int maxRowsPerSegment; + private final long maxBytesInMemory; private final Period intermediatePersistPeriod; private final File basePersistDirectory; private final int maxPendingPersists; @@ -73,6 +76,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera public RealtimeAppenderatorTuningConfig( @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, + @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod, @JsonProperty("basePersistDirectory") File basePersistDirectory, @JsonProperty("maxPendingPersists") Integer maxPendingPersists, @@ -89,6 +93,9 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera { this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; this.maxRowsPerSegment = maxRowsPerSegment == null ? defaultMaxRowsPerSegment : maxRowsPerSegment; + // initializing this to 0, it will be lazily intialized to a value + // @see server.src.main.java.io.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) + this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; this.intermediatePersistPeriod = intermediatePersistPeriod == null ? defaultIntermediatePersistPeriod : intermediatePersistPeriod; @@ -127,6 +134,12 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera return maxRowsInMemory; } + @Override + public long getMaxBytesInMemory() + { + return maxBytesInMemory; + } + @JsonProperty public int getMaxRowsPerSegment() { @@ -217,6 +230,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera return new RealtimeAppenderatorTuningConfig( maxRowsInMemory, maxRowsPerSegment, + maxBytesInMemory, intermediatePersistPeriod, dir, maxPendingPersists, diff --git a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java index 6b3cfa97866..91d8e6b133b 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/index/YeOldePlumberSchool.java @@ -43,6 +43,7 @@ import io.druid.segment.SegmentUtils; import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.TuningConfigs; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.FireHydrant; @@ -105,6 +106,7 @@ public class YeOldePlumberSchool implements PlumberSchool config.getShardSpec(), version, config.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()), config.isReportParseExceptions() ); diff --git a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java index 47bae5290e0..554969ca993 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/task/IndexTask.java @@ -76,9 +76,9 @@ import io.druid.segment.realtime.FireDepartmentMetricsTaskMetricsGetter; import io.druid.segment.realtime.RealtimeMetricsMonitor; import io.druid.segment.realtime.appenderator.Appenderator; import io.druid.segment.realtime.appenderator.AppenderatorConfig; -import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver; import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult; import io.druid.segment.realtime.appenderator.Appenderators; +import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver; import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver; import io.druid.segment.realtime.appenderator.SegmentAllocator; import io.druid.segment.realtime.appenderator.SegmentIdentifier; @@ -1231,7 +1231,6 @@ public class IndexTask extends AbstractTask implements ChatHandler @JsonTypeName("index") public static class IndexTuningConfig implements TuningConfig, AppenderatorConfig { - private static final int DEFAULT_MAX_ROWS_IN_MEMORY = 75_000; private static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000; private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec(); private static final int DEFAULT_MAX_PENDING_PERSISTS = 0; @@ -1244,6 +1243,7 @@ public class IndexTask extends AbstractTask implements ChatHandler private final Integer targetPartitionSize; private final int maxRowsInMemory; + private final long maxBytesInMemory; private final Long maxTotalRows; private final Integer numShards; private final IndexSpec indexSpec; @@ -1276,6 +1276,7 @@ public class IndexTask extends AbstractTask implements ChatHandler public IndexTuningConfig( @JsonProperty("targetPartitionSize") @Nullable Integer targetPartitionSize, @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, + @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, @JsonProperty("rowFlushBoundary") @Nullable Integer rowFlushBoundary_forBackCompatibility, // DEPRECATED @JsonProperty("numShards") @Nullable Integer numShards, @@ -1297,6 +1298,7 @@ public class IndexTask extends AbstractTask implements ChatHandler this( targetPartitionSize, maxRowsInMemory != null ? maxRowsInMemory : rowFlushBoundary_forBackCompatibility, + maxBytesInMemory != null ? maxBytesInMemory : 0, maxTotalRows, numShards, indexSpec, @@ -1315,12 +1317,13 @@ public class IndexTask extends AbstractTask implements ChatHandler private IndexTuningConfig() { - this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); + this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null); } private IndexTuningConfig( @Nullable Integer targetPartitionSize, @Nullable Integer maxRowsInMemory, + @Nullable Long maxBytesInMemory, @Nullable Long maxTotalRows, @Nullable Integer numShards, @Nullable IndexSpec indexSpec, @@ -1342,7 +1345,10 @@ public class IndexTask extends AbstractTask implements ChatHandler ); this.targetPartitionSize = initializeTargetPartitionSize(numShards, targetPartitionSize); - this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory; + this.maxRowsInMemory = maxRowsInMemory == null ? TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory; + // initializing this to 0, it will be lazily initialized to a value + // @see server.src.main.java.io.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) + this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; this.maxTotalRows = initializeMaxTotalRows(numShards, maxTotalRows); this.numShards = numShards == null || numShards.equals(-1) ? null : numShards; this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec; @@ -1396,6 +1402,7 @@ public class IndexTask extends AbstractTask implements ChatHandler return new IndexTuningConfig( targetPartitionSize, maxRowsInMemory, + maxBytesInMemory, maxTotalRows, numShards, indexSpec, @@ -1425,6 +1432,13 @@ public class IndexTask extends AbstractTask implements ChatHandler return maxRowsInMemory; } + @JsonProperty + @Override + public long getMaxBytesInMemory() + { + return maxBytesInMemory; + } + @JsonProperty public Long getMaxTotalRows() { diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java index 5e09e978fb3..6da9807536e 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/AppenderatorDriverRealtimeIndexTaskTest.java @@ -711,6 +711,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest ) ); + // Wait for the task to finish. final TaskStatus taskStatus = statusFuture.get(); Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode()); @@ -1245,6 +1246,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest null, null, null, + null, reportParseExceptions, handoffTimeout, null, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java index cefe5015529..ebac68e8b88 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/CompactionTaskTest.java @@ -247,6 +247,7 @@ public class CompactionTaskTest 1000000L, null, null, + null, new IndexSpec( new RoaringBitmapSerdeFactory(true), CompressionStrategy.LZ4, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java index 6090e9012d5..b6d72494c59 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/IndexTaskTest.java @@ -631,7 +631,7 @@ public class IndexTaskTest Granularities.MINUTE, null ), - createTuningConfig(2, 2, 2L, null, false, false, true), + createTuningConfig(2, 2, null, 2L, null, false, false, true), false ), null, @@ -675,7 +675,7 @@ public class IndexTaskTest true, null ), - createTuningConfig(3, 2, 2L, null, false, true, true), + createTuningConfig(3, 2, null, 2L, null, false, true, true), false ), null, @@ -718,7 +718,7 @@ public class IndexTaskTest true, null ), - createTuningConfig(3, 2, 2L, null, false, false, true), + createTuningConfig(3, 2, null, 2L, null, false, false, true), false ), null, @@ -790,7 +790,7 @@ public class IndexTaskTest 0 ), null, - createTuningConfig(2, null, null, null, false, false, false), // ignore parse exception, + createTuningConfig(2, null, null, null, null, false, false, false), // ignore parse exception, false ); @@ -842,7 +842,7 @@ public class IndexTaskTest 0 ), null, - createTuningConfig(2, null, null, null, false, false, true), // report parse exception + createTuningConfig(2, null, null, null, null, false, false, true), // report parse exception false ); @@ -894,6 +894,7 @@ public class IndexTaskTest null, null, null, + null, indexSpec, null, true, @@ -1012,6 +1013,7 @@ public class IndexTaskTest null, null, null, + null, indexSpec, null, true, @@ -1117,6 +1119,7 @@ public class IndexTaskTest null, null, null, + null, indexSpec, null, true, @@ -1245,7 +1248,7 @@ public class IndexTaskTest 0 ), null, - createTuningConfig(2, 1, null, null, false, true, true), // report parse exception + createTuningConfig(2, 1, null, null, null, false, true, true), // report parse exception false ); @@ -1314,7 +1317,7 @@ public class IndexTaskTest 0 ), null, - createTuningConfig(2, null, null, null, false, false, true), // report parse exception + createTuningConfig(2, null, null, null, null, false, false, true), // report parse exception false ); @@ -1553,6 +1556,7 @@ public class IndexTaskTest targetPartitionSize, 1, null, + null, numShards, forceExtendableShardSpecs, forceGuaranteedRollup, @@ -1563,6 +1567,7 @@ public class IndexTaskTest private static IndexTuningConfig createTuningConfig( Integer targetPartitionSize, Integer maxRowsInMemory, + Long maxBytesInMemory, Long maxTotalRows, Integer numShards, boolean forceExtendableShardSpecs, @@ -1573,6 +1578,7 @@ public class IndexTaskTest return new IndexTask.IndexTuningConfig( targetPartitionSize, maxRowsInMemory, + maxBytesInMemory, maxTotalRows, null, numShards, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java index e7e44e4d841..9d02ed1c63e 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/RealtimeIndexTaskTest.java @@ -904,6 +904,7 @@ public class RealtimeIndexTaskTest ); RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig( 1000, + null, new Period("P1Y"), new Period("PT10M"), null, diff --git a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java index 589dcf3af91..545bb9ec08e 100644 --- a/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/common/task/TaskSerdeTest.java @@ -108,7 +108,7 @@ public class TaskSerdeTest Assert.assertEquals(new IndexSpec(), tuningConfig.getIndexSpec()); Assert.assertEquals(new Period(Integer.MAX_VALUE), tuningConfig.getIntermediatePersistPeriod()); Assert.assertEquals(0, tuningConfig.getMaxPendingPersists()); - Assert.assertEquals(75000, tuningConfig.getMaxRowsInMemory()); + Assert.assertEquals(1000000, tuningConfig.getMaxRowsInMemory()); Assert.assertEquals(null, tuningConfig.getNumShards()); Assert.assertEquals(5000000, (int) tuningConfig.getTargetPartitionSize()); } @@ -195,6 +195,7 @@ public class TaskSerdeTest 10000, 10, null, + null, 9999, null, indexSpec, @@ -280,6 +281,7 @@ public class TaskSerdeTest null, null, null, + null, indexSpec, 3, true, @@ -537,6 +539,7 @@ public class TaskSerdeTest new RealtimeTuningConfig( 1, + null, new Period("PT10M"), null, null, diff --git a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java index bd23760153c..091dff01dba 100644 --- a/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java +++ b/indexing-service/src/test/java/io/druid/indexing/overlord/TaskLifecycleTest.java @@ -674,6 +674,7 @@ public class TaskLifecycleTest null, null, null, + null, indexSpec, 3, true, @@ -752,6 +753,7 @@ public class TaskLifecycleTest null, null, null, + null, indexSpec, 3, true, @@ -1137,6 +1139,7 @@ public class TaskLifecycleTest null, null, null, + null, indexSpec, null, false, @@ -1260,6 +1263,7 @@ public class TaskLifecycleTest ); RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig( 1000, + null, new Period("P1Y"), null, //default window period of 10 minutes null, // base persist dir ignored by Realtime Index task diff --git a/processing/src/main/java/io/druid/segment/DimensionIndexer.java b/processing/src/main/java/io/druid/segment/DimensionIndexer.java index 444703abf65..4b762e5ba8f 100644 --- a/processing/src/main/java/io/druid/segment/DimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/DimensionIndexer.java @@ -127,6 +127,14 @@ public interface DimensionIndexer */ EncodedKeyComponentType processRowValsToUnsortedEncodedKeyComponent(Object dimValues, boolean reportParseExceptions); + /** + * Gives the estimated size in bytes for the given key + * + * @param key dimension value array from a TimeAndDims key + * + * @return the estimated size in bytes of the key + */ + long estimateEncodedKeyComponentSize(EncodedKeyComponentType key); /** * Given an encoded value that was ordered by associated actual value, return the equivalent diff --git a/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java b/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java index 7449e75df7a..423f3a6424c 100644 --- a/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/DoubleDimensionIndexer.java @@ -49,6 +49,12 @@ public class DoubleDimensionIndexer implements DimensionIndexer return ret == null ? DimensionHandlerUtils.ZERO_LONG : ret; } + @Override + public long estimateEncodedKeyComponentSize(Long key) + { + return Long.BYTES; + } + @Override public Long getUnsortedEncodedValueFromSorted(Long sortedIntermediateValue) { diff --git a/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java b/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java index ee892010212..df28aa43f2e 100644 --- a/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java +++ b/processing/src/main/java/io/druid/segment/StringDimensionIndexer.java @@ -270,6 +270,20 @@ public class StringDimensionIndexer implements DimensionIndexer dimLookup.getValue(element) != null) + .mapToLong(element -> dimLookup.getValue(element).length() * Character.BYTES) + .sum(); + return estimatedSize; + } + public Integer getSortedEncodedValueFromUnsorted(Integer unsortedIntermediateValue) { return sortedLookup().getSortedIdFromUnsortedId(unsortedIntermediateValue); diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java index 315c9c58ad8..f496d53a7de 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndex.java @@ -93,6 +93,7 @@ import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** */ @@ -237,6 +238,7 @@ public abstract class IncrementalIndex extends AbstractIndex imp private final List dimensionDescsList; private final Map columnCapabilities; private final AtomicInteger numEntries = new AtomicInteger(); + private final AtomicLong bytesInMemory = new AtomicLong(); // This is modified on add() in a critical section. private final ThreadLocal in = new ThreadLocal<>(); @@ -333,6 +335,7 @@ public abstract class IncrementalIndex extends AbstractIndex imp private boolean concurrentEventAdd; private boolean sortFacts; private int maxRowCount; + private long maxBytesInMemory; public Builder() { @@ -342,6 +345,7 @@ public abstract class IncrementalIndex extends AbstractIndex imp concurrentEventAdd = false; sortFacts = true; maxRowCount = 0; + maxBytesInMemory = 0; } public Builder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema) @@ -398,6 +402,13 @@ public abstract class IncrementalIndex extends AbstractIndex imp return this; } + //maxBytesInMemory only applies to OnHeapIncrementalIndex + public Builder setMaxBytesInMemory(final long maxBytesInMemory) + { + this.maxBytesInMemory = maxBytesInMemory; + return this; + } + public IncrementalIndex buildOnheap() { if (maxRowCount <= 0) { @@ -410,7 +421,8 @@ public abstract class IncrementalIndex extends AbstractIndex imp reportParseExceptions, concurrentEventAdd, sortFacts, - maxRowCount + maxRowCount, + maxBytesInMemory ); } @@ -457,6 +469,7 @@ public abstract class IncrementalIndex extends AbstractIndex imp boolean reportParseExceptions, InputRow row, AtomicInteger numEntries, + AtomicLong sizeInBytes, IncrementalIndexRow key, ThreadLocal rowContainer, Supplier rowSupplier, @@ -504,11 +517,17 @@ public abstract class IncrementalIndex extends AbstractIndex imp static class AddToFactsResult { private int rowCount; + private final long bytesInMemory; private List parseExceptionMessages; - AddToFactsResult(int rowCount, List parseExceptionMessages) + public AddToFactsResult( + int rowCount, + long bytesInMemory, + List parseExceptionMessages + ) { this.rowCount = rowCount; + this.bytesInMemory = bytesInMemory; this.parseExceptionMessages = parseExceptionMessages; } @@ -517,7 +536,12 @@ public abstract class IncrementalIndex extends AbstractIndex imp return rowCount; } - List getParseExceptionMessages() + public long getBytesInMemory() + { + return bytesInMemory; + } + + public List getParseExceptionMessages() { return parseExceptionMessages; } @@ -571,6 +595,7 @@ public abstract class IncrementalIndex extends AbstractIndex imp reportParseExceptions, row, numEntries, + bytesInMemory, incrementalIndexRowResult.getIncrementalIndexRow(), in, rowSupplier, @@ -582,7 +607,7 @@ public abstract class IncrementalIndex extends AbstractIndex imp incrementalIndexRowResult.getParseExceptionMessages(), addToFactsResult.getParseExceptionMessages() ); - return new IncrementalIndexAddResult(addToFactsResult.getRowCount(), parseException); + return new IncrementalIndexAddResult(addToFactsResult.getRowCount(), addToFactsResult.getBytesInMemory(), parseException); } @VisibleForTesting @@ -597,6 +622,7 @@ public abstract class IncrementalIndex extends AbstractIndex imp Object[] dims; List overflow = null; + long dimsKeySize = 0; List parseExceptionMessages = new ArrayList<>(); synchronized (dimensionDescs) { dims = new Object[dimensionDescs.size()]; @@ -635,7 +661,7 @@ public abstract class IncrementalIndex extends AbstractIndex imp catch (ParseException pe) { parseExceptionMessages.add(pe.getMessage()); } - + dimsKeySize += indexer.estimateEncodedKeyComponentSize(dimsKey); // Set column capabilities as data is coming in if (!capabilities.hasMultipleValues() && dimsKey != null && @@ -679,10 +705,11 @@ public abstract class IncrementalIndex extends AbstractIndex imp if (row.getTimestamp() != null) { truncated = gran.bucketStart(row.getTimestamp()).getMillis(); } - IncrementalIndexRow incrementalIndexRow = new IncrementalIndexRow( + IncrementalIndexRow incrementalIndexRow = IncrementalIndexRow.createTimeAndDimswithDimsKeySize( Math.max(truncated, minTimestamp), dims, - dimensionDescsList + dimensionDescsList, + dimsKeySize ); return new IncrementalIndexRowResult(incrementalIndexRow, parseExceptionMessages); } @@ -740,6 +767,11 @@ public abstract class IncrementalIndex extends AbstractIndex imp return numEntries.get(); } + public long getBytesInMemory() + { + return bytesInMemory.get(); + } + private long getMinTimeMillis() { return getFacts().getMinTimeMillis(); diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAddResult.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAddResult.java index 06c537a0aa3..e76d3c15a1e 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAddResult.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexAddResult.java @@ -26,16 +26,19 @@ import javax.annotation.Nullable; public class IncrementalIndexAddResult { private final int rowCount; + private final long bytesInMemory; @Nullable private final ParseException parseException; public IncrementalIndexAddResult( int rowCount, + long bytesInMemory, @Nullable ParseException parseException ) { this.rowCount = rowCount; + this.bytesInMemory = bytesInMemory; this.parseException = parseException; } @@ -44,6 +47,11 @@ public class IncrementalIndexAddResult return rowCount; } + public long getBytesInMemory() + { + return bytesInMemory; + } + @Nullable public ParseException getParseException() { diff --git a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRow.java b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRow.java index 3c0a5fdf881..dd671267c4a 100644 --- a/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRow.java +++ b/processing/src/main/java/io/druid/segment/incremental/IncrementalIndexRow.java @@ -45,6 +45,7 @@ public final class IncrementalIndexRow * {@link IncrementalIndex.RollupFactsHolder} needs concurrent collections, that are not present in fastutil. */ private int rowIndex; + private long dimsKeySize; IncrementalIndexRow( long timestamp, @@ -68,6 +69,29 @@ public final class IncrementalIndexRow this.rowIndex = rowIndex; } + private IncrementalIndexRow( + long timestamp, + Object[] dims, + List dimensionDescsList, + long dimsKeySize + ) + { + this.timestamp = timestamp; + this.dims = dims; + this.dimensionDescsList = dimensionDescsList; + this.dimsKeySize = dimsKeySize; + } + + static IncrementalIndexRow createTimeAndDimswithDimsKeySize( + long timestamp, + Object[] dims, + List dimensionDescsList, + long dimsKeySize + ) + { + return new IncrementalIndexRow(timestamp, dims, dimensionDescsList, dimsKeySize); + } + public long getTimestamp() { return timestamp; @@ -88,6 +112,25 @@ public final class IncrementalIndexRow this.rowIndex = rowIndex; } + /** + * bytesInMemory estimates the size of IncrementalIndexRow key, it takes into account the timestamp(long), + * dims(Object Array) and dimensionDescsList(List). Each of these are calculated as follows: + *
    + *
  • timestamp : Long.BYTES + *
  • dims array : Integer.BYTES * array length + Long.BYTES (dims object) + dimsKeySize(passed via constructor) + *
  • dimensionDescList : Long.BYTES (shared pointer) + *
  • dimsKeySize : this value is passed in based on the key type (int, long, double, String etc.) + *
+ * + * @return long estimated bytesInMemory + */ + public long estimateBytesInMemory() + { + long sizeInBytes = Long.BYTES + Integer.BYTES * dims.length + Long.BYTES + Long.BYTES; + sizeInBytes += dimsKeySize; + return sizeInBytes; + } + @Override public String toString() { diff --git a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java index 216a4cd706b..ae2c25f5f61 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OffheapIncrementalIndex.java @@ -40,6 +40,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** */ @@ -144,6 +145,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex boolean reportParseExceptions, InputRow row, AtomicInteger numEntries, + AtomicLong sizeInBytes, // ignored, added to make abstract class method impl happy IncrementalIndexRow key, ThreadLocal rowContainer, Supplier rowSupplier, @@ -238,7 +240,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex } } rowContainer.set(null); - return new AddToFactsResult(numEntries.get(), new ArrayList<>()); + return new AddToFactsResult(numEntries.get(), 0, new ArrayList<>()); } @Override diff --git a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java index 0445d77b529..cbe9ba48bc9 100644 --- a/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java +++ b/processing/src/main/java/io/druid/segment/incremental/OnheapIncrementalIndex.java @@ -38,22 +38,29 @@ import io.druid.segment.column.ColumnCapabilities; import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** */ public class OnheapIncrementalIndex extends IncrementalIndex { private static final Logger log = new Logger(OnheapIncrementalIndex.class); - + /** + * overhead per {@link ConcurrentHashMap.Node} or {@link java.util.concurrent.ConcurrentSkipListMap.Node} object + */ + private static final int ROUGH_OVERHEAD_PER_MAP_ENTRY = Long.BYTES * 5 + Integer.BYTES; private final ConcurrentHashMap aggregators = new ConcurrentHashMap<>(); private final FactsHolder facts; private final AtomicInteger indexIncrement = new AtomicInteger(0); + private final long maxBytesPerRowForAggregators; protected final int maxRowCount; + protected final long maxBytesInMemory; private volatile Map selectors; private String outOfRowsReason = null; @@ -64,14 +71,44 @@ public class OnheapIncrementalIndex extends IncrementalIndex boolean reportParseExceptions, boolean concurrentEventAdd, boolean sortFacts, - int maxRowCount + int maxRowCount, + long maxBytesInMemory ) { super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, concurrentEventAdd); this.maxRowCount = maxRowCount; - + this.maxBytesInMemory = maxBytesInMemory == 0 ? -1 : maxBytesInMemory; this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions()) : new PlainFactsHolder(sortFacts); + maxBytesPerRowForAggregators = getMaxBytesPerRowForAggregators(incrementalIndexSchema); + } + + /** + * Gives estimated max size per aggregator. It is assumed that every aggregator will have enough overhead for its own + * object header and for a pointer to a selector. We are adding a overhead-factor for each object as additional 16 + * bytes. + * These 16 bytes or 128 bits is the object metadata for 64-bit JVM process and consists of: + *
    + *
  • Class pointer which describes the object type: 64 bits + *
  • Flags which describe state of the object including hashcode: 64 bits + *
      + * total size estimation consists of: + *
        + *
      • metrics length : Integer.BYTES * len + *
      • maxAggregatorIntermediateSize : getMaxIntermediateSize per aggregator + overhead-factor(16 bytes) + *
      + * + * @param incrementalIndexSchema + * + * @return long max aggregator size in bytes + */ + private static long getMaxBytesPerRowForAggregators(IncrementalIndexSchema incrementalIndexSchema) + { + long maxAggregatorIntermediateSize = Integer.BYTES * incrementalIndexSchema.getMetrics().length; + maxAggregatorIntermediateSize += Arrays.stream(incrementalIndexSchema.getMetrics()) + .mapToLong(aggregator -> aggregator.getMaxIntermediateSize() + Long.BYTES * 2) + .sum(); + return maxAggregatorIntermediateSize; } @Override @@ -109,6 +146,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex boolean reportParseExceptions, InputRow row, AtomicInteger numEntries, + AtomicLong sizeInBytes, IncrementalIndexRow key, ThreadLocal rowContainer, Supplier rowSupplier, @@ -132,14 +170,20 @@ public class OnheapIncrementalIndex extends IncrementalIndex concurrentSet(rowIndex, aggs); // Last ditch sanity checks - if (numEntries.get() >= maxRowCount + if ((numEntries.get() >= maxRowCount || (maxBytesInMemory > 0 && sizeInBytes.get() >= maxBytesInMemory)) && facts.getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX && !skipMaxRowsInMemoryCheck) { - throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount); + throw new IndexSizeExceededException( + "Maximum number of rows [%d] or max size in bytes [%d] reached", + maxRowCount, + maxBytesInMemory + ); } final int prev = facts.putIfAbsent(key, rowIndex); if (IncrementalIndexRow.EMPTY_ROW_INDEX == prev) { numEntries.incrementAndGet(); + long estimatedRowSize = estimateRowSizeInBytes(key, maxBytesPerRowForAggregators); + sizeInBytes.addAndGet(estimatedRowSize); } else { // We lost a race aggs = concurrentGet(prev); @@ -150,7 +194,25 @@ public class OnheapIncrementalIndex extends IncrementalIndex } } - return new AddToFactsResult(numEntries.get(), parseExceptionMessages); + return new AddToFactsResult(numEntries.get(), sizeInBytes.get(), parseExceptionMessages); + } + + /** + * Gives an estimated size of row in bytes, it accounts for: + *
        + *
      • overhead per Map Entry + *
      • TimeAndDims key size + *
      • aggregator size + *
      + * + * @param key TimeAndDims key + * @param maxBytesPerRowForAggregators max size per aggregator + * + * @return estimated size of row + */ + private long estimateRowSizeInBytes(IncrementalIndexRow key, long maxBytesPerRowForAggregators) + { + return ROUGH_OVERHEAD_PER_MAP_ENTRY + key.estimateBytesInMemory() + maxBytesPerRowForAggregators; } @Override @@ -238,10 +300,24 @@ public class OnheapIncrementalIndex extends IncrementalIndex @Override public boolean canAppendRow() { - final boolean canAdd = size() < maxRowCount; - if (!canAdd) { - outOfRowsReason = StringUtils.format("Maximum number of rows [%d] reached", maxRowCount); + final boolean countCheck = size() < maxRowCount; + // if maxBytesInMemory = -1, then ignore sizeCheck + final boolean sizeCheck = maxBytesInMemory <= 0 || getBytesInMemory() < maxBytesInMemory; + final boolean canAdd = countCheck && sizeCheck; + if (!countCheck && !sizeCheck) { + outOfRowsReason = StringUtils.format( + "Maximum number of rows [%d] and maximum size in bytes [%d] reached", + maxRowCount, + maxBytesInMemory + ); + } else { + if (!countCheck) { + outOfRowsReason = StringUtils.format("Maximum number of rows [%d] reached", maxRowCount); + } else if (!sizeCheck) { + outOfRowsReason = StringUtils.format("Maximum size in bytes [%d] reached", maxBytesInMemory); + } } + return canAdd; } diff --git a/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexRowSizeTest.java b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexRowSizeTest.java new file mode 100644 index 00000000000..9d77bbb7693 --- /dev/null +++ b/processing/src/test/java/io/druid/segment/incremental/IncrementalIndexRowSizeTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.segment.incremental; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import io.druid.data.input.MapBasedInputRow; +import io.druid.query.aggregation.CountAggregatorFactory; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Map; + +/** + */ +public class IncrementalIndexRowSizeTest +{ + @Test + public void testIncrementalIndexRowSizeBasic() + { + IncrementalIndex index = new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt")) + .setMaxRowCount(10000) + .setMaxBytesInMemory(1000) + .buildOnheap(); + long time = System.currentTimeMillis(); + IncrementalIndex.IncrementalIndexRowResult tndResult = index.toIncrementalIndexRow(toMapRow(time, "billy", "A", "joe", "B")); + IncrementalIndexRow td1 = tndResult.getIncrementalIndexRow(); + Assert.assertEquals(44, td1.estimateBytesInMemory()); + } + + @Test + public void testIncrementalIndexRowSizeArr() + { + IncrementalIndex index = new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt")) + .setMaxRowCount(10000) + .setMaxBytesInMemory(1000) + .buildOnheap(); + long time = System.currentTimeMillis(); + IncrementalIndex.IncrementalIndexRowResult tndResult = index.toIncrementalIndexRow(toMapRow( + time + 1, + "billy", + "A", + "joe", + Arrays.asList("A", "B") + )); + IncrementalIndexRow td1 = tndResult.getIncrementalIndexRow(); + Assert.assertEquals(50, td1.estimateBytesInMemory()); + } + + @Test + public void testIncrementalIndexRowSizeComplex() + { + IncrementalIndex index = new IncrementalIndex.Builder() + .setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt")) + .setMaxRowCount(10000) + .setMaxBytesInMemory(1000) + .buildOnheap(); + long time = System.currentTimeMillis(); + IncrementalIndex.IncrementalIndexRowResult tndResult = index.toIncrementalIndexRow(toMapRow( + time + 1, + "billy", + "nelson", + "joe", + Arrays.asList("123", "abcdef") + )); + IncrementalIndexRow td1 = tndResult.getIncrementalIndexRow(); + Assert.assertEquals(74, td1.estimateBytesInMemory()); + } + + private MapBasedInputRow toMapRow(long time, Object... dimAndVal) + { + Map data = Maps.newHashMap(); + for (int i = 0; i < dimAndVal.length; i += 2) { + data.put((String) dimAndVal[i], dimAndVal[i + 1]); + } + return new MapBasedInputRow(time, Lists.newArrayList(data.keySet()), data); + } +} diff --git a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java index 94734aafad5..43264b95924 100644 --- a/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java +++ b/processing/src/test/java/io/druid/segment/incremental/OnheapIncrementalIndexBenchmark.java @@ -76,6 +76,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; /** * Extending AbstractBenchmark means only runs if explicitly called @@ -118,7 +119,8 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark boolean reportParseExceptions, boolean concurrentEventAdd, boolean sortFacts, - int maxRowCount + int maxRowCount, + long maxBytesInMemory ) { super( @@ -127,7 +129,8 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark reportParseExceptions, concurrentEventAdd, sortFacts, - maxRowCount + maxRowCount, + maxBytesInMemory ); } @@ -135,20 +138,22 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark long minTimestamp, Granularity gran, AggregatorFactory[] metrics, - int maxRowCount + int maxRowCount, + long maxBytesInMemory ) { super( new IncrementalIndexSchema.Builder() - .withMinTimestamp(minTimestamp) - .withQueryGranularity(gran) - .withMetrics(metrics) - .build(), - true, - true, - false, - true, - maxRowCount + .withMinTimestamp(minTimestamp) + .withQueryGranularity(gran) + .withMetrics(metrics) + .build(), + true, + true, + false, + true, + maxRowCount, + maxBytesInMemory ); } @@ -172,6 +177,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark boolean reportParseExceptions, InputRow row, AtomicInteger numEntries, + AtomicLong sizeInBytes, IncrementalIndexRow key, ThreadLocal rowContainer, Supplier rowSupplier, @@ -202,12 +208,14 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark // Last ditch sanity checks - if (numEntries.get() >= maxRowCount && getFacts().getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX) { - throw new IndexSizeExceededException("Maximum number of rows reached"); + if ((numEntries.get() >= maxRowCount || sizeInBytes.get() >= maxBytesInMemory) + && getFacts().getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX) { + throw new IndexSizeExceededException("Maximum number of rows or max bytes reached"); } final int prev = getFacts().putIfAbsent(key, rowIndex); if (IncrementalIndexRow.EMPTY_ROW_INDEX == prev) { numEntries.incrementAndGet(); + sizeInBytes.incrementAndGet(); } else { // We lost a race aggs = indexedMap.get(prev); @@ -235,7 +243,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark rowContainer.set(null); - return new AddToFactsResult(numEntries.get(), new ArrayList<>()); + return new AddToFactsResult(numEntries.get(), sizeInBytes.get(), new ArrayList<>()); } @Override diff --git a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java index 179f3c7fc3f..f3f3ae6a1d9 100644 --- a/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/RealtimeTuningConfig.java @@ -42,7 +42,7 @@ import java.io.File; */ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig { - private static final int defaultMaxRowsInMemory = 75000; + private static final int defaultMaxRowsInMemory = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY; private static final Period defaultIntermediatePersistPeriod = new Period("PT10M"); private static final Period defaultWindowPeriod = new Period("PT10M"); private static final VersioningPolicy defaultVersioningPolicy = new IntervalStartVersioningPolicy(); @@ -72,6 +72,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig { return new RealtimeTuningConfig( defaultMaxRowsInMemory, + 0L, defaultIntermediatePersistPeriod, defaultWindowPeriod, basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory, @@ -91,6 +92,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig } private final int maxRowsInMemory; + private final long maxBytesInMemory; private final Period intermediatePersistPeriod; private final Period windowPeriod; private final File basePersistDirectory; @@ -110,6 +112,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig @JsonCreator public RealtimeTuningConfig( @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory, + @JsonProperty("maxBytesInMemory") Long maxBytesInMemory, @JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod, @JsonProperty("windowPeriod") Period windowPeriod, @JsonProperty("basePersistDirectory") File basePersistDirectory, @@ -129,6 +132,9 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig ) { this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory; + // initializing this to 0, it will be lazily initialized to a value + // @see server.src.main.java.io.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long) + this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory; this.intermediatePersistPeriod = intermediatePersistPeriod == null ? defaultIntermediatePersistPeriod : intermediatePersistPeriod; @@ -163,6 +169,12 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig return maxRowsInMemory; } + @Override + public long getMaxBytesInMemory() + { + return maxBytesInMemory; + } + @Override @JsonProperty public Period getIntermediatePersistPeriod() @@ -268,6 +280,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig { return new RealtimeTuningConfig( maxRowsInMemory, + maxBytesInMemory, intermediatePersistPeriod, windowPeriod, basePersistDirectory, @@ -290,6 +303,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig { return new RealtimeTuningConfig( maxRowsInMemory, + maxBytesInMemory, intermediatePersistPeriod, windowPeriod, dir, diff --git a/server/src/main/java/io/druid/segment/indexing/TuningConfig.java b/server/src/main/java/io/druid/segment/indexing/TuningConfig.java index 7fd246d2573..e8cc02fc450 100644 --- a/server/src/main/java/io/druid/segment/indexing/TuningConfig.java +++ b/server/src/main/java/io/druid/segment/indexing/TuningConfig.java @@ -33,4 +33,9 @@ public interface TuningConfig boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false; int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE; int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0; + int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000; + // We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only + // tracks active index and not the index being flushed to disk, to account for that + // we halved default to 1/6(max jvm memory) + long DEFAULT_MAX_BYTES_IN_MEMORY = Runtime.getRuntime().maxMemory() / 6; } diff --git a/server/src/main/java/io/druid/segment/indexing/TuningConfigs.java b/server/src/main/java/io/druid/segment/indexing/TuningConfigs.java new file mode 100644 index 00000000000..571005f4370 --- /dev/null +++ b/server/src/main/java/io/druid/segment/indexing/TuningConfigs.java @@ -0,0 +1,34 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.druid.segment.indexing; + +public class TuningConfigs +{ + private TuningConfigs() + { + } + + public static long getMaxBytesInMemoryOrDefault(final long maxBytesInMemory) + { + // In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting + // maxBytes to max jvm memory of the process that starts first. Instead we set the default based on + // the actual task node's jvm memory. + return maxBytesInMemory == 0 ? TuningConfig.DEFAULT_MAX_BYTES_IN_MEMORY : maxBytesInMemory; + } +} diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorConfig.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorConfig.java index 286470cae76..05758b1645a 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorConfig.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorConfig.java @@ -32,6 +32,8 @@ public interface AppenderatorConfig int getMaxRowsInMemory(); + long getMaxBytesInMemory(); + int getMaxPendingPersists(); Period getIntermediatePersistPeriod(); diff --git a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java index 8f63392de8a..cd2ecd9ac69 100644 --- a/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java +++ b/server/src/main/java/io/druid/segment/realtime/appenderator/AppenderatorImpl.java @@ -66,6 +66,7 @@ import io.druid.segment.Segment; import io.druid.segment.incremental.IncrementalIndexAddResult; import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.TuningConfigs; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.FireHydrant; @@ -84,6 +85,7 @@ import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; import java.nio.file.StandardOpenOption; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Comparator; @@ -97,6 +99,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -128,6 +131,7 @@ public class AppenderatorImpl implements Appenderator // This variable updated in add(), persist(), and drop() private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger(); private final AtomicInteger totalRows = new AtomicInteger(); + private final AtomicLong bytesCurrentlyInMemory = new AtomicLong(); // Synchronize persisting commitMetadata so that multiple persist threads (if present) // and abandon threads do not step over each other private final Lock commitLock = new ReentrantLock(); @@ -143,7 +147,7 @@ public class AppenderatorImpl implements Appenderator private volatile FileChannel basePersistDirLockChannel = null; private AtomicBoolean closed = new AtomicBoolean(false); - public AppenderatorImpl( + AppenderatorImpl( DataSchema schema, AppenderatorConfig tuningConfig, FireDepartmentMetrics metrics, @@ -219,11 +223,14 @@ public class AppenderatorImpl implements Appenderator metrics.reportMessageMaxTimestamp(row.getTimestampFromEpoch()); final int sinkRowsInMemoryBeforeAdd = sink.getNumRowsInMemory(); final int sinkRowsInMemoryAfterAdd; + final long bytesInMemoryBeforeAdd = sink.getBytesInMemory(); + final long bytesInMemoryAfterAdd; final IncrementalIndexAddResult addResult; try { addResult = sink.add(row, !allowIncrementalPersists); sinkRowsInMemoryAfterAdd = addResult.getRowCount(); + bytesInMemoryAfterAdd = addResult.getBytesInMemory(); } catch (IndexSizeExceededException e) { // Uh oh, we can't do anything about this! We can't persist (commit metadata would be out of sync) and we @@ -240,19 +247,51 @@ public class AppenderatorImpl implements Appenderator final int numAddedRows = sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd; rowsCurrentlyInMemory.addAndGet(numAddedRows); totalRows.addAndGet(numAddedRows); + bytesCurrentlyInMemory.addAndGet(bytesInMemoryAfterAdd - bytesInMemoryBeforeAdd); boolean isPersistRequired = false; - if (!sink.canAppendRow() - || System.currentTimeMillis() > nextFlush - || rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) { + boolean persist = false; + List persistReasons = new ArrayList(); + + if (!sink.canAppendRow()) { + persist = true; + persistReasons.add("No more rows can be appended to sink"); + } + if (System.currentTimeMillis() > nextFlush) { + persist = true; + persistReasons.add(StringUtils.format( + " current time[%d] is greater than nextFlush[%d],", + System.currentTimeMillis(), + nextFlush + )); + } + if (rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) { + persist = true; + persistReasons.add(StringUtils.format( + " rowsCurrentlyInMemory[%d] is greater than maxRowsInMemory[%d],", + rowsCurrentlyInMemory.get(), + tuningConfig.getMaxRowsInMemory() + )); + } + if (tuningConfig.getMaxBytesInMemory() > 0 + && bytesCurrentlyInMemory.get() + >= TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory())) { + persist = true; + persistReasons.add(StringUtils.format( + " bytesCurrentlyInMemory[%d] is greater than maxBytesInMemory[%d]", + bytesCurrentlyInMemory.get(), + TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()) + )); + } + if (persist) { if (allowIncrementalPersists) { // persistAll clears rowsCurrentlyInMemory, no need to update it. + log.info("Persisting rows in memory due to: [%s]", String.join(",", persistReasons)); persistAll(committerSupplier == null ? null : committerSupplier.get()); } else { isPersistRequired = true; } } - return new AppenderatorAddResult(identifier, sink.getNumRows(), isPersistRequired, addResult.getParseException()); } @@ -286,6 +325,24 @@ public class AppenderatorImpl implements Appenderator return rowsCurrentlyInMemory.get(); } + @VisibleForTesting + long getBytesCurrentlyInMemory() + { + return bytesCurrentlyInMemory.get(); + } + + @VisibleForTesting + long getBytesInMemory(SegmentIdentifier identifier) + { + final Sink sink = sinks.get(identifier); + + if (sink == null) { + throw new ISE("No such sink: %s", identifier); + } else { + return sink.getBytesInMemory(); + } + } + private Sink getOrCreateSink(final SegmentIdentifier identifier) { Sink retVal = sinks.get(identifier); @@ -297,6 +354,7 @@ public class AppenderatorImpl implements Appenderator identifier.getShardSpec(), identifier.getVersion(), tuningConfig.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), tuningConfig.isReportParseExceptions() ); @@ -397,6 +455,7 @@ public class AppenderatorImpl implements Appenderator final Map currentHydrants = Maps.newHashMap(); final List> indexesToPersist = Lists.newArrayList(); int numPersistedRows = 0; + long bytesPersisted = 0L; for (SegmentIdentifier identifier : sinks.keySet()) { final Sink sink = sinks.get(identifier); if (sink == null) { @@ -405,6 +464,7 @@ public class AppenderatorImpl implements Appenderator final List hydrants = Lists.newArrayList(sink); currentHydrants.put(identifier.getIdentifierAsString(), hydrants.size()); numPersistedRows += sink.getNumRowsInMemory(); + bytesPersisted += sink.getBytesInMemory(); final int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size(); @@ -495,7 +555,7 @@ public class AppenderatorImpl implements Appenderator // NB: The rows are still in memory until they're done persisting, but we only count rows in active indexes. rowsCurrentlyInMemory.addAndGet(-numPersistedRows); - + bytesCurrentlyInMemory.addAndGet(-bytesPersisted); return future; } @@ -965,6 +1025,7 @@ public class AppenderatorImpl implements Appenderator identifier.getShardSpec(), identifier.getVersion(), tuningConfig.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), tuningConfig.isReportParseExceptions(), hydrants ); @@ -1021,6 +1082,7 @@ public class AppenderatorImpl implements Appenderator // Decrement this sink's rows from rowsCurrentlyInMemory (we only count active sinks). rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory()); totalRows.addAndGet(-sink.getNumRows()); + bytesCurrentlyInMemory.addAndGet(-sink.getBytesInMemory()); // Wait for any outstanding pushes to finish, then abandon the segment inside the persist thread. return Futures.transform( diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java index a99a5dfd256..0521a502f50 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/RealtimePlumber.java @@ -65,6 +65,7 @@ import io.druid.segment.incremental.IncrementalIndexAddResult; import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.TuningConfigs; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; import io.druid.segment.realtime.FireHydrant; @@ -255,6 +256,7 @@ public class RealtimePlumber implements Plumber config.getShardSpec(), versioningPolicy.getVersion(sinkInterval), config.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()), config.isReportParseExceptions() ); addSink(retVal); @@ -729,6 +731,7 @@ public class RealtimePlumber implements Plumber config.getShardSpec(), versioningPolicy.getVersion(sinkInterval), config.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()), config.isReportParseExceptions(), hydrants ); diff --git a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java index 92790d2f20a..50d0cf2a128 100644 --- a/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java +++ b/server/src/main/java/io/druid/segment/realtime/plumber/Sink.java @@ -54,7 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger; public class Sink implements Iterable { - private static final IncrementalIndexAddResult ADD_FAILED = new IncrementalIndexAddResult(-1, null); + private static final IncrementalIndexAddResult ADD_FAILED = new IncrementalIndexAddResult(-1, -1, null); private final Object hydrantLock = new Object(); private final Interval interval; @@ -62,6 +62,7 @@ public class Sink implements Iterable private final ShardSpec shardSpec; private final String version; private final int maxRowsInMemory; + private final long maxBytesInMemory; private final boolean reportParseExceptions; private final CopyOnWriteArrayList hydrants = new CopyOnWriteArrayList(); private final LinkedHashSet dimOrder = Sets.newLinkedHashSet(); @@ -75,6 +76,7 @@ public class Sink implements Iterable ShardSpec shardSpec, String version, int maxRowsInMemory, + long maxBytesInMemory, boolean reportParseExceptions ) { @@ -83,6 +85,7 @@ public class Sink implements Iterable this.interval = interval; this.version = version; this.maxRowsInMemory = maxRowsInMemory; + this.maxBytesInMemory = maxBytesInMemory; this.reportParseExceptions = reportParseExceptions; makeNewCurrIndex(interval.getStartMillis(), schema); @@ -94,6 +97,7 @@ public class Sink implements Iterable ShardSpec shardSpec, String version, int maxRowsInMemory, + long maxBytesInMemory, boolean reportParseExceptions, List hydrants ) @@ -103,6 +107,7 @@ public class Sink implements Iterable this.interval = interval; this.version = version; this.maxRowsInMemory = maxRowsInMemory; + this.maxBytesInMemory = maxBytesInMemory; this.reportParseExceptions = reportParseExceptions; int maxCount = -1; @@ -250,6 +255,18 @@ public class Sink implements Iterable } } + public long getBytesInMemory() + { + synchronized (hydrantLock) { + IncrementalIndex index = currHydrant.getIndex(); + if (index == null) { + return 0; + } + + return currHydrant.getIndex().getBytesInMemory(); + } + } + private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema) { final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder() @@ -264,6 +281,7 @@ public class Sink implements Iterable .setIndexSchema(indexSchema) .setReportParseExceptions(reportParseExceptions) .setMaxRowCount(maxRowsInMemory) + .setMaxBytesInMemory(maxBytesInMemory) .buildOnheap(); final FireHydrant old; diff --git a/server/src/test/java/io/druid/segment/indexing/RealtimeTuningConfigTest.java b/server/src/test/java/io/druid/segment/indexing/RealtimeTuningConfigTest.java index 5f1820b1b6d..f3aa521c5b0 100644 --- a/server/src/test/java/io/druid/segment/indexing/RealtimeTuningConfigTest.java +++ b/server/src/test/java/io/druid/segment/indexing/RealtimeTuningConfigTest.java @@ -90,7 +90,7 @@ public class RealtimeTuningConfigTest Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod()); Assert.assertEquals(NoneShardSpec.instance(), config.getShardSpec()); Assert.assertEquals(0, config.getMaxPendingPersists()); - Assert.assertEquals(75000, config.getMaxRowsInMemory()); + Assert.assertEquals(1000000, config.getMaxRowsInMemory()); Assert.assertEquals(0, config.getMergeThreadPriority()); Assert.assertEquals(0, config.getPersistThreadPriority()); Assert.assertEquals(new Period("PT10M"), config.getWindowPeriod()); diff --git a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java index 0b5c420badc..665ce3311fa 100644 --- a/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java +++ b/server/src/test/java/io/druid/segment/realtime/RealtimeManagerTest.java @@ -69,6 +69,7 @@ import io.druid.segment.incremental.IndexSizeExceededException; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeIOConfig; import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.TuningConfigs; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.realtime.plumber.Plumber; import io.druid.segment.realtime.plumber.PlumberSchool; @@ -199,6 +200,7 @@ public class RealtimeManagerTest ); RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( 1, + null, new Period("P1Y"), null, null, @@ -221,6 +223,7 @@ public class RealtimeManagerTest tuningConfig.getShardSpec(), DateTimes.nowUtc().toString(), tuningConfig.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), tuningConfig.isReportParseExceptions() )); @@ -241,6 +244,7 @@ public class RealtimeManagerTest tuningConfig.getShardSpec(), DateTimes.nowUtc().toString(), tuningConfig.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), tuningConfig.isReportParseExceptions() )); @@ -258,6 +262,7 @@ public class RealtimeManagerTest tuningConfig_0 = new RealtimeTuningConfig( 1, + null, new Period("P1Y"), null, null, @@ -277,6 +282,7 @@ public class RealtimeManagerTest tuningConfig_1 = new RealtimeTuningConfig( 1, + null, new Period("P1Y"), null, null, diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java index d03bab533e2..32dcb76b0c8 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java @@ -68,6 +68,7 @@ public class AppenderatorPlumberTest null, null, null, + null, new IntervalStartVersioningPolicy(), new NoopRejectionPolicyFactory(), null, diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java index cfd9106f926..a052583ed88 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTest.java @@ -145,6 +145,118 @@ public class AppenderatorTest } } + @Test + public void testMaxBytesInMemory() throws Exception + { + try (final AppenderatorTester tester = new AppenderatorTester(100, 1024, true)) { + final Appenderator appenderator = tester.getAppenderator(); + final AtomicInteger eventCount = new AtomicInteger(0); + final Supplier committerSupplier = () -> { + final Object metadata = ImmutableMap.of(eventCount, eventCount.get()); + + return new Committer() + { + @Override + public Object getMetadata() + { + return metadata; + } + + @Override + public void run() + { + //Do nothing + } + }; + }; + + appenderator.startJob(); + appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier); + //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 10 (dimsKeySize) = 138 + Assert.assertEquals(138, ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))); + appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier); + Assert.assertEquals(138, ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(1))); + appenderator.close(); + Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); + } + } + + @Test + public void testMaxBytesInMemoryInMultipleSinks() throws Exception + { + try (final AppenderatorTester tester = new AppenderatorTester(100, 1024, true)) { + final Appenderator appenderator = tester.getAppenderator(); + final AtomicInteger eventCount = new AtomicInteger(0); + final Supplier committerSupplier = () -> { + final Object metadata = ImmutableMap.of(eventCount, eventCount.get()); + + return new Committer() + { + @Override + public Object getMetadata() + { + return metadata; + } + + @Override + public void run() + { + //Do nothing + } + }; + }; + + appenderator.startJob(); + appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier); + //expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 10 (dimsKeySize) = 138 + Assert.assertEquals(138, ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()); + appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier); + Assert.assertEquals(276, ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()); + appenderator.close(); + Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); + } + } + + @Test + public void testIgnoreMaxBytesInMemory() throws Exception + { + try (final AppenderatorTester tester = new AppenderatorTester(100, -1, true)) { + final Appenderator appenderator = tester.getAppenderator(); + final AtomicInteger eventCount = new AtomicInteger(0); + final Supplier committerSupplier = () -> { + final Object metadata = ImmutableMap.of(eventCount, eventCount.get()); + + return new Committer() + { + @Override + public Object getMetadata() + { + return metadata; + } + + @Override + public void run() + { + //Do nothing + } + }; + }; + + Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.startJob(); + Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier); + //we still calculate the size even when ignoring it to make persist decision + Assert.assertEquals(138, ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))); + Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier); + Assert.assertEquals(276, ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()); + Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory()); + appenderator.close(); + Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory()); + } + } + @Test public void testMaxRowsInMemory() throws Exception { @@ -288,7 +400,12 @@ public class AppenderatorTest appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 5), committerSupplier); appenderator.close(); - try (final AppenderatorTester tester2 = new AppenderatorTester(2, tuningConfig.getBasePersistDirectory(), true)) { + try (final AppenderatorTester tester2 = new AppenderatorTester( + 2, + -1, + tuningConfig.getBasePersistDirectory(), + true + )) { final Appenderator appenderator2 = tester2.getAppenderator(); Assert.assertEquals(ImmutableMap.of("eventCount", 4), appenderator2.startJob()); Assert.assertEquals(ImmutableList.of(IDENTIFIERS.get(0)), appenderator2.getSegments()); diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java index 42462818fb8..e562e18f191 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/AppenderatorTester.java @@ -89,7 +89,7 @@ public class AppenderatorTester implements AutoCloseable final int maxRowsInMemory ) { - this(maxRowsInMemory, null, false); + this(maxRowsInMemory, -1, null, false); } public AppenderatorTester( @@ -97,11 +97,21 @@ public class AppenderatorTester implements AutoCloseable final boolean enablePushFailure ) { - this(maxRowsInMemory, null, enablePushFailure); + this(maxRowsInMemory, -1, null, enablePushFailure); } public AppenderatorTester( final int maxRowsInMemory, + final long maxSizeInBytes, + final boolean enablePushFailure + ) + { + this(maxRowsInMemory, maxSizeInBytes, null, enablePushFailure); + } + + public AppenderatorTester( + final int maxRowsInMemory, + long maxSizeInBytes, final File basePersistDirectory, final boolean enablePushFailure ) @@ -131,9 +141,10 @@ public class AppenderatorTester implements AutoCloseable null, objectMapper ); - + maxSizeInBytes = maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes; tuningConfig = new RealtimeTuningConfig( maxRowsInMemory, + maxSizeInBytes, null, null, basePersistDirectory, @@ -267,6 +278,11 @@ public class AppenderatorTester implements AutoCloseable ); } + private long getDefaultMaxBytesInMemory() + { + return (Runtime.getRuntime().totalMemory()) / 3; + } + public DataSchema getSchema() { return schema; diff --git a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java index 08157eaf7f6..b837bf5ee45 100644 --- a/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java +++ b/server/src/test/java/io/druid/segment/realtime/appenderator/DefaultOfflineAppenderatorFactoryTest.java @@ -136,6 +136,7 @@ public class DefaultOfflineAppenderatorFactoryTest 75000, null, null, + null, temporaryFolder.newFolder(), null, null, diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java index 32900d8900f..10e5bd1085a 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java @@ -50,6 +50,7 @@ import io.druid.segment.ReferenceCountingSegment; import io.druid.segment.TestHelper; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.TuningConfigs; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.loading.DataSegmentPusher; import io.druid.segment.realtime.FireDepartmentMetrics; @@ -199,6 +200,7 @@ public class RealtimePlumberSchoolTest null, null, null, + null, new IntervalStartVersioningPolicy(), rejectionPolicy, null, @@ -269,6 +271,7 @@ public class RealtimePlumberSchoolTest tuningConfig.getShardSpec(), DateTimes.of("2014-12-01T12:34:56.789").toString(), tuningConfig.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), tuningConfig.isReportParseExceptions() ); plumber.getSinks().put(0L, sink); @@ -313,6 +316,7 @@ public class RealtimePlumberSchoolTest tuningConfig.getShardSpec(), DateTimes.of("2014-12-01T12:34:56.789").toString(), tuningConfig.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), tuningConfig.isReportParseExceptions() ); plumber.getSinks().put(0L, sink); @@ -367,6 +371,7 @@ public class RealtimePlumberSchoolTest tuningConfig.getShardSpec(), DateTimes.of("2014-12-01T12:34:56.789").toString(), tuningConfig.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), tuningConfig.isReportParseExceptions() ); plumber2.getSinks().put(0L, sink); diff --git a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java index dce3e7a8978..a8bf1be421e 100644 --- a/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java +++ b/server/src/test/java/io/druid/segment/realtime/plumber/SinkTest.java @@ -31,6 +31,7 @@ import io.druid.query.aggregation.AggregatorFactory; import io.druid.query.aggregation.CountAggregatorFactory; import io.druid.segment.indexing.DataSchema; import io.druid.segment.indexing.RealtimeTuningConfig; +import io.druid.segment.indexing.TuningConfigs; import io.druid.segment.indexing.granularity.UniformGranularitySpec; import io.druid.segment.realtime.FireHydrant; import org.joda.time.DateTime; @@ -61,6 +62,7 @@ public class SinkTest final String version = DateTimes.nowUtc().toString(); RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig( 100, + null, new Period("P1Y"), null, null, @@ -83,6 +85,7 @@ public class SinkTest tuningConfig.getShardSpec(), version, tuningConfig.getMaxRowsInMemory(), + TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()), tuningConfig.isReportParseExceptions() ); diff --git a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java index 49518d6483d..9417e941290 100644 --- a/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java +++ b/services/src/test/java/io/druid/cli/validate/DruidJsonValidatorTest.java @@ -169,6 +169,7 @@ public class DruidJsonValidatorTest new RealtimeTuningConfig( 1, + null, new Period("PT10M"), null, null,