mirror of https://github.com/apache/druid.git
Skip streaming auto-scaling action if supervisor is idle (#14773)
* Skip streaming auto-scaling action if supervisor is idle * Update indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java Co-authored-by: Abhishek Radhakrishnan <abhishek.rb19@gmail.com> --------- Co-authored-by: Abhishek Radhakrishnan <abhishek.rb19@gmail.com>
This commit is contained in:
parent
9c124f2cde
commit
a8eaa1e4ed
|
@ -443,6 +443,13 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
|
|||
);
|
||||
return;
|
||||
}
|
||||
if (SupervisorStateManager.BasicState.IDLE == getState()) {
|
||||
log.info(
|
||||
"Skipping DynamicAllocationTasksNotice execution because [%s] supervisor is idle",
|
||||
dataSource
|
||||
);
|
||||
return;
|
||||
}
|
||||
log.debug("PendingCompletionTaskGroups is [%s] for dataSource [%s]", pendingCompletionTaskGroups,
|
||||
dataSource
|
||||
);
|
||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
|
|||
import org.apache.druid.indexing.overlord.TaskMaster;
|
||||
import org.apache.druid.indexing.overlord.TaskStorage;
|
||||
import org.apache.druid.indexing.overlord.supervisor.Supervisor;
|
||||
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManager;
|
||||
import org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfig;
|
||||
import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
|
||||
import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
|
||||
|
@ -358,6 +359,22 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
|
|||
}
|
||||
}
|
||||
|
||||
private class StateOverrideTestSeekableStreamSupervisor extends TestSeekableStreamSupervisor
|
||||
{
|
||||
private SupervisorStateManager.State state;
|
||||
|
||||
public StateOverrideTestSeekableStreamSupervisor(SupervisorStateManager.State state, int partitionNumbers)
|
||||
{
|
||||
super(partitionNumbers);
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SupervisorStateManager.State getState()
|
||||
{
|
||||
return state;
|
||||
}
|
||||
}
|
||||
|
||||
private static class TestSeekableStreamSupervisorSpec extends SeekableStreamSupervisorSpec
|
||||
{
|
||||
|
@ -756,6 +773,54 @@ public class SeekableStreamSupervisorSpecTest extends EasyMockSupport
|
|||
autoScaler.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekableStreamSupervisorSpecWithNoScalingOnIdleSupervisor() throws InterruptedException
|
||||
{
|
||||
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
|
||||
|
||||
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
|
||||
EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig(1, true)).anyTimes();
|
||||
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
|
||||
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
|
||||
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
|
||||
EasyMock.replay(spec);
|
||||
|
||||
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(seekableStreamSupervisorIOConfig).anyTimes();
|
||||
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
|
||||
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(seekableStreamSupervisorTuningConfig).anyTimes();
|
||||
EasyMock.replay(ingestionSchema);
|
||||
|
||||
EasyMock.expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
|
||||
EasyMock.expect(taskMaster.getSupervisorManager()).andReturn(Optional.absent()).anyTimes();
|
||||
EasyMock.replay(taskMaster);
|
||||
|
||||
TestSeekableStreamSupervisor supervisor = new StateOverrideTestSeekableStreamSupervisor(
|
||||
SupervisorStateManager.BasicState.IDLE,
|
||||
3
|
||||
);
|
||||
|
||||
LagBasedAutoScaler autoScaler = new LagBasedAutoScaler(
|
||||
supervisor,
|
||||
DATASOURCE,
|
||||
mapper.convertValue(
|
||||
getScaleOutProperties(2),
|
||||
LagBasedAutoScalerConfig.class
|
||||
),
|
||||
spec
|
||||
);
|
||||
supervisor.start();
|
||||
autoScaler.start();
|
||||
supervisor.runInternal();
|
||||
int taskCountBeforeScaleOut = supervisor.getIoConfig().getTaskCount();
|
||||
Assert.assertEquals(1, taskCountBeforeScaleOut);
|
||||
Thread.sleep(1000);
|
||||
int taskCountAfterScaleOut = supervisor.getIoConfig().getTaskCount();
|
||||
Assert.assertEquals(1, taskCountAfterScaleOut);
|
||||
|
||||
autoScaler.reset();
|
||||
autoScaler.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSeekableStreamSupervisorSpecWithScaleOutSmallPartitionNumber() throws InterruptedException
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue