Dynamic auto scale Kinesis-Stream ingest tasks (#10985)

* ready to test

* revert misc.xml

* document kinesis md

* Update docs/development/extensions-core/kafka-ingestion.md

* Update docs/development/extensions-core/kinesis-ingestion.md

* Update docs/development/extensions-core/kinesis-ingestion.md

* Update docs/development/extensions-core/kinesis-ingestion.md

* Update docs/development/extensions-core/kinesis-ingestion.md

* Update docs/development/extensions-core/kinesis-ingestion.md

* Update docs/development/extensions-core/kinesis-ingestion.md

* Update docs/development/extensions-core/kinesis-ingestion.md

* Update docs/development/extensions-core/kinesis-ingestion.md

* Update docs/development/extensions-core/kinesis-ingestion.md

* Update docs/development/extensions-core/kinesis-ingestion.md

* Update kafka-ingestion.md

remove leading `

* Update kinesis-ingestion.md

add missing `

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
Co-authored-by: Charles Smith <techdocsmith@gmail.com>
This commit is contained in:
zhangyue19921010 2021-08-31 06:44:29 +08:00 committed by GitHub
parent e4ec3527a4
commit 6d14ea2d14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 344 additions and 73 deletions

View File

@ -147,7 +147,7 @@ Where the file `supervisor-spec.json` contains a Kafka supervisor spec:
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than *2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the number of Kafka ingest tasks. ONLY supported for Kafka indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler Properties) for details.|no (default == null)|
|`autoScalerConfig`|Object|Defines auto scaling behavior for Kafka ingest tasks. See [Tasks Autoscaler Properties](#Task Autoscaler Properties).|no (default == null)|
### Task Autoscaler Properties

View File

@ -146,6 +146,116 @@ Where the file `supervisor-spec.json` contains a Kinesis supervisor spec:
|`awsAssumedRoleArn`|String|The AWS assumed role to use for additional permissions.|no|
|`awsExternalId`|String|The AWS external id to use for additional permissions.|no|
|`deaggregate`|Boolean|Whether to use the de-aggregate function of the KCL. See below for details.|no|
|`autoScalerConfig`|Object|Defines auto scaling behavior for Kinesis ingest tasks. See [Tasks Autoscaler Properties](#Task Autoscaler Properties).|no (default == null)|
### Task Autoscaler Properties
> Note that Task AutoScaler is currently designated as experimental.
| Property | Description | Required |
| ------------- | ------------- | ------------- |
| `enableTaskAutoScaler` | Enable or disable the auto scaler. When false or or absent Druid disables the `autoScaler` even when `autoScalerConfig` is not null| no (default == false) |
| `taskCountMax` | Maximum number of Kinesis ingestion tasks. Must be greater than or equal to `taskCountMin`. If greater than `{numKinesisShards}`, the maximum number of reading tasks is `{numKinesisShards}` and `taskCountMax` is ignored. | yes |
| `taskCountMin` | Minimum number of Kinesis ingestion tasks. When you enable the auto scaler, Druid ignores the value of taskCount in `IOConfig` and uses`taskCountMin` for the initial number of tasks to launch.| yes |
| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two scale actions | no (default == 600000) |
| `autoScalerStrategy` | The algorithm of `autoScaler`. ONLY `lagBased` is supported for now. See [Lag Based AutoScaler Strategy Related Properties](#Lag Based AutoScaler Strategy Related Properties) for details.| no (default == `lagBased`) |
### Lag Based AutoScaler Strategy Related Properties
The Kinesis indexing service reports lag metrics measured in time milliseconds rather than message count which is used by Kafka.
| Property | Description | Required |
| ------------- | ------------- | ------------- |
| `lagCollectionIntervalMillis` | Period of lag points collection. | no (default == 30000) |
| `lagCollectionRangeMillis` | The total time window of lag collection, Use with `lagCollectionIntervalMillis`it means that in the recent `lagCollectionRangeMillis`, collect lag metric points every `lagCollectionIntervalMillis`. | no (default == 600000) |
| `scaleOutThreshold` | The Threshold of scale out action | no (default == 6000000) |
| `triggerScaleOutFractionThreshold` | If `triggerScaleOutFractionThreshold` percent of lag points are higher than `scaleOutThreshold`, then do scale out action. | no (default == 0.3) |
| `scaleInThreshold` | The Threshold of scale in action | no (default == 1000000) |
| `triggerScaleInFractionThreshold` | If `triggerScaleInFractionThreshold` percent of lag points are lower than `scaleOutThreshold`, then do scale in action. | no (default == 0.9) |
| `scaleActionStartDelayMillis` | Number of milliseconds to delay after the supervisor starts before the first scale logic check. | no (default == 300000) |
| `scaleActionPeriodMillis` | Frequency in milliseconds to check if a scale action is triggered | no (default == 60000) |
| `scaleInStep` | Number of tasks to reduce at a time when scaling down | no (default == 1) |
| `scaleOutStep` | Number of tasks to add at a time when scaling out | no (default == 2) |
The following example demonstrates a supervisor spec with `lagBased` autoScaler enabled:
```json
{
"type": "kinesis",
"dataSchema": {
"dataSource": "metrics-kinesis",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [],
"dimensionExclusions": [
"timestamp",
"value"
]
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"ioConfig": {
"stream": "metrics",
"autoScalerConfig": {
"enableTaskAutoScaler": true,
"taskCountMax": 6,
"taskCountMin": 2,
"minTriggerScaleActionFrequencyMillis": 600000,
"autoScalerStrategy": "lagBased",
"lagCollectionIntervalMillis": 30000,
"lagCollectionRangeMillis": 600000,
"scaleOutThreshold": 600000,
"triggerScaleOutFractionThreshold": 0.3,
"scaleInThreshold": 100000,
"triggerScaleInFractionThreshold": 0.9,
"scaleActionStartDelayMillis": 300000,
"scaleActionPeriodMillis": 60000,
"scaleInStep": 1,
"scaleOutStep": 2
},
"inputFormat": {
"type": "json"
},
"endpoint": "kinesis.us-east-1.amazonaws.com",
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT1H",
"recordsPerFetch": 2000,
"fetchDelayMillis": 1000
},
"tuningConfig": {
"type": "kinesis",
"maxRowsPerSegment": 5000000
}
}
```
#### Specifying data format

View File

@ -379,11 +379,19 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
return true;
}
// not yet supported, will be implemented in the future maybe? need to find a proper way to measure kinesis lag.
// Unlike the Kafka Indexing Service,
// Kinesis reports lag metrics measured in time difference in milliseconds between the current sequence number and latest sequence number,
// rather than message count.
@Override
public LagStats computeLagStats()
{
throw new UnsupportedOperationException("Compute Lag Stats is not supported in KinesisSupervisor yet.");
Map<String, Long> partitionTimeLags = getPartitionTimeLag();
if (partitionTimeLags == null) {
return new LagStats(0, 0, 0);
}
return computeLags(partitionTimeLags);
}
@Override

View File

@ -93,12 +93,6 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
lateMessageRejectionStartDateTime
);
// for now dynamic Allocation Tasks is not supported here
// throw UnsupportedOperationException in case someone sets this on a kinesis supervisor spec.
if (autoScalerConfig != null) {
throw new UnsupportedOperationException("Tasks auto scaler for kinesis is not supported yet. Please remove autoScalerConfig or set it to null!");
}
this.endpoint = endpoint != null
? endpoint
: (region != null ? region.getEndpoint() : KinesisRegion.US_EAST_1.getEndpoint());
@ -157,6 +151,7 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
", endpoint='" + endpoint + '\'' +
", replicas=" + getReplicas() +
", taskCount=" + getTaskCount() +
", autoScalerConfig=" + getAutoscalerConfig() +
", taskDuration=" + getTaskDuration() +
", startDelay=" + getStartDelay() +
", period=" + getPeriod() +

View File

@ -58,6 +58,7 @@ import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
@ -67,6 +68,7 @@ import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@ -283,6 +285,158 @@ public class KinesisSupervisorTest extends EasyMockSupport
);
}
@Test
public void testNoInitialStateWithAutoScaleOut() throws Exception
{
HashMap<String, Object> autoScalerConfigMap = new HashMap<>();
autoScalerConfigMap.put("enableTaskAutoScaler", true);
autoScalerConfigMap.put("lagCollectionIntervalMillis", 500);
autoScalerConfigMap.put("lagCollectionRangeMillis", 500);
autoScalerConfigMap.put("scaleOutThreshold", 0);
autoScalerConfigMap.put("triggerScaleOutFractionThreshold", 0.0);
autoScalerConfigMap.put("scaleInThreshold", 1000000);
autoScalerConfigMap.put("triggerScaleInFractionThreshold", 0.8);
autoScalerConfigMap.put("scaleActionStartDelayMillis", 0);
autoScalerConfigMap.put("scaleActionPeriodMillis", 100);
autoScalerConfigMap.put("taskCountMax", 2);
autoScalerConfigMap.put("taskCountMin", 1);
autoScalerConfigMap.put("scaleInStep", 1);
autoScalerConfigMap.put("scaleOutStep", 2);
autoScalerConfigMap.put("minTriggerScaleActionFrequencyMillis", 1200000);
AutoScalerConfig autoScalerConfig = OBJECT_MAPPER.convertValue(autoScalerConfigMap, AutoScalerConfig.class);
supervisor = getTestableSupervisor(
1,
1,
true,
"PT1H",
null,
null,
false,
null,
null,
autoScalerConfig
);
KinesisSupervisorSpec kinesisSupervisorSpec = supervisor.getKinesisSupervisorSpec();
SupervisorTaskAutoScaler autoscaler = kinesisSupervisorSpec.createAutoscaler(supervisor);
supervisorRecordSupplier.assign(EasyMock.anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
.andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
.anyTimes();
EasyMock.expect(supervisorRecordSupplier.getAssignment())
.andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
.anyTimes();
supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
Capture<KinesisIndexTask> captured = Capture.newInstance();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KinesisDataSourceMetadata(
null
)
).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
replayAll();
supervisor.start();
int taskCountBeforeScale = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(1, taskCountBeforeScale);
autoscaler.start();
supervisor.runInternal();
verifyAll();
Thread.sleep(1 * 1000);
int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountAfterScale);
}
@Test
public void testNoInitialStateWithAutoScaleIn() throws Exception
{
HashMap<String, Object> autoScalerConfigMap = new HashMap<>();
autoScalerConfigMap.put("enableTaskAutoScaler", true);
autoScalerConfigMap.put("lagCollectionIntervalMillis", 500);
autoScalerConfigMap.put("lagCollectionRangeMillis", 500);
autoScalerConfigMap.put("scaleOutThreshold", 1000000);
autoScalerConfigMap.put("triggerScaleOutFractionThreshold", 0.8);
autoScalerConfigMap.put("scaleInThreshold", 0);
autoScalerConfigMap.put("triggerScaleInFractionThreshold", 0.0);
autoScalerConfigMap.put("scaleActionStartDelayMillis", 0);
autoScalerConfigMap.put("scaleActionPeriodMillis", 100);
autoScalerConfigMap.put("taskCountMax", 2);
autoScalerConfigMap.put("taskCountMin", 1);
autoScalerConfigMap.put("scaleInStep", 1);
autoScalerConfigMap.put("scaleOutStep", 2);
autoScalerConfigMap.put("minTriggerScaleActionFrequencyMillis", 1200000);
AutoScalerConfig autoScalerConfig = OBJECT_MAPPER.convertValue(autoScalerConfigMap, AutoScalerConfig.class);
supervisor = getTestableSupervisor(
1,
2,
true,
"PT1H",
null,
null,
false,
null,
null,
autoScalerConfig
);
KinesisSupervisorSpec kinesisSupervisorSpec = supervisor.getKinesisSupervisorSpec();
SupervisorTaskAutoScaler autoscaler = kinesisSupervisorSpec.createAutoscaler(supervisor);
supervisorRecordSupplier.assign(EasyMock.anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM))
.andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0))
.anyTimes();
EasyMock.expect(supervisorRecordSupplier.getAssignment())
.andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION))
.anyTimes();
supervisorRecordSupplier.seekToLatest(EasyMock.anyObject());
EasyMock.expectLastCall().anyTimes();
EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes();
supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString());
EasyMock.expectLastCall().anyTimes();
Capture<KinesisIndexTask> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock
.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE))
.andReturn(new KinesisDataSourceMetadata(null))
.anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2);
replayAll();
int taskCountInit = supervisor.getIoConfig().getTaskCount();
// when enable autoScaler the init taskCount will be equal to taskCountMin
Assert.assertEquals(1, taskCountInit);
supervisor.getIoConfig().setTaskCount(2);
supervisor.start();
int taskCountBeforeScale = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountBeforeScale);
autoscaler.start();
supervisor.runInternal();
verifyAll();
Thread.sleep(1 * 1000);
int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(1, taskCountAfterScale);
}
@Test
public void testRecordSupplier()
{
@ -347,69 +501,64 @@ public class KinesisSupervisorTest extends EasyMockSupport
}
@Test
public void testKinesisIOConfig()
public void testKinesisIOConfigInitAndAutoscalerConfigCreation()
{
Exception e = null;
try {
KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
STREAM,
INPUT_FORMAT,
"awsEndpoint",
null,
1,
1,
new Period("PT30M"),
new Period("P1D"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null,
null,
100,
1000,
null,
null,
null,
false
);
AutoScalerConfig autoScalerConfig = kinesisSupervisorIOConfig.getAutoscalerConfig();
Assert.assertNull(autoScalerConfig);
}
catch (Exception ex) {
e = ex;
}
Assert.assertNull(e);
// create KinesisSupervisorIOConfig with autoScalerConfig null
KinesisSupervisorIOConfig kinesisSupervisorIOConfigWithNullAutoScalerConfig = new KinesisSupervisorIOConfig(
STREAM,
INPUT_FORMAT,
"awsEndpoint",
null,
1,
1,
new Period("PT30M"),
new Period("P1D"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null,
null,
100,
1000,
null,
null,
null,
false
);
try {
KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
STREAM,
INPUT_FORMAT,
"awsEndpoint",
null,
1,
1,
new Period("PT30M"),
new Period("P1D"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null,
null,
100,
1000,
null,
null,
OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class),
false
);
}
catch (Exception ex) {
e = ex;
}
Assert.assertNotNull(e);
Assert.assertTrue(e instanceof UnsupportedOperationException);
AutoScalerConfig autoscalerConfigNull = kinesisSupervisorIOConfigWithNullAutoScalerConfig.getAutoscalerConfig();
Assert.assertNull(autoscalerConfigNull);
// create KinesisSupervisorIOConfig with autoScalerConfig Empty
KinesisSupervisorIOConfig kinesisSupervisorIOConfigWithEmptyAutoScalerConfig = new KinesisSupervisorIOConfig(
STREAM,
INPUT_FORMAT,
"awsEndpoint",
null,
1,
1,
new Period("PT30M"),
new Period("P1D"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null,
null,
100,
1000,
null,
null,
OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class),
false
);
AutoScalerConfig autoscalerConfig = kinesisSupervisorIOConfigWithEmptyAutoScalerConfig.getAutoscalerConfig();
Assert.assertNotNull(autoscalerConfig);
Assert.assertTrue(autoscalerConfig instanceof LagBasedAutoScalerConfig);
Assert.assertFalse(autoscalerConfig.getEnableTaskAutoScaler());
Assert.assertTrue(autoscalerConfig.toString().contains("autoScalerConfig"));
}
@Test
@ -4895,6 +5044,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
earlyMessageRejectionPeriod,
false,
null,
null,
null
);
}
@ -4908,7 +5058,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
Period earlyMessageRejectionPeriod,
boolean suspended,
Integer recordsPerFetch,
Integer fetchDelayMillis
Integer fetchDelayMillis,
AutoScalerConfig autoScalerConfig
)
{
KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
@ -4930,7 +5081,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
fetchDelayMillis,
null,
null,
null,
autoScalerConfig,
false
);
@ -5303,6 +5454,8 @@ public class KinesisSupervisorTest extends EasyMockSupport
private class TestableKinesisSupervisor extends KinesisSupervisor
{
private KinesisSupervisorSpec spec;
TestableKinesisSupervisor(
TaskStorage taskStorage,
TaskMaster taskMaster,
@ -5323,6 +5476,12 @@ public class KinesisSupervisorTest extends EasyMockSupport
rowIngestionMetersFactory,
null
);
this.spec = spec;
}
protected KinesisSupervisorSpec getKinesisSupervisorSpec()
{
return spec;
}
@Override

View File

@ -68,7 +68,6 @@ public interface Supervisor
/**
* Computes maxLag, totalLag and avgLag
* Only supports Kafka ingestion so far.
*/
LagStats computeLagStats();