'maxBytesInMemory' tuningConfig introduced for ingestion tasks (#5583)

* This commit introduces a new tuning config called 'maxBytesInMemory' for ingestion tasks

Currently a config called 'maxRowsInMemory' is present which affects how much memory gets
used for indexing.If this value is not optimal for your JVM heap size, it could lead
to OutOfMemoryError sometimes. A lower value will lead to frequent persists which might
be bad for query performance and a higher value will limit number of persists but require
more jvm heap space and could lead to OOM.
'maxBytesInMemory' is an attempt to solve this problem. It limits the total number of bytes
kept in memory before persisting.

 * The default value is 1/3(Runtime.maxMemory())
 * To maintain the current behaviour set 'maxBytesInMemory' to -1
 * If both 'maxRowsInMemory' and 'maxBytesInMemory' are present, both of them
   will be respected i.e. the first one to go above threshold will trigger persist

* Fix check style and remove a comment

* Add overlord unsecured paths to coordinator when using combined service (#5579)

* Add overlord unsecured paths to coordinator when using combined service

* PR comment

* More error reporting and stats for ingestion tasks (#5418)

* Add more indexing task status and error reporting

* PR comments, add support in AppenderatorDriverRealtimeIndexTask

* Use TaskReport instead of metrics/context

* Fix tests

* Use TaskReport uploads

* Refactor fire department metrics retrieval

* Refactor input row serde in hadoop task

* Refactor hadoop task loader names

* Truncate error message in TaskStatus, add errorMsg to task report

* PR comments

* Allow getDomain to return disjointed intervals (#5570)

* Allow getDomain to return disjointed intervals

* Indentation issues

* Adding feature thetaSketchConstant to do some set operation in PostAgg (#5551)

* Adding feature thetaSketchConstant to do some set operation in PostAggregator

* Updated review comments for PR #5551 - Adding thetaSketchConstant

* Fixed CI build issue

* Updated review comments 2 for PR #5551 - Adding thetaSketchConstant

* Fix taskDuration docs for KafkaIndexingService (#5572)

* With incremental handoff the changed line is no longer true.

* Add doc for automatic pendingSegments (#5565)

* Add missing doc for automatic pendingSegments

* address comments

* Fix indexTask to respect forceExtendableShardSpecs (#5509)

* Fix indexTask to respect forceExtendableShardSpecs

* add comments

* Deprecate spark2 profile in pom.xml (#5581)

Deprecated due to https://github.com/druid-io/druid/pull/5382

* CompressionUtils: Add support for decompressing xz, bz2, zip. (#5586)

Also switch various firehoses to the new method.

Fixes #5585.

* This commit introduces a new tuning config called 'maxBytesInMemory' for ingestion tasks

Currently a config called 'maxRowsInMemory' is present which affects how much memory gets
used for indexing.If this value is not optimal for your JVM heap size, it could lead
to OutOfMemoryError sometimes. A lower value will lead to frequent persists which might
be bad for query performance and a higher value will limit number of persists but require
more jvm heap space and could lead to OOM.
'maxBytesInMemory' is an attempt to solve this problem. It limits the total number of bytes
kept in memory before persisting.

 * The default value is 1/3(Runtime.maxMemory())
 * To maintain the current behaviour set 'maxBytesInMemory' to -1
 * If both 'maxRowsInMemory' and 'maxBytesInMemory' are present, both of them
   will be respected i.e. the first one to go above threshold will trigger persist

* Address code review comments

* Fix the coding style according to druid conventions
* Add more javadocs
* Rename some variables/methods
* Other minor issues

* Address more code review comments

* Some refactoring to put defaults in IndexTaskUtils
* Added check for maxBytesInMemory in AppenderatorImpl
* Decrement bytes in abandonSegment
* Test unit test for multiple sinks in single appenderator
* Fix some merge conflicts after rebase

* Fix some style checks

* Merge conflicts

* Fix failing tests

Add back check for 0 maxBytesInMemory in OnHeapIncrementalIndex

* Address PR comments

* Put defaults for maxRows and maxBytes in TuningConfig
* Change/add javadocs
* Refactoring and renaming some variables/methods

* Fix TeamCity inspection warnings

* Added maxBytesInMemory config to HadoopTuningConfig

* Updated the docs and examples

* Added maxBytesInMemory config in docs
* Removed references to maxRowsInMemory under tuningConfig in examples

* Set maxBytesInMemory to 0 until used

Set the maxBytesInMemory to 0 if user does not set it as part of tuningConfing
and set to part of max jvm memory when ingestion task starts

* Update toString in KafkaSupervisorTuningConfig

* Use correct maxBytesInMemory value in AppenderatorImpl

* Update DEFAULT_MAX_BYTES_IN_MEMORY to 1/6 max jvm memory

Experimenting with various defaults, 1/3 jvm memory causes OOM

* Update docs to correct maxBytesInMemory default value

* Minor to rename and add comment

* Add more details in docs

* Address new PR comments

* Address PR comments

* Fix spelling typo
This commit is contained in:
Surekha 2018-05-03 16:25:58 -07:00 committed by Gian Merlino
parent 739e347320
commit 13c616ba24
64 changed files with 778 additions and 73 deletions

View File

@ -115,7 +115,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|`type`|String|The indexing task type, this should always be `kafka`.|yes|
|`maxRowsInMemory`|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 75000)|
|`maxRowsInMemory`|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists). Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|no (default == 1000000)|
|`maxBytesInMemory`|Long|The number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. Normally this is computed internally and user does not need to set it. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists). |no (default == One-sixth of max JVM memory)|
|`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == 5000000)|
|`intermediatePersistPeriod`|ISO8601 Period|The period that determines the rate at which intermediate persists occur.|no (default == PT10M)|
|`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If this limit would be exceeded by a new intermediate persist, ingestion will block until the currently-running persist finishes. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 0, meaning one persist can be running concurrently with ingestion, and none can be queued up)|

View File

@ -154,7 +154,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|workingPath|String|The working path to use for intermediate results (results between Hadoop jobs).|no (default == '/tmp/druid-indexing')|
|version|String|The version of created segments. Ignored for HadoopIndexTask unless useExplicitVersion is set to true|no (default == datetime that indexing starts at)|
|partitionsSpec|Object|A specification of how to partition each time bucket into segments. Absence of this property means no partitioning will occur. See 'Partitioning specification' below.|no (default == 'hashed')|
|maxRowsInMemory|Integer|The number of rows to aggregate before persisting. Note that this is the number of post-aggregation rows which may not be equal to the number of input events due to roll-up. This is used to manage the required JVM heap size.|no (default == 75000)|
|maxRowsInMemory|Integer|The number of rows to aggregate before persisting. Note that this is the number of post-aggregation rows which may not be equal to the number of input events due to roll-up. This is used to manage the required JVM heap size. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|no (default == 1000000)|
|maxBytesInMemory|Long|The number of bytes to aggregate in heap memory before persisting. Normally this is computed internally and user does not need to set it. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists).|no (default == One-sixth of max JVM memory)|
|leaveIntermediate|Boolean|Leave behind intermediate files (for debugging) in the workingPath when a job completes, whether it passes or fails.|no (default == false)|
|cleanupOnFailure|Boolean|Clean up intermediate files when a job fails (unless leaveIntermediate is on).|no (default == true)|
|overwriteFiles|Boolean|Override existing files found during indexing.|no (default == false)|

View File

@ -92,7 +92,7 @@ The property `druid.realtime.specFile` has the path of a file (absolute or relat
},
"tuningConfig": {
"type" : "realtime",
"maxRowsInMemory": 75000,
"maxRowsInMemory": 1000000,
"intermediatePersistPeriod": "PT10m",
"windowPeriod": "PT10m",
"basePersistDirectory": "\/tmp\/realtime\/basePersist",
@ -141,7 +141,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|Field|Type|Description|Required|
|-----|----|-----------|--------|
|type|String|This should always be 'realtime'.|no|
|maxRowsInMemory|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 75000)|
|maxRowsInMemory|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists).|no (default == 1000000)|
|maxBytesInMemory|Long|The maximum number of bytes to keep in memory to aggregate before persisting. This is used to manage the required JVM heap size.|no (default == One-sixth of max JVM memory)|
|windowPeriod|ISO 8601 Period String|The amount of lag time to allow events. This is configured with a 10 minute window, meaning that any event more than 10 minutes ago will be thrown away and not included in the segment generated by the realtime server.|no (default == PT10m)|
|intermediatePersistPeriod|ISO8601 Period String|The period that determines the rate at which intermediate persists occur. These persists determine how often commits happen against the incoming realtime stream. If the realtime data loading process is interrupted at time T, it should be restarted to re-read data that arrived at T minus this period.|no (default == PT10m)|
|basePersistDirectory|String|The directory to put things that need persistence. The plumber is responsible for the actual intermediate persists and this tells it where to store those persists.|no (default == java tmp dir)|
@ -287,7 +288,8 @@ The following table summarizes constraints between settings in the spec file for
|segmentGranularity| Time granularity (minute, hour, day, week, month) for loading data at query time | equal to indexGranularity| more than queryGranularity|
|queryGranularity| Time granularity (minute, hour, day, week, month) for rollup | less than segmentGranularity| minute, hour, day, week, month |
|intermediatePersistPeriod| The max time (ISO8601 Period) between flushes of ingested rows from memory to disk | avoid excessive flushing | number of un-persisted rows in memory also constrained by maxRowsInMemory |
|maxRowsInMemory| The max number of ingested rows to hold in memory before a flush to disk | number of un-persisted post-aggregation rows in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod |
|maxRowsInMemory| The max number of ingested rows to hold in memory before a flush to disk. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set| number of un-persisted post-aggregation rows in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod |
|maxBytesInMemory| The number of bytes to keep in memory before a flush to disk. Normally this is computed internally and user does not need to set it. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)| number of un-persisted post-aggregation bytes in memory is also constrained by intermediatePersistPeriod | use this to avoid running out of heap if too many rows in an intermediatePersistPeriod |
The normal, expected use cases have the following overall constraints: `intermediatePersistPeriod ≤ windowPeriod < segmentGranularity` and `queryGranularity ≤ segmentGranularity`

View File

@ -77,7 +77,7 @@ The Index Task is a simpler variation of the Index Hadoop task that is designed
"tuningConfig" : {
"type" : "index",
"targetPartitionSize" : 5000000,
"maxRowsInMemory" : 75000
"maxRowsInMemory" : 1000000
}
}
}
@ -137,7 +137,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|--------|-----------|-------|---------|
|type|The task type, this should always be "index".|none|yes|
|targetPartitionSize|Used in sharding. Determines how many rows are in each segment.|5000000|no|
|maxRowsInMemory|Used in determining when intermediate persists to disk should occur.|75000|no|
|maxRowsInMemory|Used in determining when intermediate persists to disk should occur. Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|1000000|no|
|maxBytesInMemory|Used in determining when intermediate persists to disk should occur. Normally this is computed internally and user does not need to set it. This value represents number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists)|1/6 of max JVM memory|no|
|maxTotalRows|Total number of rows in segments waiting for being published. Used in determining when intermediate publish should occur.|150000|no|
|numShards|Directly specify the number of shards to create. If this is specified and 'intervals' is specified in the granularitySpec, the index task can skip the determine intervals/partitions pass through the data. numShards cannot be specified if targetPartitionSize is set.|null|no|
|indexSpec|defines segment storage format options to be used at indexing time, see [IndexSpec](#indexspec)|null|no|

View File

@ -53,7 +53,6 @@
},
"tuningConfig" : {
"type" : "realtime",
"maxRowsInMemory" : "100000",
"intermediatePersistPeriod" : "PT10M",
"windowPeriod" : "PT10M"
}

View File

@ -53,7 +53,6 @@
},
"tuningConfig" : {
"type" : "realtime",
"maxRowsInMemory" : "100000",
"intermediatePersistPeriod" : "PT10M",
"windowPeriod" : "PT10M"
}

View File

@ -53,7 +53,6 @@
},
"tuningConfig" : {
"type" : "realtime",
"maxRowsInMemory" : "100000",
"intermediatePersistPeriod" : "PT10M",
"windowPeriod" : "PT10M"
}

View File

@ -53,7 +53,6 @@
},
"tuningConfig" : {
"type" : "realtime",
"maxRowsInMemory" : "100000",
"intermediatePersistPeriod" : "PT10M",
"windowPeriod" : "PT10M"
}

View File

@ -222,6 +222,7 @@ public class OrcIndexGeneratorJobTest
null,
null,
null,
null,
false,
false,
false,

View File

@ -38,6 +38,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
private static final boolean DEFAULT_RESET_OFFSET_AUTOMATICALLY = false;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final int maxRowsPerSegment;
private final Period intermediatePersistPeriod;
private final File basePersistDirectory;
@ -58,6 +59,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
@JsonCreator
public KafkaTuningConfig(
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod,
@JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory,
@ -80,6 +82,9 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
this.maxRowsInMemory = maxRowsInMemory == null ? defaults.getMaxRowsInMemory() : maxRowsInMemory;
this.maxRowsPerSegment = maxRowsPerSegment == null ? DEFAULT_MAX_ROWS_PER_SEGMENT : maxRowsPerSegment;
// initializing this to 0, it will be lazily initialized to a value
// @see server.src.main.java.io.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.intermediatePersistPeriod = intermediatePersistPeriod == null
? defaults.getIntermediatePersistPeriod()
: intermediatePersistPeriod;
@ -116,6 +121,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
{
return new KafkaTuningConfig(
config.maxRowsInMemory,
config.maxBytesInMemory,
config.maxRowsPerSegment,
config.intermediatePersistPeriod,
config.basePersistDirectory,
@ -140,6 +146,13 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
return maxRowsInMemory;
}
@Override
@JsonProperty
public long getMaxBytesInMemory()
{
return maxBytesInMemory;
}
@JsonProperty
public int getMaxRowsPerSegment()
{
@ -240,6 +253,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
{
return new KafkaTuningConfig(
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
intermediatePersistPeriod,
dir,
@ -269,6 +283,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
KafkaTuningConfig that = (KafkaTuningConfig) o;
return maxRowsInMemory == that.maxRowsInMemory &&
maxRowsPerSegment == that.maxRowsPerSegment &&
maxBytesInMemory == that.maxBytesInMemory &&
maxPendingPersists == that.maxPendingPersists &&
reportParseExceptions == that.reportParseExceptions &&
handoffConditionTimeout == that.handoffConditionTimeout &&
@ -289,6 +304,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
return Objects.hash(
maxRowsInMemory,
maxRowsPerSegment,
maxBytesInMemory,
intermediatePersistPeriod,
basePersistDirectory,
maxPendingPersists,
@ -310,6 +326,7 @@ public class KafkaTuningConfig implements TuningConfig, AppenderatorConfig
return "KafkaTuningConfig{" +
"maxRowsInMemory=" + maxRowsInMemory +
", maxRowsPerSegment=" + maxRowsPerSegment +
", maxBytesInMemory=" + maxBytesInMemory +
", intermediatePersistPeriod=" + intermediatePersistPeriod +
", basePersistDirectory=" + basePersistDirectory +
", maxPendingPersists=" + maxPendingPersists +

View File

@ -93,6 +93,7 @@ public class KafkaSupervisorSpec implements SupervisorSpec
null,
null,
null,
null,
null
);
this.ioConfig = Preconditions.checkNotNull(ioConfig, "ioConfig");

View File

@ -21,6 +21,7 @@ package io.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.indexing.kafka.KafkaTuningConfig;
import io.druid.segment.indexing.TuningConfigs;
import io.druid.segment.writeout.SegmentWriteOutMediumFactory;
import io.druid.segment.IndexSpec;
import org.joda.time.Duration;
@ -40,6 +41,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
public KafkaSupervisorTuningConfig(
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@ -65,6 +67,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
{
super(
maxRowsInMemory,
maxBytesInMemory,
maxRowsPerSegment,
intermediatePersistPeriod,
basePersistDirectory,
@ -131,6 +134,7 @@ public class KafkaSupervisorTuningConfig extends KafkaTuningConfig
return "KafkaSupervisorTuningConfig{" +
"maxRowsInMemory=" + getMaxRowsInMemory() +
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
", maxBytesInMemory=" + TuningConfigs.getMaxBytesInMemoryOrDefault(getMaxBytesInMemory()) +
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
", basePersistDirectory=" + getBasePersistDirectory() +
", maxPendingPersists=" + getMaxPendingPersists() +

View File

@ -1965,6 +1965,7 @@ public class KafkaIndexTaskTest
{
final KafkaTuningConfig tuningConfig = new KafkaTuningConfig(
1000,
null,
maxRowsPerSegment,
new Period("P1Y"),
null,
@ -2007,6 +2008,7 @@ public class KafkaIndexTaskTest
{
final KafkaTuningConfig tuningConfig = new KafkaTuningConfig(
1000,
null,
maxRowsPerSegment,
new Period("P1Y"),
null,

View File

@ -56,7 +56,7 @@ public class KafkaTuningConfigTest
);
Assert.assertNotNull(config.getBasePersistDirectory());
Assert.assertEquals(75000, config.getMaxRowsInMemory());
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment());
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
Assert.assertEquals(0, config.getMaxPendingPersists());
@ -103,6 +103,7 @@ public class KafkaTuningConfigTest
{
KafkaTuningConfig original = new KafkaTuningConfig(
1,
null,
2,
new Period("PT3S"),
new File("/tmp/xxx"),

View File

@ -185,6 +185,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
tuningConfig = new KafkaSupervisorTuningConfig(
1000,
null,
50000,
new Period("P1Y"),
new File("/test"),

View File

@ -58,7 +58,7 @@ public class KafkaSupervisorTuningConfigTest
);
Assert.assertNotNull(config.getBasePersistDirectory());
Assert.assertEquals(75000, config.getMaxRowsInMemory());
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment());
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
Assert.assertEquals(0, config.getMaxPendingPersists());

View File

@ -43,7 +43,7 @@ public class HadoopTuningConfig implements TuningConfig
private static final PartitionsSpec DEFAULT_PARTITIONS_SPEC = HashedPartitionsSpec.makeDefaultHashedPartitionsSpec();
private static final Map<Long, List<HadoopyShardSpec>> DEFAULT_SHARD_SPECS = ImmutableMap.of();
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 75000;
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY;
private static final boolean DEFAULT_USE_COMBINER = false;
private static final int DEFAULT_NUM_BACKGROUND_PERSIST_THREADS = 0;
@ -56,6 +56,7 @@ public class HadoopTuningConfig implements TuningConfig
DEFAULT_SHARD_SPECS,
DEFAULT_INDEX_SPEC,
DEFAULT_ROW_FLUSH_BOUNDARY,
0L,
false,
true,
false,
@ -80,6 +81,7 @@ public class HadoopTuningConfig implements TuningConfig
private final Map<Long, List<HadoopyShardSpec>> shardSpecs;
private final IndexSpec indexSpec;
private final int rowFlushBoundary;
private final long maxBytesInMemory;
private final boolean leaveIntermediate;
private final Boolean cleanupOnFailure;
private final boolean overwriteFiles;
@ -102,6 +104,7 @@ public class HadoopTuningConfig implements TuningConfig
final @JsonProperty("shardSpecs") Map<Long, List<HadoopyShardSpec>> shardSpecs,
final @JsonProperty("indexSpec") IndexSpec indexSpec,
final @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
final @JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
final @JsonProperty("leaveIntermediate") boolean leaveIntermediate,
final @JsonProperty("cleanupOnFailure") Boolean cleanupOnFailure,
final @JsonProperty("overwriteFiles") boolean overwriteFiles,
@ -129,6 +132,9 @@ public class HadoopTuningConfig implements TuningConfig
this.rowFlushBoundary = maxRowsInMemory == null ? maxRowsInMemoryCOMPAT == null
? DEFAULT_ROW_FLUSH_BOUNDARY
: maxRowsInMemoryCOMPAT : maxRowsInMemory;
// initializing this to 0, it will be lazily initialized to a value
// @see server.src.main.java.io.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.leaveIntermediate = leaveIntermediate;
this.cleanupOnFailure = cleanupOnFailure == null ? true : cleanupOnFailure;
this.overwriteFiles = overwriteFiles;
@ -190,6 +196,12 @@ public class HadoopTuningConfig implements TuningConfig
return rowFlushBoundary;
}
@JsonProperty
public long getMaxBytesInMemory()
{
return maxBytesInMemory;
}
@JsonProperty
public boolean isLeaveIntermediate()
{
@ -288,6 +300,7 @@ public class HadoopTuningConfig implements TuningConfig
shardSpecs,
indexSpec,
rowFlushBoundary,
maxBytesInMemory,
leaveIntermediate,
cleanupOnFailure,
overwriteFiles,
@ -315,6 +328,7 @@ public class HadoopTuningConfig implements TuningConfig
shardSpecs,
indexSpec,
rowFlushBoundary,
maxBytesInMemory,
leaveIntermediate,
cleanupOnFailure,
overwriteFiles,
@ -342,6 +356,7 @@ public class HadoopTuningConfig implements TuningConfig
specs,
indexSpec,
rowFlushBoundary,
maxBytesInMemory,
leaveIntermediate,
cleanupOnFailure,
overwriteFiles,

View File

@ -51,6 +51,7 @@ import io.druid.segment.QueryableIndex;
import io.druid.segment.column.ColumnCapabilitiesImpl;
import io.druid.segment.incremental.IncrementalIndex;
import io.druid.segment.incremental.IncrementalIndexSchema;
import io.druid.segment.indexing.TuningConfigs;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NumberedShardSpec;
import io.druid.timeline.partition.ShardSpec;
@ -288,6 +289,7 @@ public class IndexGeneratorJob implements Jobby
.setIndexSchema(indexSchema)
.setReportParseExceptions(!tuningConfig.isIgnoreInvalidRows())
.setMaxRowCount(tuningConfig.getRowFlushBoundary())
.setMaxBytesInMemory(TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()))
.buildOnheap();
if (oldDimOrder != null && !indexSchema.getDimensionsSpec().hasCustomDimensions()) {

View File

@ -480,6 +480,7 @@ public class BatchDeltaIngestionTest
null,
null,
null,
null,
false,
false,
false,

View File

@ -198,6 +198,7 @@ public class DetermineHashedPartitionsJobTest
null,
null,
null,
null,
false,
false,
false,

View File

@ -259,6 +259,7 @@ public class DeterminePartitionsJobTest
null,
null,
null,
null,
false,
false,
false,

View File

@ -84,6 +84,7 @@ public class HadoopDruidIndexerConfigTest
ImmutableMap.of(DateTimes.of("2010-01-01T01:00:00").getMillis(), specs),
null,
null,
null,
false,
false,
false,
@ -160,6 +161,7 @@ public class HadoopDruidIndexerConfigTest
),
null,
null,
null,
false,
false,
false,

View File

@ -46,6 +46,7 @@ public class HadoopTuningConfigTest
null,
null,
100,
null,
true,
true,
true,

View File

@ -91,7 +91,7 @@ public class IndexGeneratorJobTest
@Parameterized.Parameters(name = "useCombiner={0}, partitionType={1}, interval={2}, shardInfoForEachSegment={3}, " +
"data={4}, inputFormatName={5}, inputRowParser={6}, maxRowsInMemory={7}, " +
"aggs={8}, datasourceName={9}, forceExtendableShardSpecs={10}")
"maxBytesInMemory={8}, aggs={9}, datasourceName={10}, forceExtendableShardSpecs={11}")
public static Collection<Object[]> constructFeed()
{
final List<Object[]> baseConstructors = Arrays.asList(
@ -151,6 +151,7 @@ public class IndexGeneratorJobTest
null
),
null,
null,
aggs1,
"website"
},
@ -198,6 +199,7 @@ public class IndexGeneratorJobTest
)
),
null,
null,
aggs1,
"website"
},
@ -246,6 +248,7 @@ public class IndexGeneratorJobTest
null
),
null,
null,
aggs1,
"website"
},
@ -303,6 +306,7 @@ public class IndexGeneratorJobTest
)
),
null,
null,
aggs1,
"website"
},
@ -335,6 +339,7 @@ public class IndexGeneratorJobTest
null
),
1, // force 1 row max per index for easier testing
null,
aggs2,
"inherit_dims"
},
@ -367,6 +372,7 @@ public class IndexGeneratorJobTest
null
),
1, // force 1 row max per index for easier testing
null,
aggs2,
"inherit_dims2"
}
@ -398,6 +404,7 @@ public class IndexGeneratorJobTest
private final String inputFormatName;
private final InputRowParser inputRowParser;
private final Integer maxRowsInMemory;
private final Long maxBytesInMemory;
private final AggregatorFactory[] aggs;
private final String datasourceName;
private final boolean forceExtendableShardSpecs;
@ -416,6 +423,7 @@ public class IndexGeneratorJobTest
String inputFormatName,
InputRowParser inputRowParser,
Integer maxRowsInMemory,
Long maxBytesInMemory,
AggregatorFactory[] aggs,
String datasourceName,
boolean forceExtendableShardSpecs
@ -429,6 +437,7 @@ public class IndexGeneratorJobTest
this.inputFormatName = inputFormatName;
this.inputRowParser = inputRowParser;
this.maxRowsInMemory = maxRowsInMemory;
this.maxBytesInMemory = maxBytesInMemory;
this.aggs = aggs;
this.datasourceName = datasourceName;
this.forceExtendableShardSpecs = forceExtendableShardSpecs;
@ -511,6 +520,7 @@ public class IndexGeneratorJobTest
null,
null,
maxRowsInMemory,
maxBytesInMemory,
false,
false,
false,

View File

@ -108,6 +108,7 @@ public class JobHelperTest
null,
null,
null,
null,
false,
false,
false,

View File

@ -61,6 +61,7 @@ public class GranularityPathSpecTest
null,
null,
null,
null,
false,
false,
false,

View File

@ -200,6 +200,7 @@ public class HadoopConverterJobTest
null,
null,
null,
null,
false,
false,
false,

View File

@ -37,7 +37,7 @@ import java.io.File;
@JsonTypeName("realtime_appenderator")
public class RealtimeAppenderatorTuningConfig implements TuningConfig, AppenderatorConfig
{
private static final int defaultMaxRowsInMemory = 75000;
private static final int defaultMaxRowsInMemory = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY;
private static final int defaultMaxRowsPerSegment = 5_000_000;
private static final Period defaultIntermediatePersistPeriod = new Period("PT10M");
private static final int defaultMaxPendingPersists = 0;
@ -52,8 +52,11 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
return Files.createTempDir();
}
private final int maxRowsInMemory;
private final int maxRowsPerSegment;
private final long maxBytesInMemory;
private final Period intermediatePersistPeriod;
private final File basePersistDirectory;
private final int maxPendingPersists;
@ -73,6 +76,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
public RealtimeAppenderatorTuningConfig(
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@ -89,6 +93,9 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
{
this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory;
this.maxRowsPerSegment = maxRowsPerSegment == null ? defaultMaxRowsPerSegment : maxRowsPerSegment;
// initializing this to 0, it will be lazily intialized to a value
// @see server.src.main.java.io.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.intermediatePersistPeriod = intermediatePersistPeriod == null
? defaultIntermediatePersistPeriod
: intermediatePersistPeriod;
@ -127,6 +134,12 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
return maxRowsInMemory;
}
@Override
public long getMaxBytesInMemory()
{
return maxBytesInMemory;
}
@JsonProperty
public int getMaxRowsPerSegment()
{
@ -217,6 +230,7 @@ public class RealtimeAppenderatorTuningConfig implements TuningConfig, Appendera
return new RealtimeAppenderatorTuningConfig(
maxRowsInMemory,
maxRowsPerSegment,
maxBytesInMemory,
intermediatePersistPeriod,
dir,
maxPendingPersists,

View File

@ -43,6 +43,7 @@ import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.TuningConfigs;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant;
@ -105,6 +106,7 @@ public class YeOldePlumberSchool implements PlumberSchool
config.getShardSpec(),
version,
config.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()),
config.isReportParseExceptions()
);

View File

@ -76,9 +76,9 @@ import io.druid.segment.realtime.FireDepartmentMetricsTaskMetricsGetter;
import io.druid.segment.realtime.RealtimeMetricsMonitor;
import io.druid.segment.realtime.appenderator.Appenderator;
import io.druid.segment.realtime.appenderator.AppenderatorConfig;
import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
import io.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
import io.druid.segment.realtime.appenderator.Appenderators;
import io.druid.segment.realtime.appenderator.BaseAppenderatorDriver;
import io.druid.segment.realtime.appenderator.BatchAppenderatorDriver;
import io.druid.segment.realtime.appenderator.SegmentAllocator;
import io.druid.segment.realtime.appenderator.SegmentIdentifier;
@ -1231,7 +1231,6 @@ public class IndexTask extends AbstractTask implements ChatHandler
@JsonTypeName("index")
public static class IndexTuningConfig implements TuningConfig, AppenderatorConfig
{
private static final int DEFAULT_MAX_ROWS_IN_MEMORY = 75_000;
private static final int DEFAULT_MAX_TOTAL_ROWS = 20_000_000;
private static final IndexSpec DEFAULT_INDEX_SPEC = new IndexSpec();
private static final int DEFAULT_MAX_PENDING_PERSISTS = 0;
@ -1244,6 +1243,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
private final Integer targetPartitionSize;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final Long maxTotalRows;
private final Integer numShards;
private final IndexSpec indexSpec;
@ -1276,6 +1276,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
public IndexTuningConfig(
@JsonProperty("targetPartitionSize") @Nullable Integer targetPartitionSize,
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
@JsonProperty("rowFlushBoundary") @Nullable Integer rowFlushBoundary_forBackCompatibility, // DEPRECATED
@JsonProperty("numShards") @Nullable Integer numShards,
@ -1297,6 +1298,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
this(
targetPartitionSize,
maxRowsInMemory != null ? maxRowsInMemory : rowFlushBoundary_forBackCompatibility,
maxBytesInMemory != null ? maxBytesInMemory : 0,
maxTotalRows,
numShards,
indexSpec,
@ -1315,12 +1317,13 @@ public class IndexTask extends AbstractTask implements ChatHandler
private IndexTuningConfig()
{
this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
this(null, null, null, null, null, null, null, null, null, null, null, null, null, null, null, null);
}
private IndexTuningConfig(
@Nullable Integer targetPartitionSize,
@Nullable Integer maxRowsInMemory,
@Nullable Long maxBytesInMemory,
@Nullable Long maxTotalRows,
@Nullable Integer numShards,
@Nullable IndexSpec indexSpec,
@ -1342,7 +1345,10 @@ public class IndexTask extends AbstractTask implements ChatHandler
);
this.targetPartitionSize = initializeTargetPartitionSize(numShards, targetPartitionSize);
this.maxRowsInMemory = maxRowsInMemory == null ? DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
this.maxRowsInMemory = maxRowsInMemory == null ? TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY : maxRowsInMemory;
// initializing this to 0, it will be lazily initialized to a value
// @see server.src.main.java.io.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.maxTotalRows = initializeMaxTotalRows(numShards, maxTotalRows);
this.numShards = numShards == null || numShards.equals(-1) ? null : numShards;
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
@ -1396,6 +1402,7 @@ public class IndexTask extends AbstractTask implements ChatHandler
return new IndexTuningConfig(
targetPartitionSize,
maxRowsInMemory,
maxBytesInMemory,
maxTotalRows,
numShards,
indexSpec,
@ -1425,6 +1432,13 @@ public class IndexTask extends AbstractTask implements ChatHandler
return maxRowsInMemory;
}
@JsonProperty
@Override
public long getMaxBytesInMemory()
{
return maxBytesInMemory;
}
@JsonProperty
public Long getMaxTotalRows()
{

View File

@ -711,6 +711,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
)
);
// Wait for the task to finish.
final TaskStatus taskStatus = statusFuture.get();
Assert.assertEquals(TaskState.SUCCESS, taskStatus.getStatusCode());
@ -1245,6 +1246,7 @@ public class AppenderatorDriverRealtimeIndexTaskTest
null,
null,
null,
null,
reportParseExceptions,
handoffTimeout,
null,

View File

@ -247,6 +247,7 @@ public class CompactionTaskTest
1000000L,
null,
null,
null,
new IndexSpec(
new RoaringBitmapSerdeFactory(true),
CompressionStrategy.LZ4,

View File

@ -631,7 +631,7 @@ public class IndexTaskTest
Granularities.MINUTE,
null
),
createTuningConfig(2, 2, 2L, null, false, false, true),
createTuningConfig(2, 2, null, 2L, null, false, false, true),
false
),
null,
@ -675,7 +675,7 @@ public class IndexTaskTest
true,
null
),
createTuningConfig(3, 2, 2L, null, false, true, true),
createTuningConfig(3, 2, null, 2L, null, false, true, true),
false
),
null,
@ -718,7 +718,7 @@ public class IndexTaskTest
true,
null
),
createTuningConfig(3, 2, 2L, null, false, false, true),
createTuningConfig(3, 2, null, 2L, null, false, false, true),
false
),
null,
@ -790,7 +790,7 @@ public class IndexTaskTest
0
),
null,
createTuningConfig(2, null, null, null, false, false, false), // ignore parse exception,
createTuningConfig(2, null, null, null, null, false, false, false), // ignore parse exception,
false
);
@ -842,7 +842,7 @@ public class IndexTaskTest
0
),
null,
createTuningConfig(2, null, null, null, false, false, true), // report parse exception
createTuningConfig(2, null, null, null, null, false, false, true), // report parse exception
false
);
@ -894,6 +894,7 @@ public class IndexTaskTest
null,
null,
null,
null,
indexSpec,
null,
true,
@ -1012,6 +1013,7 @@ public class IndexTaskTest
null,
null,
null,
null,
indexSpec,
null,
true,
@ -1117,6 +1119,7 @@ public class IndexTaskTest
null,
null,
null,
null,
indexSpec,
null,
true,
@ -1245,7 +1248,7 @@ public class IndexTaskTest
0
),
null,
createTuningConfig(2, 1, null, null, false, true, true), // report parse exception
createTuningConfig(2, 1, null, null, null, false, true, true), // report parse exception
false
);
@ -1314,7 +1317,7 @@ public class IndexTaskTest
0
),
null,
createTuningConfig(2, null, null, null, false, false, true), // report parse exception
createTuningConfig(2, null, null, null, null, false, false, true), // report parse exception
false
);
@ -1553,6 +1556,7 @@ public class IndexTaskTest
targetPartitionSize,
1,
null,
null,
numShards,
forceExtendableShardSpecs,
forceGuaranteedRollup,
@ -1563,6 +1567,7 @@ public class IndexTaskTest
private static IndexTuningConfig createTuningConfig(
Integer targetPartitionSize,
Integer maxRowsInMemory,
Long maxBytesInMemory,
Long maxTotalRows,
Integer numShards,
boolean forceExtendableShardSpecs,
@ -1573,6 +1578,7 @@ public class IndexTaskTest
return new IndexTask.IndexTuningConfig(
targetPartitionSize,
maxRowsInMemory,
maxBytesInMemory,
maxTotalRows,
null,
numShards,

View File

@ -904,6 +904,7 @@ public class RealtimeIndexTaskTest
);
RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(
1000,
null,
new Period("P1Y"),
new Period("PT10M"),
null,

View File

@ -108,7 +108,7 @@ public class TaskSerdeTest
Assert.assertEquals(new IndexSpec(), tuningConfig.getIndexSpec());
Assert.assertEquals(new Period(Integer.MAX_VALUE), tuningConfig.getIntermediatePersistPeriod());
Assert.assertEquals(0, tuningConfig.getMaxPendingPersists());
Assert.assertEquals(75000, tuningConfig.getMaxRowsInMemory());
Assert.assertEquals(1000000, tuningConfig.getMaxRowsInMemory());
Assert.assertEquals(null, tuningConfig.getNumShards());
Assert.assertEquals(5000000, (int) tuningConfig.getTargetPartitionSize());
}
@ -195,6 +195,7 @@ public class TaskSerdeTest
10000,
10,
null,
null,
9999,
null,
indexSpec,
@ -280,6 +281,7 @@ public class TaskSerdeTest
null,
null,
null,
null,
indexSpec,
3,
true,
@ -537,6 +539,7 @@ public class TaskSerdeTest
new RealtimeTuningConfig(
1,
null,
new Period("PT10M"),
null,
null,

View File

@ -674,6 +674,7 @@ public class TaskLifecycleTest
null,
null,
null,
null,
indexSpec,
3,
true,
@ -752,6 +753,7 @@ public class TaskLifecycleTest
null,
null,
null,
null,
indexSpec,
3,
true,
@ -1137,6 +1139,7 @@ public class TaskLifecycleTest
null,
null,
null,
null,
indexSpec,
null,
false,
@ -1260,6 +1263,7 @@ public class TaskLifecycleTest
);
RealtimeTuningConfig realtimeTuningConfig = new RealtimeTuningConfig(
1000,
null,
new Period("P1Y"),
null, //default window period of 10 minutes
null, // base persist dir ignored by Realtime Index task

View File

@ -127,6 +127,14 @@ public interface DimensionIndexer
*/
EncodedKeyComponentType processRowValsToUnsortedEncodedKeyComponent(Object dimValues, boolean reportParseExceptions);
/**
* Gives the estimated size in bytes for the given key
*
* @param key dimension value array from a TimeAndDims key
*
* @return the estimated size in bytes of the key
*/
long estimateEncodedKeyComponentSize(EncodedKeyComponentType key);
/**
* Given an encoded value that was ordered by associated actual value, return the equivalent

View File

@ -49,6 +49,12 @@ public class DoubleDimensionIndexer implements DimensionIndexer<Double, Double,
return ret == null ? DimensionHandlerUtils.ZERO_DOUBLE : ret;
}
@Override
public long estimateEncodedKeyComponentSize(Double key)
{
return Double.BYTES;
}
@Override
public Double getUnsortedEncodedValueFromSorted(Double sortedIntermediateValue)
{

View File

@ -50,6 +50,12 @@ public class FloatDimensionIndexer implements DimensionIndexer<Float, Float, Flo
return ret == null ? DimensionHandlerUtils.ZERO_FLOAT : ret;
}
@Override
public long estimateEncodedKeyComponentSize(Float key)
{
return Float.BYTES;
}
@Override
public Float getUnsortedEncodedValueFromSorted(Float sortedIntermediateValue)
{

View File

@ -50,6 +50,12 @@ public class LongDimensionIndexer implements DimensionIndexer<Long, Long, Long>
return ret == null ? DimensionHandlerUtils.ZERO_LONG : ret;
}
@Override
public long estimateEncodedKeyComponentSize(Long key)
{
return Long.BYTES;
}
@Override
public Long getUnsortedEncodedValueFromSorted(Long sortedIntermediateValue)
{

View File

@ -270,6 +270,20 @@ public class StringDimensionIndexer implements DimensionIndexer<Integer, int[],
return encodedDimensionValues;
}
@Override
public long estimateEncodedKeyComponentSize(int[] key)
{
// string length is being accounted for each time they are referenced, based on dimension handler interface,
// even though they are stored just once. It may overestimate the size by a bit, but we wanted to leave
// more buffer to be safe
long estimatedSize = key.length * Integer.BYTES;
estimatedSize += Arrays.stream(key)
.filter(element -> dimLookup.getValue(element) != null)
.mapToLong(element -> dimLookup.getValue(element).length() * Character.BYTES)
.sum();
return estimatedSize;
}
public Integer getSortedEncodedValueFromUnsorted(Integer unsortedIntermediateValue)
{
return sortedLookup().getSortedIdFromUnsortedId(unsortedIntermediateValue);

View File

@ -93,6 +93,7 @@ import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
@ -237,6 +238,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
private final List<DimensionDesc> dimensionDescsList;
private final Map<String, ColumnCapabilitiesImpl> columnCapabilities;
private final AtomicInteger numEntries = new AtomicInteger();
private final AtomicLong bytesInMemory = new AtomicLong();
// This is modified on add() in a critical section.
private final ThreadLocal<InputRow> in = new ThreadLocal<>();
@ -333,6 +335,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
private boolean concurrentEventAdd;
private boolean sortFacts;
private int maxRowCount;
private long maxBytesInMemory;
public Builder()
{
@ -342,6 +345,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
concurrentEventAdd = false;
sortFacts = true;
maxRowCount = 0;
maxBytesInMemory = 0;
}
public Builder setIndexSchema(final IncrementalIndexSchema incrementalIndexSchema)
@ -398,6 +402,13 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
return this;
}
//maxBytesInMemory only applies to OnHeapIncrementalIndex
public Builder setMaxBytesInMemory(final long maxBytesInMemory)
{
this.maxBytesInMemory = maxBytesInMemory;
return this;
}
public IncrementalIndex buildOnheap()
{
if (maxRowCount <= 0) {
@ -410,7 +421,8 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
reportParseExceptions,
concurrentEventAdd,
sortFacts,
maxRowCount
maxRowCount,
maxBytesInMemory
);
}
@ -457,6 +469,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
boolean reportParseExceptions,
InputRow row,
AtomicInteger numEntries,
AtomicLong sizeInBytes,
IncrementalIndexRow key,
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier,
@ -504,11 +517,17 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
static class AddToFactsResult
{
private int rowCount;
private final long bytesInMemory;
private List<String> parseExceptionMessages;
AddToFactsResult(int rowCount, List<String> parseExceptionMessages)
public AddToFactsResult(
int rowCount,
long bytesInMemory,
List<String> parseExceptionMessages
)
{
this.rowCount = rowCount;
this.bytesInMemory = bytesInMemory;
this.parseExceptionMessages = parseExceptionMessages;
}
@ -517,7 +536,12 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
return rowCount;
}
List<String> getParseExceptionMessages()
public long getBytesInMemory()
{
return bytesInMemory;
}
public List<String> getParseExceptionMessages()
{
return parseExceptionMessages;
}
@ -571,6 +595,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
reportParseExceptions,
row,
numEntries,
bytesInMemory,
incrementalIndexRowResult.getIncrementalIndexRow(),
in,
rowSupplier,
@ -582,7 +607,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
incrementalIndexRowResult.getParseExceptionMessages(),
addToFactsResult.getParseExceptionMessages()
);
return new IncrementalIndexAddResult(addToFactsResult.getRowCount(), parseException);
return new IncrementalIndexAddResult(addToFactsResult.getRowCount(), addToFactsResult.getBytesInMemory(), parseException);
}
@VisibleForTesting
@ -597,6 +622,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
Object[] dims;
List<Object> overflow = null;
long dimsKeySize = 0;
List<String> parseExceptionMessages = new ArrayList<>();
synchronized (dimensionDescs) {
dims = new Object[dimensionDescs.size()];
@ -635,7 +661,7 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
catch (ParseException pe) {
parseExceptionMessages.add(pe.getMessage());
}
dimsKeySize += indexer.estimateEncodedKeyComponentSize(dimsKey);
// Set column capabilities as data is coming in
if (!capabilities.hasMultipleValues() &&
dimsKey != null &&
@ -679,10 +705,11 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
if (row.getTimestamp() != null) {
truncated = gran.bucketStart(row.getTimestamp()).getMillis();
}
IncrementalIndexRow incrementalIndexRow = new IncrementalIndexRow(
IncrementalIndexRow incrementalIndexRow = IncrementalIndexRow.createTimeAndDimswithDimsKeySize(
Math.max(truncated, minTimestamp),
dims,
dimensionDescsList
dimensionDescsList,
dimsKeySize
);
return new IncrementalIndexRowResult(incrementalIndexRow, parseExceptionMessages);
}
@ -740,6 +767,11 @@ public abstract class IncrementalIndex<AggregatorType> extends AbstractIndex imp
return numEntries.get();
}
public long getBytesInMemory()
{
return bytesInMemory.get();
}
private long getMinTimeMillis()
{
return getFacts().getMinTimeMillis();

View File

@ -26,16 +26,19 @@ import javax.annotation.Nullable;
public class IncrementalIndexAddResult
{
private final int rowCount;
private final long bytesInMemory;
@Nullable
private final ParseException parseException;
public IncrementalIndexAddResult(
int rowCount,
long bytesInMemory,
@Nullable ParseException parseException
)
{
this.rowCount = rowCount;
this.bytesInMemory = bytesInMemory;
this.parseException = parseException;
}
@ -44,6 +47,11 @@ public class IncrementalIndexAddResult
return rowCount;
}
public long getBytesInMemory()
{
return bytesInMemory;
}
@Nullable
public ParseException getParseException()
{

View File

@ -45,6 +45,7 @@ public final class IncrementalIndexRow
* {@link IncrementalIndex.RollupFactsHolder} needs concurrent collections, that are not present in fastutil.
*/
private int rowIndex;
private long dimsKeySize;
IncrementalIndexRow(
long timestamp,
@ -68,6 +69,29 @@ public final class IncrementalIndexRow
this.rowIndex = rowIndex;
}
private IncrementalIndexRow(
long timestamp,
Object[] dims,
List<IncrementalIndex.DimensionDesc> dimensionDescsList,
long dimsKeySize
)
{
this.timestamp = timestamp;
this.dims = dims;
this.dimensionDescsList = dimensionDescsList;
this.dimsKeySize = dimsKeySize;
}
static IncrementalIndexRow createTimeAndDimswithDimsKeySize(
long timestamp,
Object[] dims,
List<IncrementalIndex.DimensionDesc> dimensionDescsList,
long dimsKeySize
)
{
return new IncrementalIndexRow(timestamp, dims, dimensionDescsList, dimsKeySize);
}
public long getTimestamp()
{
return timestamp;
@ -88,6 +112,25 @@ public final class IncrementalIndexRow
this.rowIndex = rowIndex;
}
/**
* bytesInMemory estimates the size of IncrementalIndexRow key, it takes into account the timestamp(long),
* dims(Object Array) and dimensionDescsList(List). Each of these are calculated as follows:
* <ul>
* <li> timestamp : Long.BYTES
* <li> dims array : Integer.BYTES * array length + Long.BYTES (dims object) + dimsKeySize(passed via constructor)
* <li> dimensionDescList : Long.BYTES (shared pointer)
* <li> dimsKeySize : this value is passed in based on the key type (int, long, double, String etc.)
* </ul>
*
* @return long estimated bytesInMemory
*/
public long estimateBytesInMemory()
{
long sizeInBytes = Long.BYTES + Integer.BYTES * dims.length + Long.BYTES + Long.BYTES;
sizeInBytes += dimsKeySize;
return sizeInBytes;
}
@Override
public String toString()
{

View File

@ -40,6 +40,7 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
@ -144,6 +145,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
boolean reportParseExceptions,
InputRow row,
AtomicInteger numEntries,
AtomicLong sizeInBytes, // ignored, added to make abstract class method impl happy
IncrementalIndexRow key,
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier,
@ -238,7 +240,7 @@ public class OffheapIncrementalIndex extends IncrementalIndex<BufferAggregator>
}
}
rowContainer.set(null);
return new AddToFactsResult(numEntries.get(), new ArrayList<>());
return new AddToFactsResult(numEntries.get(), 0, new ArrayList<>());
}
@Override

View File

@ -38,22 +38,29 @@ import io.druid.segment.column.ColumnCapabilities;
import javax.annotation.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
*/
public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
{
private static final Logger log = new Logger(OnheapIncrementalIndex.class);
/**
* overhead per {@link ConcurrentHashMap.Node} or {@link java.util.concurrent.ConcurrentSkipListMap.Node} object
*/
private static final int ROUGH_OVERHEAD_PER_MAP_ENTRY = Long.BYTES * 5 + Integer.BYTES;
private final ConcurrentHashMap<Integer, Aggregator[]> aggregators = new ConcurrentHashMap<>();
private final FactsHolder facts;
private final AtomicInteger indexIncrement = new AtomicInteger(0);
private final long maxBytesPerRowForAggregators;
protected final int maxRowCount;
protected final long maxBytesInMemory;
private volatile Map<String, ColumnSelectorFactory> selectors;
private String outOfRowsReason = null;
@ -64,14 +71,44 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
boolean reportParseExceptions,
boolean concurrentEventAdd,
boolean sortFacts,
int maxRowCount
int maxRowCount,
long maxBytesInMemory
)
{
super(incrementalIndexSchema, deserializeComplexMetrics, reportParseExceptions, concurrentEventAdd);
this.maxRowCount = maxRowCount;
this.maxBytesInMemory = maxBytesInMemory == 0 ? -1 : maxBytesInMemory;
this.facts = incrementalIndexSchema.isRollup() ? new RollupFactsHolder(sortFacts, dimsComparator(), getDimensions())
: new PlainFactsHolder(sortFacts);
maxBytesPerRowForAggregators = getMaxBytesPerRowForAggregators(incrementalIndexSchema);
}
/**
* Gives estimated max size per aggregator. It is assumed that every aggregator will have enough overhead for its own
* object header and for a pointer to a selector. We are adding a overhead-factor for each object as additional 16
* bytes.
* These 16 bytes or 128 bits is the object metadata for 64-bit JVM process and consists of:
* <ul>
* <li>Class pointer which describes the object type: 64 bits
* <li>Flags which describe state of the object including hashcode: 64 bits
* <ul/>
* total size estimation consists of:
* <ul>
* <li> metrics length : Integer.BYTES * len
* <li> maxAggregatorIntermediateSize : getMaxIntermediateSize per aggregator + overhead-factor(16 bytes)
* </ul>
*
* @param incrementalIndexSchema
*
* @return long max aggregator size in bytes
*/
private static long getMaxBytesPerRowForAggregators(IncrementalIndexSchema incrementalIndexSchema)
{
long maxAggregatorIntermediateSize = Integer.BYTES * incrementalIndexSchema.getMetrics().length;
maxAggregatorIntermediateSize += Arrays.stream(incrementalIndexSchema.getMetrics())
.mapToLong(aggregator -> aggregator.getMaxIntermediateSize() + Long.BYTES * 2)
.sum();
return maxAggregatorIntermediateSize;
}
@Override
@ -109,6 +146,7 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
boolean reportParseExceptions,
InputRow row,
AtomicInteger numEntries,
AtomicLong sizeInBytes,
IncrementalIndexRow key,
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier,
@ -132,14 +170,20 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
concurrentSet(rowIndex, aggs);
// Last ditch sanity checks
if (numEntries.get() >= maxRowCount
if ((numEntries.get() >= maxRowCount || (maxBytesInMemory > 0 && sizeInBytes.get() >= maxBytesInMemory))
&& facts.getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX
&& !skipMaxRowsInMemoryCheck) {
throw new IndexSizeExceededException("Maximum number of rows [%d] reached", maxRowCount);
throw new IndexSizeExceededException(
"Maximum number of rows [%d] or max size in bytes [%d] reached",
maxRowCount,
maxBytesInMemory
);
}
final int prev = facts.putIfAbsent(key, rowIndex);
if (IncrementalIndexRow.EMPTY_ROW_INDEX == prev) {
numEntries.incrementAndGet();
long estimatedRowSize = estimateRowSizeInBytes(key, maxBytesPerRowForAggregators);
sizeInBytes.addAndGet(estimatedRowSize);
} else {
// We lost a race
aggs = concurrentGet(prev);
@ -150,7 +194,25 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
}
}
return new AddToFactsResult(numEntries.get(), parseExceptionMessages);
return new AddToFactsResult(numEntries.get(), sizeInBytes.get(), parseExceptionMessages);
}
/**
* Gives an estimated size of row in bytes, it accounts for:
* <ul>
* <li> overhead per Map Entry
* <li> TimeAndDims key size
* <li> aggregator size
* </ul>
*
* @param key TimeAndDims key
* @param maxBytesPerRowForAggregators max size per aggregator
*
* @return estimated size of row
*/
private long estimateRowSizeInBytes(IncrementalIndexRow key, long maxBytesPerRowForAggregators)
{
return ROUGH_OVERHEAD_PER_MAP_ENTRY + key.estimateBytesInMemory() + maxBytesPerRowForAggregators;
}
@Override
@ -238,10 +300,24 @@ public class OnheapIncrementalIndex extends IncrementalIndex<Aggregator>
@Override
public boolean canAppendRow()
{
final boolean canAdd = size() < maxRowCount;
if (!canAdd) {
outOfRowsReason = StringUtils.format("Maximum number of rows [%d] reached", maxRowCount);
final boolean countCheck = size() < maxRowCount;
// if maxBytesInMemory = -1, then ignore sizeCheck
final boolean sizeCheck = maxBytesInMemory <= 0 || getBytesInMemory() < maxBytesInMemory;
final boolean canAdd = countCheck && sizeCheck;
if (!countCheck && !sizeCheck) {
outOfRowsReason = StringUtils.format(
"Maximum number of rows [%d] and maximum size in bytes [%d] reached",
maxRowCount,
maxBytesInMemory
);
} else {
if (!countCheck) {
outOfRowsReason = StringUtils.format("Maximum number of rows [%d] reached", maxRowCount);
} else if (!sizeCheck) {
outOfRowsReason = StringUtils.format("Maximum size in bytes [%d] reached", maxBytesInMemory);
}
}
return canAdd;
}

View File

@ -0,0 +1,97 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.incremental;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.druid.data.input.MapBasedInputRow;
import io.druid.query.aggregation.CountAggregatorFactory;
import org.junit.Assert;
import org.junit.Test;
import java.util.Arrays;
import java.util.Map;
/**
*/
public class IncrementalIndexRowSizeTest
{
@Test
public void testIncrementalIndexRowSizeBasic()
{
IncrementalIndex index = new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
.setMaxRowCount(10000)
.setMaxBytesInMemory(1000)
.buildOnheap();
long time = System.currentTimeMillis();
IncrementalIndex.IncrementalIndexRowResult tndResult = index.toIncrementalIndexRow(toMapRow(time, "billy", "A", "joe", "B"));
IncrementalIndexRow td1 = tndResult.getIncrementalIndexRow();
Assert.assertEquals(44, td1.estimateBytesInMemory());
}
@Test
public void testIncrementalIndexRowSizeArr()
{
IncrementalIndex index = new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
.setMaxRowCount(10000)
.setMaxBytesInMemory(1000)
.buildOnheap();
long time = System.currentTimeMillis();
IncrementalIndex.IncrementalIndexRowResult tndResult = index.toIncrementalIndexRow(toMapRow(
time + 1,
"billy",
"A",
"joe",
Arrays.asList("A", "B")
));
IncrementalIndexRow td1 = tndResult.getIncrementalIndexRow();
Assert.assertEquals(50, td1.estimateBytesInMemory());
}
@Test
public void testIncrementalIndexRowSizeComplex()
{
IncrementalIndex index = new IncrementalIndex.Builder()
.setSimpleTestingIndexSchema(new CountAggregatorFactory("cnt"))
.setMaxRowCount(10000)
.setMaxBytesInMemory(1000)
.buildOnheap();
long time = System.currentTimeMillis();
IncrementalIndex.IncrementalIndexRowResult tndResult = index.toIncrementalIndexRow(toMapRow(
time + 1,
"billy",
"nelson",
"joe",
Arrays.asList("123", "abcdef")
));
IncrementalIndexRow td1 = tndResult.getIncrementalIndexRow();
Assert.assertEquals(74, td1.estimateBytesInMemory());
}
private MapBasedInputRow toMapRow(long time, Object... dimAndVal)
{
Map<String, Object> data = Maps.newHashMap();
for (int i = 0; i < dimAndVal.length; i += 2) {
data.put((String) dimAndVal[i], dimAndVal[i + 1]);
}
return new MapBasedInputRow(time, Lists.newArrayList(data.keySet()), data);
}
}

View File

@ -76,6 +76,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
/**
* Extending AbstractBenchmark means only runs if explicitly called
@ -118,7 +119,8 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
boolean reportParseExceptions,
boolean concurrentEventAdd,
boolean sortFacts,
int maxRowCount
int maxRowCount,
long maxBytesInMemory
)
{
super(
@ -127,7 +129,8 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
reportParseExceptions,
concurrentEventAdd,
sortFacts,
maxRowCount
maxRowCount,
maxBytesInMemory
);
}
@ -135,20 +138,22 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
long minTimestamp,
Granularity gran,
AggregatorFactory[] metrics,
int maxRowCount
int maxRowCount,
long maxBytesInMemory
)
{
super(
new IncrementalIndexSchema.Builder()
.withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build(),
true,
true,
false,
true,
maxRowCount
.withMinTimestamp(minTimestamp)
.withQueryGranularity(gran)
.withMetrics(metrics)
.build(),
true,
true,
false,
true,
maxRowCount,
maxBytesInMemory
);
}
@ -172,6 +177,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
boolean reportParseExceptions,
InputRow row,
AtomicInteger numEntries,
AtomicLong sizeInBytes,
IncrementalIndexRow key,
ThreadLocal<InputRow> rowContainer,
Supplier<InputRow> rowSupplier,
@ -202,12 +208,14 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
// Last ditch sanity checks
if (numEntries.get() >= maxRowCount && getFacts().getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX) {
throw new IndexSizeExceededException("Maximum number of rows reached");
if ((numEntries.get() >= maxRowCount || sizeInBytes.get() >= maxBytesInMemory)
&& getFacts().getPriorIndex(key) == IncrementalIndexRow.EMPTY_ROW_INDEX) {
throw new IndexSizeExceededException("Maximum number of rows or max bytes reached");
}
final int prev = getFacts().putIfAbsent(key, rowIndex);
if (IncrementalIndexRow.EMPTY_ROW_INDEX == prev) {
numEntries.incrementAndGet();
sizeInBytes.incrementAndGet();
} else {
// We lost a race
aggs = indexedMap.get(prev);
@ -235,7 +243,7 @@ public class OnheapIncrementalIndexBenchmark extends AbstractBenchmark
rowContainer.set(null);
return new AddToFactsResult(numEntries.get(), new ArrayList<>());
return new AddToFactsResult(numEntries.get(), sizeInBytes.get(), new ArrayList<>());
}
@Override

View File

@ -42,7 +42,7 @@ import java.io.File;
*/
public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
{
private static final int defaultMaxRowsInMemory = 75000;
private static final int defaultMaxRowsInMemory = TuningConfig.DEFAULT_MAX_ROWS_IN_MEMORY;
private static final Period defaultIntermediatePersistPeriod = new Period("PT10M");
private static final Period defaultWindowPeriod = new Period("PT10M");
private static final VersioningPolicy defaultVersioningPolicy = new IntervalStartVersioningPolicy();
@ -72,6 +72,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
{
return new RealtimeTuningConfig(
defaultMaxRowsInMemory,
0L,
defaultIntermediatePersistPeriod,
defaultWindowPeriod,
basePersistDirectory == null ? createNewBasePersistDirectory() : basePersistDirectory,
@ -91,6 +92,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
}
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final Period intermediatePersistPeriod;
private final Period windowPeriod;
private final File basePersistDirectory;
@ -110,6 +112,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
@JsonCreator
public RealtimeTuningConfig(
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
@JsonProperty("intermediatePersistPeriod") Period intermediatePersistPeriod,
@JsonProperty("windowPeriod") Period windowPeriod,
@JsonProperty("basePersistDirectory") File basePersistDirectory,
@ -129,6 +132,9 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
)
{
this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory;
// initializing this to 0, it will be lazily initialized to a value
// @see server.src.main.java.io.druid.segment.indexing.TuningConfigs#getMaxBytesInMemoryOrDefault(long)
this.maxBytesInMemory = maxBytesInMemory == null ? 0 : maxBytesInMemory;
this.intermediatePersistPeriod = intermediatePersistPeriod == null
? defaultIntermediatePersistPeriod
: intermediatePersistPeriod;
@ -163,6 +169,12 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
return maxRowsInMemory;
}
@Override
public long getMaxBytesInMemory()
{
return maxBytesInMemory;
}
@Override
@JsonProperty
public Period getIntermediatePersistPeriod()
@ -268,6 +280,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
{
return new RealtimeTuningConfig(
maxRowsInMemory,
maxBytesInMemory,
intermediatePersistPeriod,
windowPeriod,
basePersistDirectory,
@ -290,6 +303,7 @@ public class RealtimeTuningConfig implements TuningConfig, AppenderatorConfig
{
return new RealtimeTuningConfig(
maxRowsInMemory,
maxBytesInMemory,
intermediatePersistPeriod,
windowPeriod,
dir,

View File

@ -33,4 +33,9 @@ public interface TuningConfig
boolean DEFAULT_LOG_PARSE_EXCEPTIONS = false;
int DEFAULT_MAX_PARSE_EXCEPTIONS = Integer.MAX_VALUE;
int DEFAULT_MAX_SAVED_PARSE_EXCEPTIONS = 0;
int DEFAULT_MAX_ROWS_IN_MEMORY = 1_000_000;
// We initially estimated this to be 1/3(max jvm memory), but bytesCurrentlyInMemory only
// tracks active index and not the index being flushed to disk, to account for that
// we halved default to 1/6(max jvm memory)
long DEFAULT_MAX_BYTES_IN_MEMORY = Runtime.getRuntime().maxMemory() / 6;
}

View File

@ -0,0 +1,34 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.segment.indexing;
public class TuningConfigs
{
private TuningConfigs()
{
}
public static long getMaxBytesInMemoryOrDefault(final long maxBytesInMemory)
{
// In the main tuningConfig class constructor, we set the maxBytes to 0 if null to avoid setting
// maxBytes to max jvm memory of the process that starts first. Instead we set the default based on
// the actual task node's jvm memory.
return maxBytesInMemory == 0 ? TuningConfig.DEFAULT_MAX_BYTES_IN_MEMORY : maxBytesInMemory;
}
}

View File

@ -32,6 +32,8 @@ public interface AppenderatorConfig
int getMaxRowsInMemory();
long getMaxBytesInMemory();
int getMaxPendingPersists();
Period getIntermediatePersistPeriod();

View File

@ -66,6 +66,7 @@ import io.druid.segment.Segment;
import io.druid.segment.incremental.IncrementalIndexAddResult;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.TuningConfigs;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant;
@ -84,6 +85,7 @@ import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
@ -97,6 +99,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@ -128,6 +131,7 @@ public class AppenderatorImpl implements Appenderator
// This variable updated in add(), persist(), and drop()
private final AtomicInteger rowsCurrentlyInMemory = new AtomicInteger();
private final AtomicInteger totalRows = new AtomicInteger();
private final AtomicLong bytesCurrentlyInMemory = new AtomicLong();
// Synchronize persisting commitMetadata so that multiple persist threads (if present)
// and abandon threads do not step over each other
private final Lock commitLock = new ReentrantLock();
@ -143,7 +147,7 @@ public class AppenderatorImpl implements Appenderator
private volatile FileChannel basePersistDirLockChannel = null;
private AtomicBoolean closed = new AtomicBoolean(false);
public AppenderatorImpl(
AppenderatorImpl(
DataSchema schema,
AppenderatorConfig tuningConfig,
FireDepartmentMetrics metrics,
@ -219,11 +223,14 @@ public class AppenderatorImpl implements Appenderator
metrics.reportMessageMaxTimestamp(row.getTimestampFromEpoch());
final int sinkRowsInMemoryBeforeAdd = sink.getNumRowsInMemory();
final int sinkRowsInMemoryAfterAdd;
final long bytesInMemoryBeforeAdd = sink.getBytesInMemory();
final long bytesInMemoryAfterAdd;
final IncrementalIndexAddResult addResult;
try {
addResult = sink.add(row, !allowIncrementalPersists);
sinkRowsInMemoryAfterAdd = addResult.getRowCount();
bytesInMemoryAfterAdd = addResult.getBytesInMemory();
}
catch (IndexSizeExceededException e) {
// Uh oh, we can't do anything about this! We can't persist (commit metadata would be out of sync) and we
@ -240,19 +247,51 @@ public class AppenderatorImpl implements Appenderator
final int numAddedRows = sinkRowsInMemoryAfterAdd - sinkRowsInMemoryBeforeAdd;
rowsCurrentlyInMemory.addAndGet(numAddedRows);
totalRows.addAndGet(numAddedRows);
bytesCurrentlyInMemory.addAndGet(bytesInMemoryAfterAdd - bytesInMemoryBeforeAdd);
boolean isPersistRequired = false;
if (!sink.canAppendRow()
|| System.currentTimeMillis() > nextFlush
|| rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) {
boolean persist = false;
List<String> persistReasons = new ArrayList();
if (!sink.canAppendRow()) {
persist = true;
persistReasons.add("No more rows can be appended to sink");
}
if (System.currentTimeMillis() > nextFlush) {
persist = true;
persistReasons.add(StringUtils.format(
" current time[%d] is greater than nextFlush[%d],",
System.currentTimeMillis(),
nextFlush
));
}
if (rowsCurrentlyInMemory.get() >= tuningConfig.getMaxRowsInMemory()) {
persist = true;
persistReasons.add(StringUtils.format(
" rowsCurrentlyInMemory[%d] is greater than maxRowsInMemory[%d],",
rowsCurrentlyInMemory.get(),
tuningConfig.getMaxRowsInMemory()
));
}
if (tuningConfig.getMaxBytesInMemory() > 0
&& bytesCurrentlyInMemory.get()
>= TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory())) {
persist = true;
persistReasons.add(StringUtils.format(
" bytesCurrentlyInMemory[%d] is greater than maxBytesInMemory[%d]",
bytesCurrentlyInMemory.get(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory())
));
}
if (persist) {
if (allowIncrementalPersists) {
// persistAll clears rowsCurrentlyInMemory, no need to update it.
log.info("Persisting rows in memory due to: [%s]", String.join(",", persistReasons));
persistAll(committerSupplier == null ? null : committerSupplier.get());
} else {
isPersistRequired = true;
}
}
return new AppenderatorAddResult(identifier, sink.getNumRows(), isPersistRequired, addResult.getParseException());
}
@ -286,6 +325,24 @@ public class AppenderatorImpl implements Appenderator
return rowsCurrentlyInMemory.get();
}
@VisibleForTesting
long getBytesCurrentlyInMemory()
{
return bytesCurrentlyInMemory.get();
}
@VisibleForTesting
long getBytesInMemory(SegmentIdentifier identifier)
{
final Sink sink = sinks.get(identifier);
if (sink == null) {
throw new ISE("No such sink: %s", identifier);
} else {
return sink.getBytesInMemory();
}
}
private Sink getOrCreateSink(final SegmentIdentifier identifier)
{
Sink retVal = sinks.get(identifier);
@ -297,6 +354,7 @@ public class AppenderatorImpl implements Appenderator
identifier.getShardSpec(),
identifier.getVersion(),
tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
tuningConfig.isReportParseExceptions()
);
@ -397,6 +455,7 @@ public class AppenderatorImpl implements Appenderator
final Map<String, Integer> currentHydrants = Maps.newHashMap();
final List<Pair<FireHydrant, SegmentIdentifier>> indexesToPersist = Lists.newArrayList();
int numPersistedRows = 0;
long bytesPersisted = 0L;
for (SegmentIdentifier identifier : sinks.keySet()) {
final Sink sink = sinks.get(identifier);
if (sink == null) {
@ -405,6 +464,7 @@ public class AppenderatorImpl implements Appenderator
final List<FireHydrant> hydrants = Lists.newArrayList(sink);
currentHydrants.put(identifier.getIdentifierAsString(), hydrants.size());
numPersistedRows += sink.getNumRowsInMemory();
bytesPersisted += sink.getBytesInMemory();
final int limit = sink.isWritable() ? hydrants.size() - 1 : hydrants.size();
@ -495,7 +555,7 @@ public class AppenderatorImpl implements Appenderator
// NB: The rows are still in memory until they're done persisting, but we only count rows in active indexes.
rowsCurrentlyInMemory.addAndGet(-numPersistedRows);
bytesCurrentlyInMemory.addAndGet(-bytesPersisted);
return future;
}
@ -965,6 +1025,7 @@ public class AppenderatorImpl implements Appenderator
identifier.getShardSpec(),
identifier.getVersion(),
tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
tuningConfig.isReportParseExceptions(),
hydrants
);
@ -1021,6 +1082,7 @@ public class AppenderatorImpl implements Appenderator
// Decrement this sink's rows from rowsCurrentlyInMemory (we only count active sinks).
rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory());
totalRows.addAndGet(-sink.getNumRows());
bytesCurrentlyInMemory.addAndGet(-sink.getBytesInMemory());
// Wait for any outstanding pushes to finish, then abandon the segment inside the persist thread.
return Futures.transform(

View File

@ -65,6 +65,7 @@ import io.druid.segment.incremental.IncrementalIndexAddResult;
import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.TuningConfigs;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.FireHydrant;
@ -255,6 +256,7 @@ public class RealtimePlumber implements Plumber
config.getShardSpec(),
versioningPolicy.getVersion(sinkInterval),
config.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()),
config.isReportParseExceptions()
);
addSink(retVal);
@ -729,6 +731,7 @@ public class RealtimePlumber implements Plumber
config.getShardSpec(),
versioningPolicy.getVersion(sinkInterval),
config.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(config.getMaxBytesInMemory()),
config.isReportParseExceptions(),
hydrants
);

View File

@ -54,7 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger;
public class Sink implements Iterable<FireHydrant>
{
private static final IncrementalIndexAddResult ADD_FAILED = new IncrementalIndexAddResult(-1, null);
private static final IncrementalIndexAddResult ADD_FAILED = new IncrementalIndexAddResult(-1, -1, null);
private final Object hydrantLock = new Object();
private final Interval interval;
@ -62,6 +62,7 @@ public class Sink implements Iterable<FireHydrant>
private final ShardSpec shardSpec;
private final String version;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final boolean reportParseExceptions;
private final CopyOnWriteArrayList<FireHydrant> hydrants = new CopyOnWriteArrayList<FireHydrant>();
private final LinkedHashSet<String> dimOrder = Sets.newLinkedHashSet();
@ -75,6 +76,7 @@ public class Sink implements Iterable<FireHydrant>
ShardSpec shardSpec,
String version,
int maxRowsInMemory,
long maxBytesInMemory,
boolean reportParseExceptions
)
{
@ -83,6 +85,7 @@ public class Sink implements Iterable<FireHydrant>
this.interval = interval;
this.version = version;
this.maxRowsInMemory = maxRowsInMemory;
this.maxBytesInMemory = maxBytesInMemory;
this.reportParseExceptions = reportParseExceptions;
makeNewCurrIndex(interval.getStartMillis(), schema);
@ -94,6 +97,7 @@ public class Sink implements Iterable<FireHydrant>
ShardSpec shardSpec,
String version,
int maxRowsInMemory,
long maxBytesInMemory,
boolean reportParseExceptions,
List<FireHydrant> hydrants
)
@ -103,6 +107,7 @@ public class Sink implements Iterable<FireHydrant>
this.interval = interval;
this.version = version;
this.maxRowsInMemory = maxRowsInMemory;
this.maxBytesInMemory = maxBytesInMemory;
this.reportParseExceptions = reportParseExceptions;
int maxCount = -1;
@ -250,6 +255,18 @@ public class Sink implements Iterable<FireHydrant>
}
}
public long getBytesInMemory()
{
synchronized (hydrantLock) {
IncrementalIndex index = currHydrant.getIndex();
if (index == null) {
return 0;
}
return currHydrant.getIndex().getBytesInMemory();
}
}
private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema)
{
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
@ -264,6 +281,7 @@ public class Sink implements Iterable<FireHydrant>
.setIndexSchema(indexSchema)
.setReportParseExceptions(reportParseExceptions)
.setMaxRowCount(maxRowsInMemory)
.setMaxBytesInMemory(maxBytesInMemory)
.buildOnheap();
final FireHydrant old;

View File

@ -90,7 +90,7 @@ public class RealtimeTuningConfigTest
Assert.assertEquals(new Period("PT10M"), config.getIntermediatePersistPeriod());
Assert.assertEquals(NoneShardSpec.instance(), config.getShardSpec());
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(75000, config.getMaxRowsInMemory());
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
Assert.assertEquals(0, config.getMergeThreadPriority());
Assert.assertEquals(0, config.getPersistThreadPriority());
Assert.assertEquals(new Period("PT10M"), config.getWindowPeriod());

View File

@ -69,6 +69,7 @@ import io.druid.segment.incremental.IndexSizeExceededException;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.TuningConfigs;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.plumber.Plumber;
import io.druid.segment.realtime.plumber.PlumberSchool;
@ -199,6 +200,7 @@ public class RealtimeManagerTest
);
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
1,
null,
new Period("P1Y"),
null,
null,
@ -221,6 +223,7 @@ public class RealtimeManagerTest
tuningConfig.getShardSpec(),
DateTimes.nowUtc().toString(),
tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
tuningConfig.isReportParseExceptions()
));
@ -241,6 +244,7 @@ public class RealtimeManagerTest
tuningConfig.getShardSpec(),
DateTimes.nowUtc().toString(),
tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
tuningConfig.isReportParseExceptions()
));
@ -258,6 +262,7 @@ public class RealtimeManagerTest
tuningConfig_0 = new RealtimeTuningConfig(
1,
null,
new Period("P1Y"),
null,
null,
@ -277,6 +282,7 @@ public class RealtimeManagerTest
tuningConfig_1 = new RealtimeTuningConfig(
1,
null,
new Period("P1Y"),
null,
null,

View File

@ -68,6 +68,7 @@ public class AppenderatorPlumberTest
null,
null,
null,
null,
new IntervalStartVersioningPolicy(),
new NoopRejectionPolicyFactory(),
null,

View File

@ -145,6 +145,118 @@ public class AppenderatorTest
}
}
@Test
public void testMaxBytesInMemory() throws Exception
{
try (final AppenderatorTester tester = new AppenderatorTester(100, 1024, true)) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
final Object metadata = ImmutableMap.of(eventCount, eventCount.get());
return new Committer()
{
@Override
public Object getMetadata()
{
return metadata;
}
@Override
public void run()
{
//Do nothing
}
};
};
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier);
//expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 10 (dimsKeySize) = 138
Assert.assertEquals(138, ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier);
Assert.assertEquals(138, ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(1)));
appenderator.close();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
}
}
@Test
public void testMaxBytesInMemoryInMultipleSinks() throws Exception
{
try (final AppenderatorTester tester = new AppenderatorTester(100, 1024, true)) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
final Object metadata = ImmutableMap.of(eventCount, eventCount.get());
return new Committer()
{
@Override
public Object getMetadata()
{
return metadata;
}
@Override
public void run()
{
//Do nothing
}
};
};
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier);
//expectedSizeInBytes = 44(map overhead) + 28 (TimeAndDims overhead) + 56 (aggregator metrics) + 10 (dimsKeySize) = 138
Assert.assertEquals(138, ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory());
appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier);
Assert.assertEquals(276, ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory());
appenderator.close();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
}
}
@Test
public void testIgnoreMaxBytesInMemory() throws Exception
{
try (final AppenderatorTester tester = new AppenderatorTester(100, -1, true)) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
final Object metadata = ImmutableMap.of(eventCount, eventCount.get());
return new Committer()
{
@Override
public Object getMetadata()
{
return metadata;
}
@Override
public void run()
{
//Do nothing
}
};
};
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.startJob();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(0), IR("2000", "foo", 1), committerSupplier);
//we still calculate the size even when ignoring it to make persist decision
Assert.assertEquals(138, ((AppenderatorImpl) appenderator).getBytesInMemory(IDENTIFIERS.get(0)));
Assert.assertEquals(1, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.add(IDENTIFIERS.get(1), IR("2000", "bar", 1), committerSupplier);
Assert.assertEquals(276, ((AppenderatorImpl) appenderator).getBytesCurrentlyInMemory());
Assert.assertEquals(2, ((AppenderatorImpl) appenderator).getRowsInMemory());
appenderator.close();
Assert.assertEquals(0, ((AppenderatorImpl) appenderator).getRowsInMemory());
}
}
@Test
public void testMaxRowsInMemory() throws Exception
{
@ -288,7 +400,12 @@ public class AppenderatorTest
appenderator.add(IDENTIFIERS.get(0), IR("2000", "bob", 5), committerSupplier);
appenderator.close();
try (final AppenderatorTester tester2 = new AppenderatorTester(2, tuningConfig.getBasePersistDirectory(), true)) {
try (final AppenderatorTester tester2 = new AppenderatorTester(
2,
-1,
tuningConfig.getBasePersistDirectory(),
true
)) {
final Appenderator appenderator2 = tester2.getAppenderator();
Assert.assertEquals(ImmutableMap.of("eventCount", 4), appenderator2.startJob());
Assert.assertEquals(ImmutableList.of(IDENTIFIERS.get(0)), appenderator2.getSegments());

View File

@ -89,7 +89,7 @@ public class AppenderatorTester implements AutoCloseable
final int maxRowsInMemory
)
{
this(maxRowsInMemory, null, false);
this(maxRowsInMemory, -1, null, false);
}
public AppenderatorTester(
@ -97,11 +97,21 @@ public class AppenderatorTester implements AutoCloseable
final boolean enablePushFailure
)
{
this(maxRowsInMemory, null, enablePushFailure);
this(maxRowsInMemory, -1, null, enablePushFailure);
}
public AppenderatorTester(
final int maxRowsInMemory,
final long maxSizeInBytes,
final boolean enablePushFailure
)
{
this(maxRowsInMemory, maxSizeInBytes, null, enablePushFailure);
}
public AppenderatorTester(
final int maxRowsInMemory,
long maxSizeInBytes,
final File basePersistDirectory,
final boolean enablePushFailure
)
@ -131,9 +141,10 @@ public class AppenderatorTester implements AutoCloseable
null,
objectMapper
);
maxSizeInBytes = maxSizeInBytes == 0L ? getDefaultMaxBytesInMemory() : maxSizeInBytes;
tuningConfig = new RealtimeTuningConfig(
maxRowsInMemory,
maxSizeInBytes,
null,
null,
basePersistDirectory,
@ -267,6 +278,11 @@ public class AppenderatorTester implements AutoCloseable
);
}
private long getDefaultMaxBytesInMemory()
{
return (Runtime.getRuntime().totalMemory()) / 3;
}
public DataSchema getSchema()
{
return schema;

View File

@ -136,6 +136,7 @@ public class DefaultOfflineAppenderatorFactoryTest
75000,
null,
null,
null,
temporaryFolder.newFolder(),
null,
null,

View File

@ -50,6 +50,7 @@ import io.druid.segment.ReferenceCountingSegment;
import io.druid.segment.TestHelper;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.TuningConfigs;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.loading.DataSegmentPusher;
import io.druid.segment.realtime.FireDepartmentMetrics;
@ -199,6 +200,7 @@ public class RealtimePlumberSchoolTest
null,
null,
null,
null,
new IntervalStartVersioningPolicy(),
rejectionPolicy,
null,
@ -269,6 +271,7 @@ public class RealtimePlumberSchoolTest
tuningConfig.getShardSpec(),
DateTimes.of("2014-12-01T12:34:56.789").toString(),
tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
tuningConfig.isReportParseExceptions()
);
plumber.getSinks().put(0L, sink);
@ -313,6 +316,7 @@ public class RealtimePlumberSchoolTest
tuningConfig.getShardSpec(),
DateTimes.of("2014-12-01T12:34:56.789").toString(),
tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
tuningConfig.isReportParseExceptions()
);
plumber.getSinks().put(0L, sink);
@ -367,6 +371,7 @@ public class RealtimePlumberSchoolTest
tuningConfig.getShardSpec(),
DateTimes.of("2014-12-01T12:34:56.789").toString(),
tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
tuningConfig.isReportParseExceptions()
);
plumber2.getSinks().put(0L, sink);

View File

@ -31,6 +31,7 @@ import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.indexing.TuningConfigs;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import io.druid.segment.realtime.FireHydrant;
import org.joda.time.DateTime;
@ -61,6 +62,7 @@ public class SinkTest
final String version = DateTimes.nowUtc().toString();
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
100,
null,
new Period("P1Y"),
null,
null,
@ -83,6 +85,7 @@ public class SinkTest
tuningConfig.getShardSpec(),
version,
tuningConfig.getMaxRowsInMemory(),
TuningConfigs.getMaxBytesInMemoryOrDefault(tuningConfig.getMaxBytesInMemory()),
tuningConfig.isReportParseExceptions()
);

View File

@ -169,6 +169,7 @@ public class DruidJsonValidatorTest
new RealtimeTuningConfig(
1,
null,
new Period("PT10M"),
null,
null,