diff --git a/docs/development/extensions-core/kinesis-ingestion.md b/docs/development/extensions-core/kinesis-ingestion.md index 9b14ec767c2..eccfed0a589 100644 --- a/docs/development/extensions-core/kinesis-ingestion.md +++ b/docs/development/extensions-core/kinesis-ingestion.md @@ -101,9 +101,7 @@ Where the file `supervisor-spec.json` contains a Kinesis supervisor spec: "endpoint": "kinesis.us-east-1.amazonaws.com", "taskCount": 1, "replicas": 1, - "taskDuration": "PT1H", - "recordsPerFetch": 4000, - "fetchDelayMillis": 0 + "taskDuration": "PT1H" }, "tuningConfig": { "type": "kinesis", @@ -121,10 +119,10 @@ Where the file `supervisor-spec.json` contains a Kinesis supervisor spec: |`type`|The supervisor type; this should always be `kinesis`.|yes| |`spec`|Container object for the supervisor configuration.|yes| |`dataSchema`|The schema that will be used by the Kinesis indexing task during ingestion. See [`dataSchema`](../../ingestion/ingestion-spec.md#dataschema).|yes| -|`ioConfig`|A `KinesisSupervisorIOConfig` object for configuring Kafka connection and I/O-related settings for the supervisor and indexing task. See [KinesisSupervisorIOConfig](#kinesissupervisorioconfig) below.|yes| -|`tuningConfig`|A `KinesisSupervisorTuningConfig` object for configuring performance-related settings for the supervisor and indexing tasks. See [KinesisSupervisorTuningConfig](#kinesissupervisortuningconfig) below.|no| +|`ioConfig`|An [`ioConfig`](#ioconfig) object for configuring Kafka connection and I/O-related settings for the supervisor and indexing task.|yes| +|`tuningConfig`|A [`tuningConfig`](#tuningconfig) object for configuring performance-related settings for the supervisor and indexing tasks.|no| -### KinesisSupervisorIOConfig +### `ioConfig` |Field|Type|Description|Required| |-----|----|-----------|--------| @@ -140,7 +138,7 @@ Where the file `supervisor-spec.json` contains a Kinesis supervisor spec: |`completionTimeout`|ISO8601 Period|The length of time to wait before declaring a publishing task as failed and terminating it. If this is set too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|no (default == PT6H)| |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)| |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*. Messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting `earlyMessageRejectionPeriod` too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)| -|`recordsPerFetch`|Integer|The number of records to request per call to fetch records from Kinesis. See [Determining fetch settings](#determining-fetch-settings).|no (default == 4000)| +|`recordsPerFetch`|Integer|The number of records to request per call to fetch records from Kinesis. See [Determining fetch settings](#determining-fetch-settings).|no (see [Determining fetch settings](#determining-fetch-settings) for defaults)| |`fetchDelayMillis`|Integer|Time in milliseconds to wait between subsequent calls to fetch records from Kinesis. See [Determining fetch settings](#determining-fetch-settings).|no (default == 0)| |`awsAssumedRoleArn`|String|The AWS assumed role to use for additional permissions.|no| |`awsExternalId`|String|The AWS external id to use for additional permissions.|no| @@ -243,9 +241,7 @@ The following example demonstrates a supervisor spec with `lagBased` autoScaler "endpoint": "kinesis.us-east-1.amazonaws.com", "taskCount": 1, "replicas": 1, - "taskDuration": "PT1H", - "recordsPerFetch": 4000, - "fetchDelayMillis": 0 + "taskDuration": "PT1H" }, "tuningConfig": { "type": "kinesis", @@ -271,7 +267,7 @@ For more information, see [Data formats](../../ingestion/data-formats.md). You c -### KinesisSupervisorTuningConfig +### `tuningConfig` The `tuningConfig` is optional. If no `tuningConfig` is specified, default parameters are used. @@ -296,16 +292,16 @@ The `tuningConfig` is optional. If no `tuningConfig` is specified, default param |`chatRetries`|Integer|The number of times HTTP requests to indexing tasks will be retried before considering tasks unresponsive.| no (default == 8)| |`httpTimeout`|ISO8601 Period|How long to wait for a HTTP response from an indexing task.|no (default == PT10S)| |`shutdownTimeout`|ISO8601 Period|How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting.|no (default == PT80S)| -|`recordBufferSize`|Integer|Size of the buffer (number of events) used between the Kinesis fetch threads and the main ingestion thread.|no (default == 10000)| +|`recordBufferSize`|Integer|Size of the buffer (number of events) used between the Kinesis fetch threads and the main ingestion thread.|no (see [Determining fetch settings](#determining-fetch-settings) for defaults)| |`recordBufferOfferTimeout`|Integer|Length of time in milliseconds to wait for space to become available in the buffer before timing out.| no (default == 5000)| |`recordBufferFullWait`|Integer|Length of time in milliseconds to wait for the buffer to drain before attempting to fetch records from Kinesis again.|no (default == 5000)| -|`fetchThreads`|Integer|Size of the pool of threads fetching data from Kinesis. There is no benefit in having more threads than Kinesis shards.|no (default == procs * 2, where "procs" is the number of processors on the server that the task is running on) | +|`fetchThreads`|Integer|Size of the pool of threads fetching data from Kinesis. There is no benefit in having more threads than Kinesis shards.|no (default == procs * 2, where "procs" is the number of processors available to the task) | |`segmentWriteOutMediumFactory`|Object|Segment write-out medium to use when creating segments. See below for more information.|no (not specified by default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used)| |`intermediateHandoffPeriod`|ISO8601 Period|How often the tasks should hand off segments. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.| no (default == P2147483647D)| |`logParseExceptions`|Boolean|If true, log an error message when a parsing exception occurs, containing information about the row where the error occurred.|no, default == false| |`maxParseExceptions`|Integer|The maximum number of parse exceptions that can occur before the task halts ingestion and fails. Overridden if `reportParseExceptions` is set.|no, unlimited default| |`maxSavedParseExceptions`|Integer|When a parse exception occurs, Druid can keep track of the most recent parse exceptions. "maxSavedParseExceptions" limits how many exception instances will be saved. These saved exceptions will be made available after the task finishes in the [task completion report](../../ingestion/tasks.md#task-reports). Overridden if `reportParseExceptions` is set.|no, default == 0| -|`maxRecordsPerPoll`|Integer|The maximum number of records/events to be fetched from buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, Max(bufferSize, 1))`|no, default == 100| +|`maxRecordsPerPoll`|Integer|The maximum number of records/events to be fetched from buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, Max(bufferSize, 1))`|no (see [Determining fetch settings](#determining-fetch-settings) for defaults)| |`repartitionTransitionDuration`|ISO8601 Period|When shards are split or merged, the supervisor will recompute shard -> task group mappings, and signal any running tasks created under the old mappings to stop early at (current time + `repartitionTransitionDuration`). Stopping the tasks early allows Druid to begin reading from the new shards more quickly. The repartition transition wait time controlled by this property gives the stream additional time to write records to the new shards after the split/merge, which helps avoid the issues with empty shard handling described at https://github.com/apache/druid/issues/7600.|no, (default == PT2M)| |`offsetFetchPeriod`|ISO8601 Period|How often the supervisor queries Kinesis and the indexing tasks to fetch current offsets and calculate lag. If the user-specified value is below the minimum value (`PT5S`), the supervisor ignores the value and uses the minimum value instead.|no (default == PT30S, min == PT5S)| |`useListShards`|Boolean|Indicates if `listShards` API of AWS Kinesis SDK can be used to prevent `LimitExceededException` during ingestion. Please note that the necessary `IAM` permissions must be set for this to work.|no (default == false)| @@ -593,16 +589,27 @@ There is also ongoing work to support automatic segment compaction of sharded se Hadoop (see [here](https://github.com/apache/druid/pull/5102)). ### Determining Fetch Settings -Internally, the Kinesis Indexing Service uses the Kinesis Record Supplier abstraction for fetching Kinesis data records and storing the records -locally. The way the Kinesis Record Supplier fetches records is to have a separate thread run the fetching operation per each Kinesis Shard, the -max number of threads is determined by `fetchThreads`. For example, a Kinesis stream with 3 shards will have 3 threads, each fetching from a shard separately. -There is a delay between each fetching operation, which is controlled by `fetchDelayMillis`. The maximum number of records to be fetched per thread per -operation is controlled by `recordsPerFetch`. Note that this is not the same as `maxRecordsPerPoll`. +Kinesis indexing tasks fetch records using `fetchThreads` threads. +If `fetchThreads` is higher than the number of Kinesis shards, the excess threads are unused. +Each fetch thread fetches up to `recordsPerFetch` records at once from a Kinesis shard, with a delay between fetches +of `fetchDelayMillis`. +The records fetched by each thread are pushed into a shared queue of size `recordBufferSize`. +The main runner thread for each task polls up to `maxRecordsPerPoll` records from the queue at once. -The records fetched by each thread will be pushed to a queue in the order that they are fetched. The records are stored in this queue until `poll()` is called -by either the supervisor or the indexing task. `poll()` will attempt to drain the internal buffer queue up to a limit of `max(maxRecordsPerPoll, q.size())`. -Here `maxRecordsPerPoll` controls the theoretical maximum records to drain out of the buffer queue, so setting this parameter to a reasonable value is essential -in preventing the queue from overflowing or memory exceeding heap size. +When using Kinesis Producer Library's aggregation feature (i.e. when [`deaggregate`](#deaggregation) is set), +each of these parameters refers to aggregated records rather than individual records. + +The default values for these parameters are: + +- `fetchThreads`: Twice the number of processors available to the task. The number of processors available to the task +is the total number of processors on the server, divided by `druid.worker.capacity` (the number of task slots on that +particular server). +- `fetchDelayMillis`: 0 (no delay between fetches). +- `recordsPerFetch`: 100 MB or an estimated 5% of available heap, whichever is smaller, divided by `fetchThreads`. +For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation). +- `recordBufferSize`: 100 MB or an estimated 10% of available heap, whichever is smaller. +For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation). +- `maxRecordsPerPoll`: 100 for regular records, 1 for [aggregated records](#deaggregation). Kinesis places the following restrictions on calls to fetch records: @@ -611,25 +618,19 @@ Kinesis places the following restrictions on calls to fetch records: - Each shard can read up to 2 MB per second. - The maximum size of data that GetRecords can return is 10 MB. -Values for `recordsPerFetch` and `fetchDelayMillis` should be chosen to maximize throughput under the above constraints. -The values that you choose will depend on the average size of a record and the number of consumers you have reading from -a given shard (which will be `replicas` unless you have other consumers also reading from this Kinesis stream). +If the above limits are exceeded, Kinesis throws ProvisionedThroughputExceededException errors. If this happens, Druid +Kinesis tasks pause by `fetchDelayMillis` or 3 seconds, whichever is larger, and then attempt the call again. -If the above limits are violated, AWS will throw ProvisionedThroughputExceededException errors on subsequent calls to -read data. When this happens, the Kinesis indexing service will pause by `fetchDelayMillis` and then attempt the call -again. - -Internally, each indexing task maintains a buffer that stores the fetched but not yet processed record. `recordsPerFetch` and `fetchDelayMillis` -control this behavior. The number of records that the indexing task fetch from the buffer is controlled by `maxRecordsPerPoll`, which -determines the number of records to be processed per each ingestion loop in the task. +In most cases, the default settings for fetch parameters are sufficient to achieve good performance without excessive +memory usage. However, in some cases, you may need to adjust these parameters in order to more finely control fetch rate +and memory usage. Optimal values depend on the average size of a record and the number of consumers you have reading +from a given shard, which will be `replicas` unless you have other consumers also reading from this Kinesis stream. ## Deaggregation -See [issue](https://github.com/apache/druid/issues/6714) - The Kinesis indexing service supports de-aggregation of multiple rows packed into a single record by the Kinesis Producer Library's aggregate method for more efficient data transfer. -To enable this feature, set `deaggregate` to true when submitting a supervisor-spec. +To enable this feature, set `deaggregate` to true in your `ioConfig` when submitting a supervisor spec. ## Resharding diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java index a0b31f3116f..d8fbd6a36bb 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java @@ -27,19 +27,25 @@ import com.google.common.base.Preconditions; import com.google.inject.name.Named; import org.apache.druid.common.aws.AWSCredentialsConfig; import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.TaskToolbox; import org.apache.druid.indexing.common.task.TaskResource; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.utils.RuntimeInfo; import java.util.Map; public class KinesisIndexTask extends SeekableStreamIndexTask { private static final String TYPE = "index_kinesis"; + private static final Logger log = new Logger(KinesisIndexTask.class); private final boolean useListShards; private final AWSCredentialsConfig awsCredentialsConfig; + private RuntimeInfo runtimeInfo; @JsonCreator public KinesisIndexTask( @@ -66,6 +72,13 @@ public class KinesisIndexTask extends SeekableStreamIndexTask createTaskRunner() { @@ -84,14 +97,23 @@ public class KinesisIndexTask extends SeekableStreamIndexTask 0, - "Must have at least one background fetch thread for the record supplier" + log.info( + "Starting record supplier with fetchThreads [%d], fetchDelayMillis [%d], recordsPerFetch [%d], " + + "recordBufferSize [%d], maxRecordsPerPoll [%d], deaggregate [%s].", + fetchThreads, + ioConfig.getFetchDelayMillis(), + recordsPerFetch, + recordBufferSize, + maxRecordsPerPoll, + ioConfig.isDeaggregate() ); + return new KinesisRecordSupplier( KinesisRecordSupplier.getAmazonKinesisClient( ioConfig.getEndpoint(), @@ -99,14 +121,14 @@ public class KinesisIndexTask extends SeekableStreamIndexTask 0, + "Must have at least one background fetch thread for the record supplier" + ); + + return fetchThreads; + } } diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java index cabc99334aa..0572c317006 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskIOConfig.java @@ -20,8 +20,10 @@ package org.apache.druid.indexing.kinesis; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; import org.apache.druid.data.input.InputFormat; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; @@ -33,12 +35,23 @@ import java.util.Set; public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig { - public static final int DEFAULT_RECORDS_PER_FETCH = 4000; public static final int DEFAULT_FETCH_DELAY_MILLIS = 0; + /** + * Together with {@link KinesisIndexTaskTuningConfig#MAX_RECORD_BUFFER_MEMORY}, don't take up more than 200MB + * per task. + */ + private static final int MAX_RECORD_FETCH_MEMORY = 100_000_000; + + /** + * Together with {@link KinesisIndexTaskTuningConfig#RECORD_BUFFER_MEMORY_MAX_HEAP_FRACTION}, don't take up more + * than 15% of the heap. + */ + private static final double RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION = 0.05; + private final String endpoint; private final Integer recordsPerFetch; - private final Integer fetchDelayMillis; + private final int fetchDelayMillis; private final String awsAssumedRoleArn; private final String awsExternalId; @@ -92,7 +105,7 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig KinesisIndexTask.computeFetchThreads(runtimeInfo, 0) + ); + Assert.assertThrows( + IllegalArgumentException.class, + () -> KinesisIndexTask.computeFetchThreads(runtimeInfo, -1) + ); + } + private KinesisIndexTask createTask( int groupId, Map startSequenceNumbers, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java index 823c741e356..f283a7455f6 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java @@ -76,7 +76,8 @@ public class KinesisIndexTaskTuningConfigTest Assert.assertEquals(new IndexSpec(), config.getIndexSpec()); Assert.assertFalse(config.isReportParseExceptions()); Assert.assertEquals(0, config.getHandoffConditionTimeout()); - Assert.assertEquals(10000, config.getRecordBufferSize()); + Assert.assertNull(config.getRecordBufferSizeConfigured()); + Assert.assertEquals(10000, config.getRecordBufferSizeOrDefault(1_000_000_000, false)); Assert.assertEquals(5000, config.getRecordBufferOfferTimeout()); Assert.assertEquals(5000, config.getRecordBufferFullWait()); Assert.assertNull(config.getFetchThreads()); @@ -123,7 +124,8 @@ public class KinesisIndexTaskTuningConfigTest Assert.assertEquals(100, config.getMaxPendingPersists()); Assert.assertTrue(config.isReportParseExceptions()); Assert.assertEquals(100, config.getHandoffConditionTimeout()); - Assert.assertEquals(1000, config.getRecordBufferSize()); + Assert.assertEquals(1000, (int) config.getRecordBufferSizeConfigured()); + Assert.assertEquals(1000, config.getRecordBufferSizeOrDefault(1_000_000_000, false)); Assert.assertEquals(500, config.getRecordBufferOfferTimeout()); Assert.assertEquals(500, config.getRecordBufferFullWait()); Assert.assertEquals(2, (int) config.getFetchThreads()); @@ -186,8 +188,8 @@ public class KinesisIndexTaskTuningConfigTest Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions()); Assert.assertEquals(base.getRecordBufferFullWait(), deserialized.getRecordBufferFullWait()); Assert.assertEquals(base.getRecordBufferOfferTimeout(), deserialized.getRecordBufferOfferTimeout()); - Assert.assertEquals(base.getRecordBufferSize(), deserialized.getRecordBufferSize()); - Assert.assertEquals(base.getMaxRecordsPerPoll(), deserialized.getMaxRecordsPerPoll()); + Assert.assertEquals(base.getRecordBufferSizeConfigured(), deserialized.getRecordBufferSizeConfigured()); + Assert.assertEquals(base.getMaxRecordsPerPollConfigured(), deserialized.getMaxRecordsPerPollConfigured()); } @Test @@ -244,8 +246,8 @@ public class KinesisIndexTaskTuningConfigTest Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions()); Assert.assertEquals(base.getRecordBufferFullWait(), deserialized.getRecordBufferFullWait()); Assert.assertEquals(base.getRecordBufferOfferTimeout(), deserialized.getRecordBufferOfferTimeout()); - Assert.assertEquals(base.getRecordBufferSize(), deserialized.getRecordBufferSize()); - Assert.assertEquals(base.getMaxRecordsPerPoll(), deserialized.getMaxRecordsPerPoll()); + Assert.assertEquals(base.getRecordBufferSizeConfigured(), deserialized.getRecordBufferSizeConfigured()); + Assert.assertEquals(base.getMaxRecordsPerPollConfigured(), deserialized.getMaxRecordsPerPollConfigured()); } @Test @@ -307,13 +309,13 @@ public class KinesisIndexTaskTuningConfigTest null, null, null, - null, + 10, null, null, null, null ); - KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig(); + KinesisIndexTaskTuningConfig copy = original.convertToTaskTuningConfig(); Assert.assertEquals(original.getAppendableIndexSpec(), copy.getAppendableIndexSpec()); Assert.assertEquals(1, copy.getMaxRowsInMemory()); @@ -326,13 +328,13 @@ public class KinesisIndexTaskTuningConfigTest Assert.assertEquals(new IndexSpec(), copy.getIndexSpec()); Assert.assertTrue(copy.isReportParseExceptions()); Assert.assertEquals(5L, copy.getHandoffConditionTimeout()); - Assert.assertEquals(1000, copy.getRecordBufferSize()); + Assert.assertEquals(1000, (int) copy.getRecordBufferSizeConfigured()); Assert.assertEquals(500, copy.getRecordBufferOfferTimeout()); Assert.assertEquals(500, copy.getRecordBufferFullWait()); Assert.assertEquals(2, (int) copy.getFetchThreads()); Assert.assertFalse(copy.isSkipSequenceNumberAvailabilityCheck()); Assert.assertTrue(copy.isResetOffsetAutomatically()); - Assert.assertEquals(100, copy.getMaxRecordsPerPoll()); + Assert.assertEquals(10, (int) copy.getMaxRecordsPerPollConfigured()); Assert.assertEquals(new Period().withDays(Integer.MAX_VALUE), copy.getIntermediateHandoffPeriod()); } diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java index 10f866e1262..fabde1852aa 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfigTest.java @@ -70,8 +70,8 @@ public class KinesisSupervisorIOConfigTest Assert.assertFalse("lateMessageRejectionPeriod", config.getLateMessageRejectionPeriod().isPresent()); Assert.assertFalse("earlyMessageRejectionPeriod", config.getEarlyMessageRejectionPeriod().isPresent()); Assert.assertFalse("lateMessageRejectionStartDateTime", config.getLateMessageRejectionStartDateTime().isPresent()); - Assert.assertEquals((Integer) 4000, config.getRecordsPerFetch()); - Assert.assertEquals((Integer) 0, config.getFetchDelayMillis()); + Assert.assertNull(config.getRecordsPerFetch()); + Assert.assertEquals(0, config.getFetchDelayMillis()); Assert.assertNull(config.getAwsAssumedRoleArn()); Assert.assertNull(config.getAwsExternalId()); Assert.assertFalse(config.isDeaggregate()); @@ -118,7 +118,7 @@ public class KinesisSupervisorIOConfigTest Assert.assertEquals(Duration.standardHours(1), config.getLateMessageRejectionPeriod().get()); Assert.assertEquals(Duration.standardHours(1), config.getEarlyMessageRejectionPeriod().get()); Assert.assertEquals((Integer) 4000, config.getRecordsPerFetch()); - Assert.assertEquals((Integer) 1000, config.getFetchDelayMillis()); + Assert.assertEquals(1000, config.getFetchDelayMillis()); Assert.assertEquals("role", config.getAwsAssumedRoleArn()); Assert.assertEquals("awsexternalid", config.getAwsExternalId()); Assert.assertTrue(config.isDeaggregate()); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java index 2772cc5794a..70611266efe 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java @@ -112,7 +112,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu base.getHandoffConditionTimeout(), base.isResetOffsetAutomatically(), base.isSkipSequenceNumberAvailabilityCheck(), - base.getRecordBufferSize(), + base.getRecordBufferSizeConfigured(), base.getRecordBufferOfferTimeout(), base.getRecordBufferFullWait(), base.getFetchThreads(), @@ -120,7 +120,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu base.isLogParseExceptions(), base.getMaxParseExceptions(), base.getMaxSavedParseExceptions(), - base.getMaxRecordsPerPoll(), + base.getMaxRecordsPerPollConfigured(), base.getIntermediateHandoffPeriod() ); this.extra = extra; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/AdjustedRuntimeInfo.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/AdjustedRuntimeInfo.java new file mode 100644 index 00000000000..93f881337e0 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/AdjustedRuntimeInfo.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common; + +import org.apache.druid.utils.RuntimeInfo; + +/** + * Like {@link RuntimeInfo}, but adjusted based on the number of tasks running in a JVM, so each task gets its + * own processors and memory. Returned by {@link TaskToolbox#getAdjustedRuntimeInfo()}. + */ +public class AdjustedRuntimeInfo extends RuntimeInfo +{ + private final RuntimeInfo base; + private final int numTasksInJvm; + + public AdjustedRuntimeInfo(final RuntimeInfo base, final int numTasksInJvm) + { + this.base = base; + this.numTasksInJvm = numTasksInJvm; + } + + @Override + public int getAvailableProcessors() + { + return Math.max(1, base.getAvailableProcessors() / numTasksInJvm); + } + + @Override + public long getMaxHeapSizeBytes() + { + return base.getMaxHeapSizeBytes() / numTasksInJvm; + } + + @Override + public long getDirectMemorySizeBytes() + { + return base.getDirectMemorySizeBytes() / numTasksInJvm; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java index 03a4849e602..9715635962c 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/TaskToolbox.java @@ -56,6 +56,7 @@ import org.apache.druid.segment.loading.DataSegmentMover; import org.apache.druid.segment.loading.DataSegmentPusher; import org.apache.druid.segment.loading.SegmentCacheManager; import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager; +import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; import org.apache.druid.server.DruidNode; import org.apache.druid.server.coordination.DataSegmentAnnouncer; @@ -63,6 +64,8 @@ import org.apache.druid.server.coordination.DataSegmentServerAnnouncer; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.tasklogs.TaskLogPusher; import org.apache.druid.timeline.DataSegment; +import org.apache.druid.utils.JvmUtils; +import org.apache.druid.utils.RuntimeInfo; import org.joda.time.Interval; import javax.annotation.Nullable; @@ -466,6 +469,40 @@ public class TaskToolbox return attemptId; } + /** + * Get {@link RuntimeInfo} adjusted for this particular task. When running in a task JVM launched by a MiddleManager, + * this is the same as the baseline {@link RuntimeInfo}. When running in an Indexer, it is adjusted based on + * {@code druid.worker.capacity}. + */ + public RuntimeInfo getAdjustedRuntimeInfo() + { + return createAdjustedRuntimeInfo(JvmUtils.getRuntimeInfo(), appenderatorsManager); + } + + /** + * Create {@link AdjustedRuntimeInfo} based on the given {@link RuntimeInfo} and {@link AppenderatorsManager}. This + * is a way to allow code to properly apportion the amount of processors and heap available to the entire JVM. + * When running in an Indexer, other tasks share the same JVM, so this must be accounted for. + */ + public static RuntimeInfo createAdjustedRuntimeInfo( + final RuntimeInfo runtimeInfo, + final AppenderatorsManager appenderatorsManager + ) + { + if (appenderatorsManager instanceof UnifiedIndexerAppenderatorsManager) { + // CliIndexer. Each JVM runs multiple tasks; adjust. + return new AdjustedRuntimeInfo( + runtimeInfo, + ((UnifiedIndexerAppenderatorsManager) appenderatorsManager).getWorkerConfig().getCapacity() + ); + } else { + // CliPeon (assumed to be launched by CliMiddleManager). + // Each JVM runs a single task. ForkingTaskRunner sets XX:ActiveProcessorCount so each task already sees + // an adjusted number of processors from the baseline RuntimeInfo. So, we return it directly. + return runtimeInfo; + } + } + public static class Builder { private TaskConfig config; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java index cf6a7f4b03a..6526bb81b1e 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskIOConfig.java @@ -19,6 +19,7 @@ package org.apache.druid.indexing.seekablestream; +import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import com.google.common.base.Preconditions; @@ -107,12 +108,14 @@ public abstract class SeekableStreamIndexTaskIOConfig getMaximumMessageTime() { return maximumMessageTime; } @JsonProperty + @JsonInclude(JsonInclude.Include.NON_ABSENT) public Optional getMinimumMessageTime() { return minimumMessageTime; @@ -120,6 +123,7 @@ public abstract class SeekableStreamIndexTaskIOConfig