Visibility into LagBased AutoScaler desired task count (#16199)

* Visibility into skipped scale notices

* comments

* change to emit always instead of just skips

* fix failing test

* comments

* Add couple more tests
This commit is contained in:
Adithya Chakilam 2024-03-27 12:08:00 -05:00 committed by GitHub
parent cf9a3bdc14
commit a65b2d4f41
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 210 additions and 22 deletions

View File

@ -258,6 +258,7 @@ batch ingestion emit the following metrics. These metrics are deltas for each em
|`ingest/notices/time`|Milliseconds taken to process a notice by the supervisor.|`dataSource`, `tags`| < 1s |
|`ingest/pause/time`|Milliseconds spent by a task in a paused state without ingesting.|`dataSource`, `taskId`, `tags`| < 10 seconds|
|`ingest/handoff/time`|Total number of milliseconds taken to handoff a set of segments.|`dataSource`, `taskId`, `taskType`, `groupId`, `tags`|Depends on the coordinator cycle time.|
|`task/autoScaler/requiredCount`|Count of required tasks based on the calculations of `lagBased` auto scaler.|`dataSource`, `stream`, `scalingSkipReason`|Depends on auto scaler config.|
If the JVM does not support CPU time measurement for the current thread, `ingest/merge/cpu` and `ingest/persists/cpu` will be 0.

View File

@ -149,6 +149,8 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
implements Supervisor
{
public static final String CHECKPOINTS_CTX_KEY = "checkpoints";
public static final String AUTOSCALER_SKIP_REASON_DIMENSION = "scalingSkipReason";
public static final String AUTOSCALER_REQUIRED_TASKS_METRIC = "task/autoScaler/requiredCount";
private static final long MINIMUM_GET_OFFSET_PERIOD_MILLIS = 5000;
private static final long INITIAL_GET_OFFSET_DELAY_MILLIS = 15000;
@ -403,11 +405,13 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private class DynamicAllocationTasksNotice implements Notice
{
Callable<Integer> scaleAction;
ServiceEmitter emitter;
private static final String TYPE = "dynamic_allocation_tasks_notice";
DynamicAllocationTasksNotice(Callable<Integer> scaleAction)
DynamicAllocationTasksNotice(Callable<Integer> scaleAction, ServiceEmitter emitter)
{
this.scaleAction = scaleAction;
this.emitter = emitter;
}
/**
@ -448,17 +452,35 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
return;
}
}
final Integer desiredTaskCount = scaleAction.call();
ServiceMetricEvent.Builder event = ServiceMetricEvent.builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, getIoConfig().getStream());
if (nowTime - dynamicTriggerLastRunTime < autoScalerConfig.getMinTriggerScaleActionFrequencyMillis()) {
log.info(
"DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it!",
"DynamicAllocationTasksNotice submitted again in [%d] millis, minTriggerDynamicFrequency is [%s] for dataSource [%s], skipping it! desired task count is [%s], active task count is [%s]",
nowTime - dynamicTriggerLastRunTime,
autoScalerConfig.getMinTriggerScaleActionFrequencyMillis(),
dataSource
dataSource,
desiredTaskCount,
getActiveTaskGroupsCount()
);
if (desiredTaskCount > 0) {
emitter.emit(event.setDimension(
AUTOSCALER_SKIP_REASON_DIMENSION,
"minTriggerScaleActionFrequencyMillis not elapsed yet"
)
.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
}
return;
}
final Integer desriedTaskCount = scaleAction.call();
boolean allocationSuccess = changeTaskCount(desriedTaskCount);
if (desiredTaskCount > 0) {
emitter.emit(event.setMetric(AUTOSCALER_REQUIRED_TASKS_METRIC, desiredTaskCount));
}
boolean allocationSuccess = changeTaskCount(desiredTaskCount);
if (allocationSuccess) {
dynamicTriggerLastRunTime = nowTime;
}
@ -1208,9 +1230,9 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction)
public Runnable buildDynamicAllocationTask(Callable<Integer> scaleAction, ServiceEmitter emitter)
{
return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction));
return () -> addNotice(new DynamicAllocationTasksNotice(scaleAction, emitter));
}
private Runnable buildRunTask()

View File

@ -167,7 +167,7 @@ public abstract class SeekableStreamSupervisorSpec implements SupervisorSpec
{
AutoScalerConfig autoScalerConfig = ingestionSchema.getIOConfig().getAutoScalerConfig();
if (autoScalerConfig != null && autoScalerConfig.getEnableTaskAutoScaler() && supervisor instanceof SeekableStreamSupervisor) {
return autoScalerConfig.createAutoScaler(supervisor, this);
return autoScalerConfig.createAutoScaler(supervisor, this, emitter);
}
return new NoopTaskAutoScaler();
}

View File

@ -26,6 +26,7 @@ import org.apache.druid.guice.annotations.UnstableApi;
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@UnstableApi
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "autoScalerStrategy", defaultImpl = LagBasedAutoScalerConfig.class)
@ -38,6 +39,6 @@ public interface AutoScalerConfig
long getMinTriggerScaleActionFrequencyMillis();
int getTaskCountMax();
int getTaskCountMin();
SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec);
SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter);
}

View File

@ -27,6 +27,9 @@ import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervi
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.query.DruidMetrics;
import java.util.ArrayList;
import java.util.List;
@ -45,11 +48,17 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
private final SupervisorSpec spec;
private final SeekableStreamSupervisor supervisor;
private final LagBasedAutoScalerConfig lagBasedAutoScalerConfig;
private final ServiceEmitter emitter;
private final ServiceMetricEvent.Builder metricBuilder;
private static final ReentrantLock LOCK = new ReentrantLock(true);
public LagBasedAutoScaler(SeekableStreamSupervisor supervisor, String dataSource,
LagBasedAutoScalerConfig autoScalerConfig, SupervisorSpec spec
public LagBasedAutoScaler(
SeekableStreamSupervisor supervisor,
String dataSource,
LagBasedAutoScalerConfig autoScalerConfig,
SupervisorSpec spec,
ServiceEmitter emitter
)
{
this.lagBasedAutoScalerConfig = autoScalerConfig;
@ -62,6 +71,10 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
this.lagComputationExec = Execs.scheduledSingleThreaded(StringUtils.encodeForFormat(supervisorId) + "-Computation-%d");
this.spec = spec;
this.supervisor = supervisor;
this.emitter = emitter;
metricBuilder = ServiceMetricEvent.builder()
.setDimension(DruidMetrics.DATASOURCE, dataSource)
.setDimension(DruidMetrics.STREAM, this.supervisor.getIoConfig().getStream());
}
@Override
@ -93,7 +106,7 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
TimeUnit.MILLISECONDS
);
allocationExec.scheduleAtFixedRate(
supervisor.buildDynamicAllocationTask(scaleAction),
supervisor.buildDynamicAllocationTask(scaleAction, emitter),
lagBasedAutoScalerConfig.getScaleActionStartDelayMillis() + lagBasedAutoScalerConfig
.getLagCollectionRangeMillis(),
lagBasedAutoScalerConfig.getScaleActionPeriodMillis(),
@ -214,6 +227,12 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
log.warn("CurrentActiveTaskCount reached task count Max limit, skipping scale out action for dataSource [%s].",
dataSource
);
emitter.emit(metricBuilder
.setDimension(
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
"Already at max task count"
)
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, taskCount));
return -1;
} else {
desiredActiveTaskCount = Math.min(taskCount, actualTaskCountMax);
@ -228,6 +247,12 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler
log.warn("CurrentActiveTaskCount reached task count Min limit, skipping scale in action for dataSource [%s].",
dataSource
);
emitter.emit(metricBuilder
.setDimension(
SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION,
"Already at min task count"
)
.setMetric(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC, taskCount));
return -1;
} else {
desiredActiveTaskCount = Math.max(taskCount, lagBasedAutoScalerConfig.getTaskCountMin());

View File

@ -25,6 +25,7 @@ import org.apache.druid.indexing.overlord.supervisor.Supervisor;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import javax.annotation.Nullable;
@ -154,9 +155,9 @@ public class LagBasedAutoScalerConfig implements AutoScalerConfig
}
@Override
public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec)
public SupervisorTaskAutoScaler createAutoScaler(Supervisor supervisor, SupervisorSpec spec, ServiceEmitter emitter)
{
return new LagBasedAutoScaler((SeekableStreamSupervisor) supervisor, spec.getId(), this, spec);
return new LagBasedAutoScaler((SeekableStreamSupervisor) supervisor, spec.getId(), this, spec, emitter);
}
@JsonProperty

View File

@ -57,6 +57,8 @@ import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.segment.TestHelper;
@ -129,6 +131,8 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
monitorSchedulerConfig = EasyMock.mock(DruidMonitorSchedulerConfig.class);
supervisorStateManagerConfig = EasyMock.mock(SupervisorStateManagerConfig.class);
supervisor4 = EasyMock.mock(SeekableStreamSupervisor.class);
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(null).anyTimes();
}
private abstract class BaseTestSeekableStreamSupervisor extends SeekableStreamSupervisor<String, String, ByteEntity>
@ -618,9 +622,11 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
EasyMock.expect(seekableStreamSupervisorIOConfig.getAutoScalerConfig())
.andReturn(mapper.convertValue(autoScalerConfig, AutoScalerConfig.class))
.anyTimes();
EasyMock.expect(seekableStreamSupervisorIOConfig.getStream()).andReturn("stream").anyTimes();
EasyMock.replay(seekableStreamSupervisorIOConfig);
EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
EasyMock.expect(supervisor4.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
EasyMock.replay(supervisor4);
TestSeekableStreamSupervisorSpec spec = new TestSeekableStreamSupervisorSpec(
@ -691,8 +697,10 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
"1"
), AutoScalerConfig.class))
.anyTimes();
EasyMock.expect(seekableStreamSupervisorIOConfig.getStream()).andReturn("stream").anyTimes();
EasyMock.replay(seekableStreamSupervisorIOConfig);
EasyMock.expect(supervisor4.getIoConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
EasyMock.expect(supervisor4.getActiveTaskGroupsCount()).andReturn(0).anyTimes();
EasyMock.replay(supervisor4);
@ -750,16 +758,19 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
EasyMock.replay(taskMaster);
TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(3);
StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10);
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
DATASOURCE,
mapper.convertValue(
getScaleOutProperties(2),
getScaleOutProperties(10),
LagBasedAutoScalerConfig.class
),
spec
spec,
dynamicActionEmitter
);
supervisor.start();
autoScaler.start();
@ -769,6 +780,72 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
Thread.sleep(1000);
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
Assert.assertEquals(2, taskCountAfterScaleOut);
Assert.assertTrue(
dynamicActionEmitter
.getMetricEvents()
.get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
.stream()
.map(metric -> metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
.filter(Objects::nonNull)
.anyMatch("minTriggerScaleActionFrequencyMillis not elapsed yet"::equals));
autoScaler.reset();
autoScaler.stop();
}
@Test
public void testSeekableStreamSupervisorSpecWithScaleOutAlreadyAtMax() throws InterruptedException
{
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2, true)).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.replay(spec);
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
EasyMock.replay(ingestionSchema);
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
EasyMock.replay(taskMaster);
StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10)
{
@Override
public int getActiveTaskGroupsCount()
{
return 2;
}
};
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
DATASOURCE,
mapper.convertValue(
getScaleOutProperties(2),
LagBasedAutoScalerConfig.class
),
spec,
dynamicActionEmitter
);
supervisor.start();
autoScaler.start();
supervisor.runInternal();
Thread.sleep(1000);
Assert.assertTrue(
dynamicActionEmitter
.getMetricEvents()
.get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
.stream()
.map(metric -> metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
.filter(Objects::nonNull)
.anyMatch("Already at max task count"::equals));
autoScaler.reset();
autoScaler.stop();
@ -807,7 +884,8 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
getScaleOutProperties(2),
LagBasedAutoScalerConfig.class
),
spec
spec,
emitter
);
supervisor.start();
autoScaler.start();
@ -851,7 +929,8 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
getScaleOutProperties(3),
LagBasedAutoScalerConfig.class
),
spec
spec,
emitter
);
supervisor.start();
autoScaler.start();
@ -870,7 +949,6 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
public void testSeekableStreamSupervisorSpecWithScaleIn() throws InterruptedException
{
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(2, false)).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
@ -895,7 +973,8 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
getScaleInProperties(),
LagBasedAutoScalerConfig.class
),
spec
spec,
emitter
);
// enable autoscaler so that taskcount config will be ignored and init value of taskCount will use taskCountMin.
@ -914,6 +993,65 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
autoScaler.stop();
}
@Test
public void testSeekableStreamSupervisorSpecWithScaleInAlreadyAtMin() throws InterruptedException
{
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.replay(spec);
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
EasyMock.replay(ingestionSchema);
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
EasyMock.replay(taskMaster);
StubServiceEmitter dynamicActionEmitter = new StubServiceEmitter();
TestSeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(10)
{
@Override
public int getActiveTaskGroupsCount()
{
return 1;
}
};
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
supervisor,
DATASOURCE,
mapper.convertValue(
getScaleInProperties(),
LagBasedAutoScalerConfig.class
),
spec,
dynamicActionEmitter
);
supervisor.start();
autoScaler.start();
supervisor.runInternal();
Thread.sleep(1000);
Assert.assertTrue(
dynamicActionEmitter
.getMetricEvents()
.get(SeekableStreamSupervisor.AUTOSCALER_REQUIRED_TASKS_METRIC)
.stream()
.map(metric -> metric.getUserDims().get(SeekableStreamSupervisor.AUTOSCALER_SKIP_REASON_DIMENSION))
.filter(Objects::nonNull)
.anyMatch("Already at min task count"::equals));
autoScaler.reset();
autoScaler.stop();
}
@Test
public void testSeekableStreamSupervisorSpecWithScaleDisable() throws InterruptedException
{
@ -1185,7 +1323,7 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
{
HashMap<String, Object> autoScalerConfig = new HashMap<>();
autoScalerConfig.put("enableTaskAutoScaler", true);
autoScalerConfig.put("lagCollectionIntervalMillis", 500);
autoScalerConfig.put("lagCollectionIntervalMillis", 50);
autoScalerConfig.put("lagCollectionRangeMillis", 500);
autoScalerConfig.put("scaleOutThreshold", 0);
autoScalerConfig.put("triggerScaleOutFractionThreshold", 0.0);