Parallelize supervisor stop logic to make it run faster (#17535)

- Add new method `Supervisor.stopAsync`
- Implement `SeekableStreamSupervisor.stopAsync()` to use a shutdown executor
- Call `stopAsync` from `SupervisorManager`
This commit is contained in:
George Shiqi Wu 2024-12-17 22:49:24 -05:00 committed by GitHub
parent a44ab109d5
commit 9ff11731c8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 152 additions and 5 deletions

View File

@ -21,7 +21,9 @@ package org.apache.druid.indexing.overlord.supervisor;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.inject.Inject;
import org.apache.druid.common.guava.FutureUtils;
import org.apache.druid.error.DruidException;
import org.apache.druid.indexing.common.TaskLockType;
import org.apache.druid.indexing.common.task.Tasks;
@ -39,10 +41,12 @@ import org.apache.druid.query.QueryContexts;
import org.apache.druid.segment.incremental.ParseExceptionReport;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
/**
* Manages the creation and lifetime of {@link Supervisor}.
@ -212,11 +216,12 @@ public class SupervisorManager
public void stop()
{
Preconditions.checkState(started, "SupervisorManager not started");
List<ListenableFuture<Void>> stopFutures = new ArrayList<>();
synchronized (lock) {
log.info("Stopping [%d] supervisors", supervisors.keySet().size());
for (String id : supervisors.keySet()) {
try {
supervisors.get(id).lhs.stop(false);
stopFutures.add(supervisors.get(id).lhs.stopAsync());
SupervisorTaskAutoScaler autoscaler = autoscalers.get(id);
if (autoscaler != null) {
autoscaler.stop();
@ -226,6 +231,18 @@ public class SupervisorManager
log.warn(e, "Caught exception while stopping supervisor [%s]", id);
}
}
log.info("Waiting for [%d] supervisors to shutdown", stopFutures.size());
try {
FutureUtils.coalesce(stopFutures).get();
}
catch (Exception e) {
log.warn(
e,
"Stopped [%d] out of [%d] supervisors. Remaining supervisors will be killed.",
stopFutures.stream().filter(Future::isDone).count(),
stopFutures.size()
);
}
supervisors.clear();
autoscalers.clear();
started = false;

View File

@ -34,6 +34,7 @@ import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AsyncFunction;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap;
@ -1103,6 +1104,19 @@ public abstract class SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
}
}
@Override
public ListenableFuture<Void> stopAsync()
{
ListeningExecutorService shutdownExec = MoreExecutors.listeningDecorator(
Execs.singleThreaded("supervisor-shutdown-" + StringUtils.encodeForFormat(supervisorId) + "--%d")
);
return shutdownExec.submit(() -> {
stop(false);
shutdownExec.shutdown();
return null;
});
}
@Override
public void reset(@Nullable final DataSourceMetadata dataSourceMetadata)
{

View File

@ -23,6 +23,7 @@ import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexing.common.TaskLockType;
@ -130,7 +131,9 @@ public class SupervisorManagerTest extends EasyMockSupport
verifyAll();
resetAll();
supervisor3.stop(false);
SettableFuture<Void> stopFuture = SettableFuture.create();
stopFuture.set(null);
EasyMock.expect(supervisor3.stopAsync()).andReturn(stopFuture);
replayAll();
manager.stop();
@ -361,7 +364,7 @@ public class SupervisorManagerTest extends EasyMockSupport
EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs);
supervisor1.start();
supervisor1.stop(false);
supervisor1.stopAsync();
EasyMock.expectLastCall().andThrow(new RuntimeException("RTE"));
replayAll();
@ -511,7 +514,9 @@ public class SupervisorManagerTest extends EasyMockSupport
// mock manager shutdown to ensure supervisor 3 stops
resetAll();
supervisor3.stop(false);
SettableFuture<Void> stopFuture = SettableFuture.create();
stopFuture.set(null);
EasyMock.expect(supervisor3.stopAsync()).andReturn(stopFuture);
replayAll();
manager.stop();

View File

@ -1002,6 +1002,28 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport
verifyAll();
}
@Test
public void testStopGracefully() throws Exception
{
EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes();
EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes();
EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes();
EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes();
taskRunner.unregisterListener("testSupervisorId");
indexTaskClient.close();
recordSupplier.close();
replayAll();
SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor();
supervisor.start();
supervisor.runInternal();
ListenableFuture<Void> stopFuture = supervisor.stopAsync();
stopFuture.get();
verifyAll();
}
@Test
public void testStoppingGracefully()
{

View File

@ -21,6 +21,8 @@ package org.apache.druid.indexing.overlord.supervisor;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import org.apache.druid.indexing.overlord.DataSourceMetadata;
import org.apache.druid.segment.incremental.ParseExceptionReport;
@ -44,6 +46,22 @@ public interface Supervisor
*/
void stop(boolean stopGracefully);
/**
* Starts non-graceful shutdown of the supervisor and returns a future that completes when shutdown is complete.
*/
default ListenableFuture<Void> stopAsync()
{
SettableFuture<Void> stopFuture = SettableFuture.create();
try {
stop(false);
stopFuture.set(null);
}
catch (Exception e) {
stopFuture.setException(e);
}
return stopFuture;
}
SupervisorReport getStatus();
SupervisorStateManager.State getState();

View File

@ -26,6 +26,7 @@ import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
import java.util.concurrent.Future;
public class StreamSupervisorTest
{
@ -100,4 +101,74 @@ public class StreamSupervisorTest
ex.getMessage()
);
}
@Test
public void testDefaultStopAsync()
{
// Create an instance of stream supervisor without overriding stopAsync().
final StreamSupervisor streamSupervisor = new StreamSupervisor()
{
private SupervisorStateManager.State state = SupervisorStateManager.BasicState.RUNNING;
@Override
public void start()
{
}
@Override
public void stop(boolean stopGracefully)
{
state = SupervisorStateManager.BasicState.STOPPING;
}
@Override
public SupervisorReport getStatus()
{
return null;
}
@Override
public SupervisorStateManager.State getState()
{
return state;
}
@Override
public void reset(@Nullable DataSourceMetadata dataSourceMetadata)
{
}
@Override
public void resetOffsets(DataSourceMetadata resetDataSourceMetadata)
{
}
@Override
public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata)
{
}
@Override
public LagStats computeLagStats()
{
return null;
}
@Override
public int getActiveTaskGroupsCount()
{
return 0;
}
};
Future<Void> stopAsyncFuture = streamSupervisor.stopAsync();
Assert.assertTrue(stopAsyncFuture.isDone());
// stop should be called by stopAsync
Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, streamSupervisor.getState());
}
}