From e834e4929088dafe63e6b35b04dd3a842f9fa51a Mon Sep 17 00:00:00 2001 From: Adithya Chakilam <35785271+adithyachakilam@users.noreply.github.com> Date: Thu, 17 Oct 2024 13:05:16 -0500 Subject: [PATCH] supervisor/autoscaler: Fix clearing of collected lags on skipped scale actions (#17356) * superviosr/autoscaler: Fix clearing of collected lags on skipped scale actions * comments * supervisor/autoscaler: Skip scaling when partitions are less than minTaskCount (#17335) * Fix pip installation after ubuntu upgrade (#17358) * fix tests --------- Co-authored-by: Pranav --- .../supervisor/KinesisSupervisorTest.java | 2 ++ .../supervisor/SeekableStreamSupervisor.java | 23 ++++++++++++++----- .../autoscaler/LagBasedAutoScaler.java | 19 +++++++++++---- 3 files changed, 33 insertions(+), 11 deletions(-) diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 24d919918f4..fe851a183a2 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -358,6 +358,7 @@ public class KinesisSupervisorTest extends EasyMockSupport Thread.sleep(1 * 1000); int taskCountAfterScale = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(2, taskCountAfterScale); + autoscaler.stop(); } @Test @@ -435,6 +436,7 @@ public class KinesisSupervisorTest extends EasyMockSupport Thread.sleep(1 * 1000); int taskCountAfterScale = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(1, taskCountAfterScale); + autoscaler.stop(); } @Test diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 1da5b4fbb9c..86c4ba385bd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -422,13 +422,19 @@ public abstract class SeekableStreamSupervisor scaleAction; + Callable computeDesiredTaskCount; ServiceEmitter emitter; + Runnable onSuccessfulScale; private static final String TYPE = "dynamic_allocation_tasks_notice"; - DynamicAllocationTasksNotice(Callable scaleAction, ServiceEmitter emitter) + DynamicAllocationTasksNotice( + Callable computeDesiredTaskCount, + Runnable onSuccessfulScale, + ServiceEmitter emitter + ) { - this.scaleAction = scaleAction; + this.computeDesiredTaskCount = computeDesiredTaskCount; + this.onSuccessfulScale = onSuccessfulScale; this.emitter = emitter; } @@ -470,7 +476,7 @@ public abstract class SeekableStreamSupervisor scaleAction, ServiceEmitter emitter) + public Runnable buildDynamicAllocationTask( + Callable scaleAction, + Runnable onSuccessfulScale, + ServiceEmitter emitter + ) { - return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, emitter)); + return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, onSuccessfulScale, emitter)); } private Runnable buildRunTask() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index 22e36841199..648d8a655e9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -86,10 +86,6 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler int desiredTaskCount = -1; try { desiredTaskCount = computeDesiredTaskCount(new ArrayList<>(lagMetricsQueue)); - - if (desiredTaskCount != -1) { - lagMetricsQueue.clear(); - } } catch (Exception ex) { log.warn(ex, "Exception while computing desired task count for [%s]", dataSource); @@ -100,6 +96,19 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler return desiredTaskCount; }; + Runnable onSuccessfulScale = () -> { + LOCK.lock(); + try { + lagMetricsQueue.clear(); + } + catch (Exception ex) { + log.warn(ex, "Exception while clearing lags for [%s]", dataSource); + } + finally { + LOCK.unlock(); + } + }; + lagComputationExec.scheduleAtFixedRate( computeAndCollectLag(), lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for tasks to start up @@ -107,7 +116,7 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler TimeUnit.MILLISECONDS ); allocationExec.scheduleAtFixedRate( - supervisor.buildDynamicAllocationTask(scaleAction, emitter), + supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, emitter), lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + lagBasedAutoScalerConfig .getLagCollectionRangeMillis(), lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),