diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java index 243c41e4b5d..731ddcaa136 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManager.java @@ -21,7 +21,9 @@ package org.apache.druid.indexing.overlord.supervisor; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; import com.google.inject.Inject; +import org.apache.druid.common.guava.FutureUtils; import org.apache.druid.error.DruidException; import org.apache.druid.indexing.common.TaskLockType; import org.apache.druid.indexing.common.task.Tasks; @@ -39,10 +41,12 @@ import org.apache.druid.query.QueryContexts; import org.apache.druid.segment.incremental.ParseExceptionReport; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; /** * Manages the creation and lifetime of {@link Supervisor}. @@ -212,11 +216,12 @@ public class SupervisorManager public void stop() { Preconditions.checkState(started, "SupervisorManager not started"); - + List> stopFutures = new ArrayList<>(); synchronized (lock) { + log.info("Stopping [%d] supervisors", supervisors.keySet().size()); for (String id : supervisors.keySet()) { try { - supervisors.get(id).lhs.stop(false); + stopFutures.add(supervisors.get(id).lhs.stopAsync()); SupervisorTaskAutoScaler autoscaler = autoscalers.get(id); if (autoscaler != null) { autoscaler.stop(); @@ -226,6 +231,18 @@ public class SupervisorManager log.warn(e, "Caught exception while stopping supervisor [%s]", id); } } + log.info("Waiting for [%d] supervisors to shutdown", stopFutures.size()); + try { + FutureUtils.coalesce(stopFutures).get(); + } + catch (Exception e) { + log.warn( + e, + "Stopped [%d] out of [%d] supervisors. Remaining supervisors will be killed.", + stopFutures.stream().filter(Future::isDone).count(), + stopFutures.size() + ); + } supervisors.clear(); autoscalers.clear(); started = false; diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java index 43b3ba895bb..28e114fa220 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java @@ -34,6 +34,7 @@ import com.google.common.collect.Sets; import com.google.common.util.concurrent.AsyncFunction; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.ListeningScheduledExecutorService; import com.google.common.util.concurrent.MoreExecutors; import it.unimi.dsi.fastutil.ints.Int2ObjectLinkedOpenHashMap; @@ -1103,6 +1104,19 @@ public abstract class SeekableStreamSupervisor stopAsync() + { + ListeningExecutorService shutdownExec = MoreExecutors.listeningDecorator( + Execs.singleThreaded("supervisor-shutdown-" + StringUtils.encodeForFormat(supervisorId) + "--%d") + ); + return shutdownExec.submit(() -> { + stop(false); + shutdownExec.shutdown(); + return null; + }); + } + @Override public void reset(@Nullable final DataSourceMetadata dataSourceMetadata) { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java index 81153f238a4..add6d4473de 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorManagerTest.java @@ -23,6 +23,7 @@ import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.error.DruidException; import org.apache.druid.error.DruidExceptionMatcher; import org.apache.druid.indexing.common.TaskLockType; @@ -130,7 +131,9 @@ public class SupervisorManagerTest extends EasyMockSupport verifyAll(); resetAll(); - supervisor3.stop(false); + SettableFuture stopFuture = SettableFuture.create(); + stopFuture.set(null); + EasyMock.expect(supervisor3.stopAsync()).andReturn(stopFuture); replayAll(); manager.stop(); @@ -361,7 +364,7 @@ public class SupervisorManagerTest extends EasyMockSupport EasyMock.expect(metadataSupervisorManager.getLatest()).andReturn(existingSpecs); supervisor1.start(); - supervisor1.stop(false); + supervisor1.stopAsync(); EasyMock.expectLastCall().andThrow(new RuntimeException("RTE")); replayAll(); @@ -511,7 +514,9 @@ public class SupervisorManagerTest extends EasyMockSupport // mock manager shutdown to ensure supervisor 3 stops resetAll(); - supervisor3.stop(false); + SettableFuture stopFuture = SettableFuture.create(); + stopFuture.set(null); + EasyMock.expect(supervisor3.stopAsync()).andReturn(stopFuture); replayAll(); manager.stop(); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java index e5b26c33706..b1600d3ad54 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java @@ -1002,6 +1002,28 @@ public class SeekableStreamSupervisorStateTest extends EasyMockSupport verifyAll(); } + @Test + public void testStopGracefully() throws Exception + { + EasyMock.expect(spec.isSuspended()).andReturn(false).anyTimes(); + EasyMock.expect(recordSupplier.getPartitionIds(STREAM)).andReturn(ImmutableSet.of(SHARD_ID)).anyTimes(); + EasyMock.expect(taskStorage.getActiveTasksByDatasource(DATASOURCE)).andReturn(ImmutableList.of()).anyTimes(); + EasyMock.expect(taskQueue.add(EasyMock.anyObject())).andReturn(true).anyTimes(); + + taskRunner.unregisterListener("testSupervisorId"); + indexTaskClient.close(); + recordSupplier.close(); + + replayAll(); + SeekableStreamSupervisor supervisor = new TestSeekableStreamSupervisor(); + + supervisor.start(); + supervisor.runInternal(); + ListenableFuture stopFuture = supervisor.stopAsync(); + stopFuture.get(); + verifyAll(); + } + @Test public void testStoppingGracefully() { diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index 63c05be93ca..c0106f648d0 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -21,6 +21,8 @@ package org.apache.druid.indexing.overlord.supervisor; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import org.apache.druid.indexing.overlord.DataSourceMetadata; import org.apache.druid.segment.incremental.ParseExceptionReport; @@ -44,6 +46,22 @@ public interface Supervisor */ void stop(boolean stopGracefully); + /** + * Starts non-graceful shutdown of the supervisor and returns a future that completes when shutdown is complete. + */ + default ListenableFuture stopAsync() + { + SettableFuture stopFuture = SettableFuture.create(); + try { + stop(false); + stopFuture.set(null); + } + catch (Exception e) { + stopFuture.setException(e); + } + return stopFuture; + } + SupervisorReport getStatus(); SupervisorStateManager.State getState(); diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisorTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisorTest.java index 287a13c3436..0912bca8f8b 100644 --- a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisorTest.java +++ b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/StreamSupervisorTest.java @@ -26,6 +26,7 @@ import org.junit.Assert; import org.junit.Test; import javax.annotation.Nullable; +import java.util.concurrent.Future; public class StreamSupervisorTest { @@ -100,4 +101,74 @@ public class StreamSupervisorTest ex.getMessage() ); } + + @Test + public void testDefaultStopAsync() + { + // Create an instance of stream supervisor without overriding stopAsync(). + final StreamSupervisor streamSupervisor = new StreamSupervisor() + { + private SupervisorStateManager.State state = SupervisorStateManager.BasicState.RUNNING; + + @Override + public void start() + { + + } + + @Override + public void stop(boolean stopGracefully) + { + state = SupervisorStateManager.BasicState.STOPPING; + } + + @Override + public SupervisorReport getStatus() + { + return null; + } + + @Override + public SupervisorStateManager.State getState() + { + return state; + } + + @Override + public void reset(@Nullable DataSourceMetadata dataSourceMetadata) + { + + } + + @Override + public void resetOffsets(DataSourceMetadata resetDataSourceMetadata) + { + + } + + @Override + public void checkpoint(int taskGroupId, DataSourceMetadata checkpointMetadata) + { + + } + + @Override + public LagStats computeLagStats() + { + return null; + } + + @Override + public int getActiveTaskGroupsCount() + { + return 0; + } + }; + + Future stopAsyncFuture = streamSupervisor.stopAsync(); + Assert.assertTrue(stopAsyncFuture.isDone()); + + // stop should be called by stopAsync + Assert.assertEquals(SupervisorStateManager.BasicState.STOPPING, streamSupervisor.getState()); + } }