From 34237bc11272513a969c74ece39ce4bb0d28635c Mon Sep 17 00:00:00 2001 From: Adithya Chakilam <35785271+adithyachakilam@users.noreply.github.com> Date: Wed, 17 Apr 2024 04:35:05 -0500 Subject: [PATCH] Consider max lag for kinesis while autoscaling (#16284) * Consider max lag for kinesis while autoscaling * add test for coverage * test folder --- .../kinesis/supervisor/KinesisSupervisor.java | 7 ++++ .../autoscaler/LagBasedAutoScaler.java | 4 +- .../overlord/supervisor/Supervisor.java | 9 ++++ .../supervisor/autoscaler/LagMetric.java | 27 ++++++++++++ .../supervisor/autoscaler/LagStats.java | 13 ++++++ .../overlord/supervisor/LagStatsTest.java | 42 +++++++++++++++++++ 6 files changed, 100 insertions(+), 2 deletions(-) create mode 100644 server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java create mode 100644 server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index a142f414762..e1a7656f23c 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -39,6 +39,7 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskStorage; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata; import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers; @@ -427,6 +428,12 @@ public class KinesisSupervisor extends SeekableStreamSupervisor createDataSourceMetadataWithClosedOrExpiredPartitions( SeekableStreamDataSourceMetadata currentMetadata, Set terminatedPartitionIds, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index 8235f53e33f..7813725733b 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -159,8 +159,8 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler if (lagStats == null) { lagMetricsQueue.offer(0L); } else { - long totalLags = lagStats.getTotalLag(); - lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L); + long lag = lagStats.get(supervisor.getLagMetricForAutoScaler()); + lagMetricsQueue.offer(lag > 0 ? lag : 0L); } log.debug("Current lags for dataSource[%s] are [%s].", dataSource, lagMetricsQueue); } else { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index 9d940bc55b6..8befa2adae3 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import org.apache.druid.error.DruidException; import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.segment.incremental.ParseExceptionReport; @@ -93,6 +94,14 @@ public interface Supervisor */ LagStats computeLagStats(); + /** + * Used by AutoScaler to either scale by max/total/avg. + */ + default LagMetric getLagMetricForAutoScaler() + { + return LagMetric.TOTAL; + } + int getActiveTaskGroupsCount(); /** diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java new file mode 100644 index 00000000000..d3f00b5c2c8 --- /dev/null +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagMetric.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.supervisor.autoscaler; + +public enum LagMetric +{ + TOTAL, + MAX, + AVERAGE; +} diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java index 7b6e5fd0bab..c7a6dfc6132 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java @@ -46,4 +46,17 @@ public class LagStats { return avgLag; } + + public long get(LagMetric metric) + { + switch (metric) { + case AVERAGE: + return avgLag; + case TOTAL: + return totalLag; + case MAX: + return maxLag; + } + throw new IllegalStateException("Unknown Metric"); + } } diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java new file mode 100644 index 00000000000..b51882538b9 --- /dev/null +++ b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.overlord.supervisor; + +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; +import org.junit.Assert; +import org.junit.Test; + +public class LagStatsTest +{ + + @Test + public void lagStatsByMetric() + { + int max = 1; + int avg = 2; + int total = 3; + LagStats lag = new LagStats(max, total, avg); + + Assert.assertEquals(max, lag.get(LagMetric.MAX)); + Assert.assertEquals(total, lag.get(LagMetric.TOTAL)); + Assert.assertEquals(avg, lag.get(LagMetric.AVERAGE)); + } +}