mirror of https://github.com/apache/druid.git
Fix byte calculation for maxBytesInMemory to take into account of Sink/Hydrant Object overhead (#10740)
* Fix byte calculation for maxBytesInMemory to take into account of Sink/Hydrant Object overhead * Fix byte calculation for maxBytesInMemory to take into account of Sink/Hydrant Object overhead * Fix byte calculation for maxBytesInMemory to take into account of Sink/Hydrant Object overhead * Fix byte calculation for maxBytesInMemory to take into account of Sink/Hydrant Object overhead * fix checkstyle * Fix byte calculation for maxBytesInMemory to take into account of Sink/Hydrant Object overhead * Fix byte calculation for maxBytesInMemory to take into account of Sink/Hydrant Object overhead * fix test * fix test * add log * Fix byte calculation for maxBytesInMemory to take into account of Sink/Hydrant Object overhead * address comments * fix checkstyle * fix checkstyle * add config to skip overhead memory calculation * add test for the skipBytesInMemoryOverheadCheck config * add docs * fix checkstyle * fix checkstyle * fix spelling * address comments * fix travis * address comments
This commit is contained in:
parent
5efaaab561
commit
a46d561bd7
|
@ -320,7 +320,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|
|||
|version|String|The version of created segments. Ignored for HadoopIndexTask unless useExplicitVersion is set to true|no (default == datetime that indexing starts at)|
|
||||
|partitionsSpec|Object|A specification of how to partition each time bucket into segments. Absence of this property means no partitioning will occur. See [`partitionsSpec`](#partitionsspec) below.|no (default == 'hashed')|
|
||||
|maxRowsInMemory|Integer|The number of rows to aggregate before persisting. Note that this is the number of post-aggregation rows which may not be equal to the number of input events due to roll-up. This is used to manage the required JVM heap size. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|no (default == 1000000)|
|
||||
|maxBytesInMemory|Long|The number of bytes to aggregate in heap memory before persisting. Normally this is computed internally and user does not need to set it. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists).|no (default == One-sixth of max JVM memory)|
|
||||
|maxBytesInMemory|Long|The number of bytes to aggregate in heap memory before persisting. Normally this is computed internally and user does not need to set it. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists). Note that `maxBytesInMemory` also includes heap usage of artifacts created from intermediary persists. This means that after every persist, the amount of `maxBytesInMemory` until next persist will decreases, and task will fail when the sum of bytes of all intermediary persisted artifacts exceeds `maxBytesInMemory`.|no (default == One-sixth of max JVM memory)|
|
||||
|leaveIntermediate|Boolean|Leave behind intermediate files (for debugging) in the workingPath when a job completes, whether it passes or fails.|no (default == false)|
|
||||
|cleanupOnFailure|Boolean|Clean up intermediate files when a job fails (unless leaveIntermediate is on).|no (default == true)|
|
||||
|overwriteFiles|Boolean|Override existing files found during indexing.|no (default == false)|
|
||||
|
|
|
@ -720,7 +720,8 @@ is:
|
|||
|-----|-----------|-------|
|
||||
|type|Each ingestion method has its own tuning type code. You must specify the type code that matches your ingestion method. Common options are `index`, `hadoop`, `kafka`, and `kinesis`.||
|
||||
|maxRowsInMemory|The maximum number of records to store in memory before persisting to disk. Note that this is the number of rows post-rollup, and so it may not be equal to the number of input records. Ingested records will be persisted to disk when either `maxRowsInMemory` or `maxBytesInMemory` are reached (whichever happens first).|`1000000`|
|
||||
|maxBytesInMemory|The maximum aggregate size of records, in bytes, to store in the JVM heap before persisting. This is based on a rough estimate of memory usage. Ingested records will be persisted to disk when either `maxRowsInMemory` or `maxBytesInMemory` are reached (whichever happens first).<br /><br />Setting maxBytesInMemory to -1 disables this check, meaning Druid will rely entirely on maxRowsInMemory to control memory usage. Setting it to zero means the default value will be used (one-sixth of JVM heap size).<br /><br />Note that the estimate of memory usage is designed to be an overestimate, and can be especially high when using complex ingest-time aggregators, including sketches. If this causes your indexing workloads to persist to disk too often, you can set maxBytesInMemory to -1 and rely on maxRowsInMemory instead.|One-sixth of max JVM heap size|
|
||||
|maxBytesInMemory|The maximum aggregate size of records, in bytes, to store in the JVM heap before persisting. This is based on a rough estimate of memory usage. Ingested records will be persisted to disk when either `maxRowsInMemory` or `maxBytesInMemory` are reached (whichever happens first). `maxBytesInMemory` also includes heap usage of artifacts created from intermediary persists. This means that after every persist, the amount of `maxBytesInMemory` until next persist will decreases, and task will fail when the sum of bytes of all intermediary persisted artifacts exceeds `maxBytesInMemory`.<br /><br />Setting maxBytesInMemory to -1 disables this check, meaning Druid will rely entirely on maxRowsInMemory to control memory usage. Setting it to zero means the default value will be used (one-sixth of JVM heap size).<br /><br />Note that the estimate of memory usage is designed to be an overestimate, and can be especially high when using complex ingest-time aggregators, including sketches. If this causes your indexing workloads to persist to disk too often, you can set maxBytesInMemory to -1 and rely on maxRowsInMemory instead.|One-sixth of max JVM heap size|
|
||||
|skipBytesInMemoryOverheadCheck|The calculation of maxBytesInMemory takes into account overhead objects created during ingestion and each intermediate persist. Setting this to true can exclude the bytes of these overhead objects from maxBytesInMemory check.|false|
|
||||
|indexSpec|Tune how data is indexed. See below for more information.|See table below|
|
||||
|Other properties|Each ingestion method has its own list of additional tuning properties. See the documentation for each method for a full list: [Kafka indexing service](../development/extensions-core/kafka-ingestion.md#tuningconfig), [Kinesis indexing service](../development/extensions-core/kinesis-ingestion.md#tuningconfig), [Native batch](native-batch.md#tuningconfig), and [Hadoop-based](hadoop.md#tuningconfig).||
|
||||
|
||||
|
|
|
@ -203,7 +203,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|
|||
|type|The task type, this should always be `index_parallel`.|none|yes|
|
||||
|maxRowsPerSegment|Deprecated. Use `partitionsSpec` instead. Used in sharding. Determines how many rows are in each segment.|5000000|no|
|
||||
|maxRowsInMemory|Used in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|1000000|no|
|
||||
|maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no|
|
||||
|maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists). Note that `maxBytesInMemory` also includes heap usage of artifacts created from intermediary persists. This means that after every persist, the amount of `maxBytesInMemory` until next persist will decreases, and task will fail when the sum of bytes of all intermediary persisted artifacts exceeds `maxBytesInMemory`.|1/6 of max JVM memory|no|
|
||||
|maxColumnsToMerge|A parameter that limits how many segments can be merged in a single phase when merging segments for publishing. This limit is imposed on the total number of columns present in a set of segments being merged. If the limit is exceeded, segment merging will occur in multiple phases. At least 2 segments will be merged in a single phase, regardless of this setting.|-1 (unlimited)|no|
|
||||
|maxTotalRows|Deprecated. Use `partitionsSpec` instead. Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no|
|
||||
|numShards|Deprecated. Use `partitionsSpec` instead. Directly specify the number of shards to create when using a `hashed` `partitionsSpec`. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no|
|
||||
|
@ -729,7 +729,7 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|
|||
|type|The task type, this should always be "index".|none|yes|
|
||||
|maxRowsPerSegment|Deprecated. Use `partitionsSpec` instead. Used in sharding. Determines how many rows are in each segment.|5000000|no|
|
||||
|maxRowsInMemory|Used in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|1000000|no|
|
||||
|maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no|
|
||||
|maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists). Note that `maxBytesInMemory` also includes heap usage of artifacts created from intermediary persists. This means that after every persist, the amount of `maxBytesInMemory` until next persist will decreases, and task will fail when the sum of bytes of all intermediary persisted artifacts exceeds `maxBytesInMemory`.|1/6 of max JVM memory|no|
|
||||
|maxTotalRows|Deprecated. Use `partitionsSpec` instead. Total number of rows in segments waiting for being pushed. Used in determining when intermediate pushing should occur.|20000000|no|
|
||||
|numShards|Deprecated. Use `partitionsSpec` instead. Directly specify the number of shards to create. If this is specified and `intervals` is specified in the `granularitySpec`, the index task can skip the determine intervals/partitions pass through the data. `numShards` cannot be specified if `maxRowsPerSegment` is set.|null|no|
|
||||
|partitionDimensions|Deprecated. Use `partitionsSpec` instead. The dimensions to partition on. Leave blank to select all dimensions. Only used with `forceGuaranteedRollup` = true, will be ignored otherwise.|null|no|
|
||||
|
|
|
@ -37,6 +37,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
|
|||
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
|
||||
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
|
||||
@JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
|
||||
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
|
||||
@JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
|
||||
|
@ -60,6 +61,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
|
|||
appendableIndexSpec,
|
||||
maxRowsInMemory,
|
||||
maxBytesInMemory,
|
||||
skipBytesInMemoryOverheadCheck,
|
||||
maxRowsPerSegment,
|
||||
maxTotalRows,
|
||||
intermediatePersistPeriod,
|
||||
|
@ -87,6 +89,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
|
|||
getAppendableIndexSpec(),
|
||||
getMaxRowsInMemory(),
|
||||
getMaxBytesInMemory(),
|
||||
isSkipBytesInMemoryOverheadCheck(),
|
||||
getMaxRowsPerSegment(),
|
||||
getMaxTotalRows(),
|
||||
getIntermediatePersistPeriod(),
|
||||
|
@ -115,6 +118,7 @@ public class KafkaIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningCon
|
|||
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
|
||||
", maxTotalRows=" + getMaxTotalRows() +
|
||||
", maxBytesInMemory=" + getMaxBytesInMemory() +
|
||||
", skipBytesInMemoryOverheadCheck=" + isSkipBytesInMemoryOverheadCheck() +
|
||||
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
|
||||
", basePersistDirectory=" + getBasePersistDirectory() +
|
||||
", maxPendingPersists=" + getMaxPendingPersists() +
|
||||
|
|
|
@ -68,6 +68,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
@ -76,6 +77,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
|
|||
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
|
||||
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
|
||||
@JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
|
||||
@JsonProperty("maxTotalRows") Long maxTotalRows,
|
||||
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
|
||||
|
@ -105,6 +107,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
|
|||
appendableIndexSpec,
|
||||
maxRowsInMemory,
|
||||
maxBytesInMemory,
|
||||
skipBytesInMemoryOverheadCheck,
|
||||
maxRowsPerSegment,
|
||||
maxTotalRows,
|
||||
intermediatePersistPeriod,
|
||||
|
@ -197,6 +200,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
|
|||
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
|
||||
", maxTotalRows=" + getMaxTotalRows() +
|
||||
", maxBytesInMemory=" + getMaxBytesInMemoryOrDefault() +
|
||||
", skipBytesInMemoryOverheadCheck=" + isSkipBytesInMemoryOverheadCheck() +
|
||||
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
|
||||
", basePersistDirectory=" + getBasePersistDirectory() +
|
||||
", maxPendingPersists=" + getMaxPendingPersists() +
|
||||
|
@ -225,6 +229,7 @@ public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
|
|||
getAppendableIndexSpec(),
|
||||
getMaxRowsInMemory(),
|
||||
getMaxBytesInMemory(),
|
||||
isSkipBytesInMemoryOverheadCheck(),
|
||||
getMaxRowsPerSegment(),
|
||||
getMaxTotalRows(),
|
||||
getIntermediatePersistPeriod(),
|
||||
|
|
|
@ -2674,6 +2674,7 @@ public class KafkaIndexTaskTest extends SeekableStreamIndexTaskTestBase
|
|||
null,
|
||||
1000,
|
||||
null,
|
||||
null,
|
||||
maxRowsPerSegment,
|
||||
maxTotalRows,
|
||||
new Period("P1Y"),
|
||||
|
|
|
@ -123,6 +123,7 @@ public class KafkaIndexTaskTuningConfigTest
|
|||
null,
|
||||
1,
|
||||
null,
|
||||
null,
|
||||
2,
|
||||
10L,
|
||||
new Period("PT3S"),
|
||||
|
@ -168,6 +169,7 @@ public class KafkaIndexTaskTuningConfigTest
|
|||
null,
|
||||
1,
|
||||
null,
|
||||
null,
|
||||
2,
|
||||
10L,
|
||||
new Period("PT3S"),
|
||||
|
@ -218,6 +220,7 @@ public class KafkaIndexTaskTuningConfigTest
|
|||
null,
|
||||
1,
|
||||
null,
|
||||
null,
|
||||
2,
|
||||
10L,
|
||||
new Period("PT3S"),
|
||||
|
|
|
@ -272,6 +272,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
),
|
||||
null
|
||||
|
@ -3075,6 +3076,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
null,
|
||||
1000,
|
||||
null,
|
||||
null,
|
||||
50000,
|
||||
null,
|
||||
new Period("P1Y"),
|
||||
|
@ -3115,6 +3117,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
null,
|
||||
42, // This is different
|
||||
null,
|
||||
null,
|
||||
50000,
|
||||
null,
|
||||
new Period("P1Y"),
|
||||
|
@ -3411,6 +3414,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
null,
|
||||
1000,
|
||||
null,
|
||||
null,
|
||||
50000,
|
||||
null,
|
||||
new Period("P1Y"),
|
||||
|
@ -3522,6 +3526,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
|
|||
null,
|
||||
1000,
|
||||
null,
|
||||
null,
|
||||
50000,
|
||||
null,
|
||||
new Period("P1Y"),
|
||||
|
|
|
@ -41,6 +41,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuning
|
|||
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
|
||||
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
|
||||
@JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
|
||||
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
|
||||
@JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
|
||||
|
@ -65,6 +66,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends KafkaIndexTaskTuning
|
|||
appendableIndexSpec,
|
||||
maxRowsInMemory,
|
||||
maxBytesInMemory,
|
||||
skipBytesInMemoryOverheadCheck,
|
||||
maxRowsPerSegment,
|
||||
maxTotalRows,
|
||||
intermediatePersistPeriod,
|
||||
|
|
|
@ -54,6 +54,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
|
|||
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
|
||||
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
|
||||
@JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
|
||||
@JsonProperty("maxTotalRows") Long maxTotalRows,
|
||||
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
|
||||
|
@ -83,6 +84,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
|
|||
appendableIndexSpec,
|
||||
maxRowsInMemory,
|
||||
maxBytesInMemory,
|
||||
skipBytesInMemoryOverheadCheck,
|
||||
maxRowsPerSegment,
|
||||
maxTotalRows,
|
||||
intermediatePersistPeriod,
|
||||
|
@ -160,6 +162,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
|
|||
getAppendableIndexSpec(),
|
||||
getMaxRowsInMemory(),
|
||||
getMaxBytesInMemory(),
|
||||
isSkipBytesInMemoryOverheadCheck(),
|
||||
getMaxRowsPerSegment(),
|
||||
getMaxTotalRows(),
|
||||
getIntermediatePersistPeriod(),
|
||||
|
@ -227,6 +230,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
|
|||
return "KinesisIndexTaskTuningConfig{" +
|
||||
"maxRowsInMemory=" + getMaxRowsInMemory() +
|
||||
", maxBytesInMemory=" + getMaxBytesInMemory() +
|
||||
", skipBytesInMemoryOverheadCheck=" + isSkipBytesInMemoryOverheadCheck() +
|
||||
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
|
||||
", maxTotalRows=" + getMaxTotalRows() +
|
||||
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
|
||||
|
|
|
@ -77,6 +77,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
@ -85,6 +86,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
|
|||
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
|
||||
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
|
||||
@JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
|
||||
@JsonProperty("maxTotalRows") Long maxTotalRows,
|
||||
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
|
||||
|
@ -121,6 +123,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
|
|||
appendableIndexSpec,
|
||||
maxRowsInMemory,
|
||||
maxBytesInMemory,
|
||||
skipBytesInMemoryOverheadCheck,
|
||||
maxRowsPerSegment,
|
||||
maxTotalRows,
|
||||
intermediatePersistPeriod,
|
||||
|
@ -218,6 +221,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
|
|||
return "KinesisSupervisorTuningConfig{" +
|
||||
"maxRowsInMemory=" + getMaxRowsInMemory() +
|
||||
", maxBytesInMemory=" + getMaxBytesInMemory() +
|
||||
", skipBytesInMemoryOverheadCheck=" + isSkipBytesInMemoryOverheadCheck() +
|
||||
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
|
||||
", maxTotalRows=" + getMaxTotalRows() +
|
||||
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
|
||||
|
@ -255,6 +259,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
|
|||
getAppendableIndexSpec(),
|
||||
getMaxRowsInMemory(),
|
||||
getMaxBytesInMemory(),
|
||||
isSkipBytesInMemoryOverheadCheck(),
|
||||
getMaxRowsPerSegment(),
|
||||
getMaxTotalRows(),
|
||||
getIntermediatePersistPeriod(),
|
||||
|
|
|
@ -73,6 +73,7 @@ public class KinesisIndexTaskSerdeTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
private static final KinesisIndexTaskIOConfig IO_CONFIG = new KinesisIndexTaskIOConfig(
|
||||
|
|
|
@ -2758,6 +2758,7 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
|
|||
null,
|
||||
maxRowsInMemory,
|
||||
null,
|
||||
null,
|
||||
maxRowsPerSegment,
|
||||
maxTotalRows,
|
||||
new Period("P1Y"),
|
||||
|
|
|
@ -144,6 +144,7 @@ public class KinesisIndexTaskTuningConfigTest
|
|||
null,
|
||||
1,
|
||||
3L,
|
||||
null,
|
||||
2,
|
||||
100L,
|
||||
new Period("PT3S"),
|
||||
|
@ -205,6 +206,7 @@ public class KinesisIndexTaskTuningConfigTest
|
|||
null,
|
||||
1,
|
||||
3L,
|
||||
null,
|
||||
2,
|
||||
100L,
|
||||
new Period("PT3S"),
|
||||
|
@ -294,6 +296,7 @@ public class KinesisIndexTaskTuningConfigTest
|
|||
null,
|
||||
1,
|
||||
(long) 3,
|
||||
null,
|
||||
2,
|
||||
100L,
|
||||
new Period("PT3S"),
|
||||
|
|
|
@ -171,6 +171,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
|
|||
null,
|
||||
1000,
|
||||
null,
|
||||
null,
|
||||
50000,
|
||||
null,
|
||||
new Period("P1Y"),
|
||||
|
@ -3694,6 +3695,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
|
|||
null,
|
||||
1000,
|
||||
null,
|
||||
null,
|
||||
50000,
|
||||
null,
|
||||
new Period("P1Y"),
|
||||
|
@ -4747,6 +4749,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
|
|||
null,
|
||||
1000,
|
||||
null,
|
||||
null,
|
||||
50000,
|
||||
null,
|
||||
new Period("P1Y"),
|
||||
|
|
|
@ -41,6 +41,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
|
|||
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
|
||||
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
|
||||
@JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
|
||||
@JsonProperty("maxTotalRows") Long maxTotalRows,
|
||||
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
|
||||
|
@ -71,6 +72,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
|
|||
appendableIndexSpec,
|
||||
maxRowsInMemory,
|
||||
maxBytesInMemory,
|
||||
skipBytesInMemoryOverheadCheck,
|
||||
maxRowsPerSegment,
|
||||
maxTotalRows,
|
||||
intermediatePersistPeriod,
|
||||
|
@ -104,6 +106,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
|
|||
base.getAppendableIndexSpec(),
|
||||
base.getMaxRowsInMemory(),
|
||||
base.getMaxBytesInMemory(),
|
||||
base.isSkipBytesInMemoryOverheadCheck(),
|
||||
base.getMaxRowsPerSegment(),
|
||||
base.getMaxTotalRows(),
|
||||
base.getIntermediatePersistPeriod(),
|
||||
|
|
|
@ -56,6 +56,7 @@ public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig
|
|||
private final AppendableIndexSpec appendableIndexSpec;
|
||||
private final int maxRowsInMemory;
|
||||
private final long maxBytesInMemory;
|
||||
private final boolean skipBytesInMemoryOverheadCheck;
|
||||
private final DynamicPartitionsSpec partitionsSpec;
|
||||
private final Period intermediatePersistPeriod;
|
||||
private final File basePersistDirectory;
|
||||
|
@ -78,6 +79,7 @@ public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig
|
|||
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
|
||||
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
|
||||
@JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
|
||||
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
|
||||
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
|
||||
|
@ -100,6 +102,8 @@ public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig
|
|||
// initializing this to 0, it will be lazily initialized to a value
|
||||
// @see #getMaxBytesInMemoryOrDefault()
|
||||
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
|
||||
this.skipBytesInMemoryOverheadCheck = skipBytesInMemoryOverheadCheck == null ?
|
||||
DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK : skipBytesInMemoryOverheadCheck;
|
||||
this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment, maxTotalRows);
|
||||
this.intermediatePersistPeriod = intermediatePersistPeriod == null
|
||||
? DEFAULT_INTERMEDIATE_PERSIST_PERIOD
|
||||
|
@ -159,6 +163,13 @@ public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig
|
|||
return maxBytesInMemory;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@Override
|
||||
public boolean isSkipBytesInMemoryOverheadCheck()
|
||||
{
|
||||
return skipBytesInMemoryOverheadCheck;
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public Integer getMaxRowsPerSegment()
|
||||
|
@ -273,6 +284,7 @@ public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig
|
|||
appendableIndexSpec,
|
||||
maxRowsInMemory,
|
||||
maxBytesInMemory,
|
||||
skipBytesInMemoryOverheadCheck,
|
||||
partitionsSpec.getMaxRowsPerSegment(),
|
||||
partitionsSpec.getMaxTotalRows(),
|
||||
intermediatePersistPeriod,
|
||||
|
|
|
@ -222,6 +222,7 @@ public class CompactionTask extends AbstractBatchIndexTask
|
|||
indexTuningConfig.getAppendableIndexSpec(),
|
||||
indexTuningConfig.getMaxRowsPerSegment(),
|
||||
indexTuningConfig.getMaxBytesInMemory(),
|
||||
indexTuningConfig.isSkipBytesInMemoryOverheadCheck(),
|
||||
indexTuningConfig.getMaxTotalRows(),
|
||||
indexTuningConfig.getNumShards(),
|
||||
null,
|
||||
|
|
|
@ -1123,6 +1123,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
|
|||
private final AppendableIndexSpec appendableIndexSpec;
|
||||
private final int maxRowsInMemory;
|
||||
private final long maxBytesInMemory;
|
||||
private final boolean skipBytesInMemoryOverheadCheck;
|
||||
private final int maxColumnsToMerge;
|
||||
|
||||
// null if all partitionsSpec related params are null. see getDefaultPartitionsSpec() for details.
|
||||
|
@ -1196,6 +1197,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
|
|||
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
|
||||
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
|
||||
@JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows,
|
||||
@JsonProperty("rowFlushBoundary") @Deprecated @Nullable Integer rowFlushBoundary_forBackCompatibility,
|
||||
@JsonProperty("numShards") @Deprecated @Nullable Integer numShards,
|
||||
|
@ -1220,6 +1222,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
|
|||
appendableIndexSpec,
|
||||
maxRowsInMemory != null ? maxRowsInMemory : rowFlushBoundary_forBackCompatibility,
|
||||
maxBytesInMemory != null ? maxBytesInMemory : 0,
|
||||
skipBytesInMemoryOverheadCheck != null ? skipBytesInMemoryOverheadCheck : DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK,
|
||||
getPartitionsSpec(
|
||||
forceGuaranteedRollup == null ? DEFAULT_GUARANTEE_ROLLUP : forceGuaranteedRollup,
|
||||
partitionsSpec,
|
||||
|
@ -1250,13 +1253,14 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
|
|||
|
||||
private IndexTuningConfig()
|
||||
{
|
||||
this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
|
||||
this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
|
||||
}
|
||||
|
||||
private IndexTuningConfig(
|
||||
@Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
@Nullable Integer maxRowsInMemory,
|
||||
@Nullable Long maxBytesInMemory,
|
||||
@Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@Nullable PartitionsSpec partitionsSpec,
|
||||
@Nullable IndexSpec indexSpec,
|
||||
@Nullable IndexSpec indexSpecForIntermediatePersists,
|
||||
|
@ -1277,6 +1281,8 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
|
|||
// initializing this to 0, it will be lazily initialized to a value
|
||||
// @see #getMaxBytesInMemoryOrDefault()
|
||||
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
|
||||
this.skipBytesInMemoryOverheadCheck = skipBytesInMemoryOverheadCheck == null ?
|
||||
DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK : skipBytesInMemoryOverheadCheck;
|
||||
this.maxColumnsToMerge = maxColumnsToMerge == null
|
||||
? IndexMerger.UNLIMITED_MAX_COLUMNS_TO_MERGE
|
||||
: maxColumnsToMerge;
|
||||
|
@ -1317,6 +1323,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
|
|||
appendableIndexSpec,
|
||||
maxRowsInMemory,
|
||||
maxBytesInMemory,
|
||||
skipBytesInMemoryOverheadCheck,
|
||||
partitionsSpec,
|
||||
indexSpec,
|
||||
indexSpecForIntermediatePersists,
|
||||
|
@ -1339,6 +1346,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
|
|||
appendableIndexSpec,
|
||||
maxRowsInMemory,
|
||||
maxBytesInMemory,
|
||||
skipBytesInMemoryOverheadCheck,
|
||||
partitionsSpec,
|
||||
indexSpec,
|
||||
indexSpecForIntermediatePersists,
|
||||
|
@ -1376,6 +1384,13 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
|
|||
return maxBytesInMemory;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@Override
|
||||
public boolean isSkipBytesInMemoryOverheadCheck()
|
||||
{
|
||||
return skipBytesInMemoryOverheadCheck;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@Nullable
|
||||
@Override
|
||||
|
@ -1549,6 +1564,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
|
|||
return Objects.equals(appendableIndexSpec, that.appendableIndexSpec) &&
|
||||
maxRowsInMemory == that.maxRowsInMemory &&
|
||||
maxBytesInMemory == that.maxBytesInMemory &&
|
||||
skipBytesInMemoryOverheadCheck == that.skipBytesInMemoryOverheadCheck &&
|
||||
maxColumnsToMerge == that.maxColumnsToMerge &&
|
||||
maxPendingPersists == that.maxPendingPersists &&
|
||||
forceGuaranteedRollup == that.forceGuaranteedRollup &&
|
||||
|
@ -1571,6 +1587,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
|
|||
appendableIndexSpec,
|
||||
maxRowsInMemory,
|
||||
maxBytesInMemory,
|
||||
skipBytesInMemoryOverheadCheck,
|
||||
maxColumnsToMerge,
|
||||
partitionsSpec,
|
||||
indexSpec,
|
||||
|
@ -1593,6 +1610,7 @@ public class IndexTask extends AbstractBatchIndexTask implements ChatHandler
|
|||
return "IndexTuningConfig{" +
|
||||
"maxRowsInMemory=" + maxRowsInMemory +
|
||||
", maxBytesInMemory=" + maxBytesInMemory +
|
||||
", skipBytesInMemoryOverheadCheck=" + skipBytesInMemoryOverheadCheck +
|
||||
", maxColumnsToMerge=" + maxColumnsToMerge +
|
||||
", partitionsSpec=" + partitionsSpec +
|
||||
", indexSpec=" + indexSpec +
|
||||
|
|
|
@ -976,6 +976,7 @@ public class ParallelIndexSupervisorTask extends AbstractBatchIndexTask implemen
|
|||
tuningConfig.getAppendableIndexSpec(),
|
||||
tuningConfig.getMaxRowsInMemory(),
|
||||
tuningConfig.getMaxBytesInMemory(),
|
||||
tuningConfig.isSkipBytesInMemoryOverheadCheck(),
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -100,6 +100,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
@ -111,6 +112,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
|
|||
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
|
||||
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
|
||||
@JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@JsonProperty("maxTotalRows") @Deprecated @Nullable Long maxTotalRows,
|
||||
@JsonProperty("numShards") @Deprecated @Nullable Integer numShards,
|
||||
@JsonProperty("splitHintSpec") @Nullable SplitHintSpec splitHintSpec,
|
||||
|
@ -142,6 +144,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
|
|||
appendableIndexSpec,
|
||||
maxRowsInMemory,
|
||||
maxBytesInMemory,
|
||||
skipBytesInMemoryOverheadCheck,
|
||||
maxTotalRows,
|
||||
null,
|
||||
numShards,
|
||||
|
@ -258,6 +261,7 @@ public class ParallelIndexTuningConfig extends IndexTuningConfig
|
|||
getAppendableIndexSpec(),
|
||||
getMaxRowsInMemory(),
|
||||
getMaxBytesInMemory(),
|
||||
isSkipBytesInMemoryOverheadCheck(),
|
||||
null,
|
||||
null,
|
||||
getSplitHintSpec(),
|
||||
|
|
|
@ -41,6 +41,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements Appenderato
|
|||
private final AppendableIndexSpec appendableIndexSpec;
|
||||
private final int maxRowsInMemory;
|
||||
private final long maxBytesInMemory;
|
||||
private final boolean skipBytesInMemoryOverheadCheck;
|
||||
private final DynamicPartitionsSpec partitionsSpec;
|
||||
private final Period intermediatePersistPeriod;
|
||||
private final File basePersistDirectory;
|
||||
|
@ -64,6 +65,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements Appenderato
|
|||
@Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
@Nullable Integer maxRowsInMemory,
|
||||
@Nullable Long maxBytesInMemory,
|
||||
@Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@Nullable Integer maxRowsPerSegment,
|
||||
@Nullable Long maxTotalRows,
|
||||
@Nullable Period intermediatePersistPeriod,
|
||||
|
@ -93,6 +95,8 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements Appenderato
|
|||
// initializing this to 0, it will be lazily initialized to a value
|
||||
// @see #getMaxBytesInMemoryOrDefault()
|
||||
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
|
||||
this.skipBytesInMemoryOverheadCheck = skipBytesInMemoryOverheadCheck == null ?
|
||||
DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK : skipBytesInMemoryOverheadCheck;
|
||||
this.intermediatePersistPeriod = intermediatePersistPeriod == null
|
||||
? defaults.getIntermediatePersistPeriod()
|
||||
: intermediatePersistPeriod;
|
||||
|
@ -155,6 +159,13 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements Appenderato
|
|||
return maxBytesInMemory;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@Override
|
||||
public boolean isSkipBytesInMemoryOverheadCheck()
|
||||
{
|
||||
return skipBytesInMemoryOverheadCheck;
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public Integer getMaxRowsPerSegment()
|
||||
|
@ -295,6 +306,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements Appenderato
|
|||
return Objects.equals(appendableIndexSpec, that.appendableIndexSpec) &&
|
||||
maxRowsInMemory == that.maxRowsInMemory &&
|
||||
maxBytesInMemory == that.maxBytesInMemory &&
|
||||
skipBytesInMemoryOverheadCheck == that.skipBytesInMemoryOverheadCheck &&
|
||||
maxPendingPersists == that.maxPendingPersists &&
|
||||
reportParseExceptions == that.reportParseExceptions &&
|
||||
handoffConditionTimeout == that.handoffConditionTimeout &&
|
||||
|
@ -319,6 +331,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig implements Appenderato
|
|||
appendableIndexSpec,
|
||||
maxRowsInMemory,
|
||||
maxBytesInMemory,
|
||||
skipBytesInMemoryOverheadCheck,
|
||||
partitionsSpec,
|
||||
intermediatePersistPeriod,
|
||||
basePersistDirectory,
|
||||
|
|
|
@ -1397,6 +1397,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest extends InitializedNullHand
|
|||
null,
|
||||
1000,
|
||||
null,
|
||||
null,
|
||||
maxRowsPerSegment,
|
||||
maxTotalRows,
|
||||
null,
|
||||
|
|
|
@ -209,6 +209,7 @@ public class ClientCompactionTaskQuerySerdeTest
|
|||
2000L,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new SegmentsSplitHintSpec(new HumanReadableBytes(100000L), 10),
|
||||
new DynamicPartitionsSpec(100, 30000L),
|
||||
new IndexSpec(
|
||||
|
|
|
@ -280,6 +280,7 @@ public class CompactionTaskRunTest extends IngestionTestBase
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new HashedPartitionsSpec(null, 3, null),
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -312,6 +312,7 @@ public class CompactionTaskTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new IndexSpec(
|
||||
new RoaringBitmapSerdeFactory(true),
|
||||
CompressionStrategy.LZ4,
|
||||
|
@ -448,6 +449,7 @@ public class CompactionTaskTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new IndexSpec(
|
||||
new RoaringBitmapSerdeFactory(true),
|
||||
CompressionStrategy.LZ4,
|
||||
|
@ -609,6 +611,7 @@ public class CompactionTaskTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new IndexSpec(
|
||||
new RoaringBitmapSerdeFactory(true),
|
||||
CompressionStrategy.LZ4,
|
||||
|
@ -674,6 +677,7 @@ public class CompactionTaskTest
|
|||
null,
|
||||
500000,
|
||||
1000000L,
|
||||
null,
|
||||
1000000L,
|
||||
null,
|
||||
null,
|
||||
|
@ -746,6 +750,7 @@ public class CompactionTaskTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new HashedPartitionsSpec(null, 3, null),
|
||||
new IndexSpec(
|
||||
new RoaringBitmapSerdeFactory(true),
|
||||
|
@ -1133,6 +1138,7 @@ public class CompactionTaskTest
|
|||
null,
|
||||
500000,
|
||||
1000000L,
|
||||
null,
|
||||
Long.MAX_VALUE,
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -66,6 +66,7 @@ public class IndexTaskSerdeTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new DynamicPartitionsSpec(1000, 2000L),
|
||||
new IndexSpec(
|
||||
new RoaringBitmapSerdeFactory(false),
|
||||
|
@ -101,6 +102,7 @@ public class IndexTaskSerdeTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new HashedPartitionsSpec(null, 10, ImmutableList.of("dim1", "dim2")),
|
||||
new IndexSpec(
|
||||
new RoaringBitmapSerdeFactory(false),
|
||||
|
@ -132,6 +134,7 @@ public class IndexTaskSerdeTest
|
|||
null,
|
||||
100,
|
||||
2000L,
|
||||
null,
|
||||
3000L,
|
||||
null,
|
||||
null,
|
||||
|
@ -169,6 +172,7 @@ public class IndexTaskSerdeTest
|
|||
2000L,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
10,
|
||||
ImmutableList.of("dim1", "dim2"),
|
||||
null,
|
||||
|
@ -208,6 +212,7 @@ public class IndexTaskSerdeTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new DynamicPartitionsSpec(1000, 2000L),
|
||||
new IndexSpec(
|
||||
new RoaringBitmapSerdeFactory(false),
|
||||
|
@ -244,6 +249,7 @@ public class IndexTaskSerdeTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new HashedPartitionsSpec(null, 10, ImmutableList.of("dim1", "dim2")),
|
||||
new IndexSpec(
|
||||
new RoaringBitmapSerdeFactory(false),
|
||||
|
|
|
@ -1124,6 +1124,7 @@ public class IndexTaskTest extends IngestionTestBase
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new HashedPartitionsSpec(2, null, null),
|
||||
INDEX_SPEC,
|
||||
null,
|
||||
|
@ -1254,6 +1255,7 @@ public class IndexTaskTest extends IngestionTestBase
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new DynamicPartitionsSpec(2, null),
|
||||
INDEX_SPEC,
|
||||
null,
|
||||
|
@ -1376,6 +1378,7 @@ public class IndexTaskTest extends IngestionTestBase
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new HashedPartitionsSpec(2, null, null),
|
||||
INDEX_SPEC,
|
||||
null,
|
||||
|
@ -1818,6 +1821,7 @@ public class IndexTaskTest extends IngestionTestBase
|
|||
null,
|
||||
maxRowsInMemory,
|
||||
maxBytesInMemory,
|
||||
null,
|
||||
maxTotalRows,
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -833,6 +833,7 @@ public class RealtimeIndexTaskTest extends InitializedNullHandlingTest
|
|||
null,
|
||||
1000,
|
||||
null,
|
||||
null,
|
||||
new Period("P1Y"),
|
||||
new Period("PT10M"),
|
||||
null,
|
||||
|
|
|
@ -253,6 +253,7 @@ public class TaskSerdeTest
|
|||
10,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
9999,
|
||||
null,
|
||||
null,
|
||||
|
@ -338,6 +339,7 @@ public class TaskSerdeTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new DynamicPartitionsSpec(10000, null),
|
||||
indexSpec,
|
||||
null,
|
||||
|
@ -403,6 +405,7 @@ public class TaskSerdeTest
|
|||
null,
|
||||
1,
|
||||
10L,
|
||||
null,
|
||||
new Period("PT10M"),
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -158,6 +158,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
2,
|
||||
null,
|
||||
null,
|
||||
|
@ -230,6 +231,7 @@ public class AbstractParallelIndexSupervisorTaskTest extends IngestionTestBase
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new MaxSizeSplitHintSpec(null, 1),
|
||||
partitionsSpec,
|
||||
null,
|
||||
|
|
|
@ -179,6 +179,7 @@ public class ParallelIndexSupervisorTaskKillTest extends AbstractParallelIndexSu
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
numTotalSubTasks,
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -428,6 +428,7 @@ public class ParallelIndexSupervisorTaskResourceTest extends AbstractParallelInd
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
NUM_SUB_TASKS,
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -253,6 +253,7 @@ public class ParallelIndexSupervisorTaskSerdeTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
partitionsSpec,
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -201,6 +201,7 @@ public class ParallelIndexSupervisorTaskTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new HashedPartitionsSpec(null, 10, null),
|
||||
new IndexSpec(
|
||||
new RoaringBitmapSerdeFactory(true),
|
||||
|
|
|
@ -161,6 +161,7 @@ class ParallelIndexTestingFactory
|
|||
null,
|
||||
3,
|
||||
4L,
|
||||
null,
|
||||
5L,
|
||||
6,
|
||||
null,
|
||||
|
|
|
@ -76,6 +76,7 @@ public class ParallelIndexTuningConfigTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new DynamicPartitionsSpec(100, 100L),
|
||||
new IndexSpec(
|
||||
new RoaringBitmapSerdeFactory(true),
|
||||
|
@ -120,6 +121,7 @@ public class ParallelIndexTuningConfigTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new DynamicPartitionsSpec(100, 100L),
|
||||
new IndexSpec(
|
||||
new RoaringBitmapSerdeFactory(true),
|
||||
|
@ -164,6 +166,7 @@ public class ParallelIndexTuningConfigTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new DynamicPartitionsSpec(100, 100L),
|
||||
new IndexSpec(
|
||||
new RoaringBitmapSerdeFactory(true),
|
||||
|
@ -210,6 +213,7 @@ public class ParallelIndexTuningConfigTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new DynamicPartitionsSpec(100, 100L),
|
||||
new IndexSpec(
|
||||
new RoaringBitmapSerdeFactory(true),
|
||||
|
@ -253,6 +257,7 @@ public class ParallelIndexTuningConfigTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new HashedPartitionsSpec(null, 10, null),
|
||||
new IndexSpec(
|
||||
new RoaringBitmapSerdeFactory(true),
|
||||
|
@ -296,6 +301,7 @@ public class ParallelIndexTuningConfigTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new SingleDimensionPartitionsSpec(100, null, "dim", false),
|
||||
new IndexSpec(
|
||||
new RoaringBitmapSerdeFactory(true),
|
||||
|
@ -339,6 +345,7 @@ public class ParallelIndexTuningConfigTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new DynamicPartitionsSpec(100, null),
|
||||
new IndexSpec(
|
||||
new RoaringBitmapSerdeFactory(true),
|
||||
|
|
|
@ -353,6 +353,7 @@ public class SinglePhaseParallelIndexingTest extends AbstractParallelIndexSuperv
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
1,
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -754,6 +754,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
indexSpec,
|
||||
null,
|
||||
3,
|
||||
|
@ -836,6 +837,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
indexSpec,
|
||||
null,
|
||||
3,
|
||||
|
@ -1263,6 +1265,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
indexSpec,
|
||||
null,
|
||||
null,
|
||||
|
@ -1372,6 +1375,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
indexSpec,
|
||||
null,
|
||||
3,
|
||||
|
@ -1479,6 +1483,7 @@ public class TaskLifecycleTest extends InitializedNullHandlingTest
|
|||
null,
|
||||
1000,
|
||||
null,
|
||||
null,
|
||||
new Period("P1Y"),
|
||||
null, //default window period of 10 minutes
|
||||
null, // base persist dir ignored by Realtime Index task
|
||||
|
|
|
@ -900,6 +900,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
null
|
||||
)
|
||||
{
|
||||
|
|
|
@ -21,7 +21,7 @@ DRUID_SERVICE=indexer
|
|||
DRUID_LOG_PATH=/shared/logs/indexer.log
|
||||
|
||||
# JAVA OPTS
|
||||
SERVICE_DRUID_JAVA_OPTS=-server -Xmx512m -Xms512m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5008
|
||||
SERVICE_DRUID_JAVA_OPTS=-server -Xmx1g -Xms512m -XX:+UseG1GC -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5008
|
||||
|
||||
# Druid configs
|
||||
druid_host=druid-indexer
|
||||
|
|
|
@ -68,6 +68,7 @@ public class RealtimeTuningConfig implements AppenderatorConfig
|
|||
DEFAULT_APPENDABLE_INDEX,
|
||||
DEFAULT_MAX_ROWS_IN_MEMORY,
|
||||
0L,
|
||||
DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK,
|
||||
DEFAULT_INTERMEDIATE_PERSIST_PERIOD,
|
||||
DEFAULT_WINDOW_PERIOD,
|
||||
basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory,
|
||||
|
@ -91,6 +92,7 @@ public class RealtimeTuningConfig implements AppenderatorConfig
|
|||
private final AppendableIndexSpec appendableIndexSpec;
|
||||
private final int maxRowsInMemory;
|
||||
private final long maxBytesInMemory;
|
||||
private final boolean skipBytesInMemoryOverheadCheck;
|
||||
private final Period intermediatePersistPeriod;
|
||||
private final Period windowPeriod;
|
||||
private final File basePersistDirectory;
|
||||
|
@ -115,6 +117,7 @@ public class RealtimeTuningConfig implements AppenderatorConfig
|
|||
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec,
|
||||
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
|
||||
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
|
||||
@JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck,
|
||||
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
|
||||
@JsonProperty("windowPeriod") Period windowPeriod,
|
||||
@JsonProperty("basePersistDirectory") File basePersistDirectory,
|
||||
|
@ -140,6 +143,8 @@ public class RealtimeTuningConfig implements AppenderatorConfig
|
|||
// initializing this to 0, it will be lazily initialized to a value
|
||||
// @see #getMaxBytesInMemoryOrDefault()
|
||||
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
|
||||
this.skipBytesInMemoryOverheadCheck = skipBytesInMemoryOverheadCheck == null ?
|
||||
DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK : skipBytesInMemoryOverheadCheck;
|
||||
this.intermediatePersistPeriod = intermediatePersistPeriod == null
|
||||
? DEFAULT_INTERMEDIATE_PERSIST_PERIOD
|
||||
: intermediatePersistPeriod;
|
||||
|
@ -191,6 +196,13 @@ public class RealtimeTuningConfig implements AppenderatorConfig
|
|||
return maxBytesInMemory;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@Override
|
||||
public boolean isSkipBytesInMemoryOverheadCheck()
|
||||
{
|
||||
return skipBytesInMemoryOverheadCheck;
|
||||
}
|
||||
|
||||
@Override
|
||||
@JsonProperty
|
||||
public Period getIntermediatePersistPeriod()
|
||||
|
@ -318,6 +330,7 @@ public class RealtimeTuningConfig implements AppenderatorConfig
|
|||
appendableIndexSpec,
|
||||
maxRowsInMemory,
|
||||
maxBytesInMemory,
|
||||
skipBytesInMemoryOverheadCheck,
|
||||
intermediatePersistPeriod,
|
||||
windowPeriod,
|
||||
basePersistDirectory,
|
||||
|
@ -345,6 +358,7 @@ public class RealtimeTuningConfig implements AppenderatorConfig
|
|||
appendableIndexSpec,
|
||||
maxRowsInMemory,
|
||||
maxBytesInMemory,
|
||||
skipBytesInMemoryOverheadCheck,
|
||||
intermediatePersistPeriod,
|
||||
windowPeriod,
|
||||
dir,
|
||||
|
|
|
@ -39,6 +39,7 @@ public interface TuningConfig
|
|||
int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
|
||||
int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
|
||||
int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
|
||||
boolean DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK = false;
|
||||
|
||||
/**
|
||||
* The incremental index implementation to use
|
||||
|
|
|
@ -20,12 +20,14 @@
|
|||
package org.apache.druid.segment.realtime;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Iterables;
|
||||
import org.apache.druid.java.util.common.ISE;
|
||||
import org.apache.druid.java.util.common.Pair;
|
||||
import org.apache.druid.segment.IncrementalIndexSegment;
|
||||
import org.apache.druid.segment.ReferenceCountingSegment;
|
||||
import org.apache.druid.segment.Segment;
|
||||
import org.apache.druid.segment.SegmentReference;
|
||||
import org.apache.druid.segment.StorageAdapter;
|
||||
import org.apache.druid.segment.incremental.IncrementalIndex;
|
||||
import org.apache.druid.timeline.SegmentId;
|
||||
import org.joda.time.Interval;
|
||||
|
@ -70,6 +72,26 @@ public class FireHydrant
|
|||
return adapter.get().getId();
|
||||
}
|
||||
|
||||
public int getSegmentNumDimensionColumns()
|
||||
{
|
||||
final Segment segment = adapter.get().getBaseSegment();
|
||||
if (segment != null) {
|
||||
final StorageAdapter storageAdapter = segment.asStorageAdapter();
|
||||
return storageAdapter.getAvailableDimensions().size();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
public int getSegmentNumMetricColumns()
|
||||
{
|
||||
final Segment segment = adapter.get().getBaseSegment();
|
||||
if (segment != null) {
|
||||
final StorageAdapter storageAdapter = segment.asStorageAdapter();
|
||||
return Iterables.size(storageAdapter.getAvailableMetrics());
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
public Interval getSegmentDataInterval()
|
||||
{
|
||||
return adapter.get().getDataInterval();
|
||||
|
|
|
@ -32,6 +32,8 @@ public interface AppenderatorConfig extends TuningConfig
|
|||
|
||||
int getMaxPendingPersists();
|
||||
|
||||
boolean isSkipBytesInMemoryOverheadCheck();
|
||||
|
||||
/**
|
||||
* Maximum number of rows in a single segment before pushing to deep storage
|
||||
*/
|
||||
|
|
|
@ -101,6 +101,15 @@ import java.util.stream.Collectors;
|
|||
|
||||
public class AppenderatorImpl implements Appenderator
|
||||
{
|
||||
// Rough estimate of memory footprint of a ColumnHolder based on actual heap dumps
|
||||
public static final int ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER = 1000;
|
||||
public static final int ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER = 700;
|
||||
public static final int ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER = 600;
|
||||
// Rough estimate of memory footprint of empty Sink based on actual heap dumps
|
||||
public static final int ROUGH_OVERHEAD_PER_SINK = 5000;
|
||||
// Rough estimate of memory footprint of empty FireHydrant based on actual heap dumps
|
||||
public static final int ROUGH_OVERHEAD_PER_HYDRANT = 1000;
|
||||
|
||||
private static final EmittingLogger log = new EmittingLogger(AppenderatorImpl.class);
|
||||
private static final int WARN_DELAY = 1000;
|
||||
private static final String IDENTIFIER_FILE_NAME = "identifier.json";
|
||||
|
@ -125,6 +134,7 @@ public class AppenderatorImpl implements Appenderator
|
|||
private final Set<SegmentIdWithShardSpec> droppingSinks = Sets.newConcurrentHashSet();
|
||||
private final VersionedIntervalTimeline<String, Sink> sinkTimeline;
|
||||
private final long maxBytesTuningConfig;
|
||||
private final boolean skipBytesInMemoryOverheadCheck;
|
||||
|
||||
private final QuerySegmentWalker texasRanger;
|
||||
// This variable updated in add(), persist(), and drop()
|
||||
|
@ -199,6 +209,7 @@ public class AppenderatorImpl implements Appenderator
|
|||
}
|
||||
|
||||
maxBytesTuningConfig = tuningConfig.getMaxBytesInMemoryOrDefault();
|
||||
skipBytesInMemoryOverheadCheck = tuningConfig.isSkipBytesInMemoryOverheadCheck();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -408,6 +419,7 @@ public class AppenderatorImpl implements Appenderator
|
|||
maxBytesTuningConfig,
|
||||
null
|
||||
);
|
||||
bytesCurrentlyInMemory.addAndGet(calculateSinkMemoryInUsed(retVal));
|
||||
|
||||
try {
|
||||
segmentAnnouncer.announceSegment(retVal.getSegment());
|
||||
|
@ -501,7 +513,7 @@ public class AppenderatorImpl implements Appenderator
|
|||
public ListenableFuture<Object> persistAll(@Nullable final Committer committer)
|
||||
{
|
||||
throwPersistErrorIfExists();
|
||||
|
||||
long bytesInMemoryBeforePersist = bytesCurrentlyInMemory.get();
|
||||
final Map<String, Integer> currentHydrants = new HashMap<>();
|
||||
final List<Pair<FireHydrant, SegmentIdWithShardSpec>> indexesToPersist = new ArrayList<>();
|
||||
int numPersistedRows = 0;
|
||||
|
@ -527,7 +539,13 @@ public class AppenderatorImpl implements Appenderator
|
|||
}
|
||||
|
||||
if (sink.swappable()) {
|
||||
// After swapping the sink, we use memory mapped segment instead. However, the memory mapped segment still consumes memory.
|
||||
// These memory mapped segments are held in memory throughout the ingestion phase and permanently add to the bytesCurrentlyInMemory
|
||||
int memoryStillInUse = calculateMMappedHydrantMemoryInUsed(sink.getCurrHydrant());
|
||||
bytesCurrentlyInMemory.addAndGet(memoryStillInUse);
|
||||
|
||||
indexesToPersist.add(Pair.of(sink.swap(), identifier));
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -617,6 +635,36 @@ public class AppenderatorImpl implements Appenderator
|
|||
// NB: The rows are still in memory until they're done persisting, but we only count rows in active indexes.
|
||||
rowsCurrentlyInMemory.addAndGet(-numPersistedRows);
|
||||
bytesCurrentlyInMemory.addAndGet(-bytesPersisted);
|
||||
|
||||
log.info("Persisted rows[%,d] and bytes[%,d]", numPersistedRows, bytesPersisted);
|
||||
|
||||
// bytesCurrentlyInMemory can change while persisting due to concurrent ingestion.
|
||||
// Hence, we use bytesInMemoryBeforePersist to determine the change of this persist
|
||||
if (!skipBytesInMemoryOverheadCheck && bytesInMemoryBeforePersist - bytesPersisted > maxBytesTuningConfig) {
|
||||
// We are still over maxBytesTuningConfig even after persisting.
|
||||
// This means that we ran out of all available memory to ingest (due to overheads created as part of ingestion)
|
||||
final String alertMessage = StringUtils.format(
|
||||
"Task has exceeded safe estimated heap usage limits, failing "
|
||||
+ "(numSinks: [%d] numHydrantsAcrossAllSinks: [%d] totalRows: [%d])",
|
||||
sinks.size(),
|
||||
sinks.values().stream().mapToInt(Iterables::size).sum(),
|
||||
getTotalRowCount()
|
||||
);
|
||||
final String errorMessage = StringUtils.format(
|
||||
"%s.\nThis can occur when the overhead from too many intermediary segment persists becomes to "
|
||||
+ "great to have enough space to process additional input rows. This check, along with metering the overhead "
|
||||
+ "of these objects to factor into the 'maxBytesInMemory' computation, can be disabled by setting "
|
||||
+ "'skipBytesInMemoryOverheadCheck' to 'true' (note that doing so might allow the task to naturally encounter "
|
||||
+ "a 'java.lang.OutOfMemoryError'). Alternatively, 'maxBytesInMemory' can be increased which will cause an "
|
||||
+ "increase in heap footprint, but will allow for more intermediary segment persists to occur before "
|
||||
+ "reaching this condition.",
|
||||
alertMessage
|
||||
);
|
||||
log.makeAlert(alertMessage)
|
||||
.addData("dataSource", schema.getDataSource())
|
||||
.emit();
|
||||
throw new RuntimeException(errorMessage);
|
||||
}
|
||||
return future;
|
||||
}
|
||||
|
||||
|
@ -1173,6 +1221,13 @@ public class AppenderatorImpl implements Appenderator
|
|||
// i.e. those that haven't been persisted for *InMemory counters, or pushed to deep storage for the total counter.
|
||||
rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory());
|
||||
bytesCurrentlyInMemory.addAndGet(-sink.getBytesInMemory());
|
||||
bytesCurrentlyInMemory.addAndGet(-calculateSinkMemoryInUsed(sink));
|
||||
for (FireHydrant hydrant : sink) {
|
||||
// Decrement memory used by all Memory Mapped Hydrant
|
||||
if (!hydrant.equals(sink.getCurrHydrant())) {
|
||||
bytesCurrentlyInMemory.addAndGet(-calculateMMappedHydrantMemoryInUsed(hydrant));
|
||||
}
|
||||
}
|
||||
totalRows.addAndGet(-sink.getNumRows());
|
||||
}
|
||||
|
||||
|
@ -1382,4 +1437,27 @@ public class AppenderatorImpl implements Appenderator
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
private int calculateMMappedHydrantMemoryInUsed(FireHydrant hydrant)
|
||||
{
|
||||
if (skipBytesInMemoryOverheadCheck) {
|
||||
return 0;
|
||||
}
|
||||
// These calculations are approximated from actual heap dumps.
|
||||
// Memory footprint includes count integer in FireHydrant, shorts in ReferenceCountingSegment,
|
||||
// Objects in SimpleQueryableIndex (such as SmooshedFileMapper, each ColumnHolder in column map, etc.)
|
||||
return Integer.BYTES + (4 * Short.BYTES) + ROUGH_OVERHEAD_PER_HYDRANT +
|
||||
(hydrant.getSegmentNumDimensionColumns() * ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER) +
|
||||
(hydrant.getSegmentNumMetricColumns() * ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
|
||||
ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER;
|
||||
}
|
||||
|
||||
private int calculateSinkMemoryInUsed(Sink sink)
|
||||
{
|
||||
if (skipBytesInMemoryOverheadCheck) {
|
||||
return 0;
|
||||
}
|
||||
// Rough estimate of memory footprint of empty Sink based on actual heap dumps
|
||||
return ROUGH_OVERHEAD_PER_SINK;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -412,6 +412,12 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
|
|||
return newMaxBytesInMemory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isSkipBytesInMemoryOverheadCheck()
|
||||
{
|
||||
return baseConfig.isSkipBytesInMemoryOverheadCheck();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxPendingPersists()
|
||||
{
|
||||
|
|
|
@ -68,6 +68,7 @@ public class AppenderatorPlumberTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new IntervalStartVersioningPolicy(),
|
||||
new NoopRejectionPolicyFactory(),
|
||||
null,
|
||||
|
|
|
@ -154,9 +154,18 @@ public class AppenderatorTest extends InitializedNullHandlingTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testMaxBytesInMemory() throws Exception
|
||||
public void testMaxBytesInMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
|
||||
{
|
||||
try (final AppenderatorTester tester = new AppenderatorTester(100, 1024, true)) {
|
||||
try (
|
||||
final AppenderatorTester tester = new AppenderatorTester(
|
||||
100,
|
||||
1024,
|
||||
null,
|
||||
true,
|
||||
new SimpleRowIngestionMeters(),
|
||||
true
|
||||
)
|
||||
) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
final AtomicInteger eventCount = new AtomicInteger(0);
|
||||
final Supplier<Committer> committerSupplier = () -> {
|
||||
|
@ -197,9 +206,18 @@ public class AppenderatorTest extends InitializedNullHandlingTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testMaxBytesInMemoryInMultipleSinks() throws Exception
|
||||
public void testMaxBytesInMemoryInMultipleSinksWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
|
||||
{
|
||||
try (final AppenderatorTester tester = new AppenderatorTester(100, 1024, true)) {
|
||||
try (
|
||||
final AppenderatorTester tester = new AppenderatorTester(
|
||||
100,
|
||||
1024,
|
||||
null,
|
||||
true,
|
||||
new SimpleRowIngestionMeters(),
|
||||
true
|
||||
)
|
||||
) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
final AtomicInteger eventCount = new AtomicInteger(0);
|
||||
final Supplier<Committer> committerSupplier = () -> {
|
||||
|
@ -236,6 +254,377 @@ public class AppenderatorTest extends InitializedNullHandlingTest
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxBytesInMemory() throws Exception
|
||||
{
|
||||
try (final AppenderatorTester tester = new AppenderatorTester(100, 10000, true)) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
final AtomicInteger eventCount = new AtomicInteger(0);
|
||||
final Supplier<Committer> committerSupplier = () -> {
|
||||
final Object metadata = ImmutableMap.of(eventCount, eventCount.get());
|
||||
|
||||
return new Committer()
|
||||
{
|
||||
@Override
|
||||
public Object getMetadata()
|
||||
{
|
||||
return metadata;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
//Do nothing
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
appenderator.startJob();
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
|
||||
// Still under maxSizeInBytes after the add. Hence, we do not persist yet
|
||||
//expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is enabled
|
||||
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
|
||||
int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
|
||||
int sinkSizeOverhead = 1 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK;
|
||||
// currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet
|
||||
Assert.assertEquals(
|
||||
currentInMemoryIndexSize,
|
||||
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
|
||||
);
|
||||
Assert.assertEquals(
|
||||
currentInMemoryIndexSize + sinkSizeOverhead,
|
||||
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
|
||||
);
|
||||
|
||||
// We do multiple more adds to the same sink to cause persist.
|
||||
for (int i = 0; i < 26; i++) {
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1), committerSupplier);
|
||||
}
|
||||
sinkSizeOverhead = 1 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK;
|
||||
// currHydrant size is 0 since we just persist all indexes to disk.
|
||||
currentInMemoryIndexSize = 0;
|
||||
// We are now over maxSizeInBytes after the add. Hence, we do a persist.
|
||||
// currHydrant in the sink has 0 bytesInMemory since we just did a persist
|
||||
Assert.assertEquals(
|
||||
currentInMemoryIndexSize,
|
||||
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
|
||||
);
|
||||
// Mapped index size is the memory still needed after we persisted indexes. Note that the segments have
|
||||
// 1 dimension columns, 2 metric column, 1 time column.
|
||||
int mappedIndexSize = 1012 + (2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
|
||||
AppenderatorImpl.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
|
||||
AppenderatorImpl.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER;
|
||||
Assert.assertEquals(
|
||||
currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
|
||||
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
|
||||
);
|
||||
|
||||
// Add a single row after persisted
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 1), committerSupplier);
|
||||
// currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet
|
||||
currentInMemoryIndexSize = 182 + nullHandlingOverhead;
|
||||
Assert.assertEquals(
|
||||
currentInMemoryIndexSize,
|
||||
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
|
||||
);
|
||||
Assert.assertEquals(
|
||||
currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
|
||||
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
|
||||
);
|
||||
|
||||
// We do multiple more adds to the same sink to cause persist.
|
||||
for (int i = 0; i < 5; i++) {
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1), committerSupplier);
|
||||
}
|
||||
// currHydrant size is 0 since we just persist all indexes to disk.
|
||||
currentInMemoryIndexSize = 0;
|
||||
// We are now over maxSizeInBytes after the add. Hence, we do a persist.
|
||||
// currHydrant in the sink has 0 bytesInMemory since we just did a persist
|
||||
Assert.assertEquals(
|
||||
currentInMemoryIndexSize,
|
||||
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
|
||||
);
|
||||
// Mapped index size is the memory still needed after we persisted indexes. Note that the segments have
|
||||
// 1 dimension columns, 2 metric column, 1 time column. However, we have two indexes now from the two pervious
|
||||
// persists.
|
||||
mappedIndexSize = 2 * (1012 + (2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
|
||||
AppenderatorImpl.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
|
||||
AppenderatorImpl.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER);
|
||||
Assert.assertEquals(
|
||||
currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
|
||||
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
|
||||
);
|
||||
appenderator.close();
|
||||
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory());
|
||||
}
|
||||
}
|
||||
|
||||
@Test(expected = RuntimeException.class)
|
||||
public void testTaskFailAsPersistCannotFreeAnyMoreMemory() throws Exception
|
||||
{
|
||||
try (final AppenderatorTester tester = new AppenderatorTester(100, 10, true)) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
final AtomicInteger eventCount = new AtomicInteger(0);
|
||||
final Supplier<Committer> committerSupplier = () -> {
|
||||
final Object metadata = ImmutableMap.of(eventCount, eventCount.get());
|
||||
|
||||
return new Committer()
|
||||
{
|
||||
@Override
|
||||
public Object getMetadata()
|
||||
{
|
||||
return metadata;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
//Do nothing
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
appenderator.startJob();
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTaskDoesNotFailAsExceededMemoryWithSkipBytesInMemoryOverheadCheckConfig() throws Exception
|
||||
{
|
||||
try (
|
||||
final AppenderatorTester tester = new AppenderatorTester(
|
||||
100,
|
||||
10,
|
||||
null,
|
||||
true,
|
||||
new SimpleRowIngestionMeters(),
|
||||
true
|
||||
)
|
||||
) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
final AtomicInteger eventCount = new AtomicInteger(0);
|
||||
final Supplier<Committer> committerSupplier = () -> {
|
||||
final Object metadata = ImmutableMap.of(eventCount, eventCount.get());
|
||||
|
||||
return new Committer()
|
||||
{
|
||||
@Override
|
||||
public Object getMetadata()
|
||||
{
|
||||
return metadata;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
//Do nothing
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
appenderator.startJob();
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
|
||||
// Expected 0 since we persisted after the add
|
||||
Assert.assertEquals(
|
||||
0,
|
||||
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
|
||||
);
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
|
||||
// Expected 0 since we persisted after the add
|
||||
Assert.assertEquals(
|
||||
0,
|
||||
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTaskCleanupInMemoryCounterAfterCloseWithRowInMemory() throws Exception
|
||||
{
|
||||
try (final AppenderatorTester tester = new AppenderatorTester(100, 10000, true)) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
final AtomicInteger eventCount = new AtomicInteger(0);
|
||||
final Supplier<Committer> committerSupplier = () -> {
|
||||
final Object metadata = ImmutableMap.of(eventCount, eventCount.get());
|
||||
|
||||
return new Committer()
|
||||
{
|
||||
@Override
|
||||
public Object getMetadata()
|
||||
{
|
||||
return metadata;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
//Do nothing
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
appenderator.startJob();
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
|
||||
|
||||
// Still under maxSizeInBytes after the add. Hence, we do not persist yet
|
||||
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
|
||||
int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
|
||||
int sinkSizeOverhead = 1 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK;
|
||||
Assert.assertEquals(
|
||||
currentInMemoryIndexSize + sinkSizeOverhead,
|
||||
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
|
||||
);
|
||||
|
||||
// Close with row still in memory (no persist)
|
||||
appenderator.close();
|
||||
|
||||
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxBytesInMemoryInMultipleSinks() throws Exception
|
||||
{
|
||||
try (final AppenderatorTester tester = new AppenderatorTester(100, 31100, true)) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
final AtomicInteger eventCount = new AtomicInteger(0);
|
||||
final Supplier<Committer> committerSupplier = () -> {
|
||||
final Object metadata = ImmutableMap.of(eventCount, eventCount.get());
|
||||
|
||||
return new Committer()
|
||||
{
|
||||
@Override
|
||||
public Object getMetadata()
|
||||
{
|
||||
return metadata;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run()
|
||||
{
|
||||
//Do nothing
|
||||
}
|
||||
};
|
||||
};
|
||||
|
||||
appenderator.startJob();
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
|
||||
|
||||
// Still under maxSizeInBytes after the add. Hence, we do not persist yet
|
||||
//expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 54 (dimsKeySize) = 182 + 1 byte when null handling is enabled
|
||||
int nullHandlingOverhead = NullHandling.sqlCompatible() ? 1 : 0;
|
||||
int currentInMemoryIndexSize = 182 + nullHandlingOverhead;
|
||||
int sinkSizeOverhead = 2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK;
|
||||
// currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet
|
||||
Assert.assertEquals(
|
||||
currentInMemoryIndexSize,
|
||||
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
|
||||
);
|
||||
Assert.assertEquals(
|
||||
currentInMemoryIndexSize,
|
||||
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
|
||||
);
|
||||
Assert.assertEquals(
|
||||
(2 * currentInMemoryIndexSize) + sinkSizeOverhead,
|
||||
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
|
||||
);
|
||||
|
||||
// We do multiple more adds to the same sink to cause persist.
|
||||
for (int i = 0; i < 49; i++) {
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar_" + i, 1), committerSupplier);
|
||||
}
|
||||
sinkSizeOverhead = 2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK;
|
||||
// currHydrant size is 0 since we just persist all indexes to disk.
|
||||
currentInMemoryIndexSize = 0;
|
||||
// We are now over maxSizeInBytes after the add. Hence, we do a persist.
|
||||
// currHydrant in the sink has 0 bytesInMemory since we just did a persist
|
||||
Assert.assertEquals(
|
||||
currentInMemoryIndexSize,
|
||||
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
|
||||
);
|
||||
Assert.assertEquals(
|
||||
currentInMemoryIndexSize,
|
||||
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
|
||||
);
|
||||
// Mapped index size is the memory still needed after we persisted indexes. Note that the segments have
|
||||
// 1 dimension columns, 2 metric column, 1 time column.
|
||||
int mappedIndexSize = 2 * (1012 + (2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
|
||||
AppenderatorImpl.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
|
||||
AppenderatorImpl.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER);
|
||||
Assert.assertEquals(
|
||||
currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
|
||||
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
|
||||
);
|
||||
|
||||
// Add a single row after persisted to sink 0
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 1), committerSupplier);
|
||||
// currHydrant in the sink still has > 0 bytesInMemory since we do not persist yet
|
||||
currentInMemoryIndexSize = 182 + nullHandlingOverhead;
|
||||
Assert.assertEquals(
|
||||
currentInMemoryIndexSize,
|
||||
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
|
||||
);
|
||||
Assert.assertEquals(
|
||||
0,
|
||||
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
|
||||
);
|
||||
Assert.assertEquals(
|
||||
currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
|
||||
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
|
||||
);
|
||||
// Now add a single row to sink 1
|
||||
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bob", 1), committerSupplier);
|
||||
Assert.assertEquals(
|
||||
currentInMemoryIndexSize,
|
||||
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
|
||||
);
|
||||
Assert.assertEquals(
|
||||
currentInMemoryIndexSize,
|
||||
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
|
||||
);
|
||||
Assert.assertEquals(
|
||||
(2 * currentInMemoryIndexSize) + sinkSizeOverhead + mappedIndexSize,
|
||||
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
|
||||
);
|
||||
|
||||
// We do multiple more adds to the both sink to cause persist.
|
||||
for (int i = 0; i < 34; i++) {
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bar_" + i, 1), committerSupplier);
|
||||
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar_" + i, 1), committerSupplier);
|
||||
}
|
||||
// currHydrant size is 0 since we just persist all indexes to disk.
|
||||
currentInMemoryIndexSize = 0;
|
||||
// We are now over maxSizeInBytes after the add. Hence, we do a persist.
|
||||
// currHydrant in the sink has 0 bytesInMemory since we just did a persist
|
||||
Assert.assertEquals(
|
||||
currentInMemoryIndexSize,
|
||||
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0))
|
||||
);
|
||||
Assert.assertEquals(
|
||||
currentInMemoryIndexSize,
|
||||
((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(1))
|
||||
);
|
||||
// Mapped index size is the memory still needed after we persisted indexes. Note that the segments have
|
||||
// 1 dimension columns, 2 metric column, 1 time column. However, we have two indexes now from the two pervious
|
||||
// persists.
|
||||
mappedIndexSize = 2 * (2 * (1012 + (2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_METRIC_COLUMN_HOLDER) +
|
||||
AppenderatorImpl.ROUGH_OVERHEAD_PER_DIMENSION_COLUMN_HOLDER +
|
||||
AppenderatorImpl.ROUGH_OVERHEAD_PER_TIME_COLUMN_HOLDER));
|
||||
Assert.assertEquals(
|
||||
currentInMemoryIndexSize + sinkSizeOverhead + mappedIndexSize,
|
||||
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
|
||||
);
|
||||
appenderator.close();
|
||||
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIgnoreMaxBytesInMemory() throws Exception
|
||||
{
|
||||
|
@ -273,8 +662,9 @@ public class AppenderatorTest extends InitializedNullHandlingTest
|
|||
);
|
||||
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
appenderator.add(IDENTIFIERS.get(1), ir("2000", "bar", 1), committerSupplier);
|
||||
int sinkSizeOverhead = 2 * AppenderatorImpl.ROUGH_OVERHEAD_PER_SINK;
|
||||
Assert.assertEquals(
|
||||
364 + 2 * nullHandlingOverhead,
|
||||
(364 + 2 * nullHandlingOverhead) + sinkSizeOverhead,
|
||||
((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory()
|
||||
);
|
||||
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
|
||||
|
@ -486,7 +876,7 @@ public class AppenderatorTest extends InitializedNullHandlingTest
|
|||
public void testVerifyRowIngestionMetrics() throws Exception
|
||||
{
|
||||
final RowIngestionMeters rowIngestionMeters = new SimpleRowIngestionMeters();
|
||||
try (final AppenderatorTester tester = new AppenderatorTester(5, 1000L, null, false, rowIngestionMeters)) {
|
||||
try (final AppenderatorTester tester = new AppenderatorTester(5, 10000L, null, false, rowIngestionMeters)) {
|
||||
final Appenderator appenderator = tester.getAppenderator();
|
||||
appenderator.startJob();
|
||||
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", "invalid_met"), Committers.nilSupplier());
|
||||
|
|
|
@ -124,7 +124,7 @@ public class AppenderatorTester implements AutoCloseable
|
|||
final boolean enablePushFailure
|
||||
)
|
||||
{
|
||||
this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory, enablePushFailure, new SimpleRowIngestionMeters());
|
||||
this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory, enablePushFailure, new SimpleRowIngestionMeters(), false);
|
||||
}
|
||||
|
||||
public AppenderatorTester(
|
||||
|
@ -134,6 +134,18 @@ public class AppenderatorTester implements AutoCloseable
|
|||
final boolean enablePushFailure,
|
||||
final RowIngestionMeters rowIngestionMeters
|
||||
)
|
||||
{
|
||||
this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory, enablePushFailure, rowIngestionMeters, false);
|
||||
}
|
||||
|
||||
public AppenderatorTester(
|
||||
final int maxRowsInMemory,
|
||||
final long maxSizeInBytes,
|
||||
final File basePersistDirectory,
|
||||
final boolean enablePushFailure,
|
||||
final RowIngestionMeters rowIngestionMeters,
|
||||
final boolean skipBytesInMemoryOverheadCheck
|
||||
)
|
||||
{
|
||||
objectMapper = new DefaultObjectMapper();
|
||||
objectMapper.registerSubtypes(LinearShardSpec.class);
|
||||
|
@ -165,6 +177,7 @@ public class AppenderatorTester implements AutoCloseable
|
|||
null,
|
||||
maxRowsInMemory,
|
||||
maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes,
|
||||
skipBytesInMemoryOverheadCheck,
|
||||
null,
|
||||
null,
|
||||
basePersistDirectory,
|
||||
|
|
|
@ -139,6 +139,7 @@ public class DefaultOfflineAppenderatorFactoryTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
temporaryFolder.newFolder(),
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -205,6 +205,7 @@ public class RealtimePlumberSchoolTest extends InitializedNullHandlingTest
|
|||
null,
|
||||
null,
|
||||
null,
|
||||
null,
|
||||
new IntervalStartVersioningPolicy(),
|
||||
rejectionPolicy,
|
||||
null,
|
||||
|
|
|
@ -68,6 +68,7 @@ public class SinkTest extends InitializedNullHandlingTest
|
|||
null,
|
||||
100,
|
||||
null,
|
||||
null,
|
||||
new Period("P1Y"),
|
||||
null,
|
||||
null,
|
||||
|
@ -224,6 +225,7 @@ public class SinkTest extends InitializedNullHandlingTest
|
|||
null,
|
||||
100,
|
||||
null,
|
||||
null,
|
||||
new Period("P1Y"),
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -159,6 +159,7 @@ public class DruidJsonValidatorTest
|
|||
null,
|
||||
1,
|
||||
null,
|
||||
null,
|
||||
new Period("PT10M"),
|
||||
null,
|
||||
null,
|
||||
|
|
|
@ -1797,6 +1797,7 @@ expr
|
|||
jackson-jq
|
||||
missingValue
|
||||
schemaless
|
||||
skipBytesInMemoryOverheadCheck
|
||||
spatialDimensions
|
||||
useFieldDiscovery
|
||||
- ../docs/tutorials/index.md
|
||||
|
|
Loading…
Reference in New Issue