Dynamic auto scale Kafka-Stream ingest tasks (#10524)

* druid task auto scale based on kafka lag

* fix kafkaSupervisorIOConfig and KinesisSupervisorIOConfig

* druid task auto scale based on kafka lag

* fix kafkaSupervisorIOConfig and KinesisSupervisorIOConfig

* test dynamic auto scale done

* auto scale tasks tested on prd cluster

* auto scale tasks tested on prd cluster

* modify code style to solve 29055.10 29055.9 29055.17 29055.18 29055.19 29055.20

* rename test fiel function

* change codes and add docs based on capistrant reviewed

* midify test docs

* modify docs

* modify docs

* modify docs

* merge from master

* Extract the autoScale logic out of SeekableStreamSupervisor to minimize putting more stuff inside there &&  Make autoscaling algorithm configurable and scalable.

* fix ci failed

* revert msic.xml

* add uts to test autoscaler create && scale out/in and kafka ingest with scale enable

* add more uts

* fix inner class check

* add IT for kafka ingestion with autoscaler

* add new IT in groups=kafka-index named testKafkaIndexDataWithWithAutoscaler

* review change

* code review

* remove unused imports

* fix NLP

* fix docs and UTs

* revert misc.xml

* use jackson to build autoScaleConfig with default values

* add uts

* use jackson to init AutoScalerConfig in IOConfig instead of Map<>

* autoscalerConfig interface and provide a defaultAutoScalerConfig

* modify uts

* modify docs

* fix checkstyle

* revert misc.xml

* modify uts

* reviewed code change

* reviewed code change

* code reviewed

* code review

* log changed

* do StringUtils.encodeForFormat when create allocationExec

* code review && limit taskCountMax to partitionNumbers

* modify docs

* code review

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
This commit is contained in:
zhangyue19921010 2021-03-06 17:06:52 +08:00 committed by GitHub
parent 16acd6686a
commit bddacbb1c3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
36 changed files with 2671 additions and 23 deletions

View File

@ -146,6 +146,115 @@ A sample supervisor spec is shown below:
|`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)|
|`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)|
|`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)|
|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the number of Kafka ingest tasks. ONLY supported for Kafka indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler Properties) for details.|no (default == null)|
### Task Autoscaler Properties
> Note that Task AutoScaler is currently designated as experimental.
| Property | Description | Required |
| ------------- | ------------- | ------------- |
| `enableTaskAutoScaler` | Whether enable this feature or not. Set false or ignored here will disable `autoScaler` even though `autoScalerConfig` is not null| no (default == false) |
| `taskCountMax` | Maximum value of task count. Make Sure `taskCountMax >= taskCountMin`. If `taskCountMax > {numKafkaPartitions}`, the maximum number of reading tasks would be equal to `{numKafkaPartitions}` and `taskCountMax` would be ignored. | yes |
| `taskCountMin` | Minimum value of task count. When enable autoscaler, the value of taskCount in `IOConfig` will be ignored, and `taskCountMin` will be the number of tasks that ingestion starts going up to `taskCountMax`| yes |
| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two scale actions | no (default == 600000) |
| `autoScalerStrategy` | The algorithm of `autoScaler`. ONLY `lagBased` is supported for now. See [Lag Based AutoScaler Strategy Related Properties](#Lag Based AutoScaler Strategy Related Properties) for details.| no (default == `lagBased`) |
### Lag Based AutoScaler Strategy Related Properties
| Property | Description | Required |
| ------------- | ------------- | ------------- |
| `lagCollectionIntervalMillis` | Period of lag points collection. | no (default == 30000) |
| `lagCollectionRangeMillis` | The total time window of lag collection, Use with `lagCollectionIntervalMillis`it means that in the recent `lagCollectionRangeMillis`, collect lag metric points every `lagCollectionIntervalMillis`. | no (default == 600000) |
| `scaleOutThreshold` | The Threshold of scale out action | no (default == 6000000) |
| `triggerScaleOutFractionThreshold` | If `triggerScaleOutFractionThreshold` percent of lag points are higher than `scaleOutThreshold`, then do scale out action. | no (default == 0.3) |
| `scaleInThreshold` | The Threshold of scale in action | no (default == 1000000) |
| `triggerScaleInFractionThreshold` | If `triggerScaleInFractionThreshold` percent of lag points are lower than `scaleOutThreshold`, then do scale in action. | no (default == 0.9) |
| `scaleActionStartDelayMillis` | Number of milliseconds after supervisor starts when first check scale logic. | no (default == 300000) |
| `scaleActionPeriodMillis` | The frequency of checking whether to do scale action in millis | no (default == 60000) |
| `scaleInStep` | How many tasks to reduce at a time | no (default == 1) |
| `scaleOutStep` | How many tasks to add at a time | no (default == 2) |
A sample supervisor spec with `lagBased` autoScaler enabled is shown below:
```json
{
"type": "kafka",
"dataSchema": {
"dataSource": "metrics-kafka",
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": [
],
"dimensionExclusions": [
"timestamp",
"value"
]
},
"metricsSpec": [
{
"name": "count",
"type": "count"
},
{
"name": "value_sum",
"fieldName": "value",
"type": "doubleSum"
},
{
"name": "value_min",
"fieldName": "value",
"type": "doubleMin"
},
{
"name": "value_max",
"fieldName": "value",
"type": "doubleMax"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "HOUR",
"queryGranularity": "NONE"
}
},
"ioConfig": {
"topic": "metrics",
"inputFormat": {
"type": "json"
},
"consumerProperties": {
"bootstrap.servers": "localhost:9092"
},
"autoScalerConfig": {
"enableTaskAutoScaler": true,
"taskCountMax": 6,
"taskCountMin": 2,
"minTriggerScaleActionFrequencyMillis": 600000,
"autoScalerStrategy": "lagBased",
"lagCollectionIntervalMillis": 30000,
"lagCollectionRangeMillis": 600000,
"scaleOutThreshold": 6000000,
"triggerScaleOutFractionThreshold": 0.3,
"scaleInThreshold": 1000000,
"triggerScaleInFractionThreshold": 0.9,
"scaleActionStartDelayMillis": 300000,
"scaleActionPeriodMillis": 60000,
"scaleInStep": 1,
"scaleOutStep": 2
},
"taskCount":1,
"replicas":1,
"taskDuration":"PT1H"
},
"tuningConfig":{
"type":"kafka",
"maxRowsPerSegment":5000000
}
}
```
#### More on consumerProperties

View File

@ -37,6 +37,7 @@ import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.JodaUtils;
@ -282,6 +283,18 @@ public class MaterializedViewSupervisor implements Supervisor
// do nothing
}
@Override
public LagStats computeLagStats()
{
throw new UnsupportedOperationException("Compute Lag Stats not supported in MaterializedViewSupervisor");
}
@Override
public int getActiveTaskGroupsCount()
{
throw new UnsupportedOperationException("Get Active Task Groups Count is not supported in MaterializedViewSupervisor");
}
/**
* Find intervals in which derived dataSource should rebuild the segments.
* Choose the latest intervals to create new HadoopIndexTask and submit it.

View File

@ -29,7 +29,9 @@ import org.apache.druid.indexer.HadoopTuningConfig;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.math.expr.ExprMacroTable;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.metadata.SqlSegmentsMetadataManager;
@ -50,6 +52,7 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.util.concurrent.Callable;
public class MaterializedViewSupervisorSpecTest
{
@ -155,6 +158,85 @@ public class MaterializedViewSupervisorSpecTest
Assert.assertEquals(expected.getMetrics(), spec.getMetrics());
}
@Test
public void testMaterializedViewSupervisorSpecCreated()
{
Exception ex = null;
try {
MaterializedViewSupervisorSpec spec = new MaterializedViewSupervisorSpec(
"wikiticker",
new DimensionsSpec(
Lists.newArrayList(
new StringDimensionSchema("isUnpatrolled"),
new StringDimensionSchema("metroCode"),
new StringDimensionSchema("namespace"),
new StringDimensionSchema("page"),
new StringDimensionSchema("regionIsoCode"),
new StringDimensionSchema("regionName"),
new StringDimensionSchema("user")
),
null,
null
),
new AggregatorFactory[]{
new CountAggregatorFactory("count"),
new LongSumAggregatorFactory("added", "added")
},
HadoopTuningConfig.makeDefaultTuningConfig(),
null,
null,
null,
null,
null,
false,
objectMapper,
null,
null,
null,
null,
null,
new MaterializedViewTaskConfig(),
EasyMock.createMock(AuthorizerMapper.class),
new NoopChatHandlerProvider(),
new SupervisorStateManagerConfig()
);
Supervisor supervisor = spec.createSupervisor();
Assert.assertTrue(supervisor instanceof MaterializedViewSupervisor);
SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor);
Assert.assertNull(autoscaler);
try {
supervisor.computeLagStats();
}
catch (Exception e) {
Assert.assertTrue(e instanceof UnsupportedOperationException);
}
try {
int count = supervisor.getActiveTaskGroupsCount();
}
catch (Exception e) {
Assert.assertTrue(e instanceof UnsupportedOperationException);
}
Callable<Integer> noop = new Callable<Integer>() {
@Override
public Integer call()
{
return -1;
}
};
}
catch (Exception e) {
ex = e;
}
Assert.assertNull(ex);
}
@Test
public void testSuspendResuume() throws IOException
{

View File

@ -38,6 +38,7 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig;
@ -58,6 +59,7 @@ import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.joda.time.DateTime;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -330,6 +332,17 @@ public class KafkaSupervisor extends SeekableStreamSupervisor<Integer, Long, Kaf
return false;
}
@Override
public LagStats computeLagStats()
{
Map<Integer, Long> partitionRecordLag = getPartitionRecordLag();
if (partitionRecordLag == null) {
return new LagStats(0, 0, 0);
}
return computeLags(partitionRecordLag);
}
@Override
protected void updatePartitionLagFromStream()
{

View File

@ -24,10 +24,12 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.StringUtils;
import org.joda.time.DateTime;
import org.joda.time.Period;
import javax.annotation.Nullable;
import java.util.Map;
public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
@ -51,6 +53,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
@JsonProperty("taskCount") Integer taskCount,
@JsonProperty("taskDuration") Period taskDuration,
@JsonProperty("consumerProperties") Map<String, Object> consumerProperties,
@Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig autoScalerConfig,
@JsonProperty("pollTimeout") Long pollTimeout,
@JsonProperty("startDelay") Period startDelay,
@JsonProperty("period") Period period,
@ -73,6 +76,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
completionTimeout,
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
autoScalerConfig,
lateMessageRejectionStartDateTime
);
@ -117,6 +121,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
", taskCount=" + getTaskCount() +
", taskDuration=" + getTaskDuration() +
", consumerProperties=" + consumerProperties +
", autoScalerConfig=" + getAutoscalerConfig() +
", pollTimeout=" + pollTimeout +
", startDelay=" + getStartDelay() +
", period=" + getPeriod() +

View File

@ -134,6 +134,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
null,
null,
null,
null,
true,
null,
null,

View File

@ -61,13 +61,16 @@ import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@ -158,6 +161,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
private RowIngestionMetersFactory rowIngestionMetersFactory;
private ExceptionCapturingServiceEmitter serviceEmitter;
private SupervisorStateManagerConfig supervisorConfig;
private KafkaSupervisorIngestionSpec ingestionSchema;
private static String getTopic()
{
@ -214,6 +218,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
serviceEmitter = new ExceptionCapturingServiceEmitter();
EmittingLogger.registerEmitter(serviceEmitter);
supervisorConfig = new SupervisorStateManagerConfig();
ingestionSchema = EasyMock.createMock(KafkaSupervisorIngestionSpec.class);
}
@After
@ -232,6 +237,197 @@ public class KafkaSupervisorTest extends EasyMockSupport
zkServer = null;
}
@Test
public void testNoInitialStateWithAutoscaler() throws Exception
{
KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory(
null,
null
)
{
@Override
public KafkaIndexTaskClient build(
TaskInfoProvider taskInfoProvider,
String dataSource,
int numThreads,
Duration httpTimeout,
long numRetries
)
{
Assert.assertEquals(TEST_CHAT_THREADS, numThreads);
Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout);
Assert.assertEquals(TEST_CHAT_RETRIES, numRetries);
return taskClient;
}
};
HashMap<String, Object> autoScalerConfig = new HashMap<>();
autoScalerConfig.put("enableTaskAutoScaler", true);
autoScalerConfig.put("lagCollectionIntervalMillis", 500);
autoScalerConfig.put("lagCollectionRangeMillis", 500);
autoScalerConfig.put("scaleOutThreshold", 0);
autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.0);
autoScalerConfig.put("scaleInThreshold", 1000000);
autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
autoScalerConfig.put("scaleActionStartDelayMillis", 0);
autoScalerConfig.put("scaleActionPeriodMillis", 100);
autoScalerConfig.put("taskCountMax", 2);
autoScalerConfig.put("taskCountMin", 1);
autoScalerConfig.put("scaleInStep", 1);
autoScalerConfig.put("scaleOutStep", 2);
autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
final Map<String, Object> consumerProperties = KafkaConsumerConfigs.getConsumerProperties();
consumerProperties.put("myCustomKey", "myCustomValue");
consumerProperties.put("bootstrap.servers", kafkaHost);
KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig(
topic,
INPUT_FORMAT,
1,
1,
new Period("PT1H"),
consumerProperties,
OBJECT_MAPPER.convertValue(autoScalerConfig, LagBasedAutoScalerConfig.class),
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
new Period("P1D"),
new Period("PT30S"),
true,
new Period("PT30M"),
null,
null,
null
);
final KafkaSupervisorTuningConfig tuningConfigOri = new KafkaSupervisorTuningConfig(
null,
1000,
null,
null,
50000,
null,
new Period("P1Y"),
new File("/test"),
null,
null,
null,
true,
false,
null,
false,
null,
numThreads,
TEST_CHAT_THREADS,
TEST_CHAT_RETRIES,
TEST_HTTP_TIMEOUT,
TEST_SHUTDOWN_TIMEOUT,
null,
null,
null,
null,
null
);
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(kafkaSupervisorIOConfig).anyTimes();
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(tuningConfigOri).anyTimes();
EasyMock.replay(ingestionSchema);
SeekableStreamSupervisorSpec testableSupervisorSpec = new KafkaSupervisorSpec(
ingestionSchema,
dataSchema,
tuningConfigOri,
kafkaSupervisorIOConfig,
null,
false,
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
taskClientFactory,
OBJECT_MAPPER,
new NoopServiceEmitter(),
new DruidMonitorSchedulerConfig(),
rowIngestionMetersFactory,
new SupervisorStateManagerConfig()
);
supervisor = new TestableKafkaSupervisor(
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
taskClientFactory,
OBJECT_MAPPER,
(KafkaSupervisorSpec) testableSupervisorSpec,
rowIngestionMetersFactory
);
SupervisorTaskAutoScaler autoscaler = testableSupervisorSpec.createAutoscaler(supervisor);
final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig();
addSomeEvents(1);
Capture<KafkaIndexTask> captured = Capture.newInstance();
EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes();
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn(
new KafkaDataSourceMetadata(
null
)
).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true);
taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class));
replayAll();
supervisor.start();
int taskCountBeforeScale = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(1, taskCountBeforeScale);
autoscaler.start();
supervisor.runInternal();
Thread.sleep(1 * 1000);
verifyAll();
int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountAfterScale);
KafkaIndexTask task = captured.getValue();
Assert.assertEquals(KafkaSupervisorTest.dataSchema, task.getDataSchema());
Assert.assertEquals(tuningConfig.convertToTaskTuningConfig(), task.getTuningConfig());
KafkaIndexTaskIOConfig taskConfig = task.getIOConfig();
Assert.assertEquals(kafkaHost, taskConfig.getConsumerProperties().get("bootstrap.servers"));
Assert.assertEquals("myCustomValue", taskConfig.getConsumerProperties().get("myCustomKey"));
Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName());
Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction());
Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent());
Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent());
Assert.assertEquals(topic, taskConfig.getStartSequenceNumbers().getStream());
Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(0));
Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(1));
Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(2));
Assert.assertEquals(topic, taskConfig.getEndSequenceNumbers().getStream());
Assert.assertEquals(
Long.MAX_VALUE,
(long) taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(0)
);
Assert.assertEquals(
Long.MAX_VALUE,
(long) taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(1)
);
Assert.assertEquals(
Long.MAX_VALUE,
(long) taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(2)
);
autoscaler.reset();
autoscaler.stop();
}
@Test
public void testCreateBaseTaskContexts() throws JsonProcessingException
{
@ -3379,6 +3575,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskCount,
new Period(duration),
consumerProperties,
null,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
new Period("P1D"),
new Period("PT30S"),
@ -3491,6 +3688,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskCount,
new Period(duration),
consumerProperties,
null,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
new Period("P1D"),
new Period("PT30S"),
@ -3607,6 +3805,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
taskCount,
new Period(duration),
consumerProperties,
null,
KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
new Period("P1D"),
new Period("PT30S"),

View File

@ -39,6 +39,7 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
@ -378,6 +379,13 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
return true;
}
// not yet supported, will be implemented in the future maybe? need to find a proper way to measure kinesis lag.
@Override
public LagStats computeLagStats()
{
throw new UnsupportedOperationException("Compute Lag Stats is not supported in KinesisSupervisor yet.");
}
@Override
protected Map<String, OrderedSequenceNumber<String>> filterExpiredPartitionsFromStartingOffsets(
Map<String, OrderedSequenceNumber<String>> startingOffsets

View File

@ -26,9 +26,12 @@ import org.apache.druid.data.input.InputFormat;
import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig;
import org.apache.druid.indexing.kinesis.KinesisRegion;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.joda.time.DateTime;
import org.joda.time.Period;
import javax.annotation.Nullable;
public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
{
private final String endpoint;
@ -70,6 +73,7 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
@JsonProperty("fetchDelayMillis") Integer fetchDelayMillis,
@JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn,
@JsonProperty("awsExternalId") String awsExternalId,
@Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig autoScalerConfig,
@JsonProperty("deaggregate") boolean deaggregate
)
{
@ -85,8 +89,16 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig
completionTimeout,
lateMessageRejectionPeriod,
earlyMessageRejectionPeriod,
autoScalerConfig,
lateMessageRejectionStartDateTime
);
// for now dynamic Allocation Tasks is not supported here
// throw UnsupportedOperationException in case someone sets this on a kinesis supervisor spec.
if (autoScalerConfig != null) {
throw new UnsupportedOperationException("Tasks auto scaler for kinesis is not supported yet. Please remove autoScalerConfig or set it to null!");
}
this.endpoint = endpoint != null
? endpoint
: (region != null ? region.getEndpoint() : KinesisRegion.US_EAST_1.getEndpoint());

View File

@ -153,6 +153,7 @@ public class KinesisSamplerSpecTest extends EasyMockSupport
null,
null,
null,
null,
false
),
null,

View File

@ -66,6 +66,7 @@ import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager;
import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
@ -102,6 +103,7 @@ import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -304,6 +306,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
1000,
null,
null,
null,
false
);
KinesisIndexTaskClientFactory clientFactory = new KinesisIndexTaskClientFactory(null, OBJECT_MAPPER);
@ -344,6 +347,72 @@ public class KinesisSupervisorTest extends EasyMockSupport
Assert.assertFalse(supplier.isBackgroundFetchRunning());
}
@Test
public void testKinesisIOConfig()
{
Exception e = null;
try {
KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
STREAM,
INPUT_FORMAT,
"awsEndpoint",
null,
1,
1,
new Period("PT30M"),
new Period("P1D"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null,
null,
100,
1000,
null,
null,
null,
false
);
AutoScalerConfig autoScalerConfig = kinesisSupervisorIOConfig.getAutoscalerConfig();
Assert.assertNull(autoScalerConfig);
}
catch (Exception ex) {
e = ex;
}
Assert.assertNull(e);
try {
KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig(
STREAM,
INPUT_FORMAT,
"awsEndpoint",
null,
1,
1,
new Period("PT30M"),
new Period("P1D"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null,
null,
100,
1000,
null,
null,
OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class),
false
);
}
catch (Exception ex) {
e = ex;
}
Assert.assertNotNull(e);
Assert.assertTrue(e instanceof UnsupportedOperationException);
}
@Test
public void testMultiTask() throws Exception
{
@ -4721,6 +4790,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
null,
null,
null,
null,
false
);
@ -4863,6 +4933,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
fetchDelayMillis,
null,
null,
null,
false
);
@ -4950,6 +5021,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
fetchDelayMillis,
null,
null,
null,
false
);
@ -5039,6 +5111,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
fetchDelayMillis,
null,
null,
null,
false
);

View File

@ -62,7 +62,11 @@
<artifactId>druid-hll</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.2</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>

View File

@ -23,6 +23,7 @@ import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
@ -30,6 +31,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.metadata.MetadataSupervisorManager;
import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -44,6 +46,8 @@ public class SupervisorManager
private final MetadataSupervisorManager metadataSupervisorManager;
private final ConcurrentHashMap<String, Pair<Supervisor, SupervisorSpec>> supervisors = new ConcurrentHashMap<>();
// SupervisorTaskAutoScaler could be null
private final ConcurrentHashMap<String, SupervisorTaskAutoScaler> autoscalers = new ConcurrentHashMap<>();
private final Object lock = new Object();
private volatile boolean started = false;
@ -54,6 +58,11 @@ public class SupervisorManager
this.metadataSupervisorManager = metadataSupervisorManager;
}
public MetadataSupervisorManager getMetadataSupervisorManager()
{
return metadataSupervisorManager;
}
public Set<String> getSupervisorIds()
{
return supervisors.keySet();
@ -140,12 +149,17 @@ public class SupervisorManager
for (String id : supervisors.keySet()) {
try {
supervisors.get(id).lhs.stop(false);
SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
if (autoscaler != null) {
autoscaler.stop();
}
}
catch (Exception e) {
log.warn(e, "Caught exception while stopping supervisor [%s]", id);
}
}
supervisors.clear();
autoscalers.clear();
started = false;
}
@ -187,6 +201,10 @@ public class SupervisorManager
}
supervisor.lhs.reset(dataSourceMetadata);
SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
if (autoscaler != null) {
autoscaler.reset();
}
return true;
}
@ -238,6 +256,12 @@ public class SupervisorManager
pair.lhs.stop(true);
supervisors.remove(id);
SupervisorTaskAutoScaler autoscler = autoscalers.get(id);
if (autoscler != null) {
autoscler.stop();
autoscalers.remove(id);
}
return true;
}
@ -282,9 +306,16 @@ public class SupervisorManager
}
Supervisor supervisor;
SupervisorTaskAutoScaler autoscaler;
try {
supervisor = spec.createSupervisor();
autoscaler = spec.createAutoscaler(supervisor);
supervisor.start();
if (autoscaler != null) {
autoscaler.start();
autoscalers.put(id, autoscaler);
}
}
catch (Exception e) {
// Supervisor creation or start failed write tombstone only when trying to start a new supervisor

View File

@ -56,8 +56,10 @@ import org.apache.druid.indexing.overlord.TaskRunnerListener;
import org.apache.druid.indexing.overlord.TaskRunnerWorkItem;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorReport;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient;
@ -71,6 +73,7 @@ import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
@ -82,6 +85,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.metadata.EntryExistsException;
import org.apache.druid.metadata.MetadataSupervisorManager;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.joda.time.DateTime;
@ -102,6 +106,7 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
@ -318,6 +323,127 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
// change taskCount without resubmitting.
private class DynamicAllocationTasksNotice implements Notice
{
Callable<Integer> scaleAction;
DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
{
this.scaleAction = scaleAction;
}
/**
* This method will do lag points collection and check dynamic scale action is necessary or not.
*/
@Override
public void handle()
{
if (autoScalerConfig == null) {
log.warn("autoScalerConfig is null but dynamic allocation notice is submitted, how can it be ?");
} else {
try {
long nowTime = System.currentTimeMillis();
if (spec.isSuspended()) {
log.info("Skipping DynamicAllocationTasksNotice execution because [%s] supervisor is suspended",
dataSource
);
return;
}
log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s]", pendingCompletionTaskGroups,
dataSource
);
for (CopyOnWriteArrayList<TaskGroup> list : pendingCompletionTaskGroups.values()) {
if (!list.isEmpty()) {
log.info(
"Skipping DynamicAllocationTasksNotice execution for datasource [%s] because following tasks are pending [%s]",
dataSource, pendingCompletionTaskGroups
);
return;
}
}
if (nowTime - dynamicTriggerLastRunTime < autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
log.info(
"DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
nowTime - dynamicTriggerLastRunTime, autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), dataSource
);
return;
}
final Integer desriedTaskCount = scaleAction.call();
boolean allocationSuccess = changeTaskCount(desriedTaskCount);
if (allocationSuccess) {
dynamicTriggerLastRunTime = nowTime;
}
}
catch (Exception ex) {
log.warn(ex, "Error parsing DynamicAllocationTasksNotice");
}
}
}
}
/**
* This method determines how to do scale actions based on collected lag points.
* If scale action is triggered :
* First of all, call gracefulShutdownInternal() which will change the state of current datasource ingest tasks from reading to publishing.
* Secondly, clear all the stateful data structures: activelyReadingTaskGroups, partitionGroups, partitionOffsets, pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in the next 'RunNotice'.
* Finally, change the taskCount in SeekableStreamSupervisorIOConfig and sync it to MetadataStorage.
* After the taskCount is changed in SeekableStreamSupervisorIOConfig, next RunNotice will create scaled number of ingest tasks without resubmitting the supervisor.
* @param desiredActiveTaskCount desired taskCount computed from AutoScaler
* @return Boolean flag indicating if scale action was executed or not. If true, it will wait at least 'minTriggerScaleActionFrequencyMillis' before next 'changeTaskCount'.
* If false, it will do 'changeTaskCount' again after 'scaleActionPeriodMillis' millis.
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
private boolean changeTaskCount(int desiredActiveTaskCount) throws InterruptedException, ExecutionException, TimeoutException
{
int currentActiveTaskCount;
Collection<TaskGroup> activeTaskGroups = activelyReadingTaskGroups.values();
currentActiveTaskCount = activeTaskGroups.size();
if (desiredActiveTaskCount < 0 || desiredActiveTaskCount == currentActiveTaskCount) {
return false;
} else {
log.info(
"Starting scale action, current active task count is [%d] and desired task count is [%d] for dataSource [%s].",
currentActiveTaskCount, desiredActiveTaskCount, dataSource
);
gracefulShutdownInternal();
changeTaskCountInIOConfig(desiredActiveTaskCount);
clearAllocationInfo();
log.info("Changed taskCount to [%s] for dataSource [%s].", desiredActiveTaskCount, dataSource);
return true;
}
}
private void changeTaskCountInIOConfig(int desiredActiveTaskCount)
{
ioConfig.setTaskCount(desiredActiveTaskCount);
try {
Optional<SupervisorManager> supervisorManager = taskMaster.getSupervisorManager();
if (supervisorManager.isPresent()) {
MetadataSupervisorManager metadataSupervisorManager = supervisorManager.get().getMetadataSupervisorManager();
metadataSupervisorManager.insert(dataSource, spec);
} else {
log.error("supervisorManager is null in taskMaster, skipping scale action for dataSource [%s].", dataSource);
}
}
catch (Exception e) {
log.error(e, "Failed to sync taskCount to MetaStorage for dataSource [%s].", dataSource);
}
}
private void clearAllocationInfo()
{
activelyReadingTaskGroups.clear();
partitionGroups.clear();
partitionOffsets.clear();
pendingCompletionTaskGroups.clear();
partitionIds.clear();
}
private class GracefulShutdownNotice extends ShutdownNotice
{
@Override
@ -470,6 +596,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private final SeekableStreamIndexTaskClient<PartitionIdType, SequenceOffsetType> taskClient;
private final SeekableStreamSupervisorSpec spec;
private final SeekableStreamSupervisorIOConfig ioConfig;
private final AutoScalerConfig autoScalerConfig;
private final SeekableStreamSupervisorTuningConfig tuningConfig;
private final SeekableStreamIndexTaskTuningConfig taskTuningConfig;
private final String supervisorId;
@ -488,6 +615,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private final boolean useExclusiveStartingSequence;
private boolean listenerRegistered = false;
private long lastRunTime;
private long dynamicTriggerLastRunTime;
private int initRetryCounter = 0;
private volatile DateTime firstRunTime;
private volatile DateTime earlyStopTime = null;
@ -519,20 +647,40 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
this.useExclusiveStartingSequence = useExclusiveStartingSequence;
this.dataSource = spec.getDataSchema().getDataSource();
this.ioConfig = spec.getIoConfig();
this.autoScalerConfig = ioConfig.getAutoscalerConfig();
this.tuningConfig = spec.getTuningConfig();
this.taskTuningConfig = this.tuningConfig.convertToTaskTuningConfig();
this.supervisorId = supervisorId;
this.exec = Execs.singleThreaded(StringUtils.encodeForFormat(supervisorId));
this.scheduledExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Scheduler-%d");
this.reportingExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Reporting-%d");
this.stateManager = new SeekableStreamSupervisorStateManager(
spec.getSupervisorStateManagerConfig(),
spec.isSuspended()
);
int workerThreads = (this.tuningConfig.getWorkerThreads() != null
? this.tuningConfig.getWorkerThreads()
: Math.min(10, this.ioConfig.getTaskCount()));
int workerThreads;
int chatThreads;
if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler()) {
log.info("Running Task autoscaler for datasource [%s]", dataSource);
workerThreads = (this.tuningConfig.getWorkerThreads() != null
? this.tuningConfig.getWorkerThreads()
: Math.min(10, autoScalerConfig.getTaskCountMax()));
chatThreads = (this.tuningConfig.getChatThreads() != null
? this.tuningConfig.getChatThreads()
: Math.min(10, autoScalerConfig.getTaskCountMax() * this.ioConfig.getReplicas()));
} else {
workerThreads = (this.tuningConfig.getWorkerThreads() != null
? this.tuningConfig.getWorkerThreads()
: Math.min(10, this.ioConfig.getTaskCount()));
chatThreads = (this.tuningConfig.getChatThreads() != null
? this.tuningConfig.getChatThreads()
: Math.min(10, this.ioConfig.getTaskCount() * this.ioConfig.getReplicas()));
}
this.workerExec = MoreExecutors.listeningDecorator(
Execs.multiThreaded(
@ -578,9 +726,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
+ IndexTaskClient.MAX_RETRY_WAIT_SECONDS)
);
int chatThreads = (this.tuningConfig.getChatThreads() != null
? this.tuningConfig.getChatThreads()
: Math.min(10, this.ioConfig.getTaskCount() * this.ioConfig.getReplicas()));
this.taskClient = taskClientFactory.build(
taskInfoProvider,
dataSource,
@ -597,6 +742,13 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
}
@Override
public int getActiveTaskGroupsCount()
{
return activelyReadingTaskGroups.values().size();
}
@Override
public void start()
{
@ -659,6 +811,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
scheduledExec.shutdownNow(); // stop recurring executions
reportingExec.shutdownNow();
if (started) {
Optional<TaskRunner> taskRunner = taskMaster.getTaskRunner();
if (taskRunner.isPresent()) {
@ -774,7 +927,6 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
scheduleReporting(reportingExec);
started = true;
log.info(
"Started SeekableStreamSupervisor[%s], first run in [%s], with spec: [%s]",
@ -797,6 +949,11 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction)
{
return () -> notices.add(new DynamicAllocationTasksNotice(scaleAction));
}
private Runnable buildRunTask()
{
return () -> notices.add(new RunNotice());
@ -1901,6 +2058,11 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return false;
}
public int getPartitionCount()
{
return recordSupplier.getPartitionIds(ioConfig.getStream()).size();
}
private boolean updatePartitionDataFromStream()
{
List<PartitionIdType> previousPartitionIds = new ArrayList<>(partitionIds);
@ -3510,29 +3672,21 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return;
}
long maxLag = 0, totalLag = 0, avgLag;
for (long lag : partitionLags.values()) {
if (lag > maxLag) {
maxLag = lag;
}
totalLag += lag;
}
avgLag = partitionLags.size() == 0 ? 0 : totalLag / partitionLags.size();
LagStats lagStats = computeLags(partitionLags);
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("dataSource", dataSource)
.build(StringUtils.format("ingest/%s/lag%s", type, suffix), totalLag)
.build(StringUtils.format("ingest/%s/lag%s", type, suffix), lagStats.getTotalLag())
);
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("dataSource", dataSource)
.build(StringUtils.format("ingest/%s/maxLag%s", type, suffix), maxLag)
.build(StringUtils.format("ingest/%s/maxLag%s", type, suffix), lagStats.getMaxLag())
);
emitter.emit(
ServiceMetricEvent.builder()
.setDimension("dataSource", dataSource)
.build(StringUtils.format("ingest/%s/avgLag%s", type, suffix), avgLag)
.build(StringUtils.format("ingest/%s/avgLag%s", type, suffix), lagStats.getAvgLag())
);
};
@ -3545,6 +3699,24 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
/**
* This method computes maxLag, totalLag and avgLag
* @param partitionLags lags per partition
*/
protected LagStats computeLags(Map<PartitionIdType, Long> partitionLags)
{
long maxLag = 0, totalLag = 0, avgLag;
for (long lag : partitionLags.values()) {
if (lag > maxLag) {
maxLag = lag;
}
totalLag += lag;
}
avgLag = partitionLags.size() == 0 ? 0 : totalLag / partitionLags.size();
return new LagStats(maxLag, totalLag, avgLag);
}
/**
* a special sequence number that is used to indicate that the sequence offset
* for a particular partition has not yet been calculated by the supervisor. When

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.IAE;
import org.joda.time.DateTime;
import org.joda.time.Duration;
@ -37,7 +38,7 @@ public abstract class SeekableStreamSupervisorIOConfig
@Nullable
private final InputFormat inputFormat; // nullable for backward compatibility
private final Integer replicas;
private final Integer taskCount;
private Integer taskCount;
private final Duration taskDuration;
private final Duration startDelay;
private final Duration period;
@ -46,6 +47,7 @@ public abstract class SeekableStreamSupervisorIOConfig
private final Optional<Duration> lateMessageRejectionPeriod;
private final Optional<Duration> earlyMessageRejectionPeriod;
private final Optional<DateTime> lateMessageRejectionStartDateTime;
@Nullable private final AutoScalerConfig autoScalerConfig;
public SeekableStreamSupervisorIOConfig(
String stream,
@ -59,13 +61,21 @@ public abstract class SeekableStreamSupervisorIOConfig
Period completionTimeout,
Period lateMessageRejectionPeriod,
Period earlyMessageRejectionPeriod,
@Nullable AutoScalerConfig autoScalerConfig,
DateTime lateMessageRejectionStartDateTime
)
{
this.stream = Preconditions.checkNotNull(stream, "stream cannot be null");
this.inputFormat = inputFormat;
this.replicas = replicas != null ? replicas : 1;
this.taskCount = taskCount != null ? taskCount : 1;
// Could be null
this.autoScalerConfig = autoScalerConfig;
// if autoscaler is enable then taskcount will be ignored here. and init taskcount will be equal to taskCountMin
if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler()) {
this.taskCount = autoScalerConfig.getTaskCountMin();
} else {
this.taskCount = taskCount != null ? taskCount : 1;
}
this.taskDuration = defaultDuration(taskDuration, "PT1H");
this.startDelay = defaultDuration(startDelay, "PT5S");
this.period = defaultDuration(period, "PT30S");
@ -113,12 +123,24 @@ public abstract class SeekableStreamSupervisorIOConfig
return replicas;
}
@Nullable
@JsonProperty
public AutoScalerConfig getAutoscalerConfig()
{
return autoScalerConfig;
}
@JsonProperty
public Integer getTaskCount()
{
return taskCount;
}
public void setTaskCount(final int taskCount)
{
this.taskCount = taskCount;
}
@JsonProperty
public Duration getTaskDuration()
{

View File

@ -32,7 +32,10 @@ import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
@ -151,6 +154,21 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
@Override
public abstract Supervisor createSupervisor();
/**
* An autoScaler instance will be returned depending on the autoScalerConfig. In case autoScalerConfig is null or autoScaler is disabled then NoopTaskAutoScaler will be returned.
* @param supervisor
* @return autoScaler
*/
@Override
public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
{
AutoScalerConfig autoScalerConfig = ingestionSchema.getIOConfig().getAutoscalerConfig();
if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler() && supervisor instanceof SeekableStreamSupervisor) {
return autoScalerConfig.createAutoScaler(supervisor, this);
}
return new NoopTaskAutoScaler();
}
@Override
public List<String> getDataSources()
{

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.guice.annotations.UnstableApi;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
@UnstableApi
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "autoScalerStrategy", defaultImpl = LagBasedAutoScalerConfig.class)
@JsonSubTypes(value = {
@Type(name = "lagBased", value = LagBasedAutoScalerConfig.class)
})
public interface AutoScalerConfig
{
boolean getEnableTaskAutoScaler();
long getMinTriggerScaleActionFrequencyMillis();
int getTaskCountMax();
int getTaskCountMin();
SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec);
}

View File

@ -0,0 +1,244 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
import org.apache.commons.collections4.queue.CircularFifoQueue;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
{
private static final EmittingLogger log = new EmittingLogger(LagBasedAutoScaler.class);
private final String dataSource;
private final CircularFifoQueue<Long> lagMetricsQueue;
private final ScheduledExecutorService lagComputationExec;
private final ScheduledExecutorService allocationExec;
private final SupervisorSpec spec;
private final SeekableStreamSupervisor supervisor;
private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
private static final ReentrantLock LOCK = new ReentrantLock(true);
public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String dataSource,
LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec
)
{
this.lagBasedAutoScalerConfig = autoScalerConfig;
final String supervisorId = StringUtils.format("Supervisor-%s", dataSource);
this.dataSource = dataSource;
final int slots = (int) (lagBasedAutoScalerConfig.getLagCollectionRangeMillis() / lagBasedAutoScalerConfig
.getLagCollectionIntervalMillis()) + 1;
this.lagMetricsQueue = new CircularFifoQueue<>(slots);
this.allocationExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Allocation-%d");
this.lagComputationExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Computation-%d");
this.spec = spec;
this.supervisor = supervisor;
}
@Override
public void start()
{
Callable<Integer> scaleAction = () -> {
LOCK.lock();
int desiredTaskCount = -1;
try {
desiredTaskCount = computeDesiredTaskCount(new ArrayList<>(lagMetricsQueue));
if (desiredTaskCount != -1) {
lagMetricsQueue.clear();
}
}
catch (Exception ex) {
log.warn(ex, "Exception while computing desired task count for [%s]", dataSource);
}
finally {
LOCK.unlock();
}
return desiredTaskCount;
};
lagComputationExec.scheduleAtFixedRate(
computeAndCollectLag(),
lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for tasks to start up
lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(),
TimeUnit.MILLISECONDS
);
allocationExec.scheduleAtFixedRate(
supervisor.buildDynamicAllocationTask(scaleAction),
lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + lagBasedAutoScalerConfig
.getLagCollectionRangeMillis(),
lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),
TimeUnit.MILLISECONDS
);
log.info(
"LagBasedAutoScaler will collect lag every [%d] millis and will keep [%d] data points for the last [%d] millis for dataSource [%s]",
lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), lagMetricsQueue.size(),
lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), dataSource
);
}
@Override
public void stop()
{
allocationExec.shutdownNow();
lagComputationExec.shutdownNow();
}
@Override
public void reset()
{
// clear queue for kafka lags
if (lagMetricsQueue != null) {
try {
LOCK.lock();
lagMetricsQueue.clear();
}
catch (Exception e) {
log.warn(e, "Error,when clear queue in rest action");
}
finally {
LOCK.unlock();
}
}
}
/**
* This method computes current consumer lag. Gets the total lag of all partitions and fill in the lagMetricsQueue
*
* @return a Runnbale object to compute and collect lag.
*/
private Runnable computeAndCollectLag()
{
return () -> {
LOCK.lock();
try {
if (!spec.isSuspended()) {
LagStats lagStats = supervisor.computeLagStats();
if (lagStats == null) {
lagMetricsQueue.offer(0L);
} else {
long totalLags = lagStats.getTotalLag();
lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L);
}
log.debug("Current lags [%s] for dataSource [%s].", new ArrayList<>(lagMetricsQueue), dataSource);
} else {
log.warn("[%s] supervisor is suspended, skipping lag collection", dataSource);
}
}
catch (Exception e) {
log.error(e, "Error while collecting lags");
}
finally {
LOCK.unlock();
}
};
}
/**
* This method determines whether to do scale actions based on collected lag points.
* Current algorithm of scale is simple:
* First of all, compute the proportion of lag points higher/lower than scaleOutThreshold/scaleInThreshold, getting scaleOutThreshold/scaleInThreshold.
* Secondly, compare scaleOutThreshold/scaleInThreshold with triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold. P.S. Scale out action has higher priority than scale in action.
* Finaly, if scaleOutThreshold/scaleInThreshold is higher than triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold, scale out/in action would be triggered.
*
* @param lags the lag metrics of Stream(Kafka/Kinesis)
* @return Integer. target number of tasksCount, -1 means skip scale action.
*/
private int computeDesiredTaskCount(List<Long> lags)
{
// if supervisor is not suspended, ensure required tasks are running
// if suspended, ensure tasks have been requested to gracefully stop
log.debug("Computing desired task count for [%s], based on following lags : [%s]", dataSource, lags);
int beyond = 0;
int within = 0;
int metricsCount = lags.size();
for (Long lag : lags) {
if (lag >= lagBasedAutoScalerConfig.getScaleOutThreshold()) {
beyond++;
}
if (lag <= lagBasedAutoScalerConfig.getScaleInThreshold()) {
within++;
}
}
double beyondProportion = beyond * 1.0 / metricsCount;
double withinProportion = within * 1.0 / metricsCount;
log.debug("Calculated beyondProportion is [%s] and withinProportion is [%s] for dataSource [%s].", beyondProportion,
withinProportion, dataSource
);
int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount();
int desiredActiveTaskCount;
if (beyondProportion >= lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) {
// Do Scale out
int taskCount = currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep();
int partitionCount = supervisor.getPartitionCount();
if (partitionCount <= 0) {
log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource);
return -1;
}
int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount);
if (currentActiveTaskCount == actualTaskCountMax) {
log.warn("CurrentActiveTaskCount reached task count Max limit, skipping scale out action for dataSource [%s].",
dataSource
);
return -1;
} else {
desiredActiveTaskCount = Math.min(taskCount, actualTaskCountMax);
}
return desiredActiveTaskCount;
}
if (withinProportion >= lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) {
// Do Scale in
int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep();
if (currentActiveTaskCount == lagBasedAutoScalerConfig.getTaskCountMin()) {
log.warn("CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource [%s].",
dataSource
);
return -1;
} else {
desiredActiveTaskCount = Math.max(taskCount, lagBasedAutoScalerConfig.getTaskCountMin());
}
return desiredActiveTaskCount;
}
return -1;
}
public LagBasedAutoScalerConfig getAutoScalerConfig()
{
return lagBasedAutoScalerConfig;
}
}

View File

@ -0,0 +1,208 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import javax.annotation.Nullable;
public class LagBasedAutoScalerConfig implements AutoScalerConfig
{
private final long lagCollectionIntervalMillis;
private final long lagCollectionRangeMillis;
private final long scaleActionStartDelayMillis;
private final long scaleActionPeriodMillis;
private final long scaleOutThreshold;
private final long scaleInThreshold;
private final double triggerScaleOutFractionThreshold;
private final double triggerScaleInFractionThreshold;
private int taskCountMax;
private int taskCountMin;
private final int scaleInStep;
private final int scaleOutStep;
private final boolean enableTaskAutoScaler;
private final long minTriggerScaleActionFrequencyMillis;
@JsonCreator
public LagBasedAutoScalerConfig(
@Nullable @JsonProperty("lagCollectionIntervalMillis") Long lagCollectionIntervalMillis,
@Nullable @JsonProperty("lagCollectionRangeMillis") Long lagCollectionRangeMillis,
@Nullable @JsonProperty("scaleActionStartDelayMillis") Long scaleActionStartDelayMillis,
@Nullable @JsonProperty("scaleActionPeriodMillis") Long scaleActionPeriodMillis,
@Nullable @JsonProperty("scaleOutThreshold") Long scaleOutThreshold,
@Nullable @JsonProperty("scaleInThreshold") Long scaleInThreshold,
@Nullable @JsonProperty("triggerScaleOutFractionThreshold") Double triggerScaleOutFractionThreshold,
@Nullable @JsonProperty("triggerScaleInFractionThreshold") Double triggerScaleInFractionThreshold,
@JsonProperty("taskCountMax") Integer taskCountMax,
@JsonProperty("taskCountMin") Integer taskCountMin,
@Nullable @JsonProperty("scaleInStep") Integer scaleInStep,
@Nullable @JsonProperty("scaleOutStep") Integer scaleOutStep,
@Nullable @JsonProperty("enableTaskAutoScaler") Boolean enableTaskAutoScaler,
@Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long minTriggerScaleActionFrequencyMillis
)
{
this.enableTaskAutoScaler = enableTaskAutoScaler != null ? enableTaskAutoScaler : false;
this.lagCollectionIntervalMillis = lagCollectionIntervalMillis != null ? lagCollectionIntervalMillis : 30000;
this.lagCollectionRangeMillis = lagCollectionRangeMillis != null ? lagCollectionRangeMillis : 600000;
this.scaleActionStartDelayMillis = scaleActionStartDelayMillis != null ? scaleActionStartDelayMillis : 300000;
this.scaleActionPeriodMillis = scaleActionPeriodMillis != null ? scaleActionPeriodMillis : 60000;
this.scaleOutThreshold = scaleOutThreshold != null ? scaleOutThreshold : 6000000;
this.scaleInThreshold = scaleInThreshold != null ? scaleInThreshold : 1000000;
this.triggerScaleOutFractionThreshold = triggerScaleOutFractionThreshold != null ? triggerScaleOutFractionThreshold : 0.3;
this.triggerScaleInFractionThreshold = triggerScaleInFractionThreshold != null ? triggerScaleInFractionThreshold : 0.9;
// Only do taskCountMax and taskCountMin check when autoscaler is enabled. So that users left autoConfig empty{} will not throw any exception and autoscaler is disabled.
// If autoscaler is disabled, no matter what configs are set, they are not used.
if (this.enableTaskAutoScaler) {
if (taskCountMax == null || taskCountMin == null) {
throw new RuntimeException("taskCountMax or taskCountMin can't be null!");
} else if (taskCountMax < taskCountMin) {
throw new RuntimeException("taskCountMax can't lower than taskCountMin!");
}
this.taskCountMax = taskCountMax;
this.taskCountMin = taskCountMin;
}
this.scaleInStep = scaleInStep != null ? scaleInStep : 1;
this.scaleOutStep = scaleOutStep != null ? scaleOutStep : 2;
this.minTriggerScaleActionFrequencyMillis = minTriggerScaleActionFrequencyMillis
!= null ? minTriggerScaleActionFrequencyMillis : 600000;
}
@JsonProperty
public long getLagCollectionIntervalMillis()
{
return lagCollectionIntervalMillis;
}
@JsonProperty
public long getLagCollectionRangeMillis()
{
return lagCollectionRangeMillis;
}
@JsonProperty
public long getScaleActionStartDelayMillis()
{
return scaleActionStartDelayMillis;
}
@JsonProperty
public long getScaleActionPeriodMillis()
{
return scaleActionPeriodMillis;
}
@JsonProperty
public long getScaleOutThreshold()
{
return scaleOutThreshold;
}
@JsonProperty
public long getScaleInThreshold()
{
return scaleInThreshold;
}
@JsonProperty
public double getTriggerScaleOutFractionThreshold()
{
return triggerScaleOutFractionThreshold;
}
@JsonProperty
public double getTriggerScaleInFractionThreshold()
{
return triggerScaleInFractionThreshold;
}
@Override
@JsonProperty
public int getTaskCountMax()
{
return taskCountMax;
}
@Override
@JsonProperty
public int getTaskCountMin()
{
return taskCountMin;
}
@Override
public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec)
{
return new LagBasedAutoScaler((SeekableStreamSupervisor) supervisor, spec.getId(), this, spec);
}
@JsonProperty
public int getScaleInStep()
{
return scaleInStep;
}
@JsonProperty
public int getScaleOutStep()
{
return scaleOutStep;
}
@Override
@JsonProperty
public boolean getEnableTaskAutoScaler()
{
return enableTaskAutoScaler;
}
@Override
@JsonProperty
public long getMinTriggerScaleActionFrequencyMillis()
{
return minTriggerScaleActionFrequencyMillis;
}
@Override
public String toString()
{
return "autoScalerConfig{" +
"enableTaskAutoScaler=" + enableTaskAutoScaler +
", taskCountMax=" + taskCountMax +
", taskCountMin=" + taskCountMin +
", minTriggerScaleActionFrequencyMillis=" + minTriggerScaleActionFrequencyMillis +
", lagCollectionIntervalMillis=" + lagCollectionIntervalMillis +
", lagCollectionIntervalMillis=" + lagCollectionIntervalMillis +
", scaleOutThreshold=" + scaleOutThreshold +
", triggerScaleOutFractionThreshold=" + triggerScaleOutFractionThreshold +
", scaleInThreshold=" + scaleInThreshold +
", triggerScaleInFractionThreshold=" + triggerScaleInFractionThreshold +
", scaleActionStartDelayMillis=" + scaleActionStartDelayMillis +
", scaleActionPeriodMillis=" + scaleActionPeriodMillis +
", scaleInStep=" + scaleInStep +
", scaleOutStep=" + scaleOutStep +
'}';
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
public class NoopTaskAutoScaler implements SupervisorTaskAutoScaler
{
public NoopTaskAutoScaler()
{
}
@Override
public void start()
{
//Do nothing
}
@Override
public void stop()
{
//Do nothing
}
@Override
public void reset()
{
//Do nothing
}
}

View File

@ -32,6 +32,8 @@ import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorResource;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler;
import org.apache.druid.indexing.worker.http.WorkerResource;
import org.apache.druid.server.http.security.ResourceFilterTestHelper;
import org.apache.druid.server.security.AuthorizerMapper;
@ -126,6 +128,12 @@ public class OverlordSecurityResourceFilterTest extends ResourceFilterTestHelper
return null;
}
@Override
public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
{
return new NoopTaskAutoScaler();
}
@Override
public List<String> getDataSources()
{

View File

@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.server.security.Access;
@ -1155,6 +1156,12 @@ public class SupervisorResourceTest extends EasyMockSupport
return supervisor;
}
@Override
public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
{
return null;
}
@Override
public List<String> getDataSources()
{

View File

@ -0,0 +1,950 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.seekablestream;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionSchema;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexing.common.task.Task;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber;
import org.apache.druid.indexing.seekablestream.common.RecordSupplier;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScaler;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.File;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ScheduledExecutorService;
public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
{
private SeekableStreamSupervisorIngestionSpec ingestionSchema;
private TaskStorage taskStorage;
private TaskMaster taskMaster;
private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator;
private ServiceEmitter emitter;
private RowIngestionMetersFactory rowIngestionMetersFactory;
private DataSchema dataSchema;
private SeekableStreamSupervisorTuningConfig seekableStreamSupervisorTuningConfig;
private SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig;
private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper();
private SeekableStreamIndexTaskClientFactory taskClientFactory;
private static final String STREAM = "stream";
private static final String DATASOURCE = "testDS";
private SeekableStreamSupervisorSpec spec;
private SupervisorStateManagerConfig supervisorConfig;
private SeekableStreamSupervisor supervisor4;
private SeekableStreamIndexTaskClientFactory indexTaskClientFactory;
private ObjectMapper mapper;
private DruidMonitorSchedulerConfig monitorSchedulerConfig;
private SupervisorStateManagerConfig supervisorStateManagerConfig;
@Before
public void setUp()
{
ingestionSchema = EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class);
taskStorage = EasyMock.mock(TaskStorage.class);
taskMaster = EasyMock.mock(TaskMaster.class);
indexerMetadataStorageCoordinator = EasyMock.mock(IndexerMetadataStorageCoordinator.class);
emitter = EasyMock.mock(ServiceEmitter.class);
rowIngestionMetersFactory = EasyMock.mock(RowIngestionMetersFactory.class);
dataSchema = EasyMock.mock(DataSchema.class);
seekableStreamSupervisorTuningConfig = EasyMock.mock(SeekableStreamSupervisorTuningConfig.class);
seekableStreamSupervisorIOConfig = EasyMock.mock(SeekableStreamSupervisorIOConfig.class);
taskClientFactory = EasyMock.mock(SeekableStreamIndexTaskClientFactory.class);
spec = EasyMock.mock(SeekableStreamSupervisorSpec.class);
supervisorConfig = new SupervisorStateManagerConfig();
indexTaskClientFactory = EasyMock.mock(SeekableStreamIndexTaskClientFactory.class);
mapper = new DefaultObjectMapper();
monitorSchedulerConfig = EasyMock.mock(DruidMonitorSchedulerConfig.class);
supervisorStateManagerConfig = EasyMock.mock(SupervisorStateManagerConfig.class);
supervisor4 = EasyMock.mock(SeekableStreamSupervisor.class);
}
private abstract class BaseTestSeekableStreamSupervisor extends SeekableStreamSupervisor<String, String, ByteEntity>
{
private BaseTestSeekableStreamSupervisor()
{
super(
"testSupervisorId",
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
taskClientFactory,
OBJECT_MAPPER,
spec,
rowIngestionMetersFactory,
false
);
}
@Override
protected String baseTaskName()
{
return "test";
}
@Override
protected void updatePartitionLagFromStream()
{
// do nothing
}
@Nullable
@Override
protected Map<String, Long> getPartitionRecordLag()
{
return null;
}
@Nullable
@Override
protected Map<String, Long> getPartitionTimeLag()
{
return null;
}
@Override
protected SeekableStreamIndexTaskIOConfig createTaskIoConfig(
int groupId,
Map<String, String> startPartitions,
Map<String, String> endPartitions,
String baseSequenceName,
DateTime minimumMessageTime,
DateTime maximumMessageTime,
Set<String> exclusiveStartSequenceNumberPartitions,
SeekableStreamSupervisorIOConfig ioConfig
)
{
return new SeekableStreamIndexTaskIOConfig<String, String>(
groupId,
baseSequenceName,
new SeekableStreamStartSequenceNumbers<>(STREAM, startPartitions, exclusiveStartSequenceNumberPartitions),
new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions),
true,
minimumMessageTime,
maximumMessageTime,
ioConfig.getInputFormat()
)
{
};
}
@Override
protected List<SeekableStreamIndexTask<String, String, ByteEntity>> createIndexTasks(
int replicas,
String baseSequenceName,
ObjectMapper sortingMapper,
TreeMap<Integer, Map<String, String>> sequenceOffsets,
SeekableStreamIndexTaskIOConfig taskIoConfig,
SeekableStreamIndexTaskTuningConfig taskTuningConfig,
RowIngestionMetersFactory rowIngestionMetersFactory
)
{
return null;
}
@Override
protected int getTaskGroupIdForPartition(String partition)
{
return 0;
}
@Override
protected boolean checkSourceMetadataMatch(DataSourceMetadata metadata)
{
return true;
}
@Override
protected boolean doesTaskTypeMatchSupervisor(Task task)
{
return true;
}
@Override
protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetaDataForReset(
String stream,
Map<String, String> map
)
{
return null;
}
@Override
protected OrderedSequenceNumber<String> makeSequenceNumber(String seq, boolean isExclusive)
{
return new OrderedSequenceNumber<String>(seq, isExclusive)
{
@Override
public int compareTo(OrderedSequenceNumber<String> o)
{
return new BigInteger(this.get()).compareTo(new BigInteger(o.get()));
}
};
}
@Override
protected Map<String, Long> getRecordLagPerPartition(Map<String, String> currentOffsets)
{
return null;
}
@Override
protected Map<String, Long> getTimeLagPerPartition(Map<String, String> currentOffsets)
{
return null;
}
@Override
protected RecordSupplier<String, String, ByteEntity> setupRecordSupplier()
{
return recordSupplier;
}
@Override
protected SeekableStreamSupervisorReportPayload<String, String> createReportPayload(
int numPartitions,
boolean includeOffsets
)
{
return new SeekableStreamSupervisorReportPayload<String, String>(
DATASOURCE,
STREAM,
1,
1,
1L,
null,
null,
null,
null,
null,
null,
false,
true,
null,
null,
null
)
{
};
}
@Override
protected String getNotSetMarker()
{
return "NOT_SET";
}
@Override
protected String getEndOfPartitionMarker()
{
return "EOF";
}
@Override
protected boolean isEndOfShard(String seqNum)
{
return false;
}
@Override
protected boolean isShardExpirationMarker(String seqNum)
{
return false;
}
@Override
protected boolean useExclusiveStartSequenceNumberForNonFirstSequence()
{
return false;
}
}
private class TestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor
{
private int partitionNumbers;
public TestSeekableStreamSupervisor(int partitionNumbers)
{
this.partitionNumbers = partitionNumbers;
}
@Override
protected void scheduleReporting(ScheduledExecutorService reportingExec)
{
// do nothing
}
@Override
public LagStats computeLagStats()
{
return new LagStats(0, 0, 0);
}
@Override
public int getPartitionCount()
{
return partitionNumbers;
}
}
private static class TestSeekableStreamSupervisorSpec extends SeekableStreamSupervisorSpec
{
private SeekableStreamSupervisor supervisor;
private String id;
public TestSeekableStreamSupervisorSpec(SeekableStreamSupervisorIngestionSpec ingestionSchema,
@Nullable Map<String, Object> context,
Boolean suspended,
TaskStorage taskStorage,
TaskMaster taskMaster,
IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator,
SeekableStreamIndexTaskClientFactory indexTaskClientFactory,
ObjectMapper mapper,
ServiceEmitter emitter,
DruidMonitorSchedulerConfig monitorSchedulerConfig,
RowIngestionMetersFactory rowIngestionMetersFactory,
SupervisorStateManagerConfig supervisorStateManagerConfig,
SeekableStreamSupervisor supervisor,
String id)
{
super(
ingestionSchema,
context,
suspended,
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
indexTaskClientFactory,
mapper,
emitter,
monitorSchedulerConfig,
rowIngestionMetersFactory,
supervisorStateManagerConfig);
this.supervisor = supervisor;
this.id = id;
}
@Override
public List<String> getDataSources()
{
return new ArrayList<>();
}
@Override
public String getId()
{
return id;
}
@Override
public Supervisor createSupervisor()
{
return supervisor;
}
@Override
public String getType()
{
return null;
}
@Override
public String getSource()
{
return null;
}
@Override
protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend)
{
return null;
}
}
private static SeekableStreamSupervisorTuningConfig getTuningConfig()
{
return new SeekableStreamSupervisorTuningConfig()
{
@Override
public Integer getWorkerThreads()
{
return 1;
}
@Override
public Integer getChatThreads()
{
return 1;
}
@Override
public Long getChatRetries()
{
return 1L;
}
@Override
public Duration getHttpTimeout()
{
return new Period("PT1M").toStandardDuration();
}
@Override
public Duration getShutdownTimeout()
{
return new Period("PT1S").toStandardDuration();
}
@Override
public Duration getRepartitionTransitionDuration()
{
return new Period("PT2M").toStandardDuration();
}
@Override
public Duration getOffsetFetchPeriod()
{
return new Period("PT5M").toStandardDuration();
}
@Override
public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig()
{
return new SeekableStreamIndexTaskTuningConfig(
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null
)
{
@Override
public SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir)
{
return null;
}
@Override
public String toString()
{
return null;
}
};
}
};
}
@Test
public void testAutoScalerConfig()
{
AutoScalerConfig autoScalerConfigEmpty = mapper.convertValue(new HashMap<>(), AutoScalerConfig.class);
Assert.assertTrue(autoScalerConfigEmpty instanceof LagBasedAutoScalerConfig);
Assert.assertFalse(autoScalerConfigEmpty.getEnableTaskAutoScaler());
AutoScalerConfig autoScalerConfigNull = mapper.convertValue(null, AutoScalerConfig.class);
Assert.assertNull(autoScalerConfigNull);
AutoScalerConfig autoScalerConfigDefault = mapper.convertValue(ImmutableMap.of("autoScalerStrategy", "lagBased"), AutoScalerConfig.class);
Assert.assertTrue(autoScalerConfigDefault instanceof LagBasedAutoScalerConfig);
AutoScalerConfig autoScalerConfigValue = mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis", "1"), AutoScalerConfig.class);
Assert.assertTrue(autoScalerConfigValue instanceof LagBasedAutoScalerConfig);
LagBasedAutoScalerConfig lagBasedAutoScalerConfig = (LagBasedAutoScalerConfig) autoScalerConfigValue;
Assert.assertEquals(lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), 1);
Exception e = null;
try {
AutoScalerConfig autoScalerError = mapper.convertValue(ImmutableMap.of("enableTaskAutoScaler", "true", "taskCountMax", "1", "taskCountMin", "4"), AutoScalerConfig.class);
}
catch (RuntimeException ex) {
e = ex;
}
Assert.assertNotNull(e);
e = null;
try {
// taskCountMax and taskCountMin couldn't be ignored.
AutoScalerConfig autoScalerError2 = mapper.convertValue(ImmutableMap.of("enableTaskAutoScaler", "true"), AutoScalerConfig.class);
}
catch (RuntimeException ex) {
e = ex;
}
Assert.assertNotNull(e);
}
@Test
public void testAutoScalerCreated()
{
HashMap<String, Object> autoScalerConfig = new HashMap<>();
autoScalerConfig.put("enableTaskAutoScaler", true);
autoScalerConfig.put("lagCollectionIntervalMillis", 500);
autoScalerConfig.put("lagCollectionRangeMillis", 500);
autoScalerConfig.put("scaleOutThreshold", 5000000);
autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.3);
autoScalerConfig.put("scaleInThreshold", 1000000);
autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
autoScalerConfig.put("scaleActionStartDelayMillis", 0);
autoScalerConfig.put("scaleActionPeriodMillis", 100);
autoScalerConfig.put("taskCountMax", 8);
autoScalerConfig.put("taskCountMin", 1);
autoScalerConfig.put("scaleInStep", 1);
autoScalerConfig.put("scaleOutStep", 2);
autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
EasyMock.replay(ingestionSchema);
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes();
EasyMock.replay(seekableStreamSupervisorIOConfig);
EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
EasyMock.replay(supervisor4);
TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(ingestionSchema,
null,
false,
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
indexTaskClientFactory,
mapper,
emitter,
monitorSchedulerConfig,
rowIngestionMetersFactory,
supervisorStateManagerConfig,
supervisor4,
"id1");
SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor4);
Assert.assertTrue(autoscaler instanceof LagBasedAutoScaler);
EasyMock.reset(seekableStreamSupervisorIOConfig);
autoScalerConfig.put("enableTaskAutoScaler", false);
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes();
EasyMock.replay(seekableStreamSupervisorIOConfig);
SupervisorTaskAutoScaler autoscaler2 = spec.createAutoscaler(supervisor4);
Assert.assertTrue(autoscaler2 instanceof NoopTaskAutoScaler);
EasyMock.reset(seekableStreamSupervisorIOConfig);
autoScalerConfig.remove("enableTaskAutoScaler");
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes();
EasyMock.replay(seekableStreamSupervisorIOConfig);
SupervisorTaskAutoScaler autoscaler3 = spec.createAutoscaler(supervisor4);
Assert.assertTrue(autoscaler3 instanceof NoopTaskAutoScaler);
EasyMock.reset(seekableStreamSupervisorIOConfig);
autoScalerConfig.clear();
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes();
EasyMock.replay(seekableStreamSupervisorIOConfig);
Assert.assertTrue(autoScalerConfig.isEmpty());
SupervisorTaskAutoScaler autoscaler4 = spec.createAutoscaler(supervisor4);
Assert.assertTrue(autoscaler4 instanceof NoopTaskAutoScaler);
}
@Test
public void testDefaultAutoScalerConfigCreatedWithDefault()
{
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
EasyMock.replay(ingestionSchema);
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis", "1", "enableTaskAutoScaler", true, "taskCountMax", "4", "taskCountMin", "1"), AutoScalerConfig.class)).anyTimes();
EasyMock.replay(seekableStreamSupervisorIOConfig);
EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
EasyMock.replay(supervisor4);
TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(ingestionSchema,
null,
false,
taskStorage,
taskMaster,
indexerMetadataStorageCoordinator,
indexTaskClientFactory,
mapper,
emitter,
monitorSchedulerConfig,
rowIngestionMetersFactory,
supervisorStateManagerConfig,
supervisor4,
"id1");
SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor4);
Assert.assertTrue(autoscaler instanceof LagBasedAutoScaler);
LagBasedAutoScaler lagBasedAutoScaler = (LagBasedAutoScaler) autoscaler;
LagBasedAutoScalerConfig lagBasedAutoScalerConfig = lagBasedAutoScaler.getAutoScalerConfig();
Assert.assertEquals(lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), 1);
Assert.assertEquals(lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), 600000);
Assert.assertEquals(lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), 300000);
Assert.assertEquals(lagBasedAutoScalerConfig.getScaleActionPeriodMillis(), 60000);
Assert.assertEquals(lagBasedAutoScalerConfig.getScaleOutThreshold(), 6000000);
Assert.assertEquals(lagBasedAutoScalerConfig.getScaleInThreshold(), 1000000);
Assert.assertEquals(lagBasedAutoScalerConfig.getTaskCountMax(), 4);
Assert.assertEquals(lagBasedAutoScalerConfig.getTaskCountMin(), 1);
Assert.assertEquals(lagBasedAutoScalerConfig.getScaleInStep(), 1);
Assert.assertEquals(lagBasedAutoScalerConfig.getScaleOutStep(), 2);
Assert.assertEquals(lagBasedAutoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), 600000);
}
@Test
public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedException
{
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.replay(spec);
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
EasyMock.replay(ingestionSchema);
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
EasyMock.replay(taskMaster);
TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3);
LagStats lagStats = supervisor.computeLagStats();
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor, DATASOURCE, mapper.convertValue(getScaleOutProperties(2), LagBasedAutoScalerConfig.class), spec);
supervisor.start();
autoScaler.start();
supervisor.runInternal();
int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(1, taskCountBeforeScaleOut);
Thread.sleep(1 * 1000);
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountAfterScaleOut);
autoScaler.reset();
autoScaler.stop();
}
@Test
public void testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() throws InterruptedException
{
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.replay(spec);
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
EasyMock.replay(ingestionSchema);
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
EasyMock.replay(taskMaster);
TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(2);
LagStats lagStats = supervisor.computeLagStats();
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor, DATASOURCE, mapper.convertValue(getScaleOutProperties(3), LagBasedAutoScalerConfig.class), spec);
supervisor.start();
autoScaler.start();
supervisor.runInternal();
int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(1, taskCountBeforeScaleOut);
Thread.sleep(1 * 1000);
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountAfterScaleOut);
autoScaler.reset();
autoScaler.stop();
}
@Test
public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedException
{
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2, false)).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.replay(spec);
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
EasyMock.replay(ingestionSchema);
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
EasyMock.replay(taskMaster);
TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3);
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor, DATASOURCE, mapper.convertValue(getScaleInProperties(), LagBasedAutoScalerConfig.class), spec);
// enable autoscaler so that taskcount config will be ignored and init value of taskCount will use taskCountMin.
Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount());
supervisor.getIoConfig().setTaskCount(2);
supervisor.start();
autoScaler.start();
supervisor.runInternal();
int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountBeforeScaleOut);
Thread.sleep(1 * 1000);
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(1, taskCountAfterScaleOut);
autoScaler.reset();
autoScaler.stop();
}
@Test
public void testSeekableStreamSupervisorSpecWithScaleDisable() throws InterruptedException
{
SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig = new SeekableStreamSupervisorIOConfig(
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
1,
1,
new Period("PT1H"),
new Period("P1D"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null, null, null
) {};
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
EasyMock.expect(spec.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.replay(spec);
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(this.seekableStreamSupervisorIOConfig).anyTimes();
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
EasyMock.replay(ingestionSchema);
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
EasyMock.replay(taskMaster);
TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3);
NoopTaskAutoScaler autoScaler = new NoopTaskAutoScaler();
supervisor.start();
autoScaler.start();
supervisor.runInternal();
int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(1, taskCountBeforeScaleOut);
Thread.sleep(1 * 1000);
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(1, taskCountAfterScaleOut);
autoScaler.reset();
autoScaler.stop();
}
private static DataSchema getDataSchema()
{
List<DimensionSchema> dimensions = new ArrayList<>();
dimensions.add(StringDimensionSchema.create("dim1"));
dimensions.add(StringDimensionSchema.create("dim2"));
return new DataSchema(
DATASOURCE,
new TimestampSpec("timestamp", "iso", null),
new DimensionsSpec(
dimensions,
null,
null
),
new AggregatorFactory[]{new CountAggregatorFactory("rows")},
new UniformGranularitySpec(
Granularities.HOUR,
Granularities.NONE,
ImmutableList.of()
),
null
);
}
private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scaleOut)
{
if (scaleOut) {
return new SeekableStreamSupervisorIOConfig(
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
1,
taskCount,
new Period("PT1H"),
new Period("P1D"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null, mapper.convertValue(getScaleOutProperties(2), AutoScalerConfig.class), null
) {};
} else {
return new SeekableStreamSupervisorIOConfig(
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false),
1,
taskCount,
new Period("PT1H"),
new Period("P1D"),
new Period("PT30S"),
false,
new Period("PT30M"),
null,
null, mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class), null
) {};
}
}
private static Map<String, Object> getScaleOutProperties(int maxTaskCount)
{
HashMap<String, Object> autoScalerConfig = new HashMap<>();
autoScalerConfig.put("enableTaskAutoScaler", true);
autoScalerConfig.put("lagCollectionIntervalMillis", 500);
autoScalerConfig.put("lagCollectionRangeMillis", 500);
autoScalerConfig.put("scaleOutThreshold", 0);
autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.0);
autoScalerConfig.put("scaleInThreshold", 1000000);
autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
autoScalerConfig.put("scaleActionStartDelayMillis", 0);
autoScalerConfig.put("scaleActionPeriodMillis", 100);
autoScalerConfig.put("taskCountMax", maxTaskCount);
autoScalerConfig.put("taskCountMin", 1);
autoScalerConfig.put("scaleInStep", 1);
autoScalerConfig.put("scaleOutStep", 2);
autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
return autoScalerConfig;
}
private static Map<String, Object> getScaleInProperties()
{
HashMap<String, Object> autoScalerConfig = new HashMap<>();
autoScalerConfig.put("enableTaskAutoScaler", true);
autoScalerConfig.put("lagCollectionIntervalMillis", 500);
autoScalerConfig.put("lagCollectionRangeMillis", 500);
autoScalerConfig.put("scaleOutThreshold", 8000000);
autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.3);
autoScalerConfig.put("scaleInThreshold", 0);
autoScalerConfig.put("triggerScaleInFractionThreshold", 0.0);
autoScalerConfig.put("scaleActionStartDelayMillis", 0);
autoScalerConfig.put("scaleActionPeriodMillis", 100);
autoScalerConfig.put("taskCountMax", 2);
autoScalerConfig.put("taskCountMin", 1);
autoScalerConfig.put("scaleInStep", 1);
autoScalerConfig.put("scaleOutStep", 2);
autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
return autoScalerConfig;
}
}

View File

@ -43,6 +43,7 @@ import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager.BasicState;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask;
@ -58,6 +59,7 @@ import org.apache.druid.indexing.seekablestream.common.StreamException;
import org.apache.druid.indexing.seekablestream.common.StreamPartition;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamState;
import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
@ -82,9 +84,11 @@ import org.junit.Before;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.File;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -765,6 +769,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
new Period("PT30M"),
null,
null,
null,
null
)
{
@ -825,12 +830,32 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
false,
new Period("PT30M"),
null,
null, null
null, OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class), null
)
{
};
}
private static Map<String, Object> getProperties()
{
HashMap<String, Object> autoScalerConfig = new HashMap<>();
autoScalerConfig.put("enableTaskAutoScaler", true);
autoScalerConfig.put("lagCollectionIntervalMillis", 500);
autoScalerConfig.put("lagCollectionRangeMillis", 500);
autoScalerConfig.put("scaleOutThreshold", 5000000);
autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.3);
autoScalerConfig.put("scaleInThreshold", 1000000);
autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8);
autoScalerConfig.put("scaleActionStartDelayMillis", 0);
autoScalerConfig.put("scaleActionPeriodMillis", 100);
autoScalerConfig.put("taskCountMax", 8);
autoScalerConfig.put("taskCountMin", 1);
autoScalerConfig.put("scaleInStep", 1);
autoScalerConfig.put("scaleOutStep", 2);
autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000);
return autoScalerConfig;
}
private static SeekableStreamSupervisorTuningConfig getTuningConfig()
{
return new SeekableStreamSupervisorTuningConfig()
@ -1177,6 +1202,12 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
{
// do nothing
}
@Override
public LagStats computeLagStats()
{
return null;
}
}
private class TestEmittingTestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor
@ -1219,6 +1250,12 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
}
}
@Override
public LagStats computeLagStats()
{
return null;
}
@Override
protected void scheduleReporting(ScheduledExecutorService reportingExec)
{

View File

@ -74,10 +74,14 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
private static final String QUERIES_FILE = "/stream/queries/stream_index_queries.json";
private static final String SUPERVISOR_SPEC_TEMPLATE_FILE = "supervisor_spec_template.json";
private static final String SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE = "supervisor_with_autoscaler_spec_template.json";
protected static final String DATA_RESOURCE_ROOT = "/stream/data";
protected static final String SUPERVISOR_SPEC_TEMPLATE_PATH =
String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_SPEC_TEMPLATE_FILE);
protected static final String SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_PATH =
String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE);
protected static final String SERIALIZER_SPEC_DIR = "serializer";
protected static final String INPUT_FORMAT_SPEC_DIR = "input_format";
protected static final String INPUT_ROW_PARSER_SPEC_DIR = "parser";
@ -294,6 +298,70 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest
}
}
protected void doTestIndexDataWithAutoscaler(@Nullable Boolean transactionEnabled) throws Exception
{
final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig(
INPUT_FORMAT,
getResourceAsString(JSON_INPUT_FORMAT_PATH)
);
try (
final Closeable closer = createResourceCloser(generatedTestConfig);
final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled)
) {
final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform()
.apply(getResourceAsString(SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_PATH));
LOG.info("supervisorSpec: [%s]\n", taskSpec);
// Start supervisor
generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec));
LOG.info("Submitted supervisor");
// Start generating half of the data
int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND;
int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2;
secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound;
final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator(
new JsonEventSerializer(jsonMapper),
EVENTS_PER_SECOND,
CYCLE_PADDING_MS
);
long numWritten = streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateFirstRound,
FIRST_EVENT_TIME
);
// Verify supervisor is healthy before suspension
ITRetryUtil.retryUntil(
() -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())),
true,
10000,
30,
"Waiting for supervisor to be healthy"
);
// wait for autoScaling task numbers from 1 to 2.
ITRetryUtil.retryUntil(
() -> indexer.getRunningTasks().size() == 2,
true,
10000,
50,
"waiting for autoScaling task numbers from 1 to 2"
);
// Start generating remainning half of the data
numWritten += streamGenerator.run(
generatedTestConfig.getStreamName(),
streamEventWriter,
secondsToGenerateRemaining,
FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound)
);
// Verify that supervisor can catch up with the stream
verifyIngestedData(generatedTestConfig, numWritten);
}
}
protected void doTestIndexDataWithStreamReshardSplit(@Nullable Boolean transactionEnabled) throws Exception
{
// Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT * 2

View File

@ -52,6 +52,16 @@ public class ITKafkaIndexingServiceNonTransactionalParallelizedTest extends Abst
doTestIndexDataWithStartStopSupervisor(false);
}
/**
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
* and supervisor maintained and scoped within this test only
*/
@Test
public void testKafkaIndexDataWithWithAutoscaler() throws Exception
{
doTestIndexDataWithAutoscaler(false);
}
/**
* This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource
* and supervisor maintained and scoped within this test only

View File

@ -0,0 +1,73 @@
{
"type": "%%STREAM_TYPE%%",
"dataSchema": {
"dataSource": "%%DATASOURCE%%",
"parser": %%PARSER%%,
"timestampSpec": {
"column": "timestamp",
"format": "auto"
},
"dimensionsSpec": {
"dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"],
"dimensionExclusions": [],
"spatialDimensions": []
},
"metricsSpec": [
{
"type": "count",
"name": "count"
},
{
"type": "doubleSum",
"name": "added",
"fieldName": "added"
},
{
"type": "doubleSum",
"name": "deleted",
"fieldName": "deleted"
},
{
"type": "doubleSum",
"name": "delta",
"fieldName": "delta"
}
],
"granularitySpec": {
"type": "uniform",
"segmentGranularity": "MINUTE",
"queryGranularity": "NONE"
}
},
"tuningConfig": {
"type": "%%STREAM_TYPE%%",
"intermediatePersistPeriod": "PT30S",
"maxRowsPerSegment": 5000000,
"maxRowsInMemory": 500000
},
"ioConfig": {
"%%TOPIC_KEY%%": "%%TOPIC_VALUE%%",
"%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%,
"autoScalerConfig": {
"enableTaskAutoScaler": true,
"lagCollectionIntervalMillis": 500,
"lagCollectionRangeMillis": 500,
"scaleOutThreshold": 0,
"triggerScaleOutFractionThreshold": 0.0,
"scaleInThreshold": 1000000,
"triggerScaleInFractionThreshold": 0.9,
"scaleActionStartDelayMillis": 0,
"scaleActionPeriodMillis": 100,
"taskCountMax": 2,
"taskCountMin": 1,
"scaleInStep": 1,
"scaleOutStep": 2,
"minTriggerScaleActionFrequencyMillis": 600000
},
"taskCount": 1,
"replicas": 1,
"taskDuration": "PT30S",
"%%USE_EARLIEST_KEY%%": true,
"inputFormat" : %%INPUT_FORMAT%%
}
}

View File

@ -957,6 +957,11 @@
<artifactId>jna</artifactId>
<version>4.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.2</version>
</dependency>
<dependency>
<groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import javax.annotation.Nullable;
import java.util.ArrayList;
@ -154,6 +155,18 @@ public class NoopSupervisorSpec implements SupervisorSpec
{
}
@Override
public LagStats computeLagStats()
{
return new LagStats(0, 0, 0);
}
@Override
public int getActiveTaskGroupsCount()
{
return -1;
}
};
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.indexing.overlord.supervisor;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import javax.annotation.Nullable;
import java.util.Map;
@ -64,4 +65,12 @@ public interface Supervisor
* @param checkpointMetadata metadata for the sequence to currently checkpoint
*/
void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata);
/**
* Computes maxLag, totalLag and avgLag
* Only supports Kafka ingestion so far.
*/
LagStats computeLagStats();
int getActiveTaskGroupsCount();
}

View File

@ -21,6 +21,7 @@ package org.apache.druid.indexing.overlord.supervisor;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import java.util.List;
@ -40,6 +41,11 @@ public interface SupervisorSpec
*/
Supervisor createSupervisor();
default SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
{
return null;
}
List<String> getDataSources();
default SupervisorSpec createSuspendedSpec()

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.supervisor.autoscaler;
public class LagStats
{
private final long maxLag;
private final long totalLag;
private final long avgLag;
public LagStats(long maxLag, long totalLag, long avgLag)
{
this.maxLag = maxLag;
this.totalLag = totalLag;
this.avgLag = avgLag;
}
public long getMaxLag()
{
return maxLag;
}
public long getTotalLag()
{
return totalLag;
}
public long getAvgLag()
{
return avgLag;
}
}

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.supervisor.autoscaler;
public interface SupervisorTaskAutoScaler
{
void start();
void stop();
void reset();
}

View File

@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing;
import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.junit.Assert;
import org.junit.Test;
import java.util.Collections;
import java.util.concurrent.Callable;
public class NoopSupervisorSpecTest
{
@Test
public void testNoopSupervisorSpecWithAutoscaler()
{
Exception e = null;
try {
NoopSupervisorSpec noopSupervisorSpec = new NoopSupervisorSpec(null, Collections.singletonList("datasource1"));
Supervisor supervisor = noopSupervisorSpec.createSupervisor();
SupervisorTaskAutoScaler autoscaler = noopSupervisorSpec.createAutoscaler(supervisor);
Assert.assertNull(autoscaler);
Callable<Integer> noop = new Callable<Integer>() {
@Override
public Integer call()
{
return -1;
}
};
int count = supervisor.getActiveTaskGroupsCount();
Assert.assertEquals(count, -1);
LagStats lagStats = supervisor.computeLagStats();
long totalLag = lagStats.getTotalLag();
long avgLag = lagStats.getAvgLag();
long maxLag = lagStats.getMaxLag();
Assert.assertEquals(totalLag, 0);
Assert.assertEquals(avgLag, 0);
Assert.assertEquals(maxLag, 0);
}
catch (Exception ex) {
e = ex;
}
Assert.assertNull(e);
}
}

View File

@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.VersionedSupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.junit.After;
@ -185,6 +186,12 @@ public class SQLMetadataSupervisorManagerTest
throw new UnsupportedOperationException();
}
@Override
public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
{
return null;
}
@Override
public List<String> getDataSources()
{

View File

@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import java.util.List;
import java.util.Objects;
@ -52,6 +53,12 @@ public class TestSupervisorSpec implements SupervisorSpec
return null;
}
@Override
public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor)
{
return null;
}
@Override
public List<String> getDataSources()
{