diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java index 24d919918f4..fe851a183a2 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java @@ -358,6 +358,7 @@ public class KinesisSupervisorTest extends EasyMockSupport Thread.sleep(1 * 1000); int taskCountAfterScale = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(2, taskCountAfterScale); + autoscaler.stop(); } @Test @@ -435,6 +436,7 @@ public class KinesisSupervisorTest extends EasyMockSupport Thread.sleep(1 * 1000); int taskCountAfterScale = supervisor.getIoConfig().getTaskCount(); Assert.assertEquals(1, taskCountAfterScale); + autoscaler.stop(); } @Test diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 1da5b4fbb9c..86c4ba385bd 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -422,13 +422,19 @@ public abstract class SeekableStreamSupervisor scaleAction; + Callable computeDesiredTaskCount; ServiceEmitter emitter; + Runnable onSuccessfulScale; private static final String TYPE = "dynamic_allocation_tasks_notice"; - DynamicAllocationTasksNotice(Callable scaleAction, ServiceEmitter emitter) + DynamicAllocationTasksNotice( + Callable computeDesiredTaskCount, + Runnable onSuccessfulScale, + ServiceEmitter emitter + ) { - this.scaleAction = scaleAction; + this.computeDesiredTaskCount = computeDesiredTaskCount; + this.onSuccessfulScale = onSuccessfulScale; this.emitter = emitter; } @@ -470,7 +476,7 @@ public abstract class SeekableStreamSupervisor scaleAction, ServiceEmitter emitter) + public Runnable buildDynamicAllocationTask( + Callable scaleAction, + Runnable onSuccessfulScale, + ServiceEmitter emitter + ) { - return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, emitter)); + return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, onSuccessfulScale, emitter)); } private Runnable buildRunTask() diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index 22e36841199..648d8a655e9 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -86,10 +86,6 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler int desiredTaskCount = -1; try { desiredTaskCount = computeDesiredTaskCount(new ArrayList<>(lagMetricsQueue)); - - if (desiredTaskCount != -1) { - lagMetricsQueue.clear(); - } } catch (Exception ex) { log.warn(ex, "Exception while computing desired task count for [%s]", dataSource); @@ -100,6 +96,19 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler return desiredTaskCount; }; + Runnable onSuccessfulScale = () -> { + LOCK.lock(); + try { + lagMetricsQueue.clear(); + } + catch (Exception ex) { + log.warn(ex, "Exception while clearing lags for [%s]", dataSource); + } + finally { + LOCK.unlock(); + } + }; + lagComputationExec.scheduleAtFixedRate( computeAndCollectLag(), lagBasedAutoScalerConfig.getScaleActionStartDelayMillis(), // wait for tasks to start up @@ -107,7 +116,7 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler TimeUnit.MILLISECONDS ); allocationExec.scheduleAtFixedRate( - supervisor.buildDynamicAllocationTask(scaleAction, emitter), + supervisor.buildDynamicAllocationTask(scaleAction, onSuccessfulScale, emitter), lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + lagBasedAutoScalerConfig .getLagCollectionRangeMillis(), lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),