Kinesis: More robust default fetch settings. (#13539)

* Kinesis: More robust default fetch settings.

1) Default recordsPerFetch and recordBufferSize based on available memory
   rather than using hardcoded numbers. For this, we need an estimate
   of record size. Use 10 KB for regular records and 1 MB for aggregated
   records. With 1 GB heaps, 2 processors per task, and nonaggregated
   records, recordBufferSize comes out to the same as the old
   default (10000), and recordsPerFetch comes out slightly lower (1250
   instead of 4000).

2) Default maxRecordsPerPoll based on whether records are aggregated
   or not (100 if not aggregated, 1 if aggregated). Prior default was 100.

3) Default fetchThreads based on processors divided by task count on
   Indexers, rather than overall processor count.

4) Additionally clean up the serialized JSON a bit by adding various
   JsonInclude annotations.

* Updates for tests.

* Additional important verify.
This commit is contained in:
Gian Merlino 2023-01-12 21:33:54 -08:00 committed by GitHub
parent b5b740bbbb
commit 182c4fad29
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 437 additions and 100 deletions

View File

@ -101,9 +101,7 @@ Where the file `supervisor-spec.json` contains a Kinesis supervisor spec:
"endpoint": "kinesis.us-east-1.amazonaws.com",
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H",
"recordsPerFetch": 4000,
"fetchDelayMillis": 0
"taskDuration": "PT1H"
},
"tuningConfig": {
"type": "kinesis",
@ -121,10 +119,10 @@ Where the file `supervisor-spec.json` contains a Kinesis supervisor spec:
|`type`|The supervisor type; this should always be `kinesis`.|yes|
|`spec`|Container object for the supervisor configuration.|yes|
|`dataSchema`|The schema that will be used by the Kinesis indexing task during ingestion. See [`dataSchema`](../../ingestion/ingestion-spec.md#dataschema).|yes|
|`ioConfig`|A `KinesisSupervisorIOConfig` object for configuring Kafka connection and I/O-related settings for the supervisor and indexing task. See [KinesisSupervisorIOConfig](#kinesissupervisorioconfig) below.|yes|
|`tuningConfig`|A `KinesisSupervisorTuningConfig` object for configuring performance-related settings for the supervisor and indexing tasks. See [KinesisSupervisorTuningConfig](#kinesissupervisortuningconfig) below.|no|
|`ioConfig`|An [`ioConfig`](#ioconfig) object for configuring Kafka connection and I/O-related settings for the supervisor and indexing task.|yes|
|`tuningConfig`|A [`tuningConfig`](#tuningconfig) object for configuring performance-related settings for the supervisor and indexing tasks.|no|
### KinesisSupervisorIOConfig
### `ioConfig`
|Field|Type|Description|Required|
|-----|----|-----------|--------|
@ -140,7 +138,7 @@ Where the file `supervisor-spec.json` contains a Kinesis supervisor spec:
|`completionTimeout`|ISO8601 Period|The length of time to wait before declaring a publishing task as failed and terminating it. If this is set too low, your tasks may never publish. The publishing clock for a task begins roughly after `taskDuration` elapses.|no (default == PT6H)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*. Messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting `earlyMessageRejectionPeriod` too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
|`recordsPerFetch`|Integer|The number of records to request per call to fetch records from Kinesis. See [Determining fetch settings](#determining-fetch-settings).|no (default == 4000)|
|`recordsPerFetch`|Integer|The number of records to request per call to fetch records from Kinesis. See [Determining fetch settings](#determining-fetch-settings).|no (see [Determining fetch settings](#determining-fetch-settings) for defaults)|
|`fetchDelayMillis`|Integer|Time in milliseconds to wait between subsequent calls to fetch records from Kinesis. See [Determining fetch settings](#determining-fetch-settings).|no (default == 0)|
|`awsAssumedRoleArn`|String|The AWS assumed role to use for additional permissions.|no|
|`awsExternalId`|String|The AWS external id to use for additional permissions.|no|
@ -243,9 +241,7 @@ The following example demonstrates a supervisor spec with `lagBased` autoScaler
"endpoint": "kinesis.us-east-1.amazonaws.com",
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H",
"recordsPerFetch": 4000,
"fetchDelayMillis": 0
"taskDuration": "PT1H"
},
"tuningConfig": {
"type": "kinesis",
@ -271,7 +267,7 @@ For more information, see [Data formats](../../ingestion/data-formats.md). You c
<a name="tuningconfig"></a>
### KinesisSupervisorTuningConfig
### `tuningConfig`
The `tuningConfig` is optional. If no `tuningConfig` is specified, default parameters are used.
@ -296,16 +292,16 @@ The `tuningConfig` is optional. If no `tuningConfig` is specified, default param
|`chatRetries`|Integer|The number of times HTTP requests to indexing tasks will be retried before considering tasks unresponsive.| no (default == 8)|
|`httpTimeout`|ISO8601 Period|How long to wait for a HTTP response from an indexing task.|no (default == PT10S)|
|`shutdownTimeout`|ISO8601 Period|How long to wait for the supervisor to attempt a graceful shutdown of tasks before exiting.|no (default == PT80S)|
|`recordBufferSize`|Integer|Size of the buffer (number of events) used between the Kinesis fetch threads and the main ingestion thread.|no (default == 10000)|
|`recordBufferSize`|Integer|Size of the buffer (number of events) used between the Kinesis fetch threads and the main ingestion thread.|no (see [Determining fetch settings](#determining-fetch-settings) for defaults)|
|`recordBufferOfferTimeout`|Integer|Length of time in milliseconds to wait for space to become available in the buffer before timing out.| no (default == 5000)|
|`recordBufferFullWait`|Integer|Length of time in milliseconds to wait for the buffer to drain before attempting to fetch records from Kinesis again.|no (default == 5000)|
|`fetchThreads`|Integer|Size of the pool of threads fetching data from Kinesis. There is no benefit in having more threads than Kinesis shards.|no (default == procs * 2, where "procs" is the number of processors on the server that the task is running on) |
|`fetchThreads`|Integer|Size of the pool of threads fetching data from Kinesis. There is no benefit in having more threads than Kinesis shards.|no (default == procs * 2, where "procs" is the number of processors available to the task) |
|`segmentWriteOutMediumFactory`|Object|Segment write-out medium to use when creating segments. See below for more information.|no (not specified by default, the value from `druid.peon.defaultSegmentWriteOutMediumFactory.type` is used)|
|`intermediateHandoffPeriod`|ISO8601 Period|How often the tasks should hand off segments. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.| no (default == P2147483647D)|
|`logParseExceptions`|Boolean|If true, log an error message when a parsing exception occurs, containing information about the row where the error occurred.|no, default == false|
|`maxParseExceptions`|Integer|The maximum number of parse exceptions that can occur before the task halts ingestion and fails. Overridden if `reportParseExceptions` is set.|no, unlimited default|
|`maxSavedParseExceptions`|Integer|When a parse exception occurs, Druid can keep track of the most recent parse exceptions. "maxSavedParseExceptions" limits how many exception instances will be saved. These saved exceptions will be made available after the task finishes in the [task completion report](../../ingestion/tasks.md#task-reports). Overridden if `reportParseExceptions` is set.|no, default == 0|
|`maxRecordsPerPoll`|Integer|The maximum number of records/events to be fetched from buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, Max(bufferSize, 1))`|no, default == 100|
|`maxRecordsPerPoll`|Integer|The maximum number of records/events to be fetched from buffer per poll. The actual maximum will be `Max(maxRecordsPerPoll, Max(bufferSize, 1))`|no (see [Determining fetch settings](#determining-fetch-settings) for defaults)|
|`repartitionTransitionDuration`|ISO8601 Period|When shards are split or merged, the supervisor will recompute shard -> task group mappings, and signal any running tasks created under the old mappings to stop early at (current time + `repartitionTransitionDuration`). Stopping the tasks early allows Druid to begin reading from the new shards more quickly. The repartition transition wait time controlled by this property gives the stream additional time to write records to the new shards after the split/merge, which helps avoid the issues with empty shard handling described at https://github.com/apache/druid/issues/7600.|no, (default == PT2M)|
|`offsetFetchPeriod`|ISO8601 Period|How often the supervisor queries Kinesis and the indexing tasks to fetch current offsets and calculate lag. If the user-specified value is below the minimum value (`PT5S`), the supervisor ignores the value and uses the minimum value instead.|no (default == PT30S, min == PT5S)|
|`useListShards`|Boolean|Indicates if `listShards` API of AWS Kinesis SDK can be used to prevent `LimitExceededException` during ingestion. Please note that the necessary `IAM` permissions must be set for this to work.|no (default == false)|
@ -593,16 +589,27 @@ There is also ongoing work to support automatic segment compaction of sharded se
Hadoop (see [here](https://github.com/apache/druid/pull/5102)).
### Determining Fetch Settings
Internally, the Kinesis Indexing Service uses the Kinesis Record Supplier abstraction for fetching Kinesis data records and storing the records
locally. The way the Kinesis Record Supplier fetches records is to have a separate thread run the fetching operation per each Kinesis Shard, the
max number of threads is determined by `fetchThreads`. For example, a Kinesis stream with 3 shards will have 3 threads, each fetching from a shard separately.
There is a delay between each fetching operation, which is controlled by `fetchDelayMillis`. The maximum number of records to be fetched per thread per
operation is controlled by `recordsPerFetch`. Note that this is not the same as `maxRecordsPerPoll`.
Kinesis indexing tasks fetch records using `fetchThreads` threads.
If `fetchThreads` is higher than the number of Kinesis shards, the excess threads are unused.
Each fetch thread fetches up to `recordsPerFetch` records at once from a Kinesis shard, with a delay between fetches
of `fetchDelayMillis`.
The records fetched by each thread are pushed into a shared queue of size `recordBufferSize`.
The main runner thread for each task polls up to `maxRecordsPerPoll` records from the queue at once.
The records fetched by each thread will be pushed to a queue in the order that they are fetched. The records are stored in this queue until `poll()` is called
by either the supervisor or the indexing task. `poll()` will attempt to drain the internal buffer queue up to a limit of `max(maxRecordsPerPoll, q.size())`.
Here `maxRecordsPerPoll` controls the theoretical maximum records to drain out of the buffer queue, so setting this parameter to a reasonable value is essential
in preventing the queue from overflowing or memory exceeding heap size.
When using Kinesis Producer Library's aggregation feature (i.e. when [`deaggregate`](#deaggregation) is set),
each of these parameters refers to aggregated records rather than individual records.
The default values for these parameters are:
- `fetchThreads`: Twice the number of processors available to the task. The number of processors available to the task
is the total number of processors on the server, divided by `druid.worker.capacity` (the number of task slots on that
particular server).
- `fetchDelayMillis`: 0 (no delay between fetches).
- `recordsPerFetch`: 100 MB or an estimated 5% of available heap, whichever is smaller, divided by `fetchThreads`.
For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation).
- `recordBufferSize`: 100 MB or an estimated 10% of available heap, whichever is smaller.
For estimation purposes, Druid uses a figure of 10 KB for regular records and 1 MB for [aggregated records](#deaggregation).
- `maxRecordsPerPoll`: 100 for regular records, 1 for [aggregated records](#deaggregation).
Kinesis places the following restrictions on calls to fetch records:
@ -611,25 +618,19 @@ Kinesis places the following restrictions on calls to fetch records:
- Each shard can read up to 2 MB per second.
- The maximum size of data that GetRecords can return is 10 MB.
Values for `recordsPerFetch` and `fetchDelayMillis` should be chosen to maximize throughput under the above constraints.
The values that you choose will depend on the average size of a record and the number of consumers you have reading from
a given shard (which will be `replicas` unless you have other consumers also reading from this Kinesis stream).
If the above limits are exceeded, Kinesis throws ProvisionedThroughputExceededException errors. If this happens, Druid
Kinesis tasks pause by `fetchDelayMillis` or 3 seconds, whichever is larger, and then attempt the call again.
If the above limits are violated, AWS will throw ProvisionedThroughputExceededException errors on subsequent calls to
read data. When this happens, the Kinesis indexing service will pause by `fetchDelayMillis` and then attempt the call
again.
Internally, each indexing task maintains a buffer that stores the fetched but not yet processed record. `recordsPerFetch` and `fetchDelayMillis`
control this behavior. The number of records that the indexing task fetch from the buffer is controlled by `maxRecordsPerPoll`, which
determines the number of records to be processed per each ingestion loop in the task.
In most cases, the default settings for fetch parameters are sufficient to achieve good performance without excessive
memory usage. However, in some cases, you may need to adjust these parameters in order to more finely control fetch rate
and memory usage. Optimal values depend on the average size of a record and the number of consumers you have reading
from a given shard, which will be `replicas` unless you have other consumers also reading from this Kinesis stream.
## Deaggregation
See [issue](https://github.com/apache/druid/issues/6714)
The Kinesis indexing service supports de-aggregation of multiple rows packed into a single record by the Kinesis
Producer Library's aggregate method for more efficient data transfer.
To enable this feature, set `deaggregate` to true when submitting a supervisor-spec.
To enable this feature, set `deaggregate` to true in your `ioConfig` when submitting a supervisor spec.
## Resharding

View File

@ -27,19 +27,25 @@ import com.google.common.base.Preconditions;
import com.google.inject.name.Named;
import org.apache.druid.common.aws.AWSCredentialsConfig;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.TaskResource;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.utils.RuntimeInfo;
import java.util.Map;
public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, ByteEntity>
{
private static final String TYPE = "index_kinesis";
private static final Logger log = new Logger(KinesisIndexTask.class);
private final boolean useListShards;
private final AWSCredentialsConfig awsCredentialsConfig;
private RuntimeInfo runtimeInfo;
@JsonCreator
public KinesisIndexTask(
@ -66,6 +72,13 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, By
this.awsCredentialsConfig = awsCredentialsConfig;
}
@Override
public TaskStatus runTask(TaskToolbox toolbox)
{
this.runtimeInfo = toolbox.getAdjustedRuntimeInfo();
return super.runTask(toolbox);
}
@Override
protected SeekableStreamIndexTaskRunner<String, String, ByteEntity> createTaskRunner()
{
@ -84,14 +97,23 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, By
{
KinesisIndexTaskIOConfig ioConfig = ((KinesisIndexTaskIOConfig) super.ioConfig);
KinesisIndexTaskTuningConfig tuningConfig = ((KinesisIndexTaskTuningConfig) super.tuningConfig);
int fetchThreads = tuningConfig.getFetchThreads() != null
? tuningConfig.getFetchThreads()
: Runtime.getRuntime().availableProcessors() * 2;
final int fetchThreads = computeFetchThreads(runtimeInfo, tuningConfig.getFetchThreads());
final int recordsPerFetch = ioConfig.getRecordsPerFetchOrDefault(runtimeInfo.getMaxHeapSizeBytes(), fetchThreads);
final int recordBufferSize =
tuningConfig.getRecordBufferSizeOrDefault(runtimeInfo.getMaxHeapSizeBytes(), ioConfig.isDeaggregate());
final int maxRecordsPerPoll = tuningConfig.getMaxRecordsPerPollOrDefault(ioConfig.isDeaggregate());
Preconditions.checkArgument(
fetchThreads > 0,
"Must have at least one background fetch thread for the record supplier"
log.info(
"Starting record supplier with fetchThreads [%d], fetchDelayMillis [%d], recordsPerFetch [%d], "
+ "recordBufferSize [%d], maxRecordsPerPoll [%d], deaggregate [%s].",
fetchThreads,
ioConfig.getFetchDelayMillis(),
recordsPerFetch,
recordBufferSize,
maxRecordsPerPoll,
ioConfig.isDeaggregate()
);
return new KinesisRecordSupplier(
KinesisRecordSupplier.getAmazonKinesisClient(
ioConfig.getEndpoint(),
@ -99,14 +121,14 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, By
ioConfig.getAwsAssumedRoleArn(),
ioConfig.getAwsExternalId()
),
ioConfig.getRecordsPerFetch(),
recordsPerFetch,
ioConfig.getFetchDelayMillis(),
fetchThreads,
ioConfig.isDeaggregate(),
tuningConfig.getRecordBufferSize(),
recordBufferSize,
tuningConfig.getRecordBufferOfferTimeout(),
tuningConfig.getRecordBufferFullWait(),
tuningConfig.getMaxRecordsPerPoll(),
maxRecordsPerPoll,
false,
useListShards
);
@ -136,4 +158,22 @@ public class KinesisIndexTask extends SeekableStreamIndexTask<String, String, By
{
return awsCredentialsConfig;
}
@VisibleForTesting
static int computeFetchThreads(final RuntimeInfo runtimeInfo, final Integer configuredFetchThreads)
{
final int fetchThreads;
if (configuredFetchThreads != null) {
fetchThreads = configuredFetchThreads;
} else {
fetchThreads = runtimeInfo.getAvailableProcessors() * 2;
}
Preconditions.checkArgument(
fetchThreads > 0,
"Must have at least one background fetch thread for the record supplier"
);
return fetchThreads;
}
}

View File

@ -20,8 +20,10 @@
package org.apache.druid.indexing.kinesis;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
@ -33,12 +35,23 @@ import java.util.Set;
public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<String, String>
{
public static final int DEFAULT_RECORDS_PER_FETCH = 4000;
public static final int DEFAULT_FETCH_DELAY_MILLIS = 0;
/**
* Together with {@link KinesisIndexTaskTuningConfig#MAX_RECORD_BUFFER_MEMORY}, don't take up more than 200MB
* per task.
*/
private static final int MAX_RECORD_FETCH_MEMORY = 100_000_000;
/**
* Together with {@link KinesisIndexTaskTuningConfig#RECORD_BUFFER_MEMORY_MAX_HEAP_FRACTION}, don't take up more
* than 15% of the heap.
*/
private static final double RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION = 0.05;
private final String endpoint;
private final Integer recordsPerFetch;
private final Integer fetchDelayMillis;
private final int fetchDelayMillis;
private final String awsAssumedRoleArn;
private final String awsExternalId;
@ -92,7 +105,7 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St
);
this.endpoint = Preconditions.checkNotNull(endpoint, "endpoint");
this.recordsPerFetch = recordsPerFetch != null ? recordsPerFetch : DEFAULT_RECORDS_PER_FETCH;
this.recordsPerFetch = recordsPerFetch;
this.fetchDelayMillis = fetchDelayMillis != null ? fetchDelayMillis : DEFAULT_FETCH_DELAY_MILLIS;
this.awsAssumedRoleArn = awsAssumedRoleArn;
this.awsExternalId = awsExternalId;
@ -202,31 +215,55 @@ public class KinesisIndexTaskIOConfig extends SeekableStreamIndexTaskIOConfig<St
return endpoint;
}
@JsonProperty
public int getRecordsPerFetch()
@Nullable
@JsonProperty("recordsPerFetch")
@JsonInclude(JsonInclude.Include.NON_NULL)
public Integer getRecordsPerFetchConfigured()
{
return recordsPerFetch;
}
public int getRecordsPerFetchOrDefault(final long maxHeapSize, final int fetchThreads)
{
if (recordsPerFetch != null) {
return recordsPerFetch;
} else {
final long memoryToUse = Math.min(
MAX_RECORD_FETCH_MEMORY,
(long) (maxHeapSize * RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION)
);
final int assumedRecordSize = deaggregate
? KinesisIndexTaskTuningConfig.ASSUMED_RECORD_SIZE_AGGREGATE
: KinesisIndexTaskTuningConfig.ASSUMED_RECORD_SIZE;
return Ints.checkedCast(Math.max(1, memoryToUse / assumedRecordSize / fetchThreads));
}
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public int getFetchDelayMillis()
{
return fetchDelayMillis;
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public String getAwsAssumedRoleArn()
{
return awsAssumedRoleArn;
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public String getAwsExternalId()
{
return awsExternalId;
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public boolean isDeaggregate()
{
return deaggregate;

View File

@ -20,9 +20,11 @@
package org.apache.druid.indexing.kinesis;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
@ -36,16 +38,32 @@ import java.util.Objects;
@JsonTypeName("KinesisTuningConfig")
public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningConfig
{
private static final int DEFAULT_RECORD_BUFFER_SIZE = 10000;
// Assumed record buffer size is larger when dealing with aggregated messages, because aggregated messages tend to
// be larger, up to 1MB in size.
static final int ASSUMED_RECORD_SIZE = 10_000;
static final int ASSUMED_RECORD_SIZE_AGGREGATE = 1_000_000;
/**
* Together with {@link KinesisIndexTaskIOConfig#MAX_RECORD_FETCH_MEMORY}, don't take up more than 200MB per task.
*/
private static final int MAX_RECORD_BUFFER_MEMORY = 100_000_000;
/**
* Together with {@link KinesisIndexTaskIOConfig#RECORD_FETCH_MEMORY_MAX_HEAP_FRACTION}, don't take up more
* than 15% of the heap per task.
*/
private static final double RECORD_BUFFER_MEMORY_MAX_HEAP_FRACTION = 0.1;
private static final int DEFAULT_RECORD_BUFFER_OFFER_TIMEOUT = 5000;
private static final int DEFAULT_RECORD_BUFFER_FULL_WAIT = 5000;
private static final int DEFAULT_MAX_RECORDS_PER_POLL = 100;
private static final int DEFAULT_MAX_RECORDS_PER_POLL_AGGREGATE = 1;
private final int recordBufferSize;
private final Integer recordBufferSize;
private final int recordBufferOfferTimeout;
private final int recordBufferFullWait;
private final Integer fetchThreads;
private final int maxRecordsPerPoll;
private final Integer maxRecordsPerPoll;
public KinesisIndexTaskTuningConfig(
@Nullable AppendableIndexSpec appendableIndexSpec,
@ -97,13 +115,13 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
maxParseExceptions,
maxSavedParseExceptions
);
this.recordBufferSize = recordBufferSize == null ? DEFAULT_RECORD_BUFFER_SIZE : recordBufferSize;
this.recordBufferSize = recordBufferSize;
this.recordBufferOfferTimeout = recordBufferOfferTimeout == null
? DEFAULT_RECORD_BUFFER_OFFER_TIMEOUT
: recordBufferOfferTimeout;
this.recordBufferFullWait = recordBufferFullWait == null ? DEFAULT_RECORD_BUFFER_FULL_WAIT : recordBufferFullWait;
this.fetchThreads = fetchThreads; // we handle this being null later
this.maxRecordsPerPoll = maxRecordsPerPoll == null ? DEFAULT_MAX_RECORDS_PER_POLL : maxRecordsPerPoll;
this.maxRecordsPerPoll = maxRecordsPerPoll;
Preconditions.checkArgument(
!(super.isResetOffsetAutomatically() && super.isSkipSequenceNumberAvailabilityCheck()),
@ -168,12 +186,29 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
);
}
@JsonProperty
public int getRecordBufferSize()
@Nullable
@JsonProperty("recordBufferSize")
@JsonInclude(JsonInclude.Include.NON_NULL)
public Integer getRecordBufferSizeConfigured()
{
return recordBufferSize;
}
public int getRecordBufferSizeOrDefault(final long maxHeapSize, final boolean deaggregate)
{
if (recordBufferSize != null) {
return recordBufferSize;
} else {
final long memoryToUse = Math.min(
MAX_RECORD_BUFFER_MEMORY,
(long) (maxHeapSize * RECORD_BUFFER_MEMORY_MAX_HEAP_FRACTION)
);
final int assumedRecordSize = deaggregate ? ASSUMED_RECORD_SIZE_AGGREGATE : ASSUMED_RECORD_SIZE;
return Ints.checkedCast(Math.max(1, memoryToUse / assumedRecordSize));
}
}
@JsonProperty
public int getRecordBufferOfferTimeout()
{
@ -186,18 +221,27 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
return recordBufferFullWait;
}
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Integer getFetchThreads()
{
return fetchThreads;
}
@JsonProperty
public int getMaxRecordsPerPoll()
@Nullable
@JsonProperty("maxRecordsPerPoll")
@JsonInclude(JsonInclude.Include.NON_NULL)
public Integer getMaxRecordsPerPollConfigured()
{
return maxRecordsPerPoll;
}
public int getMaxRecordsPerPollOrDefault(final boolean deaggregate)
{
return deaggregate ? DEFAULT_MAX_RECORDS_PER_POLL_AGGREGATE : DEFAULT_MAX_RECORDS_PER_POLL;
}
@Override
public KinesisIndexTaskTuningConfig withBasePersistDirectory(File dir)
{
@ -217,7 +261,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
getHandoffConditionTimeout(),
isResetOffsetAutomatically(),
isSkipSequenceNumberAvailabilityCheck(),
getRecordBufferSize(),
getRecordBufferSizeConfigured(),
getRecordBufferOfferTimeout(),
getRecordBufferFullWait(),
getFetchThreads(),
@ -225,7 +269,7 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
isLogParseExceptions(),
getMaxParseExceptions(),
getMaxSavedParseExceptions(),
getMaxRecordsPerPoll(),
getMaxRecordsPerPollConfigured(),
getIntermediateHandoffPeriod()
);
}
@ -243,10 +287,10 @@ public class KinesisIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningC
return false;
}
KinesisIndexTaskTuningConfig that = (KinesisIndexTaskTuningConfig) o;
return recordBufferSize == that.recordBufferSize &&
return Objects.equals(recordBufferSize, that.recordBufferSize) &&
recordBufferOfferTimeout == that.recordBufferOfferTimeout &&
recordBufferFullWait == that.recordBufferFullWait &&
maxRecordsPerPoll == that.maxRecordsPerPoll &&
Objects.equals(maxRecordsPerPoll, that.maxRecordsPerPoll) &&
Objects.equals(fetchThreads, that.fetchThreads);
}

View File

@ -35,6 +35,8 @@ import javax.annotation.Nullable;
public class KinesisSamplerSpec extends SeekableStreamSamplerSpec
{
private static final int DEFAULT_RECORDS_PER_FETCH = 100;
private final AWSCredentialsConfig awsCredentialsConfig;
@JsonCreator
@ -63,14 +65,14 @@ public class KinesisSamplerSpec extends SeekableStreamSamplerSpec
ioConfig.getAwsAssumedRoleArn(),
ioConfig.getAwsExternalId()
),
ioConfig.getRecordsPerFetch(),
ioConfig.getRecordsPerFetch() != null ? ioConfig.getRecordsPerFetch() : DEFAULT_RECORDS_PER_FETCH,
ioConfig.getFetchDelayMillis(),
1,
ioConfig.isDeaggregate(),
tuningConfig.getRecordBufferSize(),
tuningConfig.getRecordBufferSizeOrDefault(Runtime.getRuntime().maxMemory(), ioConfig.isDeaggregate()),
tuningConfig.getRecordBufferOfferTimeout(),
tuningConfig.getRecordBufferFullWait(),
tuningConfig.getMaxRecordsPerPoll(),
tuningConfig.getMaxRecordsPerPollOrDefault(ioConfig.isDeaggregate()),
ioConfig.isUseEarliestSequenceNumber(),
tuningConfig.isUseListShards()
);

View File

@ -201,10 +201,10 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
ioConfig.getFetchDelayMillis(),
0, // skip starting background fetch, it is not used
ioConfig.isDeaggregate(),
taskTuningConfig.getRecordBufferSize(),
taskTuningConfig.getRecordBufferSizeOrDefault(Runtime.getRuntime().maxMemory(), ioConfig.isDeaggregate()),
taskTuningConfig.getRecordBufferOfferTimeout(),
taskTuningConfig.getRecordBufferFullWait(),
taskTuningConfig.getMaxRecordsPerPoll(),
taskTuningConfig.getMaxRecordsPerPollOrDefault(ioConfig.isDeaggregate()),
ioConfig.isUseEarliestSequenceNumber(),
spec.getSpec().getTuningConfig().isUseListShards()
);

View File

@ -20,6 +20,7 @@
package org.apache.druid.indexing.kinesis.supervisor;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
@ -48,7 +49,7 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
// exception. For this reason, we recommend that you wait one second between calls to GetRecords; however, it's
// possible that the application will get exceptions for longer than 1 second.
private final Integer recordsPerFetch;
private final Integer fetchDelayMillis;
private final int fetchDelayMillis;
private final String awsAssumedRoleArn;
private final String awsExternalId;
@ -98,9 +99,7 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
this.endpoint = endpoint != null
? endpoint
: (region != null ? region.getEndpoint() : KinesisRegion.US_EAST_1.getEndpoint());
this.recordsPerFetch = recordsPerFetch != null
? recordsPerFetch
: KinesisIndexTaskIOConfig.DEFAULT_RECORDS_PER_FETCH;
this.recordsPerFetch = recordsPerFetch;
this.fetchDelayMillis = fetchDelayMillis != null
? fetchDelayMillis
: KinesisIndexTaskIOConfig.DEFAULT_FETCH_DELAY_MILLIS;
@ -115,31 +114,37 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
return endpoint;
}
@Nullable
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public Integer getRecordsPerFetch()
{
return recordsPerFetch;
}
@JsonProperty
public Integer getFetchDelayMillis()
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public int getFetchDelayMillis()
{
return fetchDelayMillis;
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public String getAwsAssumedRoleArn()
{
return awsAssumedRoleArn;
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_NULL)
public String getAwsExternalId()
{
return awsExternalId;
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public boolean isDeaggregate()
{
return deaggregate;

View File

@ -259,7 +259,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
", chatRetries=" + chatRetries +
", httpTimeout=" + httpTimeout +
", shutdownTimeout=" + shutdownTimeout +
", recordBufferSize=" + getRecordBufferSize() +
", recordBufferSize=" + getRecordBufferSizeConfigured() +
", recordBufferOfferTimeout=" + getRecordBufferOfferTimeout() +
", recordBufferFullWait=" + getRecordBufferFullWait() +
", fetchThreads=" + getFetchThreads() +
@ -267,7 +267,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
", logParseExceptions=" + isLogParseExceptions() +
", maxParseExceptions=" + getMaxParseExceptions() +
", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
", maxRecordsPerPoll=" + getMaxRecordsPerPoll() +
", maxRecordsPerPoll=" + getMaxRecordsPerPollConfigured() +
", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
", repartitionTransitionDuration=" + getRepartitionTransitionDuration() +
", useListShards=" + isUseListShards() +
@ -293,7 +293,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
getHandoffConditionTimeout(),
isResetOffsetAutomatically(),
isSkipSequenceNumberAvailabilityCheck(),
getRecordBufferSize(),
getRecordBufferSizeConfigured(),
getRecordBufferOfferTimeout(),
getRecordBufferFullWait(),
getFetchThreads(),
@ -301,7 +301,7 @@ public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
isLogParseExceptions(),
getMaxParseExceptions(),
getMaxSavedParseExceptions(),
getMaxRecordsPerPoll(),
getMaxRecordsPerPollConfigured(),
getIntermediateHandoffPeriod()
);
}

View File

@ -93,7 +93,8 @@ public class KinesisIOConfigTest
Assert.assertTrue(config.isUseTransaction());
Assert.assertFalse("minimumMessageTime", config.getMinimumMessageTime().isPresent());
Assert.assertEquals(config.getEndpoint(), "kinesis.us-east-1.amazonaws.com");
Assert.assertEquals(config.getRecordsPerFetch(), 4000);
Assert.assertNull(config.getRecordsPerFetchConfigured());
Assert.assertEquals(config.getRecordsPerFetchOrDefault(1_000_000_000, 4), 1250);
Assert.assertEquals(config.getFetchDelayMillis(), 0);
Assert.assertEquals(Collections.emptySet(), config.getStartSequenceNumbers().getExclusivePartitions());
Assert.assertNull(config.getAwsAssumedRoleArn());
@ -149,7 +150,7 @@ public class KinesisIOConfigTest
Assert.assertEquals(DateTimes.of("2016-05-31T14:00Z"), config.getMaximumMessageTime().get());
Assert.assertEquals(config.getEndpoint(), "kinesis.us-east-2.amazonaws.com");
Assert.assertEquals(config.getStartSequenceNumbers().getExclusivePartitions(), ImmutableSet.of("0"));
Assert.assertEquals(1000, config.getRecordsPerFetch());
Assert.assertEquals(1000, (int) config.getRecordsPerFetchConfigured());
Assert.assertEquals(1000, config.getFetchDelayMillis());
Assert.assertEquals("role", config.getAwsAssumedRoleArn());
Assert.assertEquals("awsexternalid", config.getAwsExternalId());
@ -301,7 +302,7 @@ public class KinesisIOConfigTest
Assert.assertEquals(currentConfig.getMinimumMessageTime(), oldConfig.getMinimumMessageTime());
Assert.assertEquals(currentConfig.getMaximumMessageTime(), oldConfig.getMaximumMessageTime());
Assert.assertEquals(currentConfig.getEndpoint(), oldConfig.getEndpoint());
Assert.assertEquals(currentConfig.getRecordsPerFetch(), oldConfig.getRecordsPerFetch());
Assert.assertEquals((int) currentConfig.getRecordsPerFetchConfigured(), oldConfig.getRecordsPerFetch());
Assert.assertEquals(currentConfig.getFetchDelayMillis(), oldConfig.getFetchDelayMillis());
Assert.assertEquals(currentConfig.getAwsAssumedRoleArn(), oldConfig.getAwsAssumedRoleArn());
Assert.assertEquals(currentConfig.getAwsExternalId(), oldConfig.getAwsExternalId());
@ -348,7 +349,7 @@ public class KinesisIOConfigTest
Assert.assertEquals(oldConfig.getMinimumMessageTime(), currentConfig.getMinimumMessageTime());
Assert.assertEquals(oldConfig.getMaximumMessageTime(), currentConfig.getMaximumMessageTime());
Assert.assertEquals(oldConfig.getEndpoint(), currentConfig.getEndpoint());
Assert.assertEquals(oldConfig.getRecordsPerFetch(), currentConfig.getRecordsPerFetch());
Assert.assertEquals(oldConfig.getRecordsPerFetch(), (int) currentConfig.getRecordsPerFetchConfigured());
Assert.assertEquals(oldConfig.getFetchDelayMillis(), currentConfig.getFetchDelayMillis());
Assert.assertEquals(oldConfig.getAwsAssumedRoleArn(), currentConfig.getAwsAssumedRoleArn());
Assert.assertEquals(oldConfig.getAwsExternalId(), currentConfig.getAwsExternalId());

View File

@ -69,6 +69,7 @@ import org.apache.druid.java.util.emitter.core.NoopEmitter;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.SegmentDescriptor;
import org.apache.druid.query.filter.SelectorDimFilter;
@ -2243,6 +2244,24 @@ public class KinesisIndexTaskTest extends SeekableStreamIndexTaskTestBase
Assert.assertNull(newDataSchemaMetadata());
}
@Test
public void testComputeFetchThreads()
{
final DruidProcessingConfigTest.MockRuntimeInfo runtimeInfo =
new DruidProcessingConfigTest.MockRuntimeInfo(3, 1000, 2000);
Assert.assertEquals(6, KinesisIndexTask.computeFetchThreads(runtimeInfo, null));
Assert.assertEquals(2, KinesisIndexTask.computeFetchThreads(runtimeInfo, 2));
Assert.assertThrows(
IllegalArgumentException.class,
() -> KinesisIndexTask.computeFetchThreads(runtimeInfo, 0)
);
Assert.assertThrows(
IllegalArgumentException.class,
() -> KinesisIndexTask.computeFetchThreads(runtimeInfo, -1)
);
}
private KinesisIndexTask createTask(
int groupId,
Map<String, String> startSequenceNumbers,

View File

@ -76,7 +76,8 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(new IndexSpec(), config.getIndexSpec());
Assert.assertFalse(config.isReportParseExceptions());
Assert.assertEquals(0, config.getHandoffConditionTimeout());
Assert.assertEquals(10000, config.getRecordBufferSize());
Assert.assertNull(config.getRecordBufferSizeConfigured());
Assert.assertEquals(10000, config.getRecordBufferSizeOrDefault(1_000_000_000, false));
Assert.assertEquals(5000, config.getRecordBufferOfferTimeout());
Assert.assertEquals(5000, config.getRecordBufferFullWait());
Assert.assertNull(config.getFetchThreads());
@ -123,7 +124,8 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(100, config.getMaxPendingPersists());
Assert.assertTrue(config.isReportParseExceptions());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
Assert.assertEquals(1000, config.getRecordBufferSize());
Assert.assertEquals(1000, (int) config.getRecordBufferSizeConfigured());
Assert.assertEquals(1000, config.getRecordBufferSizeOrDefault(1_000_000_000, false));
Assert.assertEquals(500, config.getRecordBufferOfferTimeout());
Assert.assertEquals(500, config.getRecordBufferFullWait());
Assert.assertEquals(2, (int) config.getFetchThreads());
@ -186,8 +188,8 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
Assert.assertEquals(base.getRecordBufferFullWait(), deserialized.getRecordBufferFullWait());
Assert.assertEquals(base.getRecordBufferOfferTimeout(), deserialized.getRecordBufferOfferTimeout());
Assert.assertEquals(base.getRecordBufferSize(), deserialized.getRecordBufferSize());
Assert.assertEquals(base.getMaxRecordsPerPoll(), deserialized.getMaxRecordsPerPoll());
Assert.assertEquals(base.getRecordBufferSizeConfigured(), deserialized.getRecordBufferSizeConfigured());
Assert.assertEquals(base.getMaxRecordsPerPollConfigured(), deserialized.getMaxRecordsPerPollConfigured());
}
@Test
@ -244,8 +246,8 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
Assert.assertEquals(base.getRecordBufferFullWait(), deserialized.getRecordBufferFullWait());
Assert.assertEquals(base.getRecordBufferOfferTimeout(), deserialized.getRecordBufferOfferTimeout());
Assert.assertEquals(base.getRecordBufferSize(), deserialized.getRecordBufferSize());
Assert.assertEquals(base.getMaxRecordsPerPoll(), deserialized.getMaxRecordsPerPoll());
Assert.assertEquals(base.getRecordBufferSizeConfigured(), deserialized.getRecordBufferSizeConfigured());
Assert.assertEquals(base.getMaxRecordsPerPollConfigured(), deserialized.getMaxRecordsPerPollConfigured());
}
@Test
@ -307,13 +309,13 @@ public class KinesisIndexTaskTuningConfigTest
null,
null,
null,
null,
10,
null,
null,
null,
null
);
KinesisIndexTaskTuningConfig copy = (KinesisIndexTaskTuningConfig) original.convertToTaskTuningConfig();
KinesisIndexTaskTuningConfig copy = original.convertToTaskTuningConfig();
Assert.assertEquals(original.getAppendableIndexSpec(), copy.getAppendableIndexSpec());
Assert.assertEquals(1, copy.getMaxRowsInMemory());
@ -326,13 +328,13 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(new IndexSpec(), copy.getIndexSpec());
Assert.assertTrue(copy.isReportParseExceptions());
Assert.assertEquals(5L, copy.getHandoffConditionTimeout());
Assert.assertEquals(1000, copy.getRecordBufferSize());
Assert.assertEquals(1000, (int) copy.getRecordBufferSizeConfigured());
Assert.assertEquals(500, copy.getRecordBufferOfferTimeout());
Assert.assertEquals(500, copy.getRecordBufferFullWait());
Assert.assertEquals(2, (int) copy.getFetchThreads());
Assert.assertFalse(copy.isSkipSequenceNumberAvailabilityCheck());
Assert.assertTrue(copy.isResetOffsetAutomatically());
Assert.assertEquals(100, copy.getMaxRecordsPerPoll());
Assert.assertEquals(10, (int) copy.getMaxRecordsPerPollConfigured());
Assert.assertEquals(new Period().withDays(Integer.MAX_VALUE), copy.getIntermediateHandoffPeriod());
}

View File

@ -70,8 +70,8 @@ public class KinesisSupervisorIOConfigTest
Assert.assertFalse("lateMessageRejectionPeriod", config.getLateMessageRejectionPeriod().isPresent());
Assert.assertFalse("earlyMessageRejectionPeriod", config.getEarlyMessageRejectionPeriod().isPresent());
Assert.assertFalse("lateMessageRejectionStartDateTime", config.getLateMessageRejectionStartDateTime().isPresent());
Assert.assertEquals((Integer) 4000, config.getRecordsPerFetch());
Assert.assertEquals((Integer) 0, config.getFetchDelayMillis());
Assert.assertNull(config.getRecordsPerFetch());
Assert.assertEquals(0, config.getFetchDelayMillis());
Assert.assertNull(config.getAwsAssumedRoleArn());
Assert.assertNull(config.getAwsExternalId());
Assert.assertFalse(config.isDeaggregate());
@ -118,7 +118,7 @@ public class KinesisSupervisorIOConfigTest
Assert.assertEquals(Duration.standardHours(1), config.getLateMessageRejectionPeriod().get());
Assert.assertEquals(Duration.standardHours(1), config.getEarlyMessageRejectionPeriod().get());
Assert.assertEquals((Integer) 4000, config.getRecordsPerFetch());
Assert.assertEquals((Integer) 1000, config.getFetchDelayMillis());
Assert.assertEquals(1000, config.getFetchDelayMillis());
Assert.assertEquals("role", config.getAwsAssumedRoleArn());
Assert.assertEquals("awsexternalid", config.getAwsExternalId());
Assert.assertTrue(config.isDeaggregate());

View File

@ -112,7 +112,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
base.getHandoffConditionTimeout(),
base.isResetOffsetAutomatically(),
base.isSkipSequenceNumberAvailabilityCheck(),
base.getRecordBufferSize(),
base.getRecordBufferSizeConfigured(),
base.getRecordBufferOfferTimeout(),
base.getRecordBufferFullWait(),
base.getFetchThreads(),
@ -120,7 +120,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends KinesisIndexTaskTu
base.isLogParseExceptions(),
base.getMaxParseExceptions(),
base.getMaxSavedParseExceptions(),
base.getMaxRecordsPerPoll(),
base.getMaxRecordsPerPollConfigured(),
base.getIntermediateHandoffPeriod()
);
this.extra = extra;

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common;
import org.apache.druid.utils.RuntimeInfo;
/**
* Like {@link RuntimeInfo}, but adjusted based on the number of tasks running in a JVM, so each task gets its
* own processors and memory. Returned by {@link TaskToolbox#getAdjustedRuntimeInfo()}.
*/
public class AdjustedRuntimeInfo extends RuntimeInfo
{
private final RuntimeInfo base;
private final int numTasksInJvm;
public AdjustedRuntimeInfo(final RuntimeInfo base, final int numTasksInJvm)
{
this.base = base;
this.numTasksInJvm = numTasksInJvm;
}
@Override
public int getAvailableProcessors()
{
return Math.max(1, base.getAvailableProcessors() / numTasksInJvm);
}
@Override
public long getMaxHeapSizeBytes()
{
return base.getMaxHeapSizeBytes() / numTasksInJvm;
}
@Override
public long getDirectMemorySizeBytes()
{
return base.getDirectMemorySizeBytes() / numTasksInJvm;
}
}

View File

@ -56,6 +56,7 @@ import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentCacheManager;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
@ -63,6 +64,8 @@ import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.tasklogs.TaskLogPusher;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.utils.JvmUtils;
import org.apache.druid.utils.RuntimeInfo;
import org.joda.time.Interval;
import javax.annotation.Nullable;
@ -466,6 +469,40 @@ public class TaskToolbox
return attemptId;
}
/**
* Get {@link RuntimeInfo} adjusted for this particular task. When running in a task JVM launched by a MiddleManager,
* this is the same as the baseline {@link RuntimeInfo}. When running in an Indexer, it is adjusted based on
* {@code druid.worker.capacity}.
*/
public RuntimeInfo getAdjustedRuntimeInfo()
{
return createAdjustedRuntimeInfo(JvmUtils.getRuntimeInfo(), appenderatorsManager);
}
/**
* Create {@link AdjustedRuntimeInfo} based on the given {@link RuntimeInfo} and {@link AppenderatorsManager}. This
* is a way to allow code to properly apportion the amount of processors and heap available to the entire JVM.
* When running in an Indexer, other tasks share the same JVM, so this must be accounted for.
*/
public static RuntimeInfo createAdjustedRuntimeInfo(
final RuntimeInfo runtimeInfo,
final AppenderatorsManager appenderatorsManager
)
{
if (appenderatorsManager instanceof UnifiedIndexerAppenderatorsManager) {
// CliIndexer. Each JVM runs multiple tasks; adjust.
return new AdjustedRuntimeInfo(
runtimeInfo,
((UnifiedIndexerAppenderatorsManager) appenderatorsManager).getWorkerConfig().getCapacity()
);
} else {
// CliPeon (assumed to be launched by CliMiddleManager).
// Each JVM runs a single task. ForkingTaskRunner sets XX:ActiveProcessorCount so each task already sees
// an adjusted number of processors from the baseline RuntimeInfo. So, we return it directly.
return runtimeInfo;
}
}
public static class Builder
{
private TaskConfig config;

View File

@ -19,6 +19,7 @@
package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
@ -107,12 +108,14 @@ public abstract class SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_ABSENT)
public Optional<DateTime> getMaximumMessageTime()
{
return maximumMessageTime;
}
@JsonProperty
@JsonInclude(JsonInclude.Include.NON_ABSENT)
public Optional<DateTime> getMinimumMessageTime()
{
return minimumMessageTime;
@ -120,6 +123,7 @@ public abstract class SeekableStreamIndexTaskIOConfig<PartitionIdType, SequenceO
@Nullable
@JsonProperty("inputFormat")
@JsonInclude(JsonInclude.Include.NON_NULL)
private InputFormat getGivenInputFormat()
{
return inputFormat;

View File

@ -31,8 +31,10 @@ import org.apache.druid.indexing.common.task.NoopTestTaskReportFileWriter;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.common.task.Tasks;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.indexing.worker.config.WorkerConfig;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.MonitorScheduler;
import org.apache.druid.query.DruidProcessingConfigTest;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.segment.IndexIO;
@ -45,17 +47,21 @@ import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.segment.loading.SegmentLocalCacheManager;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
import org.apache.druid.segment.realtime.firehose.NoopChatHandlerProvider;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.apache.druid.server.coordination.DataSegmentServerAnnouncer;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.utils.RuntimeInfo;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import java.io.IOException;
@ -171,7 +177,10 @@ public class TaskToolboxTest
@Test
public void testGetQueryRunnerFactoryConglomerate()
{
Assert.assertEquals(mockQueryRunnerFactoryConglomerate, taskToolbox.build(task).getQueryRunnerFactoryConglomerate());
Assert.assertEquals(
mockQueryRunnerFactoryConglomerate,
taskToolbox.build(task).getQueryRunnerFactoryConglomerate()
);
}
@Test
@ -221,4 +230,72 @@ public class TaskToolboxTest
{
Assert.assertEquals(mockCacheConfig, taskToolbox.build(task).getCacheConfig());
}
@Test
public void testCreateAdjustedRuntimeInfoForMiddleManager()
{
final AppenderatorsManager appenderatorsManager = Mockito.mock(AppenderatorsManager.class);
final DruidProcessingConfigTest.MockRuntimeInfo runtimeInfo =
new DruidProcessingConfigTest.MockRuntimeInfo(12, 1_000_000, 2_000_000);
final RuntimeInfo adjustedRuntimeInfo = TaskToolbox.createAdjustedRuntimeInfo(runtimeInfo, appenderatorsManager);
Assert.assertEquals(
runtimeInfo.getAvailableProcessors(),
adjustedRuntimeInfo.getAvailableProcessors()
);
Assert.assertEquals(
runtimeInfo.getMaxHeapSizeBytes(),
adjustedRuntimeInfo.getMaxHeapSizeBytes()
);
Assert.assertEquals(
runtimeInfo.getDirectMemorySizeBytes(),
adjustedRuntimeInfo.getDirectMemorySizeBytes()
);
Mockito.verifyNoMoreInteractions(appenderatorsManager);
}
@Test
public void testCreateAdjustedRuntimeInfoForIndexer()
{
// UnifiedIndexerAppenderatorsManager class is used on Indexers.
final UnifiedIndexerAppenderatorsManager appenderatorsManager =
Mockito.mock(UnifiedIndexerAppenderatorsManager.class);
final int numWorkers = 3;
final DruidProcessingConfigTest.MockRuntimeInfo runtimeInfo =
new DruidProcessingConfigTest.MockRuntimeInfo(12, 1_000_000, 2_000_000);
Mockito.when(appenderatorsManager.getWorkerConfig()).thenReturn(new WorkerConfig()
{
@Override
public int getCapacity()
{
return 3;
}
});
final RuntimeInfo adjustedRuntimeInfo = TaskToolbox.createAdjustedRuntimeInfo(runtimeInfo, appenderatorsManager);
Assert.assertEquals(
runtimeInfo.getAvailableProcessors() / numWorkers,
adjustedRuntimeInfo.getAvailableProcessors()
);
Assert.assertEquals(
runtimeInfo.getMaxHeapSizeBytes() / numWorkers,
adjustedRuntimeInfo.getMaxHeapSizeBytes()
);
Assert.assertEquals(
runtimeInfo.getDirectMemorySizeBytes() / numWorkers,
adjustedRuntimeInfo.getDirectMemorySizeBytes()
);
Mockito.verify(appenderatorsManager).getWorkerConfig();
Mockito.verifyNoMoreInteractions(appenderatorsManager);
}
}

View File

@ -190,13 +190,13 @@ public class DruidProcessingConfigTest
config.intermediateComputeSizeBytes();
}
static class MockRuntimeInfo extends RuntimeInfo
public static class MockRuntimeInfo extends RuntimeInfo
{
private final int availableProcessors;
private final long maxHeapSize;
private final long directSize;
MockRuntimeInfo(int availableProcessors, long directSize, long maxHeapSize)
public MockRuntimeInfo(int availableProcessors, long directSize, long maxHeapSize)
{
this.availableProcessors = availableProcessors;
this.directSize = directSize;

View File

@ -400,6 +400,11 @@ public class UnifiedIndexerAppenderatorsManager implements AppenderatorsManager
return datasourceBundles;
}
public WorkerConfig getWorkerConfig()
{
return workerConfig;
}
private AppenderatorConfig rewriteAppenderatorConfigMemoryLimits(AppenderatorConfig baseConfig)
{
long perWorkerLimit = workerConfig.getGlobalIngestionHeapLimitBytes() / workerConfig.getCapacity();

View File

@ -72,10 +72,11 @@ public class UnifiedIndexerAppenderatorsManagerTest extends InitializedNullHandl
@Rule
public final ExpectedException expectedException = ExpectedException.none();
private final WorkerConfig workerConfig = new WorkerConfig();
private final UnifiedIndexerAppenderatorsManager manager = new UnifiedIndexerAppenderatorsManager(
DirectQueryProcessingPool.INSTANCE,
JoinableFactoryWrapperTest.NOOP_JOINABLE_FACTORY_WRAPPER,
new WorkerConfig(),
workerConfig,
MapCache.create(10),
new CacheConfig(),
new CachePopulatorStats(),
@ -279,6 +280,12 @@ public class UnifiedIndexerAppenderatorsManagerTest extends InitializedNullHandl
Assert.assertEquals(file, limitedPoolIndexMerger.merge(null, false, null, file, null, null, -1));
}
@Test
public void test_getWorkerConfig()
{
Assert.assertSame(workerConfig, manager.getWorkerConfig());
}
/**
* An {@link IndexMerger} that does nothing, but is useful for LimitedPoolIndexMerger tests.
*/