diff --git a/docs/development/extensions-core/kafka-ingestion.md b/docs/development/extensions-core/kafka-ingestion.md index 6fa2262286d..dcebce0e3fb 100644 --- a/docs/development/extensions-core/kafka-ingestion.md +++ b/docs/development/extensions-core/kafka-ingestion.md @@ -147,7 +147,7 @@ Where the file `supervisor-spec.json` contains a Kafka supervisor spec: |`lateMessageRejectionStartDateTime`|ISO8601 DateTime|Configure tasks to reject messages with timestamps earlier than this date time; for example if this is set to `2016-01-01T11:00Z` and the supervisor creates a task at *2016-01-01T12:00Z*, Druid drops messages with timestamps earlier than *2016-01-01T11:00Z*. This can prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline).|no (default == none)| |`lateMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps earlier than this period before the task was created; for example if this is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps earlier than *2016-01-01T11:00Z* will be dropped. This may help prevent concurrency issues if your data stream has late messages and you have multiple pipelines that need to operate on the same segments (e.g. a realtime and a nightly batch ingestion pipeline). Please note that only one of `lateMessageRejectionPeriod` or `lateMessageRejectionStartDateTime` can be specified.|no (default == none)| |`earlyMessageRejectionPeriod`|ISO8601 Period|Configure tasks to reject messages with timestamps later than this period after the task reached its taskDuration; for example if this is set to `PT1H`, the taskDuration is set to `PT1H` and the supervisor creates a task at *2016-01-01T12:00Z*, messages with timestamps later than *2016-01-01T14:00Z* will be dropped. **Note:** Tasks sometimes run past their task duration, for example, in cases of supervisor failover. Setting earlyMessageRejectionPeriod too low may cause messages to be dropped unexpectedly whenever a task runs past its originally configured task duration.|no (default == none)| -|`autoScalerConfig`|Object|`autoScalerConfig` to specify how to auto scale the number of Kafka ingest tasks. ONLY supported for Kafka indexing as of now. See [Tasks Autoscaler Properties](#Task Autoscaler Properties) for details.|no (default == null)| +|`autoScalerConfig`|Object|Defines auto scaling behavior for Kafka ingest tasks. See [Tasks Autoscaler Properties](#Task Autoscaler Properties).|no (default == null)| ### Task Autoscaler Properties diff --git a/docs/development/extensions-core/kinesis-ingestion.md b/docs/development/extensions-core/kinesis-ingestion.md index fcc4f647fc1..8066d6b96e4 100644 --- a/docs/development/extensions-core/kinesis-ingestion.md +++ b/docs/development/extensions-core/kinesis-ingestion.md @@ -146,6 +146,116 @@ Where the file `supervisor-spec.json` contains a Kinesis supervisor spec: |`awsAssumedRoleArn`|String|The AWS assumed role to use for additional permissions.|no| |`awsExternalId`|String|The AWS external id to use for additional permissions.|no| |`deaggregate`|Boolean|Whether to use the de-aggregate function of the KCL. See below for details.|no| +|`autoScalerConfig`|Object|Defines auto scaling behavior for Kinesis ingest tasks. See [Tasks Autoscaler Properties](#Task Autoscaler Properties).|no (default == null)| + +### Task Autoscaler Properties + +> Note that Task AutoScaler is currently designated as experimental. + +| Property | Description | Required | +| ------------- | ------------- | ------------- | +| `enableTaskAutoScaler` | Enable or disable the auto scaler. When false or or absent Druid disables the `autoScaler` even when `autoScalerConfig` is not null| no (default == false) | +| `taskCountMax` | Maximum number of Kinesis ingestion tasks. Must be greater than or equal to `taskCountMin`. If greater than `{numKinesisShards}`, the maximum number of reading tasks is `{numKinesisShards}` and `taskCountMax` is ignored. | yes | +| `taskCountMin` | Minimum number of Kinesis ingestion tasks. When you enable the auto scaler, Druid ignores the value of taskCount in `IOConfig` and uses`taskCountMin` for the initial number of tasks to launch.| yes | +| `minTriggerScaleActionFrequencyMillis` | Minimum time interval between two scale actions | no (default == 600000) | +| `autoScalerStrategy` | The algorithm of `autoScaler`. ONLY `lagBased` is supported for now. See [Lag Based AutoScaler Strategy Related Properties](#Lag Based AutoScaler Strategy Related Properties) for details.| no (default == `lagBased`) | + +### Lag Based AutoScaler Strategy Related Properties + +The Kinesis indexing service reports lag metrics measured in time milliseconds rather than message count which is used by Kafka. + +| Property | Description | Required | +| ------------- | ------------- | ------------- | +| `lagCollectionIntervalMillis` | Period of lag points collection. | no (default == 30000) | +| `lagCollectionRangeMillis` | The total time window of lag collection, Use with `lagCollectionIntervalMillis`,it means that in the recent `lagCollectionRangeMillis`, collect lag metric points every `lagCollectionIntervalMillis`. | no (default == 600000) | +| `scaleOutThreshold` | The Threshold of scale out action | no (default == 6000000) | +| `triggerScaleOutFractionThreshold` | If `triggerScaleOutFractionThreshold` percent of lag points are higher than `scaleOutThreshold`, then do scale out action. | no (default == 0.3) | +| `scaleInThreshold` | The Threshold of scale in action | no (default == 1000000) | +| `triggerScaleInFractionThreshold` | If `triggerScaleInFractionThreshold` percent of lag points are lower than `scaleOutThreshold`, then do scale in action. | no (default == 0.9) | +| `scaleActionStartDelayMillis` | Number of milliseconds to delay after the supervisor starts before the first scale logic check. | no (default == 300000) | +| `scaleActionPeriodMillis` | Frequency in milliseconds to check if a scale action is triggered | no (default == 60000) | +| `scaleInStep` | Number of tasks to reduce at a time when scaling down | no (default == 1) | +| `scaleOutStep` | Number of tasks to add at a time when scaling out | no (default == 2) | + +The following example demonstrates a supervisor spec with `lagBased` autoScaler enabled: +```json +{ + "type": "kinesis", + "dataSchema": { + "dataSource": "metrics-kinesis", + "timestampSpec": { + "column": "timestamp", + "format": "auto" + }, + "dimensionsSpec": { + "dimensions": [], + "dimensionExclusions": [ + "timestamp", + "value" + ] + }, + "metricsSpec": [ + { + "name": "count", + "type": "count" + }, + { + "name": "value_sum", + "fieldName": "value", + "type": "doubleSum" + }, + { + "name": "value_min", + "fieldName": "value", + "type": "doubleMin" + }, + { + "name": "value_max", + "fieldName": "value", + "type": "doubleMax" + } + ], + "granularitySpec": { + "type": "uniform", + "segmentGranularity": "HOUR", + "queryGranularity": "NONE" + } + }, + "ioConfig": { + "stream": "metrics", + "autoScalerConfig": { + "enableTaskAutoScaler": true, + "taskCountMax": 6, + "taskCountMin": 2, + "minTriggerScaleActionFrequencyMillis": 600000, + "autoScalerStrategy": "lagBased", + "lagCollectionIntervalMillis": 30000, + "lagCollectionRangeMillis": 600000, + "scaleOutThreshold": 600000, + "triggerScaleOutFractionThreshold": 0.3, + "scaleInThreshold": 100000, + "triggerScaleInFractionThreshold": 0.9, + "scaleActionStartDelayMillis": 300000, + "scaleActionPeriodMillis": 60000, + "scaleInStep": 1, + "scaleOutStep": 2 + }, + "inputFormat": { + "type": "json" + }, + "endpoint": "kinesis.us-east-1.amazonaws.com", + "taskCount": 1, + "replicas": 1, + "taskDuration": "PT1H", + "recordsPerFetch": 2000, + "fetchDelayMillis": 1000 + }, + "tuningConfig": { + "type": "kinesis", + "maxRowsPerSegment": 5000000 + } +} +``` #### Specifying data format diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index 92defd23150..e33f458a833 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -379,11 +379,19 @@ public class KinesisSupervisor extends SeekableStreamSupervisor partitionTimeLags = getPartitionTimeLag(); + + if (partitionTimeLags == null) { + return new LagStats(0, 0, 0); + } + + return computeLags(partitionTimeLags); } @Override diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java index b43cece0e52..41ae8767b2e 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorIOConfig.java @@ -93,12 +93,6 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig lateMessageRejectionStartDateTime ); - // for now dynamic Allocation Tasks is not supported here - // throw UnsupportedOperationException in case someone sets this on a kinesis supervisor spec. - if (autoScalerConfig != null) { - throw new UnsupportedOperationException("Tasks auto scaler for kinesis is not supported yet. Please remove autoScalerConfig or set it to null!"); - } - this.endpoint = endpoint != null ? endpoint : (region != null ? region.getEndpoint() : KinesisRegion.US_EAST_1.getEndpoint()); @@ -157,6 +151,7 @@ public class KinesisSupervisorIOConfig extends SeekableStreamSupervisorIOConfig ", endpoint='" + endpoint + '\'' + ", replicas=" + getReplicas() + ", taskCount=" + getTaskCount() + + ", autoScalerConfig=" + getAutoscalerConfig() + ", taskDuration=" + getTaskDuration() + ", startDelay=" + getStartDelay() + ", period=" + getPeriod() + diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 2a806c8d333..d1ba49b366c 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -58,6 +58,7 @@ import org.apache.druid.indexing.overlord.TaskStorage; import org.apache.druid.indexing.overlord.supervisor.SupervisorReport; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager; import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig; @@ -67,6 +68,7 @@ import org.apache.druid.indexing.seekablestream.common.StreamPartition; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager; import org.apache.druid.indexing.seekablestream.supervisor.TaskReportData; import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig; +import org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; @@ -283,6 +285,158 @@ public class KinesisSupervisorTest extends EasyMockSupport ); } + @Test + public void testNoInitialStateWithAutoScaleOut() throws Exception + { + HashMap autoScalerConfigMap = new HashMap<>(); + autoScalerConfigMap.put("enableTaskAutoScaler", true); + autoScalerConfigMap.put("lagCollectionIntervalMillis", 500); + autoScalerConfigMap.put("lagCollectionRangeMillis", 500); + autoScalerConfigMap.put("scaleOutThreshold", 0); + autoScalerConfigMap.put("triggerScaleOutFractionThreshold", 0.0); + autoScalerConfigMap.put("scaleInThreshold", 1000000); + autoScalerConfigMap.put("triggerScaleInFractionThreshold", 0.8); + autoScalerConfigMap.put("scaleActionStartDelayMillis", 0); + autoScalerConfigMap.put("scaleActionPeriodMillis", 100); + autoScalerConfigMap.put("taskCountMax", 2); + autoScalerConfigMap.put("taskCountMin", 1); + autoScalerConfigMap.put("scaleInStep", 1); + autoScalerConfigMap.put("scaleOutStep", 2); + autoScalerConfigMap.put("minTriggerScaleActionFrequencyMillis", 1200000); + + AutoScalerConfig autoScalerConfig = OBJECT_MAPPER.convertValue(autoScalerConfigMap, AutoScalerConfig.class); + supervisor = getTestableSupervisor( + 1, + 1, + true, + "PT1H", + null, + null, + false, + null, + null, + autoScalerConfig + ); + KinesisSupervisorSpec kinesisSupervisorSpec = supervisor.getKinesisSupervisorSpec(); + SupervisorTaskAutoScaler autoscaler = kinesisSupervisorSpec.createAutoscaler(supervisor); + + supervisorRecordSupplier.assign(EasyMock.anyObject()); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) + .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) + .anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getAssignment()) + .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) + .anyTimes(); + supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); + supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); + EasyMock.expectLastCall().anyTimes(); + + Capture captured = Capture.newInstance(); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.of(taskRunner)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)).andReturn( + new KinesisDataSourceMetadata( + null + ) + ).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true); + taskRunner.registerListener(EasyMock.anyObject(TaskRunnerListener.class), EasyMock.anyObject(Executor.class)); + replayAll(); + + supervisor.start(); + int taskCountBeforeScale = supervisor.getIoConfig().getTaskCount(); + Assert.assertEquals(1, taskCountBeforeScale); + autoscaler.start(); + + supervisor.runInternal(); + verifyAll(); + Thread.sleep(1 * 1000); + int taskCountAfterScale = supervisor.getIoConfig().getTaskCount(); + Assert.assertEquals(2, taskCountAfterScale); + } + + @Test + public void testNoInitialStateWithAutoScaleIn() throws Exception + { + HashMap autoScalerConfigMap = new HashMap<>(); + autoScalerConfigMap.put("enableTaskAutoScaler", true); + autoScalerConfigMap.put("lagCollectionIntervalMillis", 500); + autoScalerConfigMap.put("lagCollectionRangeMillis", 500); + autoScalerConfigMap.put("scaleOutThreshold", 1000000); + autoScalerConfigMap.put("triggerScaleOutFractionThreshold", 0.8); + autoScalerConfigMap.put("scaleInThreshold", 0); + autoScalerConfigMap.put("triggerScaleInFractionThreshold", 0.0); + autoScalerConfigMap.put("scaleActionStartDelayMillis", 0); + autoScalerConfigMap.put("scaleActionPeriodMillis", 100); + autoScalerConfigMap.put("taskCountMax", 2); + autoScalerConfigMap.put("taskCountMin", 1); + autoScalerConfigMap.put("scaleInStep", 1); + autoScalerConfigMap.put("scaleOutStep", 2); + autoScalerConfigMap.put("minTriggerScaleActionFrequencyMillis", 1200000); + + AutoScalerConfig autoScalerConfig = OBJECT_MAPPER.convertValue(autoScalerConfigMap, AutoScalerConfig.class); + supervisor = getTestableSupervisor( + 1, + 2, + true, + "PT1H", + null, + null, + false, + null, + null, + autoScalerConfig + ); + + KinesisSupervisorSpec kinesisSupervisorSpec = supervisor.getKinesisSupervisorSpec(); + SupervisorTaskAutoScaler autoscaler = kinesisSupervisorSpec.createAutoscaler(supervisor); + + supervisorRecordSupplier.assign(EasyMock.anyObject()); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getPartitionIds(STREAM)) + .andReturn(ImmutableSet.of(SHARD_ID1, SHARD_ID0)) + .anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getAssignment()) + .andReturn(ImmutableSet.of(SHARD1_PARTITION, SHARD0_PARTITION)) + .anyTimes(); + supervisorRecordSupplier.seekToLatest(EasyMock.anyObject()); + EasyMock.expectLastCall().anyTimes(); + EasyMock.expect(supervisorRecordSupplier.getEarliestSequenceNumber(EasyMock.anyObject())).andReturn("0").anyTimes(); + supervisorRecordSupplier.seek(EasyMock.anyObject(), EasyMock.anyString()); + EasyMock.expectLastCall().anyTimes(); + + Capture captured = Capture.newInstance(CaptureType.ALL); + EasyMock.expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + EasyMock + .expect(indexerMetadataStorageCoordinator.retrieveDataSourceMetadata(DATASOURCE)) + .andReturn(new KinesisDataSourceMetadata(null)) + .anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.capture(captured))).andReturn(true).times(2); + replayAll(); + + int taskCountInit = supervisor.getIoConfig().getTaskCount(); + // when enable autoScaler the init taskCount will be equal to taskCountMin + Assert.assertEquals(1, taskCountInit); + supervisor.getIoConfig().setTaskCount(2); + + supervisor.start(); + int taskCountBeforeScale = supervisor.getIoConfig().getTaskCount(); + Assert.assertEquals(2, taskCountBeforeScale); + autoscaler.start(); + + supervisor.runInternal(); + verifyAll(); + Thread.sleep(1 * 1000); + int taskCountAfterScale = supervisor.getIoConfig().getTaskCount(); + Assert.assertEquals(1, taskCountAfterScale); + } + @Test public void testRecordSupplier() { @@ -347,69 +501,64 @@ public class KinesisSupervisorTest extends EasyMockSupport } @Test - public void testKinesisIOConfig() + public void testKinesisIOConfigInitAndAutoscalerConfigCreation() { - Exception e = null; - try { - KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig( - STREAM, - INPUT_FORMAT, - "awsEndpoint", - null, - 1, - 1, - new Period("PT30M"), - new Period("P1D"), - new Period("PT30S"), - false, - new Period("PT30M"), - null, - null, - null, - 100, - 1000, - null, - null, - null, - false - ); - AutoScalerConfig autoScalerConfig = kinesisSupervisorIOConfig.getAutoscalerConfig(); - Assert.assertNull(autoScalerConfig); - } - catch (Exception ex) { - e = ex; - } - Assert.assertNull(e); + // create KinesisSupervisorIOConfig with autoScalerConfig null + KinesisSupervisorIOConfig kinesisSupervisorIOConfigWithNullAutoScalerConfig = new KinesisSupervisorIOConfig( + STREAM, + INPUT_FORMAT, + "awsEndpoint", + null, + 1, + 1, + new Period("PT30M"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + null, + 100, + 1000, + null, + null, + null, + false + ); - try { - KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig( - STREAM, - INPUT_FORMAT, - "awsEndpoint", - null, - 1, - 1, - new Period("PT30M"), - new Period("P1D"), - new Period("PT30S"), - false, - new Period("PT30M"), - null, - null, - null, - 100, - 1000, - null, - null, - OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class), - false - ); - } - catch (Exception ex) { - e = ex; - } - Assert.assertNotNull(e); - Assert.assertTrue(e instanceof UnsupportedOperationException); + AutoScalerConfig autoscalerConfigNull = kinesisSupervisorIOConfigWithNullAutoScalerConfig.getAutoscalerConfig(); + Assert.assertNull(autoscalerConfigNull); + + // create KinesisSupervisorIOConfig with autoScalerConfig Empty + KinesisSupervisorIOConfig kinesisSupervisorIOConfigWithEmptyAutoScalerConfig = new KinesisSupervisorIOConfig( + STREAM, + INPUT_FORMAT, + "awsEndpoint", + null, + 1, + 1, + new Period("PT30M"), + new Period("P1D"), + new Period("PT30S"), + false, + new Period("PT30M"), + null, + null, + null, + 100, + 1000, + null, + null, + OBJECT_MAPPER.convertValue(new HashMap<>(), AutoScalerConfig.class), + false + ); + + AutoScalerConfig autoscalerConfig = kinesisSupervisorIOConfigWithEmptyAutoScalerConfig.getAutoscalerConfig(); + Assert.assertNotNull(autoscalerConfig); + Assert.assertTrue(autoscalerConfig instanceof LagBasedAutoScalerConfig); + Assert.assertFalse(autoscalerConfig.getEnableTaskAutoScaler()); + Assert.assertTrue(autoscalerConfig.toString().contains("autoScalerConfig")); } @Test @@ -4895,6 +5044,7 @@ public class KinesisSupervisorTest extends EasyMockSupport earlyMessageRejectionPeriod, false, null, + null, null ); } @@ -4908,7 +5058,8 @@ public class KinesisSupervisorTest extends EasyMockSupport Period earlyMessageRejectionPeriod, boolean suspended, Integer recordsPerFetch, - Integer fetchDelayMillis + Integer fetchDelayMillis, + AutoScalerConfig autoScalerConfig ) { KinesisSupervisorIOConfig kinesisSupervisorIOConfig = new KinesisSupervisorIOConfig( @@ -4930,7 +5081,7 @@ public class KinesisSupervisorTest extends EasyMockSupport fetchDelayMillis, null, null, - null, + autoScalerConfig, false ); @@ -5303,6 +5454,8 @@ public class KinesisSupervisorTest extends EasyMockSupport private class TestableKinesisSupervisor extends KinesisSupervisor { + private KinesisSupervisorSpec spec; + TestableKinesisSupervisor( TaskStorage taskStorage, TaskMaster taskMaster, @@ -5323,6 +5476,12 @@ public class KinesisSupervisorTest extends EasyMockSupport rowIngestionMetersFactory, null ); + this.spec = spec; + } + + protected KinesisSupervisorSpec getKinesisSupervisorSpec() + { + return spec; } @Override diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index 66d11399bd2..2074900e78a 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -68,7 +68,6 @@ public interface Supervisor /** * Computes maxLag, totalLag and avgLag - * Only supports Kafka ingestion so far. */ LagStats computeLagStats();