Consider max lag for kinesis while autoscaling (#16284)

* Consider max lag for kinesis while autoscaling

* add test for coverage

* test folder
This commit is contained in:
Adithya Chakilam 2024-04-17 04:35:05 -05:00 committed by GitHub
parent ccc1ffb032
commit 34237bc112
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 100 additions and 2 deletions

View File

@ -39,6 +39,7 @@ import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
import org.apache.druid.indexing.overlord.TaskMaster;
import org.apache.druid.indexing.overlord.TaskStorage;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.indexing.seekablestream.SeekableStreamDataSourceMetadata;
import org.apache.druid.indexing.seekablestream.SeekableStreamEndSequenceNumbers;
@ -427,6 +428,12 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
);
}
@Override
public LagMetric getLagMetricForAutoScaler()
{
return LagMetric.MAX;
}
private SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithClosedOrExpiredPartitions(
SeekableStreamDataSourceMetadata<String, String> currentMetadata,
Set<String> terminatedPartitionIds,

View File

@ -159,8 +159,8 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
if (lagStats == null) {
lagMetricsQueue.offer(0L);
} else {
long totalLags = lagStats.getTotalLag();
lagMetricsQueue.offer(totalLags > 0 ? totalLags : 0L);
long lag = lagStats.get(supervisor.getLagMetricForAutoScaler());
lagMetricsQueue.offer(lag > 0 ? lag : 0L);
}
log.debug("Current lags for dataSource[%s] are [%s].", dataSource, lagMetricsQueue);
} else {

View File

@ -23,6 +23,7 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.apache.druid.segment.incremental.ParseExceptionReport;
@ -93,6 +94,14 @@ public interface Supervisor
*/
LagStats computeLagStats();
/**
* Used by AutoScaler to either scale by max/total/avg.
*/
default LagMetric getLagMetricForAutoScaler()
{
return LagMetric.TOTAL;
}
int getActiveTaskGroupsCount();
/**

View File

@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.supervisor.autoscaler;
public enum LagMetric
{
TOTAL,
MAX,
AVERAGE;
}

View File

@ -46,4 +46,17 @@ public class LagStats
{
return avgLag;
}
public long get(LagMetric metric)
{
switch (metric) {
case AVERAGE:
return avgLag;
case TOTAL:
return totalLag;
case MAX:
return maxLag;
}
throw new IllegalStateException("Unknown Metric");
}
}

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.overlord.supervisor;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagMetric;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
import org.junit.Assert;
import org.junit.Test;
public class LagStatsTest
{
@Test
public void lagStatsByMetric()
{
int max = 1;
int avg = 2;
int total = 3;
LagStats lag = new LagStats(max, total, avg);
Assert.assertEquals(max, lag.get(LagMetric.MAX));
Assert.assertEquals(total, lag.get(LagMetric.TOTAL));
Assert.assertEquals(avg, lag.get(LagMetric.AVERAGE));
}
}