diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index a142f414762..e1a7656f23c 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -39,6 +39,7 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskStorage; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; @@ -427,6 +428,12 @@ public class KinesisSupervisor extends SeekableStreamSupervisor createDataSourceMetadataWithClosedOrExpiredPartitions( SeekableStreamDataSourceMetadata currentMetadata, Set terminatedPartitionIds, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index 8235f53e33f..7813725733b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -159,8 +159,8 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler if (lagStats == null) { lagMetricsQueue.offer(0L); } else { - long totalLags = lagStats.getTotalLag(); - lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L); + long lag = lagStats.get(supervisor.getLagMetricForAutoScaler()); + lagMetricsQueue.offer(lag > 0 ? lag : 0L); } log.debug("Current lags for dataSource[%s] are [%s].", dataSource, lagMetricsQueue); } else { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index 9d940bc55b6..8befa2adae3 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.error.DruidException; import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.segment.incremental.ParseExceptionReport; @@ -93,6 +94,14 @@ public interface Supervisor */ LagStats computeLagStats(); + /** + * Used by AutoScaler to either scale by max/total/avg. + */ + default LagMetric getLagMetricForAutoScaler() + { + return LagMetric.TOTAL; + } + int getActiveTaskGroupsCount(); /** diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java new file mode 100644 index 00000000000..d3f00b5c2c8 --- /dev/null +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.supervisor.autoscaler; + +public enum LagMetric +{ + TOTAL, + MAX, + AVERAGE; +} diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java index 7b6e5fd0bab..c7a6dfc6132 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java @@ -46,4 +46,17 @@ public class LagStats { return avgLag; } + + public long get(LagMetric metric) + { + switch (metric) { + case AVERAGE: + return avgLag; + case TOTAL: + return totalLag; + case MAX: + return maxLag; + } + throw new IllegalStateException("Unknown Metric"); + } } diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java new file mode 100644 index 00000000000..b51882538b9 --- /dev/null +++ b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.supervisor; + +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; +import org.junit.Assert; +import org.junit.Test; + +public class LagStatsTest +{ + + @Test + public void lagStatsByMetric() + { + int max = 1; + int avg = 2; + int total = 3; + LagStats lag = new LagStats(max, total, avg); + + Assert.assertEquals(max, lag.get(LagMetric.MAX)); + Assert.assertEquals(total, lag.get(LagMetric.TOTAL)); + Assert.assertEquals(avg, lag.get(LagMetric.AVERAGE)); + } +}