mirror of https://github.com/apache/druid.git
Add config lagAggregate to LagBasedAutoScalerConfig (#16334)
Changes: - Add new config `lagAggregate` to `LagBasedAutoScalerConfig` - Add field `aggregateForScaling` to `LagStats` - Use the new field/config to determine which aggregate to use to compute lag - Remove method `Supervisor.computeLagForAutoScaler()`
This commit is contained in:
parent
89ec0da5c5
commit
f8015eb02a
|
@ -96,6 +96,7 @@ The following table outlines the configuration properties related to the `lagBas
|
|||
|`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale action is triggered.|No|60000|
|
||||
|`scaleInStep`|The number of tasks to reduce at once when scaling down.|No|1|
|
||||
|`scaleOutStep`|The number of tasks to add at once when scaling out.|No|2|
|
||||
|`lagAggregate`|The aggregate function used to compute the lag metric for scaling decisions. Possible values are `MAX`, `SUM` and `AVERAGE`. |No|`SUM`|
|
||||
|
||||
The following example shows a supervisor spec with `lagBased` autoscaler:
|
||||
|
||||
|
|
|
@ -427,13 +427,6 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String,
|
|||
);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long computeLagForAutoScaler()
|
||||
{
|
||||
LagStats lagStats = computeLagStats();
|
||||
return lagStats == null ? 0L : lagStats.getMaxLag();
|
||||
}
|
||||
|
||||
private SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithClosedOrExpiredPartitions(
|
||||
SeekableStreamDataSourceMetadata<String, String> currentMetadata,
|
||||
Set<String> terminatedPartitionIds,
|
||||
|
|
|
@ -21,6 +21,8 @@ package org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
|
|||
|
||||
import org.apache.commons.collections4.queue.CircularFifoQueue;
|
||||
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
|
||||
import org.apache.druid.indexing.overlord.supervisor.autoscaler.AggregateFunction;
|
||||
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
|
||||
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
|
||||
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
|
||||
import org.apache.druid.java.util.common.StringUtils;
|
||||
|
@ -154,8 +156,17 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
|
|||
LOCK.lock();
|
||||
try {
|
||||
if (!spec.isSuspended()) {
|
||||
long lag = supervisor.computeLagForAutoScaler();
|
||||
lagMetricsQueue.offer(lag > 0 ? lag : 0L);
|
||||
LagStats lagStats = supervisor.computeLagStats();
|
||||
|
||||
if (lagStats != null) {
|
||||
AggregateFunction aggregate = lagBasedAutoScalerConfig.getLagAggregate() == null ?
|
||||
lagStats.getAggregateForScaling() :
|
||||
lagBasedAutoScalerConfig.getLagAggregate();
|
||||
long lag = lagStats.getMetric(aggregate);
|
||||
lagMetricsQueue.offer(lag > 0 ? lag : 0L);
|
||||
} else {
|
||||
lagMetricsQueue.offer(0L);
|
||||
}
|
||||
log.debug("Current lags for dataSource[%s] are [%s].", dataSource, lagMetricsQueue);
|
||||
} else {
|
||||
log.warn("[%s] supervisor is suspended, skipping lag collection", dataSource);
|
||||
|
|
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
|
|||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
|
||||
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
|
||||
import org.apache.druid.indexing.overlord.supervisor.autoscaler.AggregateFunction;
|
||||
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
|
||||
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
|
||||
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
|
||||
|
@ -45,6 +46,7 @@ public class LagBasedAutoScalerConfig implements AutoScalerConfig
|
|||
private final int scaleOutStep;
|
||||
private final boolean enableTaskAutoScaler;
|
||||
private final long minTriggerScaleActionFrequencyMillis;
|
||||
private final AggregateFunction lagAggregate;
|
||||
|
||||
@JsonCreator
|
||||
public LagBasedAutoScalerConfig(
|
||||
|
@ -61,7 +63,8 @@ public class LagBasedAutoScalerConfig implements AutoScalerConfig
|
|||
@Nullable @JsonProperty("scaleInStep") Integer scaleInStep,
|
||||
@Nullable @JsonProperty("scaleOutStep") Integer scaleOutStep,
|
||||
@Nullable @JsonProperty("enableTaskAutoScaler") Boolean enableTaskAutoScaler,
|
||||
@Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long minTriggerScaleActionFrequencyMillis
|
||||
@Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long minTriggerScaleActionFrequencyMillis,
|
||||
@Nullable @JsonProperty("lagAggregate") AggregateFunction lagAggregate
|
||||
)
|
||||
{
|
||||
this.enableTaskAutoScaler = enableTaskAutoScaler != null ? enableTaskAutoScaler : false;
|
||||
|
@ -73,6 +76,7 @@ public class LagBasedAutoScalerConfig implements AutoScalerConfig
|
|||
this.scaleInThreshold = scaleInThreshold != null ? scaleInThreshold : 1000000;
|
||||
this.triggerScaleOutFractionThreshold = triggerScaleOutFractionThreshold != null ? triggerScaleOutFractionThreshold : 0.3;
|
||||
this.triggerScaleInFractionThreshold = triggerScaleInFractionThreshold != null ? triggerScaleInFractionThreshold : 0.9;
|
||||
this.lagAggregate = lagAggregate;
|
||||
|
||||
// Only do taskCountMax and taskCountMin check when autoscaler is enabled. So that users left autoConfig empty{} will not throw any exception and autoscaler is disabled.
|
||||
// If autoscaler is disabled, no matter what configs are set, they are not used.
|
||||
|
@ -186,6 +190,13 @@ public class LagBasedAutoScalerConfig implements AutoScalerConfig
|
|||
return minTriggerScaleActionFrequencyMillis;
|
||||
}
|
||||
|
||||
@JsonProperty
|
||||
@Nullable
|
||||
public AggregateFunction getLagAggregate()
|
||||
{
|
||||
return lagAggregate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
@ -204,6 +215,7 @@ public class LagBasedAutoScalerConfig implements AutoScalerConfig
|
|||
", scaleActionPeriodMillis=" + scaleActionPeriodMillis +
|
||||
", scaleInStep=" + scaleInStep +
|
||||
", scaleOutStep=" + scaleOutStep +
|
||||
", lagAggregate=" + lagAggregate +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
|
|
|
@ -92,14 +92,5 @@ public interface Supervisor
|
|||
*/
|
||||
LagStats computeLagStats();
|
||||
|
||||
/**
|
||||
* Used by AutoScaler to make scaling decisions.
|
||||
*/
|
||||
default long computeLagForAutoScaler()
|
||||
{
|
||||
LagStats lagStats = computeLagStats();
|
||||
return lagStats == null ? 0L : lagStats.getTotalLag();
|
||||
}
|
||||
|
||||
int getActiveTaskGroupsCount();
|
||||
}
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.apache.druid.indexing.overlord.supervisor.autoscaler;
|
||||
|
||||
public enum AggregateFunction
|
||||
{
|
||||
MAX,
|
||||
SUM,
|
||||
AVERAGE
|
||||
}
|
|
@ -24,12 +24,19 @@ public class LagStats
|
|||
private final long maxLag;
|
||||
private final long totalLag;
|
||||
private final long avgLag;
|
||||
private final AggregateFunction aggregateForScaling;
|
||||
|
||||
public LagStats(long maxLag, long totalLag, long avgLag)
|
||||
{
|
||||
this(maxLag, totalLag, avgLag, AggregateFunction.SUM);
|
||||
}
|
||||
|
||||
public LagStats(long maxLag, long totalLag, long avgLag, AggregateFunction aggregateForScaling)
|
||||
{
|
||||
this.maxLag = maxLag;
|
||||
this.totalLag = totalLag;
|
||||
this.avgLag = avgLag;
|
||||
this.aggregateForScaling = aggregateForScaling == null ? AggregateFunction.SUM : aggregateForScaling;
|
||||
}
|
||||
|
||||
public long getMaxLag()
|
||||
|
@ -46,4 +53,26 @@ public class LagStats
|
|||
{
|
||||
return avgLag;
|
||||
}
|
||||
|
||||
/**
|
||||
* The preferred scaling metric that supervisor may specify to be used.
|
||||
* This could be overrided by the autscaler.
|
||||
*/
|
||||
public AggregateFunction getAggregateForScaling()
|
||||
{
|
||||
return aggregateForScaling;
|
||||
}
|
||||
|
||||
public long getMetric(AggregateFunction metric)
|
||||
{
|
||||
switch (metric) {
|
||||
case MAX:
|
||||
return getMaxLag();
|
||||
case SUM:
|
||||
return getTotalLag();
|
||||
case AVERAGE:
|
||||
return getAvgLag();
|
||||
}
|
||||
throw new IllegalStateException("Unknown scale metric");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,22 +19,21 @@
|
|||
|
||||
package org.apache.druid.indexing.overlord.supervisor;
|
||||
|
||||
import org.apache.druid.indexing.overlord.supervisor.autoscaler.AggregateFunction;
|
||||
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class SupervisorTest
|
||||
public class LagStatsTest
|
||||
{
|
||||
@Test
|
||||
public void testAutoScalerLagComputation()
|
||||
{
|
||||
Supervisor supervisor = Mockito.spy(Supervisor.class);
|
||||
LagStats lagStats = new LagStats(1, 2, 3);
|
||||
|
||||
Mockito.when(supervisor.computeLagStats()).thenReturn(new LagStats(1, 2, 3));
|
||||
Assert.assertEquals(2, supervisor.computeLagForAutoScaler());
|
||||
|
||||
Mockito.when(supervisor.computeLagStats()).thenReturn(null);
|
||||
Assert.assertEquals(0, supervisor.computeLagForAutoScaler());
|
||||
Assert.assertEquals(1, lagStats.getMetric(AggregateFunction.MAX));
|
||||
Assert.assertEquals(2, lagStats.getMetric(AggregateFunction.SUM));
|
||||
Assert.assertEquals(3, lagStats.getMetric(AggregateFunction.AVERAGE));
|
||||
Assert.assertEquals(AggregateFunction.SUM, lagStats.getAggregateForScaling());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue