supervisor/autoscaler: Fix clearing of collected lags on skipped scale actions (#17356)

* superviosr/autoscaler: Fix clearing of collected lags on skipped scale actions

* comments

* supervisor/autoscaler: Skip scaling when partitions are less than minTaskCount (#17335)

* Fix pip installation after ubuntu upgrade (#17358)

* fix tests

---------

Co-authored-by: Pranav <pranavbhole@gmail.com>
This commit is contained in:
Adithya Chakilam 2024-10-17 13:05:16 -05:00 committed by GitHub
parent d1b81f312a
commit e834e49290
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 33 additions and 11 deletions

View File

@ -358,6 +358,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
Thread.sleep(1 * 1000);
int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountAfterScale);
autoscaler.stop();
}
@Test
@ -435,6 +436,7 @@ public class KinesisSupervisorTest extends EasyMockSupport
Thread.sleep(1 * 1000);
int taskCountAfterScale = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(1, taskCountAfterScale);
autoscaler.stop();
}
@Test

View File

@ -422,13 +422,19 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
// change taskCount without resubmitting.
private class DynamicAllocationTasksNotice implements Notice
{
Callable<Integer> scaleAction;
Callable<Integer> computeDesiredTaskCount;
ServiceEmitter emitter;
Runnable onSuccessfulScale;
private static final String TYPE = "dynamic_allocation_tasks_notice";
DynamicAllocationTasksNotice(Callable<Integer> scaleAction, ServiceEmitter emitter)
DynamicAllocationTasksNotice(
Callable<Integer> computeDesiredTaskCount,
Runnable onSuccessfulScale,
ServiceEmitter emitter
)
{
this.scaleAction = scaleAction;
this.computeDesiredTaskCount = computeDesiredTaskCount;
this.onSuccessfulScale = onSuccessfulScale;
this.emitter = emitter;
}
@ -470,7 +476,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return;
}
}
final Integer desiredTaskCount = scaleAction.call();
final Integer desiredTaskCount = computeDesiredTaskCount.call();
ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
@ -500,6 +506,7 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
boolean allocationSuccess = changeTaskCount(desiredTaskCount);
if (allocationSuccess) {
onSuccessfulScale.run();
dynamicTriggerLastRunTime = nowTime;
}
}
@ -1260,9 +1267,13 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction, ServiceEmitter emitter)
public Runnable buildDynamicAllocationTask(
Callable<Integer> scaleAction,
Runnable onSuccessfulScale,
ServiceEmitter emitter
)
{
return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, emitter));
return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, onSuccessfulScale, emitter));
}
private Runnable buildRunTask()

View File

@ -86,10 +86,6 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
int desiredTaskCount = -1;
try {
desiredTaskCount = computeDesiredTaskCount(new ArrayList<>(lagMetricsQueue));
if (desiredTaskCount != -1) {
lagMetricsQueue.clear();
}
}
catch (Exception ex) {
log.warn(ex, "Exception while computing desired task count for [%s]", dataSource);
@ -100,6 +96,19 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
return desiredTaskCount;
};
Runnable onSuccessfulScale = () -> {
LOCK.lock();
try {
lagMetricsQueue.clear();
}
catch (Exception ex) {
log.warn(ex, "Exception while clearing lags for [%s]", dataSource);
}
finally {
LOCK.unlock();
}
};
lagComputationExec.scheduleAtFixedRate(
computeAndCollectLag(),
lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for tasks to start up
@ -107,7 +116,7 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
TimeUnit.MILLISECONDS
);
allocationExec.scheduleAtFixedRate(
supervisor.buildDynamicAllocationTask(scaleAction, emitter),
supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, emitter),
lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + lagBasedAutoScalerConfig
.getLagCollectionRangeMillis(),
lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),