diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index 7b94a99c253..0dd31677af9 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -146,6 +146,115 @@ A sample supervisor spec is shown below: |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)| |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)| |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)| +|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the number of Kafka ingest tasks. ONLY supported for Kafka indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler Properties) for details.|no (default == null)| + +### Task Autoscaler Properties + +> Note that Task AutoScaler is currently designated as experimental. + +| Property | Description | Required | +| ------------- | ------------- | ------------- | +| `enableTaskAutoScaler` | Whether enable this feature or not. Set false or ignored here will disable `autoScaler` even though `autoScalerConfig` is not null| no (default == false) | +| `taskCountMax` | Maximum value of task count. Make Sure `taskCountMax >= taskCountMin`. If `taskCountMax > {numKafkaPartitions}`, the maximum number of reading tasks would be equal to `{numKafkaPartitions}` and `taskCountMax` would be ignored. | yes | +| `taskCountMin` | Minimum value of task count. When enable autoscaler, the value of taskCount in `IOConfig` will be ignored, and `taskCountMin` will be the number of tasks that ingestion starts going up to `taskCountMax`| yes | +| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two scale actions | no (default == 600000) | +| `autoScalerStrategy` | The algorithm of `autoScaler`. ONLY `lagBased` is supported for now. See [Lag Based AutoScaler Strategy Related Properties](#Lag Based AutoScaler Strategy Related Properties) for details.| no (default == `lagBased`) | + +### Lag Based AutoScaler Strategy Related Properties +| Property | Description | Required | +| ------------- | ------------- | ------------- | +| `lagCollectionIntervalMillis` | Period of lag points collection. | no (default == 30000) | +| `lagCollectionRangeMillis` | The total time window of lag collection, Use with `lagCollectionIntervalMillis`,it means that in the recent `lagCollectionRangeMillis`, collect lag metric points every `lagCollectionIntervalMillis`. | no (default == 600000) | +| `scaleOutThreshold` | The Threshold of scale out action | no (default == 6000000) | +| `triggerScaleOutFractionThreshold` | If `triggerScaleOutFractionThreshold` percent of lag points are higher than `scaleOutThreshold`, then do scale out action. | no (default == 0.3) | +| `scaleInThreshold` | The Threshold of scale in action | no (default == 1000000) | +| `triggerScaleInFractionThreshold` | If `triggerScaleInFractionThreshold` percent of lag points are lower than `scaleOutThreshold`, then do scale in action. | no (default == 0.9) | +| `scaleActionStartDelayMillis` | Number of milliseconds after supervisor starts when first check scale logic. | no (default == 300000) | +| `scaleActionPeriodMillis` | The frequency of checking whether to do scale action in millis | no (default == 60000) | +| `scaleInStep` | How many tasks to reduce at a time | no (default == 1) | +| `scaleOutStep` | How many tasks to add at a time | no (default == 2) | + +A sample supervisor spec with `lagBased` autoScaler enabled is shown below: +```json +{ + "type": "kafka", + "dataSchema": { + "dataSource": "metrics-kafka", + "timestampSpec": { + "column": "timestamp", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": [ + + ], + "dimensionExclusions": [ + "timestamp", + "value" + ] + }, + "metricsSpec": [ + { + "name": "count", + "type": "count" + }, + { + "name": "value_sum", + "fieldName": "value", + "type": "doubleSum" + }, + { + "name": "value_min", + "fieldName": "value", + "type": "doubleMin" + }, + { + "name": "value_max", + "fieldName": "value", + "type": "doubleMax" + } + ], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "HOUR", + "queryGranularity": "NONE" + } + }, + "ioConfig": { + "topic": "metrics", + "inputFormat": { + "type": "json" + }, + "consumerProperties": { + "bootstrap.servers": "localhost:9092" + }, + "autoScalerConfig": { + "enableTaskAutoScaler": true, + "taskCountMax": 6, + "taskCountMin": 2, + "minTriggerScaleActionFrequencyMillis": 600000, + "autoScalerStrategy": "lagBased", + "lagCollectionIntervalMillis": 30000, + "lagCollectionRangeMillis": 600000, + "scaleOutThreshold": 6000000, + "triggerScaleOutFractionThreshold": 0.3, + "scaleInThreshold": 1000000, + "triggerScaleInFractionThreshold": 0.9, + "scaleActionStartDelayMillis": 300000, + "scaleActionPeriodMillis": 60000, + "scaleInStep": 1, + "scaleOutStep": 2 + }, + "taskCount":1, + "replicas":1, + "taskDuration":"PT1H" + }, + "tuningConfig":{ + "type":"kafka", + "maxRowsPerSegment":5000000 + } +} +``` #### More on consumerProperties diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index 41647d525a5..f456cb611c9 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -37,6 +37,7 @@ import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.Supervisor; import org.apache.druid.indexing.overlord.supervisor.SupervisorReport; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.JodaUtils; @@ -282,6 +283,18 @@ public class MaterializedViewSupervisor implements Supervisor // do nothing } + @Override + public LagStats computeLagStats() + { + throw new UnsupportedOperationException("Compute Lag Stats not supported in MaterializedViewSupervisor"); + } + + @Override + public int getActiveTaskGroupsCount() + { + throw new UnsupportedOperationException("Get Active Task Groups Count is not supported in MaterializedViewSupervisor"); + } + /** * Find intervals in which derived dataSource should rebuild the segments. * Choose the latest intervals to create new HadoopIndexTask and submit it. diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java index c82b8b8fae6..01ebdd0a931 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorSpecTest.java @@ -29,7 +29,9 @@ import org.apache.druid.indexer.HadoopTuningConfig; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskStorage; +import org.apache.druid.indexing.overlord.supervisor.Supervisor; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.math.expr.ExprMacroTable; import org.apache.druid.metadata.MetadataSupervisorManager; import org.apache.druid.metadata.SqlSegmentsMetadataManager; @@ -50,6 +52,7 @@ import org.junit.Test; import org.junit.rules.ExpectedException; import java.io.IOException; +import java.util.concurrent.Callable; public class MaterializedViewSupervisorSpecTest { @@ -155,6 +158,85 @@ public class MaterializedViewSupervisorSpecTest Assert.assertEquals(expected.getMetrics(), spec.getMetrics()); } + @Test + public void testMaterializedViewSupervisorSpecCreated() + { + Exception ex = null; + + try { + MaterializedViewSupervisorSpec spec = new MaterializedViewSupervisorSpec( + "wikiticker", + new DimensionsSpec( + Lists.newArrayList( + new StringDimensionSchema("isUnpatrolled"), + new StringDimensionSchema("metroCode"), + new StringDimensionSchema("namespace"), + new StringDimensionSchema("page"), + new StringDimensionSchema("regionIsoCode"), + new StringDimensionSchema("regionName"), + new StringDimensionSchema("user") + ), + null, + null + ), + new AggregatorFactory[]{ + new CountAggregatorFactory("count"), + new LongSumAggregatorFactory("added", "added") + }, + HadoopTuningConfig.makeDefaultTuningConfig(), + null, + null, + null, + null, + null, + false, + objectMapper, + null, + null, + null, + null, + null, + new MaterializedViewTaskConfig(), + EasyMock.createMock(AuthorizerMapper.class), + new NoopChatHandlerProvider(), + new SupervisorStateManagerConfig() + ); + Supervisor supervisor = spec.createSupervisor(); + Assert.assertTrue(supervisor instanceof MaterializedViewSupervisor); + + SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor); + Assert.assertNull(autoscaler); + + try { + supervisor.computeLagStats(); + } + catch (Exception e) { + Assert.assertTrue(e instanceof UnsupportedOperationException); + } + + try { + int count = supervisor.getActiveTaskGroupsCount(); + } + catch (Exception e) { + Assert.assertTrue(e instanceof UnsupportedOperationException); + } + + Callable noop = new Callable() { + @Override + public Integer call() + { + return -1; + } + }; + + } + catch (Exception e) { + ex = e; + } + + Assert.assertNull(ex); + } + @Test public void testSuspendResuume() throws IOException { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java index d592f789953..a02568a8eea 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisor.java @@ -38,6 +38,7 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskStorage; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskIOConfig; @@ -58,6 +59,7 @@ import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; import org.joda.time.DateTime; import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -330,6 +332,17 @@ public class KafkaSupervisor extends SeekableStreamSupervisor partitionRecordLag = getPartitionRecordLag(); + if (partitionRecordLag == null) { + return new LagStats(0, 0, 0); + } + + return computeLags(partitionRecordLag); + } + @Override protected void updatePartitionLagFromStream() { diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java index 62c1e790657..87b689e6e64 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorIOConfig.java @@ -24,10 +24,12 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.data.input.InputFormat; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; import org.apache.druid.java.util.common.StringUtils; import org.joda.time.DateTime; import org.joda.time.Period; +import javax.annotation.Nullable; import java.util.Map; public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig @@ -51,6 +53,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig @JsonProperty("taskCount") Integer taskCount, @JsonProperty("taskDuration") Period taskDuration, @JsonProperty("consumerProperties") Map consumerProperties, + @Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig autoScalerConfig, @JsonProperty("pollTimeout") Long pollTimeout, @JsonProperty("startDelay") Period startDelay, @JsonProperty("period") Period period, @@ -73,6 +76,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig completionTimeout, lateMessageRejectionPeriod, earlyMessageRejectionPeriod, + autoScalerConfig, lateMessageRejectionStartDateTime ); @@ -117,6 +121,7 @@ public class KafkaSupervisorIOConfig extends SeekableStreamSupervisorIOConfig ", taskCount=" + getTaskCount() + ", taskDuration=" + getTaskDuration() + ", consumerProperties=" + consumerProperties + + ", autoScalerConfig=" + getAutoscalerConfig() + ", pollTimeout=" + pollTimeout + ", startDelay=" + getStartDelay() + ", period=" + getPeriod() + diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java index c54a31f031b..dd712d9db56 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java @@ -134,6 +134,7 @@ public class KafkaSamplerSpecTest extends InitializedNullHandlingTest null, null, null, + null, true, null, null, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java index 1dbabe84481..ac6b6e50590 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java @@ -61,13 +61,16 @@ import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.SupervisorReport; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; import org.apache.druid.indexing.seekablestream.SeekableStreamStartSequenceNumbers; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager; import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -158,6 +161,7 @@ public class KafkaSupervisorTest extends EasyMockSupport private RowIngestionMetersFactory rowIngestionMetersFactory; private ExceptionCapturingServiceEmitter serviceEmitter; private SupervisorStateManagerConfig supervisorConfig; + private KafkaSupervisorIngestionSpec ingestionSchema; private static String getTopic() { @@ -214,6 +218,7 @@ public class KafkaSupervisorTest extends EasyMockSupport serviceEmitter = new ExceptionCapturingServiceEmitter(); EmittingLogger.registerEmitter(serviceEmitter); supervisorConfig = new SupervisorStateManagerConfig(); + ingestionSchema = EasyMock.createMock(KafkaSupervisorIngestionSpec.class); } @After @@ -232,6 +237,197 @@ public class KafkaSupervisorTest extends EasyMockSupport zkServer = null; } + @Test + public void testNoInitialStateWithAutoscaler() throws Exception + { + KafkaIndexTaskClientFactory taskClientFactory = new KafkaIndexTaskClientFactory( + null, + null + ) + { + @Override + public KafkaIndexTaskClient build( + TaskInfoProvider taskInfoProvider, + String dataSource, + int numThreads, + Duration httpTimeout, + long numRetries + ) + { + Assert.assertEquals(TEST_CHAT_THREADS, numThreads); + Assert.assertEquals(TEST_HTTP_TIMEOUT.toStandardDuration(), httpTimeout); + Assert.assertEquals(TEST_CHAT_RETRIES, numRetries); + return taskClient; + } + }; + + HashMap autoScalerConfig = new HashMap<>(); + autoScalerConfig.put("enableTaskAutoScaler", true); + autoScalerConfig.put("lagCollectionIntervalMillis", 500); + autoScalerConfig.put("lagCollectionRangeMillis", 500); + autoScalerConfig.put("scaleOutThreshold", 0); + autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.0); + autoScalerConfig.put("scaleInThreshold", 1000000); + autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8); + autoScalerConfig.put("scaleActionStartDelayMillis", 0); + autoScalerConfig.put("scaleActionPeriodMillis", 100); + autoScalerConfig.put("taskCountMax", 2); + autoScalerConfig.put("taskCountMin", 1); + autoScalerConfig.put("scaleInStep", 1); + autoScalerConfig.put("scaleOutStep", 2); + autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000); + + final Map consumerProperties = KafkaConsumerConfigs.getConsumerProperties(); + consumerProperties.put("myCustomKey", "myCustomValue"); + consumerProperties.put("bootstrap.servers", kafkaHost); + + KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new KafkaSupervisorIOConfig( + topic, + INPUT_FORMAT, + 1, + 1, + new Period("PT1H"), + consumerProperties, + OBJECT_MAPPER.convertValue(autoScalerConfig, LagBasedAutoScalerConfig.class), + KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, + new Period("P1D"), + new Period("PT30S"), + true, + new Period("PT30M"), + null, + null, + null + ); + + final KafkaSupervisorTuningConfig tuningConfigOri = new KafkaSupervisorTuningConfig( + null, + 1000, + null, + null, + 50000, + null, + new Period("P1Y"), + new File("/test"), + null, + null, + null, + true, + false, + null, + false, + null, + numThreads, + TEST_CHAT_THREADS, + TEST_CHAT_RETRIES, + TEST_HTTP_TIMEOUT, + TEST_SHUTDOWN_TIMEOUT, + null, + null, + null, + null, + null + ); + + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(kafkaSupervisorIOConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(tuningConfigOri).anyTimes(); + EasyMock.replay(ingestionSchema); + + SeekableStreamSupervisorSpec testableSupervisorSpec = new KafkaSupervisorSpec( + ingestionSchema, + dataSchema, + tuningConfigOri, + kafkaSupervisorIOConfig, + null, + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + OBJECT_MAPPER, + new NoopServiceEmitter(), + new DruidMonitorSchedulerConfig(), + rowIngestionMetersFactory, + new SupervisorStateManagerConfig() + ); + + supervisor = new TestableKafkaSupervisor( + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + OBJECT_MAPPER, + (KafkaSupervisorSpec) testableSupervisorSpec, + rowIngestionMetersFactory + ); + + SupervisorTaskAutoScaler autoscaler = testableSupervisorSpec.createAutoscaler(supervisor); + + + final KafkaSupervisorTuningConfig tuningConfig = supervisor.getTuningConfig(); + addSomeEvents(1); + + Capture captured = Capture.newInstance(); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( + new KafkaDataSourceMetadata( + null + ) + ).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); + taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); + replayAll(); + + supervisor.start(); + int taskCountBeforeScale = supervisor.getIoConfig().getTaskCount(); + Assert.assertEquals(1, taskCountBeforeScale); + autoscaler.start(); + supervisor.runInternal(); + Thread.sleep(1 * 1000); + verifyAll(); + + int taskCountAfterScale = supervisor.getIoConfig().getTaskCount(); + Assert.assertEquals(2, taskCountAfterScale); + + + KafkaIndexTask task = captured.getValue(); + Assert.assertEquals(KafkaSupervisorTest.dataSchema, task.getDataSchema()); + Assert.assertEquals(tuningConfig.convertToTaskTuningConfig(), task.getTuningConfig()); + + KafkaIndexTaskIOConfig taskConfig = task.getIOConfig(); + Assert.assertEquals(kafkaHost, taskConfig.getConsumerProperties().get("bootstrap.servers")); + Assert.assertEquals("myCustomValue", taskConfig.getConsumerProperties().get("myCustomKey")); + Assert.assertEquals("sequenceName-0", taskConfig.getBaseSequenceName()); + Assert.assertTrue("isUseTransaction", taskConfig.isUseTransaction()); + Assert.assertFalse("minimumMessageTime", taskConfig.getMinimumMessageTime().isPresent()); + Assert.assertFalse("maximumMessageTime", taskConfig.getMaximumMessageTime().isPresent()); + + Assert.assertEquals(topic, taskConfig.getStartSequenceNumbers().getStream()); + Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(0)); + Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(1)); + Assert.assertEquals(0L, (long) taskConfig.getStartSequenceNumbers().getPartitionSequenceNumberMap().get(2)); + + Assert.assertEquals(topic, taskConfig.getEndSequenceNumbers().getStream()); + Assert.assertEquals( + Long.MAX_VALUE, + (long) taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(0) + ); + Assert.assertEquals( + Long.MAX_VALUE, + (long) taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(1) + ); + Assert.assertEquals( + Long.MAX_VALUE, + (long) taskConfig.getEndSequenceNumbers().getPartitionSequenceNumberMap().get(2) + ); + + autoscaler.reset(); + autoscaler.stop(); + } + @Test public void testCreateBaseTaskContexts() throws JsonProcessingException { @@ -3379,6 +3575,7 @@ public class KafkaSupervisorTest extends EasyMockSupport taskCount, new Period(duration), consumerProperties, + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, new Period("P1D"), new Period("PT30S"), @@ -3491,6 +3688,7 @@ public class KafkaSupervisorTest extends EasyMockSupport taskCount, new Period(duration), consumerProperties, + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, new Period("P1D"), new Period("PT30S"), @@ -3607,6 +3805,7 @@ public class KafkaSupervisorTest extends EasyMockSupport taskCount, new Period(duration), consumerProperties, + null, KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS, new Period("P1D"), new Period("PT30S"), diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index a7bc5997d65..92defd23150 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -39,6 +39,7 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskStorage; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; @@ -378,6 +379,13 @@ public class KinesisSupervisor extends SeekableStreamSupervisor> filterExpiredPartitionsFromStartingOffsets( Map> startingOffsets diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java index f68e0b7a09b..b43cece0e52 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java @@ -26,9 +26,12 @@ import org.apache.druid.data.input.InputFormat; import org.apache.druid.indexing.kinesis.KinesisIndexTaskIOConfig; import org.apache.druid.indexing.kinesis.KinesisRegion; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; import org.joda.time.DateTime; import org.joda.time.Period; +import javax.annotation.Nullable; + public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig { private final String endpoint; @@ -70,6 +73,7 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig @JsonProperty("fetchDelayMillis") Integer fetchDelayMillis, @JsonProperty("awsAssumedRoleArn") String awsAssumedRoleArn, @JsonProperty("awsExternalId") String awsExternalId, + @Nullable @JsonProperty("autoScalerConfig") AutoScalerConfig autoScalerConfig, @JsonProperty("deaggregate") boolean deaggregate ) { @@ -85,8 +89,16 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig completionTimeout, lateMessageRejectionPeriod, earlyMessageRejectionPeriod, + autoScalerConfig, lateMessageRejectionStartDateTime ); + + // for now dynamic Allocation Tasks is not supported here + // throw UnsupportedOperationException in case someone sets this on a kinesis supervisor spec. + if (autoScalerConfig != null) { + throw new UnsupportedOperationException("Tasks auto scaler for kinesis is not supported yet. Please remove autoScalerConfig or set it to null!"); + } + this.endpoint = endpoint != null ? endpoint : (region != null ? region.getEndpoint() : KinesisRegion.US_EAST_1.getEndpoint()); diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java index 0b9ebdb93c0..e17ce1e2f31 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisSamplerSpecTest.java @@ -153,6 +153,7 @@ public class KinesisSamplerSpecTest extends EasyMockSupport null, null, null, + null, false ), null, diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 3f352045879..5d1d0f6a009 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -66,6 +66,7 @@ import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager; import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -102,6 +103,7 @@ import java.io.File; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -304,6 +306,7 @@ public class KinesisSupervisorTest extends EasyMockSupport 1000, null, null, + null, false ); KinesisIndexTaskClientFactory clientFactory = new KinesisIndexTaskClientFactory(null, OBJECT_MAPPER); @@ -344,6 +347,72 @@ public class KinesisSupervisorTest extends EasyMockSupport Assert.assertFalse(supplier.isBackgroundFetchRunning()); } + @Test + public void testKinesisIOConfig() + { + Exception e = null; + try { + KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig( + STREAM, + INPUT_FORMAT, + "awsEndpoint", + null, + 1, + 1, + new Period("PT30M"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + null, + 100, + 1000, + null, + null, + null, + false + ); + AutoScalerConfig autoScalerConfig = kinesisSupervisorIOConfig.getAutoscalerConfig(); + Assert.assertNull(autoScalerConfig); + } + catch (Exception ex) { + e = ex; + } + Assert.assertNull(e); + + try { + KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig( + STREAM, + INPUT_FORMAT, + "awsEndpoint", + null, + 1, + 1, + new Period("PT30M"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + null, + 100, + 1000, + null, + null, + OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class), + false + ); + } + catch (Exception ex) { + e = ex; + } + Assert.assertNotNull(e); + Assert.assertTrue(e instanceof UnsupportedOperationException); + } + @Test public void testMultiTask() throws Exception { @@ -4721,6 +4790,7 @@ public class KinesisSupervisorTest extends EasyMockSupport null, null, null, + null, false ); @@ -4863,6 +4933,7 @@ public class KinesisSupervisorTest extends EasyMockSupport fetchDelayMillis, null, null, + null, false ); @@ -4950,6 +5021,7 @@ public class KinesisSupervisorTest extends EasyMockSupport fetchDelayMillis, null, null, + null, false ); @@ -5039,6 +5111,7 @@ public class KinesisSupervisorTest extends EasyMockSupport fetchDelayMillis, null, null, + null, false ); diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml index 55641228cab..be8360b127b 100644 --- a/indexing-service/pom.xml +++ b/indexing-service/pom.xml @@ -62,7 +62,11 @@ druid-hll ${project.parent.version} - + + org.apache.commons + commons-collections4 + 4.2 + io.dropwizard.metrics metrics-core diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 48153b086ff..b638fcfbd8b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -23,6 +23,7 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.inject.Inject; import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStop; @@ -30,6 +31,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.metadata.MetadataSupervisorManager; import javax.annotation.Nullable; + import java.util.List; import java.util.Map; import java.util.Set; @@ -44,6 +46,8 @@ public class SupervisorManager private final MetadataSupervisorManager metadataSupervisorManager; private final ConcurrentHashMap> supervisors = new ConcurrentHashMap<>(); + // SupervisorTaskAutoScaler could be null + private final ConcurrentHashMap autoscalers = new ConcurrentHashMap<>(); private final Object lock = new Object(); private volatile boolean started = false; @@ -54,6 +58,11 @@ public class SupervisorManager this.metadataSupervisorManager = metadataSupervisorManager; } + public MetadataSupervisorManager getMetadataSupervisorManager() + { + return metadataSupervisorManager; + } + public Set getSupervisorIds() { return supervisors.keySet(); @@ -140,12 +149,17 @@ public class SupervisorManager for (String id : supervisors.keySet()) { try { supervisors.get(id).lhs.stop(false); + SupervisorTaskAutoScaler autoscaler = autoscalers.get(id); + if (autoscaler != null) { + autoscaler.stop(); + } } catch (Exception e) { log.warn(e, "Caught exception while stopping supervisor [%s]", id); } } supervisors.clear(); + autoscalers.clear(); started = false; } @@ -187,6 +201,10 @@ public class SupervisorManager } supervisor.lhs.reset(dataSourceMetadata); + SupervisorTaskAutoScaler autoscaler = autoscalers.get(id); + if (autoscaler != null) { + autoscaler.reset(); + } return true; } @@ -238,6 +256,12 @@ public class SupervisorManager pair.lhs.stop(true); supervisors.remove(id); + SupervisorTaskAutoScaler autoscler = autoscalers.get(id); + if (autoscler != null) { + autoscler.stop(); + autoscalers.remove(id); + } + return true; } @@ -282,9 +306,16 @@ public class SupervisorManager } Supervisor supervisor; + SupervisorTaskAutoScaler autoscaler; try { supervisor = spec.createSupervisor(); + autoscaler = spec.createAutoscaler(supervisor); + supervisor.start(); + if (autoscaler != null) { + autoscaler.start(); + autoscalers.put(id, autoscaler); + } } catch (Exception e) { // Supervisor creation or start failed write tombstone only when trying to start a new supervisor diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index e63f5381af4..11339d4f4ae 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -56,8 +56,10 @@ import org.apache.druid.indexing.overlord.TaskRunnerListener; import org.apache.druid.indexing.overlord.TaskRunnerWorkItem; import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.Supervisor; +import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.indexing.overlord.supervisor.SupervisorReport; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient; @@ -71,6 +73,7 @@ import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; import org.apache.druid.indexing.seekablestream.common.RecordSupplier; import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.common.StreamPartition; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.ISE; @@ -82,6 +85,7 @@ import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.emitter.service.ServiceMetricEvent; import org.apache.druid.metadata.EntryExistsException; +import org.apache.druid.metadata.MetadataSupervisorManager; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; import org.joda.time.DateTime; @@ -102,6 +106,7 @@ import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; @@ -318,6 +323,127 @@ public abstract class SeekableStreamSupervisor scaleAction; + + DynamicAllocationTasksNotice(Callable scaleAction) + { + this.scaleAction = scaleAction; + } + + /** + * This method will do lag points collection and check dynamic scale action is necessary or not. + */ + @Override + public void handle() + { + if (autoScalerConfig == null) { + log.warn("autoScalerConfig is null but dynamic allocation notice is submitted, how can it be ?"); + } else { + try { + long nowTime = System.currentTimeMillis(); + if (spec.isSuspended()) { + log.info("Skipping DynamicAllocationTasksNotice execution because [%s] supervisor is suspended", + dataSource + ); + return; + } + log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s]", pendingCompletionTaskGroups, + dataSource + ); + for (CopyOnWriteArrayList list : pendingCompletionTaskGroups.values()) { + if (!list.isEmpty()) { + log.info( + "Skipping DynamicAllocationTasksNotice execution for datasource [%s] because following tasks are pending [%s]", + dataSource, pendingCompletionTaskGroups + ); + return; + } + } + if (nowTime - dynamicTriggerLastRunTime < autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) { + log.info( + "DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!", + nowTime - dynamicTriggerLastRunTime, autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), dataSource + ); + return; + } + final Integer desriedTaskCount = scaleAction.call(); + boolean allocationSuccess = changeTaskCount(desriedTaskCount); + if (allocationSuccess) { + dynamicTriggerLastRunTime = nowTime; + } + } + catch (Exception ex) { + log.warn(ex, "Error parsing DynamicAllocationTasksNotice"); + } + } + } + } + + /** + * This method determines how to do scale actions based on collected lag points. + * If scale action is triggered : + * First of all, call gracefulShutdownInternal() which will change the state of current datasource ingest tasks from reading to publishing. + * Secondly, clear all the stateful data structures: activelyReadingTaskGroups, partitionGroups, partitionOffsets, pendingCompletionTaskGroups, partitionIds. These structures will be rebuiled in the next 'RunNotice'. + * Finally, change the taskCount in SeekableStreamSupervisorIOConfig and sync it to MetadataStorage. + * After the taskCount is changed in SeekableStreamSupervisorIOConfig, next RunNotice will create scaled number of ingest tasks without resubmitting the supervisor. + * @param desiredActiveTaskCount desired taskCount computed from AutoScaler + * @return Boolean flag indicating if scale action was executed or not. If true, it will wait at least 'minTriggerScaleActionFrequencyMillis' before next 'changeTaskCount'. + * If false, it will do 'changeTaskCount' again after 'scaleActionPeriodMillis' millis. + * @throws InterruptedException + * @throws ExecutionException + * @throws TimeoutException + */ + private boolean changeTaskCount(int desiredActiveTaskCount) throws InterruptedException, ExecutionException, TimeoutException + { + int currentActiveTaskCount; + Collection activeTaskGroups = activelyReadingTaskGroups.values(); + currentActiveTaskCount = activeTaskGroups.size(); + + if (desiredActiveTaskCount < 0 || desiredActiveTaskCount == currentActiveTaskCount) { + return false; + } else { + log.info( + "Starting scale action, current active task count is [%d] and desired task count is [%d] for dataSource [%s].", + currentActiveTaskCount, desiredActiveTaskCount, dataSource + ); + gracefulShutdownInternal(); + changeTaskCountInIOConfig(desiredActiveTaskCount); + clearAllocationInfo(); + log.info("Changed taskCount to [%s] for dataSource [%s].", desiredActiveTaskCount, dataSource); + return true; + } + } + + private void changeTaskCountInIOConfig(int desiredActiveTaskCount) + { + ioConfig.setTaskCount(desiredActiveTaskCount); + try { + Optional supervisorManager = taskMaster.getSupervisorManager(); + if (supervisorManager.isPresent()) { + MetadataSupervisorManager metadataSupervisorManager = supervisorManager.get().getMetadataSupervisorManager(); + metadataSupervisorManager.insert(dataSource, spec); + } else { + log.error("supervisorManager is null in taskMaster, skipping scale action for dataSource [%s].", dataSource); + } + } + catch (Exception e) { + log.error(e, "Failed to sync taskCount to MetaStorage for dataSource [%s].", dataSource); + } + } + + private void clearAllocationInfo() + { + activelyReadingTaskGroups.clear(); + partitionGroups.clear(); + partitionOffsets.clear(); + + pendingCompletionTaskGroups.clear(); + partitionIds.clear(); + } + private class GracefulShutdownNotice extends ShutdownNotice { @Override @@ -470,6 +596,7 @@ public abstract class SeekableStreamSupervisor taskClient; private final SeekableStreamSupervisorSpec spec; private final SeekableStreamSupervisorIOConfig ioConfig; + private final AutoScalerConfig autoScalerConfig; private final SeekableStreamSupervisorTuningConfig tuningConfig; private final SeekableStreamIndexTaskTuningConfig taskTuningConfig; private final String supervisorId; @@ -488,6 +615,7 @@ public abstract class SeekableStreamSupervisor taskRunner = taskMaster.getTaskRunner(); if (taskRunner.isPresent()) { @@ -774,7 +927,6 @@ public abstract class SeekableStreamSupervisor scaleAction) + { + return () -> notices.add(new DynamicAllocationTasksNotice(scaleAction)); + } + private Runnable buildRunTask() { return () -> notices.add(new RunNotice()); @@ -1901,6 +2058,11 @@ public abstract class SeekableStreamSupervisor previousPartitionIds = new ArrayList<>(partitionIds); @@ -3510,29 +3672,21 @@ public abstract class SeekableStreamSupervisor maxLag) { - maxLag = lag; - } - totalLag += lag; - } - avgLag = partitionLags.size() == 0 ? 0 : totalLag / partitionLags.size(); - + LagStats lagStats = computeLags(partitionLags); emitter.emit( ServiceMetricEvent.builder() .setDimension("dataSource", dataSource) - .build(StringUtils.format("ingest/%s/lag%s", type, suffix), totalLag) + .build(StringUtils.format("ingest/%s/lag%s", type, suffix), lagStats.getTotalLag()) ); emitter.emit( ServiceMetricEvent.builder() .setDimension("dataSource", dataSource) - .build(StringUtils.format("ingest/%s/maxLag%s", type, suffix), maxLag) + .build(StringUtils.format("ingest/%s/maxLag%s", type, suffix), lagStats.getMaxLag()) ); emitter.emit( ServiceMetricEvent.builder() .setDimension("dataSource", dataSource) - .build(StringUtils.format("ingest/%s/avgLag%s", type, suffix), avgLag) + .build(StringUtils.format("ingest/%s/avgLag%s", type, suffix), lagStats.getAvgLag()) ); }; @@ -3545,6 +3699,24 @@ public abstract class SeekableStreamSupervisor partitionLags) + { + long maxLag = 0, totalLag = 0, avgLag; + for (long lag : partitionLags.values()) { + if (lag > maxLag) { + maxLag = lag; + } + totalLag += lag; + } + avgLag = partitionLags.size() == 0 ? 0 : totalLag / partitionLags.size(); + return new LagStats(maxLag, totalLag, avgLag); + } + /** * a special sequence number that is used to indicate that the sequence offset * for a particular partition has not yet been calculated by the supervisor. When diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java index 723e22ec518..3ed55ec1ec4 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorIOConfig.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import org.apache.druid.data.input.InputFormat; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; import org.apache.druid.java.util.common.IAE; import org.joda.time.DateTime; import org.joda.time.Duration; @@ -37,7 +38,7 @@ public abstract class SeekableStreamSupervisorIOConfig @Nullable private final InputFormat inputFormat; // nullable for backward compatibility private final Integer replicas; - private final Integer taskCount; + private Integer taskCount; private final Duration taskDuration; private final Duration startDelay; private final Duration period; @@ -46,6 +47,7 @@ public abstract class SeekableStreamSupervisorIOConfig private final Optional lateMessageRejectionPeriod; private final Optional earlyMessageRejectionPeriod; private final Optional lateMessageRejectionStartDateTime; + @Nullable private final AutoScalerConfig autoScalerConfig; public SeekableStreamSupervisorIOConfig( String stream, @@ -59,13 +61,21 @@ public abstract class SeekableStreamSupervisorIOConfig Period completionTimeout, Period lateMessageRejectionPeriod, Period earlyMessageRejectionPeriod, + @Nullable AutoScalerConfig autoScalerConfig, DateTime lateMessageRejectionStartDateTime ) { this.stream = Preconditions.checkNotNull(stream, "stream cannot be null"); this.inputFormat = inputFormat; this.replicas = replicas != null ? replicas : 1; - this.taskCount = taskCount != null ? taskCount : 1; + // Could be null + this.autoScalerConfig = autoScalerConfig; + // if autoscaler is enable then taskcount will be ignored here. and init taskcount will be equal to taskCountMin + if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler()) { + this.taskCount = autoScalerConfig.getTaskCountMin(); + } else { + this.taskCount = taskCount != null ? taskCount : 1; + } this.taskDuration = defaultDuration(taskDuration, "PT1H"); this.startDelay = defaultDuration(startDelay, "PT5S"); this.period = defaultDuration(period, "PT30S"); @@ -113,12 +123,24 @@ public abstract class SeekableStreamSupervisorIOConfig return replicas; } + @Nullable + @JsonProperty + public AutoScalerConfig getAutoscalerConfig() + { + return autoScalerConfig; + } + @JsonProperty public Integer getTaskCount() { return taskCount; } + public void setTaskCount(final int taskCount) + { + this.taskCount = taskCount; + } + @JsonProperty public Duration getTaskDuration() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java index 50be4649bcc..ff1d31756bb 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorSpec.java @@ -32,7 +32,10 @@ import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.Supervisor; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClientFactory; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.segment.incremental.RowIngestionMetersFactory; import org.apache.druid.segment.indexing.DataSchema; @@ -151,6 +154,21 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec @Override public abstract Supervisor createSupervisor(); + /** + * An autoScaler instance will be returned depending on the autoScalerConfig. In case autoScalerConfig is null or autoScaler is disabled then NoopTaskAutoScaler will be returned. + * @param supervisor + * @return autoScaler + */ + @Override + public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor) + { + AutoScalerConfig autoScalerConfig = ingestionSchema.getIOConfig().getAutoscalerConfig(); + if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler() && supervisor instanceof SeekableStreamSupervisor) { + return autoScalerConfig.createAutoScaler(supervisor, this); + } + return new NoopTaskAutoScaler(); + } + @Override public List getDataSources() { diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java new file mode 100644 index 00000000000..53174a17bba --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/AutoScalerConfig.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonSubTypes.Type; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.guice.annotations.UnstableApi; +import org.apache.druid.indexing.overlord.supervisor.Supervisor; +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; + +@UnstableApi +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "autoScalerStrategy", defaultImpl = LagBasedAutoScalerConfig.class) +@JsonSubTypes(value = { + @Type(name = "lagBased", value = LagBasedAutoScalerConfig.class) +}) +public interface AutoScalerConfig +{ + boolean getEnableTaskAutoScaler(); + long getMinTriggerScaleActionFrequencyMillis(); + int getTaskCountMax(); + int getTaskCountMin(); + SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec); +} + diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java new file mode 100644 index 00000000000..8c645278d55 --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; + +import org.apache.commons.collections4.queue.CircularFifoQueue; +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.emitter.EmittingLogger; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +public class LagBasedAutoScaler implements SupervisorTaskAutoScaler +{ + private static final EmittingLogger log = new EmittingLogger(LagBasedAutoScaler.class); + private final String dataSource; + private final CircularFifoQueue lagMetricsQueue; + private final ScheduledExecutorService lagComputationExec; + private final ScheduledExecutorService allocationExec; + private final SupervisorSpec spec; + private final SeekableStreamSupervisor supervisor; + private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig; + + private static final ReentrantLock LOCK = new ReentrantLock(true); + + public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String dataSource, + LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec + ) + { + this.lagBasedAutoScalerConfig = autoScalerConfig; + final String supervisorId = StringUtils.format("Supervisor-%s", dataSource); + this.dataSource = dataSource; + final int slots = (int) (lagBasedAutoScalerConfig.getLagCollectionRangeMillis() / lagBasedAutoScalerConfig + .getLagCollectionIntervalMillis()) + 1; + this.lagMetricsQueue = new CircularFifoQueue<>(slots); + this.allocationExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Allocation-%d"); + this.lagComputationExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Computation-%d"); + this.spec = spec; + this.supervisor = supervisor; + } + + @Override + public void start() + { + Callable scaleAction = () -> { + LOCK.lock(); + int desiredTaskCount = -1; + try { + desiredTaskCount = computeDesiredTaskCount(new ArrayList<>(lagMetricsQueue)); + + if (desiredTaskCount != -1) { + lagMetricsQueue.clear(); + } + } + catch (Exception ex) { + log.warn(ex, "Exception while computing desired task count for [%s]", dataSource); + } + finally { + LOCK.unlock(); + } + return desiredTaskCount; + }; + + lagComputationExec.scheduleAtFixedRate( + computeAndCollectLag(), + lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for tasks to start up + lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), + TimeUnit.MILLISECONDS + ); + allocationExec.scheduleAtFixedRate( + supervisor.buildDynamicAllocationTask(scaleAction), + lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + lagBasedAutoScalerConfig + .getLagCollectionRangeMillis(), + lagBasedAutoScalerConfig.getScaleActionPeriodMillis(), + TimeUnit.MILLISECONDS + ); + log.info( + "LagBasedAutoScaler will collect lag every [%d] millis and will keep [%d] data points for the last [%d] millis for dataSource [%s]", + lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), lagMetricsQueue.size(), + lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), dataSource + ); + } + + @Override + public void stop() + { + allocationExec.shutdownNow(); + lagComputationExec.shutdownNow(); + } + + @Override + public void reset() + { + // clear queue for kafka lags + if (lagMetricsQueue != null) { + try { + LOCK.lock(); + lagMetricsQueue.clear(); + } + catch (Exception e) { + log.warn(e, "Error,when clear queue in rest action"); + } + finally { + LOCK.unlock(); + } + } + } + + /** + * This method computes current consumer lag. Gets the total lag of all partitions and fill in the lagMetricsQueue + * + * @return a Runnbale object to compute and collect lag. + */ + private Runnable computeAndCollectLag() + { + return () -> { + LOCK.lock(); + try { + if (!spec.isSuspended()) { + LagStats lagStats = supervisor.computeLagStats(); + if (lagStats == null) { + lagMetricsQueue.offer(0L); + } else { + long totalLags = lagStats.getTotalLag(); + lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L); + } + log.debug("Current lags [%s] for dataSource [%s].", new ArrayList<>(lagMetricsQueue), dataSource); + } else { + log.warn("[%s] supervisor is suspended, skipping lag collection", dataSource); + } + } + catch (Exception e) { + log.error(e, "Error while collecting lags"); + } + finally { + LOCK.unlock(); + } + }; + } + + /** + * This method determines whether to do scale actions based on collected lag points. + * Current algorithm of scale is simple: + * First of all, compute the proportion of lag points higher/lower than scaleOutThreshold/scaleInThreshold, getting scaleOutThreshold/scaleInThreshold. + * Secondly, compare scaleOutThreshold/scaleInThreshold with triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold. P.S. Scale out action has higher priority than scale in action. + * Finaly, if scaleOutThreshold/scaleInThreshold is higher than triggerScaleOutFractionThreshold/triggerScaleInFractionThreshold, scale out/in action would be triggered. + * + * @param lags the lag metrics of Stream(Kafka/Kinesis) + * @return Integer. target number of tasksCount, -1 means skip scale action. + */ + private int computeDesiredTaskCount(List lags) + { + // if supervisor is not suspended, ensure required tasks are running + // if suspended, ensure tasks have been requested to gracefully stop + log.debug("Computing desired task count for [%s], based on following lags : [%s]", dataSource, lags); + int beyond = 0; + int within = 0; + int metricsCount = lags.size(); + for (Long lag : lags) { + if (lag >= lagBasedAutoScalerConfig.getScaleOutThreshold()) { + beyond++; + } + if (lag <= lagBasedAutoScalerConfig.getScaleInThreshold()) { + within++; + } + } + double beyondProportion = beyond * 1.0 / metricsCount; + double withinProportion = within * 1.0 / metricsCount; + + log.debug("Calculated beyondProportion is [%s] and withinProportion is [%s] for dataSource [%s].", beyondProportion, + withinProportion, dataSource + ); + + int currentActiveTaskCount = supervisor.getActiveTaskGroupsCount(); + int desiredActiveTaskCount; + + if (beyondProportion >= lagBasedAutoScalerConfig.getTriggerScaleOutFractionThreshold()) { + // Do Scale out + int taskCount = currentActiveTaskCount + lagBasedAutoScalerConfig.getScaleOutStep(); + + int partitionCount = supervisor.getPartitionCount(); + if (partitionCount <= 0) { + log.warn("Partition number for [%s] <= 0 ? how can it be?", dataSource); + return -1; + } + + int actualTaskCountMax = Math.min(lagBasedAutoScalerConfig.getTaskCountMax(), partitionCount); + if (currentActiveTaskCount == actualTaskCountMax) { + log.warn("CurrentActiveTaskCount reached task count Max limit, skipping scale out action for dataSource [%s].", + dataSource + ); + return -1; + } else { + desiredActiveTaskCount = Math.min(taskCount, actualTaskCountMax); + } + return desiredActiveTaskCount; + } + + if (withinProportion >= lagBasedAutoScalerConfig.getTriggerScaleInFractionThreshold()) { + // Do Scale in + int taskCount = currentActiveTaskCount - lagBasedAutoScalerConfig.getScaleInStep(); + if (currentActiveTaskCount == lagBasedAutoScalerConfig.getTaskCountMin()) { + log.warn("CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource [%s].", + dataSource + ); + return -1; + } else { + desiredActiveTaskCount = Math.max(taskCount, lagBasedAutoScalerConfig.getTaskCountMin()); + } + return desiredActiveTaskCount; + } + return -1; + } + + public LagBasedAutoScalerConfig getAutoScalerConfig() + { + return lagBasedAutoScalerConfig; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java new file mode 100644 index 00000000000..0a4eb0b585e --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.indexing.overlord.supervisor.Supervisor; +import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; + +import javax.annotation.Nullable; + +public class LagBasedAutoScalerConfig implements AutoScalerConfig +{ + private final long lagCollectionIntervalMillis; + private final long lagCollectionRangeMillis; + private final long scaleActionStartDelayMillis; + private final long scaleActionPeriodMillis; + private final long scaleOutThreshold; + private final long scaleInThreshold; + private final double triggerScaleOutFractionThreshold; + private final double triggerScaleInFractionThreshold; + private int taskCountMax; + private int taskCountMin; + private final int scaleInStep; + private final int scaleOutStep; + private final boolean enableTaskAutoScaler; + private final long minTriggerScaleActionFrequencyMillis; + + @JsonCreator + public LagBasedAutoScalerConfig( + @Nullable @JsonProperty("lagCollectionIntervalMillis") Long lagCollectionIntervalMillis, + @Nullable @JsonProperty("lagCollectionRangeMillis") Long lagCollectionRangeMillis, + @Nullable @JsonProperty("scaleActionStartDelayMillis") Long scaleActionStartDelayMillis, + @Nullable @JsonProperty("scaleActionPeriodMillis") Long scaleActionPeriodMillis, + @Nullable @JsonProperty("scaleOutThreshold") Long scaleOutThreshold, + @Nullable @JsonProperty("scaleInThreshold") Long scaleInThreshold, + @Nullable @JsonProperty("triggerScaleOutFractionThreshold") Double triggerScaleOutFractionThreshold, + @Nullable @JsonProperty("triggerScaleInFractionThreshold") Double triggerScaleInFractionThreshold, + @JsonProperty("taskCountMax") Integer taskCountMax, + @JsonProperty("taskCountMin") Integer taskCountMin, + @Nullable @JsonProperty("scaleInStep") Integer scaleInStep, + @Nullable @JsonProperty("scaleOutStep") Integer scaleOutStep, + @Nullable @JsonProperty("enableTaskAutoScaler") Boolean enableTaskAutoScaler, + @Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long minTriggerScaleActionFrequencyMillis + ) + { + this.enableTaskAutoScaler = enableTaskAutoScaler != null ? enableTaskAutoScaler : false; + this.lagCollectionIntervalMillis = lagCollectionIntervalMillis != null ? lagCollectionIntervalMillis : 30000; + this.lagCollectionRangeMillis = lagCollectionRangeMillis != null ? lagCollectionRangeMillis : 600000; + this.scaleActionStartDelayMillis = scaleActionStartDelayMillis != null ? scaleActionStartDelayMillis : 300000; + this.scaleActionPeriodMillis = scaleActionPeriodMillis != null ? scaleActionPeriodMillis : 60000; + this.scaleOutThreshold = scaleOutThreshold != null ? scaleOutThreshold : 6000000; + this.scaleInThreshold = scaleInThreshold != null ? scaleInThreshold : 1000000; + this.triggerScaleOutFractionThreshold = triggerScaleOutFractionThreshold != null ? triggerScaleOutFractionThreshold : 0.3; + this.triggerScaleInFractionThreshold = triggerScaleInFractionThreshold != null ? triggerScaleInFractionThreshold : 0.9; + + // Only do taskCountMax and taskCountMin check when autoscaler is enabled. So that users left autoConfig empty{} will not throw any exception and autoscaler is disabled. + // If autoscaler is disabled, no matter what configs are set, they are not used. + if (this.enableTaskAutoScaler) { + if (taskCountMax == null || taskCountMin == null) { + throw new RuntimeException("taskCountMax or taskCountMin can't be null!"); + } else if (taskCountMax < taskCountMin) { + throw new RuntimeException("taskCountMax can't lower than taskCountMin!"); + } + this.taskCountMax = taskCountMax; + this.taskCountMin = taskCountMin; + } + + this.scaleInStep = scaleInStep != null ? scaleInStep : 1; + this.scaleOutStep = scaleOutStep != null ? scaleOutStep : 2; + this.minTriggerScaleActionFrequencyMillis = minTriggerScaleActionFrequencyMillis + != null ? minTriggerScaleActionFrequencyMillis : 600000; + } + + @JsonProperty + public long getLagCollectionIntervalMillis() + { + return lagCollectionIntervalMillis; + } + + @JsonProperty + public long getLagCollectionRangeMillis() + { + return lagCollectionRangeMillis; + } + + @JsonProperty + public long getScaleActionStartDelayMillis() + { + return scaleActionStartDelayMillis; + } + + @JsonProperty + public long getScaleActionPeriodMillis() + { + return scaleActionPeriodMillis; + } + + @JsonProperty + public long getScaleOutThreshold() + { + return scaleOutThreshold; + } + + @JsonProperty + public long getScaleInThreshold() + { + return scaleInThreshold; + } + + @JsonProperty + public double getTriggerScaleOutFractionThreshold() + { + return triggerScaleOutFractionThreshold; + } + + @JsonProperty + public double getTriggerScaleInFractionThreshold() + { + return triggerScaleInFractionThreshold; + } + + @Override + @JsonProperty + public int getTaskCountMax() + { + return taskCountMax; + } + + @Override + @JsonProperty + public int getTaskCountMin() + { + return taskCountMin; + } + + @Override + public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec) + { + return new LagBasedAutoScaler((SeekableStreamSupervisor) supervisor, spec.getId(), this, spec); + } + + @JsonProperty + public int getScaleInStep() + { + return scaleInStep; + } + + @JsonProperty + public int getScaleOutStep() + { + return scaleOutStep; + } + + @Override + @JsonProperty + public boolean getEnableTaskAutoScaler() + { + return enableTaskAutoScaler; + } + + @Override + @JsonProperty + public long getMinTriggerScaleActionFrequencyMillis() + { + return minTriggerScaleActionFrequencyMillis; + } + + @Override + public String toString() + { + return "autoScalerConfig{" + + "enableTaskAutoScaler=" + enableTaskAutoScaler + + ", taskCountMax=" + taskCountMax + + ", taskCountMin=" + taskCountMin + + ", minTriggerScaleActionFrequencyMillis=" + minTriggerScaleActionFrequencyMillis + + ", lagCollectionIntervalMillis=" + lagCollectionIntervalMillis + + ", lagCollectionIntervalMillis=" + lagCollectionIntervalMillis + + ", scaleOutThreshold=" + scaleOutThreshold + + ", triggerScaleOutFractionThreshold=" + triggerScaleOutFractionThreshold + + ", scaleInThreshold=" + scaleInThreshold + + ", triggerScaleInFractionThreshold=" + triggerScaleInFractionThreshold + + ", scaleActionStartDelayMillis=" + scaleActionStartDelayMillis + + ", scaleActionPeriodMillis=" + scaleActionPeriodMillis + + ", scaleInStep=" + scaleInStep + + ", scaleOutStep=" + scaleOutStep + + '}'; + } +} diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/NoopTaskAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/NoopTaskAutoScaler.java new file mode 100644 index 00000000000..9bf41e5b0be --- /dev/null +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/NoopTaskAutoScaler.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; + +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; + +public class NoopTaskAutoScaler implements SupervisorTaskAutoScaler +{ + public NoopTaskAutoScaler() + { + } + + @Override + public void start() + { + //Do nothing + } + + @Override + public void stop() + { + //Do nothing + } + + @Override + public void reset() + { + //Do nothing + } +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java index e603a553e24..3b775ca9491 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/http/security/OverlordSecurityResourceFilterTest.java @@ -32,6 +32,8 @@ import org.apache.druid.indexing.overlord.supervisor.Supervisor; import org.apache.druid.indexing.overlord.supervisor.SupervisorManager; import org.apache.druid.indexing.overlord.supervisor.SupervisorResource; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler; import org.apache.druid.indexing.worker.http.WorkerResource; import org.apache.druid.server.http.security.ResourceFilterTestHelper; import org.apache.druid.server.security.AuthorizerMapper; @@ -126,6 +128,12 @@ public class OverlordSecurityResourceFilterTest extends ResourceFilterTestHelper return null; } + @Override + public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor) + { + return new NoopTaskAutoScaler(); + } + @Override public List getDataSources() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java index 1390e7acc98..c22460ec926 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorResourceTest.java @@ -26,6 +26,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.TaskMaster; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.segment.TestHelper; import org.apache.druid.server.security.Access; @@ -1155,6 +1156,12 @@ public class SupervisorResourceTest extends EasyMockSupport return supervisor; } + @Override + public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor) + { + return null; + } + @Override public List getDataSources() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java new file mode 100644 index 00000000000..51a39fc2552 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamSupervisorSpecTest.java @@ -0,0 +1,950 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; +import org.apache.druid.indexing.overlord.TaskMaster; +import org.apache.druid.indexing.overlord.TaskStorage; +import org.apache.druid.indexing.overlord.supervisor.Supervisor; +import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; +import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIOConfig; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorIngestionSpec; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorReportPayload; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorSpec; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScaler; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.NoopTaskAutoScaler; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.aggregation.CountAggregatorFactory; +import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.incremental.RowIngestionMetersFactory; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.server.metrics.DruidMonitorSchedulerConfig; +import org.easymock.EasyMock; +import org.easymock.EasyMockSupport; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.joda.time.Period; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import javax.annotation.Nullable; +import java.io.File; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ScheduledExecutorService; + +public class SeekableStreamSupervisorSpecTest extends EasyMockSupport +{ + private SeekableStreamSupervisorIngestionSpec ingestionSchema; + private TaskStorage taskStorage; + private TaskMaster taskMaster; + private IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator; + private ServiceEmitter emitter; + private RowIngestionMetersFactory rowIngestionMetersFactory; + private DataSchema dataSchema; + private SeekableStreamSupervisorTuningConfig seekableStreamSupervisorTuningConfig; + private SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig; + private static final ObjectMapper OBJECT_MAPPER = TestHelper.makeJsonMapper(); + private SeekableStreamIndexTaskClientFactory taskClientFactory; + private static final String STREAM = "stream"; + private static final String DATASOURCE = "testDS"; + private SeekableStreamSupervisorSpec spec; + private SupervisorStateManagerConfig supervisorConfig; + + private SeekableStreamSupervisor supervisor4; + + private SeekableStreamIndexTaskClientFactory indexTaskClientFactory; + private ObjectMapper mapper; + private DruidMonitorSchedulerConfig monitorSchedulerConfig; + private SupervisorStateManagerConfig supervisorStateManagerConfig; + + @Before + public void setUp() + { + ingestionSchema = EasyMock.mock(SeekableStreamSupervisorIngestionSpec.class); + taskStorage = EasyMock.mock(TaskStorage.class); + taskMaster = EasyMock.mock(TaskMaster.class); + indexerMetadataStorageCoordinator = EasyMock.mock(IndexerMetadataStorageCoordinator.class); + emitter = EasyMock.mock(ServiceEmitter.class); + rowIngestionMetersFactory = EasyMock.mock(RowIngestionMetersFactory.class); + dataSchema = EasyMock.mock(DataSchema.class); + seekableStreamSupervisorTuningConfig = EasyMock.mock(SeekableStreamSupervisorTuningConfig.class); + seekableStreamSupervisorIOConfig = EasyMock.mock(SeekableStreamSupervisorIOConfig.class); + taskClientFactory = EasyMock.mock(SeekableStreamIndexTaskClientFactory.class); + spec = EasyMock.mock(SeekableStreamSupervisorSpec.class); + supervisorConfig = new SupervisorStateManagerConfig(); + indexTaskClientFactory = EasyMock.mock(SeekableStreamIndexTaskClientFactory.class); + mapper = new DefaultObjectMapper(); + monitorSchedulerConfig = EasyMock.mock(DruidMonitorSchedulerConfig.class); + supervisorStateManagerConfig = EasyMock.mock(SupervisorStateManagerConfig.class); + supervisor4 = EasyMock.mock(SeekableStreamSupervisor.class); + } + + private abstract class BaseTestSeekableStreamSupervisor extends SeekableStreamSupervisor + { + private BaseTestSeekableStreamSupervisor() + { + super( + "testSupervisorId", + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + taskClientFactory, + OBJECT_MAPPER, + spec, + rowIngestionMetersFactory, + false + ); + } + + @Override + protected String baseTaskName() + { + return "test"; + } + + @Override + protected void updatePartitionLagFromStream() + { + // do nothing + } + + @Nullable + @Override + protected Map getPartitionRecordLag() + { + return null; + } + + @Nullable + @Override + protected Map getPartitionTimeLag() + { + return null; + } + + @Override + protected SeekableStreamIndexTaskIOConfig createTaskIoConfig( + int groupId, + Map startPartitions, + Map endPartitions, + String baseSequenceName, + DateTime minimumMessageTime, + DateTime maximumMessageTime, + Set exclusiveStartSequenceNumberPartitions, + SeekableStreamSupervisorIOConfig ioConfig + ) + { + return new SeekableStreamIndexTaskIOConfig( + groupId, + baseSequenceName, + new SeekableStreamStartSequenceNumbers<>(STREAM, startPartitions, exclusiveStartSequenceNumberPartitions), + new SeekableStreamEndSequenceNumbers<>(STREAM, endPartitions), + true, + minimumMessageTime, + maximumMessageTime, + ioConfig.getInputFormat() + ) + { + }; + } + + @Override + protected List> createIndexTasks( + int replicas, + String baseSequenceName, + ObjectMapper sortingMapper, + TreeMap> sequenceOffsets, + SeekableStreamIndexTaskIOConfig taskIoConfig, + SeekableStreamIndexTaskTuningConfig taskTuningConfig, + RowIngestionMetersFactory rowIngestionMetersFactory + ) + { + return null; + } + + @Override + protected int getTaskGroupIdForPartition(String partition) + { + return 0; + } + + @Override + protected boolean checkSourceMetadataMatch(DataSourceMetadata metadata) + { + return true; + } + + @Override + protected boolean doesTaskTypeMatchSupervisor(Task task) + { + return true; + } + + @Override + protected SeekableStreamDataSourceMetadata createDataSourceMetaDataForReset( + String stream, + Map map + ) + { + return null; + } + + @Override + protected OrderedSequenceNumber makeSequenceNumber(String seq, boolean isExclusive) + { + return new OrderedSequenceNumber(seq, isExclusive) + { + @Override + public int compareTo(OrderedSequenceNumber o) + { + return new BigInteger(this.get()).compareTo(new BigInteger(o.get())); + } + }; + } + + @Override + protected Map getRecordLagPerPartition(Map currentOffsets) + { + return null; + } + + @Override + protected Map getTimeLagPerPartition(Map currentOffsets) + { + return null; + } + + @Override + protected RecordSupplier setupRecordSupplier() + { + return recordSupplier; + } + + @Override + protected SeekableStreamSupervisorReportPayload createReportPayload( + int numPartitions, + boolean includeOffsets + ) + { + return new SeekableStreamSupervisorReportPayload( + DATASOURCE, + STREAM, + 1, + 1, + 1L, + null, + null, + null, + null, + null, + null, + false, + true, + null, + null, + null + ) + { + }; + } + + @Override + protected String getNotSetMarker() + { + return "NOT_SET"; + } + + @Override + protected String getEndOfPartitionMarker() + { + return "EOF"; + } + + @Override + protected boolean isEndOfShard(String seqNum) + { + return false; + } + + @Override + protected boolean isShardExpirationMarker(String seqNum) + { + return false; + } + + @Override + protected boolean useExclusiveStartSequenceNumberForNonFirstSequence() + { + return false; + } + } + + private class TestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor + { + private int partitionNumbers; + + public TestSeekableStreamSupervisor(int partitionNumbers) + { + this.partitionNumbers = partitionNumbers; + } + + @Override + protected void scheduleReporting(ScheduledExecutorService reportingExec) + { + // do nothing + } + + @Override + public LagStats computeLagStats() + { + return new LagStats(0, 0, 0); + } + + @Override + public int getPartitionCount() + { + return partitionNumbers; + } + } + + + private static class TestSeekableStreamSupervisorSpec extends SeekableStreamSupervisorSpec + { + private SeekableStreamSupervisor supervisor; + private String id; + + public TestSeekableStreamSupervisorSpec(SeekableStreamSupervisorIngestionSpec ingestionSchema, + @Nullable Map context, + Boolean suspended, + TaskStorage taskStorage, + TaskMaster taskMaster, + IndexerMetadataStorageCoordinator indexerMetadataStorageCoordinator, + SeekableStreamIndexTaskClientFactory indexTaskClientFactory, + ObjectMapper mapper, + ServiceEmitter emitter, + DruidMonitorSchedulerConfig monitorSchedulerConfig, + RowIngestionMetersFactory rowIngestionMetersFactory, + SupervisorStateManagerConfig supervisorStateManagerConfig, + SeekableStreamSupervisor supervisor, + String id) + { + super( + ingestionSchema, + context, + suspended, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + supervisorStateManagerConfig); + + this.supervisor = supervisor; + this.id = id; + } + + @Override + public List getDataSources() + { + return new ArrayList<>(); + } + + @Override + public String getId() + { + return id; + } + + @Override + public Supervisor createSupervisor() + { + return supervisor; + } + + @Override + public String getType() + { + return null; + } + + @Override + public String getSource() + { + return null; + } + + @Override + protected SeekableStreamSupervisorSpec toggleSuspend(boolean suspend) + { + return null; + } + } + + private static SeekableStreamSupervisorTuningConfig getTuningConfig() + { + return new SeekableStreamSupervisorTuningConfig() + { + @Override + public Integer getWorkerThreads() + { + return 1; + } + + @Override + public Integer getChatThreads() + { + return 1; + } + + @Override + public Long getChatRetries() + { + return 1L; + } + + @Override + public Duration getHttpTimeout() + { + return new Period("PT1M").toStandardDuration(); + } + + @Override + public Duration getShutdownTimeout() + { + return new Period("PT1S").toStandardDuration(); + } + + @Override + public Duration getRepartitionTransitionDuration() + { + return new Period("PT2M").toStandardDuration(); + } + + @Override + public Duration getOffsetFetchPeriod() + { + return new Period("PT5M").toStandardDuration(); + } + + @Override + public SeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() + { + return new SeekableStreamIndexTaskTuningConfig( + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ) + { + @Override + public SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir) + { + return null; + } + + @Override + public String toString() + { + return null; + } + }; + } + }; + } + + @Test + public void testAutoScalerConfig() + { + AutoScalerConfig autoScalerConfigEmpty = mapper.convertValue(new HashMap<>(), AutoScalerConfig.class); + Assert.assertTrue(autoScalerConfigEmpty instanceof LagBasedAutoScalerConfig); + Assert.assertFalse(autoScalerConfigEmpty.getEnableTaskAutoScaler()); + + AutoScalerConfig autoScalerConfigNull = mapper.convertValue(null, AutoScalerConfig.class); + Assert.assertNull(autoScalerConfigNull); + + AutoScalerConfig autoScalerConfigDefault = mapper.convertValue(ImmutableMap.of("autoScalerStrategy", "lagBased"), AutoScalerConfig.class); + Assert.assertTrue(autoScalerConfigDefault instanceof LagBasedAutoScalerConfig); + + AutoScalerConfig autoScalerConfigValue = mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis", "1"), AutoScalerConfig.class); + Assert.assertTrue(autoScalerConfigValue instanceof LagBasedAutoScalerConfig); + LagBasedAutoScalerConfig lagBasedAutoScalerConfig = (LagBasedAutoScalerConfig) autoScalerConfigValue; + Assert.assertEquals(lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), 1); + + Exception e = null; + try { + AutoScalerConfig autoScalerError = mapper.convertValue(ImmutableMap.of("enableTaskAutoScaler", "true", "taskCountMax", "1", "taskCountMin", "4"), AutoScalerConfig.class); + } + catch (RuntimeException ex) { + e = ex; + } + Assert.assertNotNull(e); + + e = null; + try { + // taskCountMax and taskCountMin couldn't be ignored. + AutoScalerConfig autoScalerError2 = mapper.convertValue(ImmutableMap.of("enableTaskAutoScaler", "true"), AutoScalerConfig.class); + } + catch (RuntimeException ex) { + e = ex; + } + Assert.assertNotNull(e); + + + } + + @Test + public void testAutoScalerCreated() + { + HashMap autoScalerConfig = new HashMap<>(); + autoScalerConfig.put("enableTaskAutoScaler", true); + autoScalerConfig.put("lagCollectionIntervalMillis", 500); + autoScalerConfig.put("lagCollectionRangeMillis", 500); + autoScalerConfig.put("scaleOutThreshold", 5000000); + autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.3); + autoScalerConfig.put("scaleInThreshold", 1000000); + autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8); + autoScalerConfig.put("scaleActionStartDelayMillis", 0); + autoScalerConfig.put("scaleActionPeriodMillis", 100); + autoScalerConfig.put("taskCountMax", 8); + autoScalerConfig.put("taskCountMin", 1); + autoScalerConfig.put("scaleInStep", 1); + autoScalerConfig.put("scaleOutStep", 2); + autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000); + + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); + EasyMock.replay(ingestionSchema); + + EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes(); + EasyMock.replay(seekableStreamSupervisorIOConfig); + + EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes(); + EasyMock.replay(supervisor4); + + TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(ingestionSchema, + null, + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + supervisorStateManagerConfig, + supervisor4, + "id1"); + SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor4); + Assert.assertTrue(autoscaler instanceof LagBasedAutoScaler); + + EasyMock.reset(seekableStreamSupervisorIOConfig); + autoScalerConfig.put("enableTaskAutoScaler", false); + EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes(); + EasyMock.replay(seekableStreamSupervisorIOConfig); + SupervisorTaskAutoScaler autoscaler2 = spec.createAutoscaler(supervisor4); + Assert.assertTrue(autoscaler2 instanceof NoopTaskAutoScaler); + + EasyMock.reset(seekableStreamSupervisorIOConfig); + autoScalerConfig.remove("enableTaskAutoScaler"); + EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes(); + EasyMock.replay(seekableStreamSupervisorIOConfig); + SupervisorTaskAutoScaler autoscaler3 = spec.createAutoscaler(supervisor4); + Assert.assertTrue(autoscaler3 instanceof NoopTaskAutoScaler); + + EasyMock.reset(seekableStreamSupervisorIOConfig); + autoScalerConfig.clear(); + EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class)).anyTimes(); + EasyMock.replay(seekableStreamSupervisorIOConfig); + Assert.assertTrue(autoScalerConfig.isEmpty()); + SupervisorTaskAutoScaler autoscaler4 = spec.createAutoscaler(supervisor4); + Assert.assertTrue(autoscaler4 instanceof NoopTaskAutoScaler); + + } + + @Test + public void testDefaultAutoScalerConfigCreatedWithDefault() + { + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); + EasyMock.replay(ingestionSchema); + + EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoscalerConfig()).andReturn(mapper.convertValue(ImmutableMap.of("lagCollectionIntervalMillis", "1", "enableTaskAutoScaler", true, "taskCountMax", "4", "taskCountMin", "1"), AutoScalerConfig.class)).anyTimes(); + EasyMock.replay(seekableStreamSupervisorIOConfig); + + EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes(); + EasyMock.replay(supervisor4); + + TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(ingestionSchema, + null, + false, + taskStorage, + taskMaster, + indexerMetadataStorageCoordinator, + indexTaskClientFactory, + mapper, + emitter, + monitorSchedulerConfig, + rowIngestionMetersFactory, + supervisorStateManagerConfig, + supervisor4, + "id1"); + SupervisorTaskAutoScaler autoscaler = spec.createAutoscaler(supervisor4); + Assert.assertTrue(autoscaler instanceof LagBasedAutoScaler); + LagBasedAutoScaler lagBasedAutoScaler = (LagBasedAutoScaler) autoscaler; + LagBasedAutoScalerConfig lagBasedAutoScalerConfig = lagBasedAutoScaler.getAutoScalerConfig(); + Assert.assertEquals(lagBasedAutoScalerConfig.getLagCollectionIntervalMillis(), 1); + Assert.assertEquals(lagBasedAutoScalerConfig.getLagCollectionRangeMillis(), 600000); + Assert.assertEquals(lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), 300000); + Assert.assertEquals(lagBasedAutoScalerConfig.getScaleActionPeriodMillis(), 60000); + Assert.assertEquals(lagBasedAutoScalerConfig.getScaleOutThreshold(), 6000000); + Assert.assertEquals(lagBasedAutoScalerConfig.getScaleInThreshold(), 1000000); + Assert.assertEquals(lagBasedAutoScalerConfig.getTaskCountMax(), 4); + Assert.assertEquals(lagBasedAutoScalerConfig.getTaskCountMin(), 1); + Assert.assertEquals(lagBasedAutoScalerConfig.getScaleInStep(), 1); + Assert.assertEquals(lagBasedAutoScalerConfig.getScaleOutStep(), 2); + Assert.assertEquals(lagBasedAutoScalerConfig.getMinTriggerScaleActionFrequencyMillis(), 600000); + } + + @Test + public void testSeekableStreamSupervisorSpecWithScaleOut() throws InterruptedException + { + + EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); + + EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); + EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes(); + EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); + EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.replay(spec); + + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); + EasyMock.replay(ingestionSchema); + + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes(); + EasyMock.replay(taskMaster); + + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3); + + LagStats lagStats = supervisor.computeLagStats(); + + LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor, DATASOURCE, mapper.convertValue(getScaleOutProperties(2), LagBasedAutoScalerConfig.class), spec); + supervisor.start(); + autoScaler.start(); + supervisor.runInternal(); + int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount(); + Assert.assertEquals(1, taskCountBeforeScaleOut); + Thread.sleep(1 * 1000); + int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); + Assert.assertEquals(2, taskCountAfterScaleOut); + + autoScaler.reset(); + autoScaler.stop(); + + } + + @Test + public void testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() throws InterruptedException + { + + EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); + + EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); + EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes(); + EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); + EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.replay(spec); + + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); + EasyMock.replay(ingestionSchema); + + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes(); + EasyMock.replay(taskMaster); + + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(2); + + LagStats lagStats = supervisor.computeLagStats(); + + LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor, DATASOURCE, mapper.convertValue(getScaleOutProperties(3), LagBasedAutoScalerConfig.class), spec); + supervisor.start(); + autoScaler.start(); + supervisor.runInternal(); + int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount(); + Assert.assertEquals(1, taskCountBeforeScaleOut); + Thread.sleep(1 * 1000); + int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); + Assert.assertEquals(2, taskCountAfterScaleOut); + + autoScaler.reset(); + autoScaler.stop(); + + } + + @Test + public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedException + { + + EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); + + EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); + EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2, false)).anyTimes(); + EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); + EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.replay(spec); + + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); + EasyMock.replay(ingestionSchema); + + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes(); + EasyMock.replay(taskMaster); + + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3); + LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(supervisor, DATASOURCE, mapper.convertValue(getScaleInProperties(), LagBasedAutoScalerConfig.class), spec); + + // enable autoscaler so that taskcount config will be ignored and init value of taskCount will use taskCountMin. + Assert.assertEquals(1, (int) supervisor.getIoConfig().getTaskCount()); + supervisor.getIoConfig().setTaskCount(2); + supervisor.start(); + autoScaler.start(); + supervisor.runInternal(); + int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount(); + Assert.assertEquals(2, taskCountBeforeScaleOut); + Thread.sleep(1 * 1000); + int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); + Assert.assertEquals(1, taskCountAfterScaleOut); + + autoScaler.reset(); + autoScaler.stop(); + } + + @Test + public void testSeekableStreamSupervisorSpecWithScaleDisable() throws InterruptedException + { + + SeekableStreamSupervisorIOConfig seekableStreamSupervisorIOConfig = new SeekableStreamSupervisorIOConfig( + "stream", + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false), + 1, + 1, + new Period("PT1H"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, null, null + ) {}; + EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes(); + + EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes(); + EasyMock.expect(spec.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes(); + EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes(); + EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes(); + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.replay(spec); + + EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(this.seekableStreamSupervisorIOConfig).anyTimes(); + EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes(); + EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes(); + EasyMock.replay(ingestionSchema); + + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes(); + EasyMock.replay(taskMaster); + + TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3); + NoopTaskAutoScaler autoScaler = new NoopTaskAutoScaler(); + supervisor.start(); + autoScaler.start(); + supervisor.runInternal(); + int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount(); + Assert.assertEquals(1, taskCountBeforeScaleOut); + Thread.sleep(1 * 1000); + int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount(); + Assert.assertEquals(1, taskCountAfterScaleOut); + + autoScaler.reset(); + autoScaler.stop(); + } + + private static DataSchema getDataSchema() + { + List dimensions = new ArrayList<>(); + dimensions.add(StringDimensionSchema.create("dim1")); + dimensions.add(StringDimensionSchema.create("dim2")); + + return new DataSchema( + DATASOURCE, + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec( + dimensions, + null, + null + ), + new AggregatorFactory[]{new CountAggregatorFactory("rows")}, + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.NONE, + ImmutableList.of() + ), + null + ); + } + + private SeekableStreamSupervisorIOConfig getIOConfig(int taskCount, boolean scaleOut) + { + if (scaleOut) { + return new SeekableStreamSupervisorIOConfig( + "stream", + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false), + 1, + taskCount, + new Period("PT1H"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, mapper.convertValue(getScaleOutProperties(2), AutoScalerConfig.class), null + ) {}; + } else { + return new SeekableStreamSupervisorIOConfig( + "stream", + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false), + 1, + taskCount, + new Period("PT1H"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, mapper.convertValue(getScaleInProperties(), AutoScalerConfig.class), null + ) {}; + } + } + + private static Map getScaleOutProperties(int maxTaskCount) + { + HashMap autoScalerConfig = new HashMap<>(); + autoScalerConfig.put("enableTaskAutoScaler", true); + autoScalerConfig.put("lagCollectionIntervalMillis", 500); + autoScalerConfig.put("lagCollectionRangeMillis", 500); + autoScalerConfig.put("scaleOutThreshold", 0); + autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.0); + autoScalerConfig.put("scaleInThreshold", 1000000); + autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8); + autoScalerConfig.put("scaleActionStartDelayMillis", 0); + autoScalerConfig.put("scaleActionPeriodMillis", 100); + autoScalerConfig.put("taskCountMax", maxTaskCount); + autoScalerConfig.put("taskCountMin", 1); + autoScalerConfig.put("scaleInStep", 1); + autoScalerConfig.put("scaleOutStep", 2); + autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000); + return autoScalerConfig; + } + + private static Map getScaleInProperties() + { + HashMap autoScalerConfig = new HashMap<>(); + autoScalerConfig.put("enableTaskAutoScaler", true); + autoScalerConfig.put("lagCollectionIntervalMillis", 500); + autoScalerConfig.put("lagCollectionRangeMillis", 500); + autoScalerConfig.put("scaleOutThreshold", 8000000); + autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.3); + autoScalerConfig.put("scaleInThreshold", 0); + autoScalerConfig.put("triggerScaleInFractionThreshold", 0.0); + autoScalerConfig.put("scaleActionStartDelayMillis", 0); + autoScalerConfig.put("scaleActionPeriodMillis", 100); + autoScalerConfig.put("taskCountMax", 2); + autoScalerConfig.put("taskCountMin", 1); + autoScalerConfig.put("scaleInStep", 1); + autoScalerConfig.put("scaleOutStep", 2); + autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000); + return autoScalerConfig; + } + +} diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index 6adf4c8eea8..42e8c557705 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -43,6 +43,7 @@ import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager.BasicState; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask; @@ -58,6 +59,7 @@ import org.apache.druid.indexing.seekablestream.common.StreamException; import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamState; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.granularity.Granularities; @@ -82,9 +84,11 @@ import org.junit.Before; import org.junit.Test; import javax.annotation.Nullable; + import java.io.File; import java.math.BigInteger; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -765,6 +769,7 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport new Period("PT30M"), null, null, + null, null ) { @@ -825,12 +830,32 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport false, new Period("PT30M"), null, - null, null + null, OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class), null ) { }; } + private static Map getProperties() + { + HashMap autoScalerConfig = new HashMap<>(); + autoScalerConfig.put("enableTaskAutoScaler", true); + autoScalerConfig.put("lagCollectionIntervalMillis", 500); + autoScalerConfig.put("lagCollectionRangeMillis", 500); + autoScalerConfig.put("scaleOutThreshold", 5000000); + autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.3); + autoScalerConfig.put("scaleInThreshold", 1000000); + autoScalerConfig.put("triggerScaleInFractionThreshold", 0.8); + autoScalerConfig.put("scaleActionStartDelayMillis", 0); + autoScalerConfig.put("scaleActionPeriodMillis", 100); + autoScalerConfig.put("taskCountMax", 8); + autoScalerConfig.put("taskCountMin", 1); + autoScalerConfig.put("scaleInStep", 1); + autoScalerConfig.put("scaleOutStep", 2); + autoScalerConfig.put("minTriggerScaleActionFrequencyMillis", 1200000); + return autoScalerConfig; + } + private static SeekableStreamSupervisorTuningConfig getTuningConfig() { return new SeekableStreamSupervisorTuningConfig() @@ -1177,6 +1202,12 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport { // do nothing } + + @Override + public LagStats computeLagStats() + { + return null; + } } private class TestEmittingTestSeekableStreamSupervisor extends BaseTestSeekableStreamSupervisor @@ -1219,6 +1250,12 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport } } + @Override + public LagStats computeLagStats() + { + return null; + } + @Override protected void scheduleReporting(ScheduledExecutorService reportingExec) { diff --git a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java index 5c2a18edbac..c38b67ae428 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractStreamIndexingTest.java @@ -74,10 +74,14 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest private static final String QUERIES_FILE = "/stream/queries/stream_index_queries.json"; private static final String SUPERVISOR_SPEC_TEMPLATE_FILE = "supervisor_spec_template.json"; + private static final String SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE = "supervisor_with_autoscaler_spec_template.json"; protected static final String DATA_RESOURCE_ROOT = "/stream/data"; protected static final String SUPERVISOR_SPEC_TEMPLATE_PATH = String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_SPEC_TEMPLATE_FILE); + protected static final String SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_PATH = + String.join("/", DATA_RESOURCE_ROOT, SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_FILE); + protected static final String SERIALIZER_SPEC_DIR = "serializer"; protected static final String INPUT_FORMAT_SPEC_DIR = "input_format"; protected static final String INPUT_ROW_PARSER_SPEC_DIR = "parser"; @@ -294,6 +298,70 @@ public abstract class AbstractStreamIndexingTest extends AbstractIndexerTest } } + protected void doTestIndexDataWithAutoscaler(@Nullable Boolean transactionEnabled) throws Exception + { + final GeneratedTestConfig generatedTestConfig = new GeneratedTestConfig( + INPUT_FORMAT, + getResourceAsString(JSON_INPUT_FORMAT_PATH) + ); + try ( + final Closeable closer = createResourceCloser(generatedTestConfig); + final StreamEventWriter streamEventWriter = createStreamEventWriter(config, transactionEnabled) + ) { + final String taskSpec = generatedTestConfig.getStreamIngestionPropsTransform() + .apply(getResourceAsString(SUPERVISOR_WITH_AUTOSCALER_SPEC_TEMPLATE_PATH)); + LOG.info("supervisorSpec: [%s]\n", taskSpec); + // Start supervisor + generatedTestConfig.setSupervisorId(indexer.submitSupervisor(taskSpec)); + LOG.info("Submitted supervisor"); + // Start generating half of the data + int secondsToGenerateRemaining = TOTAL_NUMBER_OF_SECOND; + int secondsToGenerateFirstRound = TOTAL_NUMBER_OF_SECOND / 2; + secondsToGenerateRemaining = secondsToGenerateRemaining - secondsToGenerateFirstRound; + final StreamGenerator streamGenerator = new WikipediaStreamEventStreamGenerator( + new JsonEventSerializer(jsonMapper), + EVENTS_PER_SECOND, + CYCLE_PADDING_MS + ); + long numWritten = streamGenerator.run( + generatedTestConfig.getStreamName(), + streamEventWriter, + secondsToGenerateFirstRound, + FIRST_EVENT_TIME + ); + // Verify supervisor is healthy before suspension + ITRetryUtil.retryUntil( + () -> SupervisorStateManager.BasicState.RUNNING.equals(indexer.getSupervisorStatus(generatedTestConfig.getSupervisorId())), + true, + 10000, + 30, + "Waiting for supervisor to be healthy" + ); + + // wait for autoScaling task numbers from 1 to 2. + ITRetryUtil.retryUntil( + () -> indexer.getRunningTasks().size() == 2, + true, + 10000, + 50, + "waiting for autoScaling task numbers from 1 to 2" + ); + + // Start generating remainning half of the data + numWritten += streamGenerator.run( + generatedTestConfig.getStreamName(), + streamEventWriter, + secondsToGenerateRemaining, + FIRST_EVENT_TIME.plusSeconds(secondsToGenerateFirstRound) + ); + + // Verify that supervisor can catch up with the stream + verifyIngestedData(generatedTestConfig, numWritten); + } + } + + + protected void doTestIndexDataWithStreamReshardSplit(@Nullable Boolean transactionEnabled) throws Exception { // Reshard the stream from STREAM_SHARD_COUNT to STREAM_SHARD_COUNT * 2 diff --git a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java index 2c648ea630b..967ff524b2e 100644 --- a/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java +++ b/integration-tests/src/test/java/org/apache/druid/tests/parallelized/ITKafkaIndexingServiceNonTransactionalParallelizedTest.java @@ -52,6 +52,16 @@ public class ITKafkaIndexingServiceNonTransactionalParallelizedTest extends Abst doTestIndexDataWithStartStopSupervisor(false); } + /** + * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource + * and supervisor maintained and scoped within this test only + */ + @Test + public void testKafkaIndexDataWithWithAutoscaler() throws Exception + { + doTestIndexDataWithAutoscaler(false); + } + /** * This test can be run concurrently with other tests as it creates/modifies/teardowns a unique datasource * and supervisor maintained and scoped within this test only diff --git a/integration-tests/src/test/resources/stream/data/supervisor_with_autoscaler_spec_template.json b/integration-tests/src/test/resources/stream/data/supervisor_with_autoscaler_spec_template.json new file mode 100644 index 00000000000..f2fa82831af --- /dev/null +++ b/integration-tests/src/test/resources/stream/data/supervisor_with_autoscaler_spec_template.json @@ -0,0 +1,73 @@ +{ + "type": "%%STREAM_TYPE%%", + "dataSchema": { + "dataSource": "%%DATASOURCE%%", + "parser": %%PARSER%%, + "timestampSpec": { + "column": "timestamp", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": ["page", "language", "user", "unpatrolled", "newPage", "robot", "anonymous", "namespace", "continent", "country", "region", "city"], + "dimensionExclusions": [], + "spatialDimensions": [] + }, + "metricsSpec": [ + { + "type": "count", + "name": "count" + }, + { + "type": "doubleSum", + "name": "added", + "fieldName": "added" + }, + { + "type": "doubleSum", + "name": "deleted", + "fieldName": "deleted" + }, + { + "type": "doubleSum", + "name": "delta", + "fieldName": "delta" + } + ], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "MINUTE", + "queryGranularity": "NONE" + } + }, + "tuningConfig": { + "type": "%%STREAM_TYPE%%", + "intermediatePersistPeriod": "PT30S", + "maxRowsPerSegment": 5000000, + "maxRowsInMemory": 500000 + }, + "ioConfig": { + "%%TOPIC_KEY%%": "%%TOPIC_VALUE%%", + "%%STREAM_PROPERTIES_KEY%%": %%STREAM_PROPERTIES_VALUE%%, + "autoScalerConfig": { + "enableTaskAutoScaler": true, + "lagCollectionIntervalMillis": 500, + "lagCollectionRangeMillis": 500, + "scaleOutThreshold": 0, + "triggerScaleOutFractionThreshold": 0.0, + "scaleInThreshold": 1000000, + "triggerScaleInFractionThreshold": 0.9, + "scaleActionStartDelayMillis": 0, + "scaleActionPeriodMillis": 100, + "taskCountMax": 2, + "taskCountMin": 1, + "scaleInStep": 1, + "scaleOutStep": 2, + "minTriggerScaleActionFrequencyMillis": 600000 + }, + "taskCount": 1, + "replicas": 1, + "taskDuration": "PT30S", + "%%USE_EARLIEST_KEY%%": true, + "inputFormat" : %%INPUT_FORMAT%% + } +} diff --git a/pom.xml b/pom.xml index 24f699331c8..157aec2acad 100644 --- a/pom.xml +++ b/pom.xml @@ -957,6 +957,11 @@ jna 4.5.1 + + org.apache.commons + commons-collections4 + 4.2 + io.dropwizard.metrics metrics-core diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java index a2aa29f9c25..b19aeaa2881 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/NoopSupervisorSpec.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.annotations.VisibleForTesting; import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import javax.annotation.Nullable; import java.util.ArrayList; @@ -154,6 +155,18 @@ public class NoopSupervisorSpec implements SupervisorSpec { } + + @Override + public LagStats computeLagStats() + { + return new LagStats(0, 0, 0); + } + + @Override + public int getActiveTaskGroupsCount() + { + return -1; + } }; } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index 83c661a12ed..66d11399bd2 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -21,6 +21,7 @@ package org.apache.druid.indexing.overlord.supervisor; import com.google.common.collect.ImmutableMap; import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import javax.annotation.Nullable; import java.util.Map; @@ -64,4 +65,12 @@ public interface Supervisor * @param checkpointMetadata metadata for the sequence to currently checkpoint */ void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata); + + /** + * Computes maxLag, totalLag and avgLag + * Only supports Kafka ingestion so far. + */ + LagStats computeLagStats(); + + int getActiveTaskGroupsCount(); } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java index 041156ce425..9b44cd08dd1 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorSpec.java @@ -21,6 +21,7 @@ package org.apache.druid.indexing.overlord.supervisor; import com.fasterxml.jackson.annotation.JsonSubTypes; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import java.util.List; @@ -40,6 +41,11 @@ public interface SupervisorSpec */ Supervisor createSupervisor(); + default SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor) + { + return null; + } + List getDataSources(); default SupervisorSpec createSuspendedSpec() diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java new file mode 100644 index 00000000000..7b6e5fd0bab --- /dev/null +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.supervisor.autoscaler; + +public class LagStats +{ + private final long maxLag; + private final long totalLag; + private final long avgLag; + + public LagStats(long maxLag, long totalLag, long avgLag) + { + this.maxLag = maxLag; + this.totalLag = totalLag; + this.avgLag = avgLag; + } + + public long getMaxLag() + { + return maxLag; + } + + public long getTotalLag() + { + return totalLag; + } + + public long getAvgLag() + { + return avgLag; + } +} diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java new file mode 100644 index 00000000000..c921e2740b8 --- /dev/null +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/SupervisorTaskAutoScaler.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.supervisor.autoscaler; + +public interface SupervisorTaskAutoScaler +{ + void start(); + void stop(); + void reset(); +} diff --git a/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java new file mode 100644 index 00000000000..fd5fac09e51 --- /dev/null +++ b/server/src/test/java/org/apache/druid/indexing/NoopSupervisorSpecTest.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing; + +import org.apache.druid.indexing.overlord.supervisor.NoopSupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.Supervisor; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.concurrent.Callable; + +public class NoopSupervisorSpecTest +{ + @Test + public void testNoopSupervisorSpecWithAutoscaler() + { + Exception e = null; + try { + NoopSupervisorSpec noopSupervisorSpec = new NoopSupervisorSpec(null, Collections.singletonList("datasource1")); + Supervisor supervisor = noopSupervisorSpec.createSupervisor(); + SupervisorTaskAutoScaler autoscaler = noopSupervisorSpec.createAutoscaler(supervisor); + Assert.assertNull(autoscaler); + Callable noop = new Callable() { + @Override + public Integer call() + { + return -1; + } + }; + + int count = supervisor.getActiveTaskGroupsCount(); + Assert.assertEquals(count, -1); + + LagStats lagStats = supervisor.computeLagStats(); + long totalLag = lagStats.getTotalLag(); + long avgLag = lagStats.getAvgLag(); + long maxLag = lagStats.getMaxLag(); + Assert.assertEquals(totalLag, 0); + Assert.assertEquals(avgLag, 0); + Assert.assertEquals(maxLag, 0); + } + catch (Exception ex) { + e = ex; + } + Assert.assertNull(e); + } +} diff --git a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java index 93322209c8a..5c40757f61a 100644 --- a/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java +++ b/server/src/test/java/org/apache/druid/metadata/SQLMetadataSupervisorManagerTest.java @@ -25,6 +25,7 @@ import com.google.common.collect.ImmutableMap; import org.apache.druid.indexing.overlord.supervisor.Supervisor; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; import org.apache.druid.indexing.overlord.supervisor.VersionedSupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.StringUtils; import org.junit.After; @@ -185,6 +186,12 @@ public class SQLMetadataSupervisorManagerTest throw new UnsupportedOperationException(); } + @Override + public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor) + { + return null; + } + @Override public List getDataSources() { diff --git a/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java b/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java index e935a413f04..ffacfa26b8b 100644 --- a/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java +++ b/server/src/test/java/org/apache/druid/metadata/TestSupervisorSpec.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.indexing.overlord.supervisor.Supervisor; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import java.util.List; import java.util.Objects; @@ -52,6 +53,12 @@ public class TestSupervisorSpec implements SupervisorSpec return null; } + @Override + public SupervisorTaskAutoScaler createAutoscaler(Supervisor supervisor) + { + return null; + } + @Override public List getDataSources() {