diff --git a/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java b/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java index 97a46854223..6b0c96641a6 100644 --- a/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java +++ b/server/src/main/java/org/apache/druid/guice/AnnouncerModule.java @@ -54,7 +54,7 @@ public class AnnouncerModule implements Module JsonConfigProvider.bind(binder, "druid.announcer", BatchDataSegmentAnnouncerConfig.class); JsonConfigProvider.bind(binder, "druid.announcer", DataSegmentAnnouncerProvider.class); binder.bind(DataSegmentAnnouncer.class).toProvider(DataSegmentAnnouncerProvider.class); - binder.bind(BatchDataSegmentAnnouncer.class).in(LazySingleton.class); + binder.bind(BatchDataSegmentAnnouncer.class).in(ManageLifecycleAnnouncements.class); if (isZkEnabled) { binder.bind(DataSegmentServerAnnouncer.class).to(CuratorDataSegmentServerAnnouncer.class).in(LazySingleton.class); diff --git a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java index aa03ec6f459..3d766be0983 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java +++ b/server/src/main/java/org/apache/druid/server/coordination/BatchDataSegmentAnnouncer.java @@ -36,6 +36,7 @@ import org.apache.druid.curator.announcement.Announcer; import org.apache.druid.java.util.common.DateTimes; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.server.initialization.BatchDataSegmentAnnouncerConfig; import org.apache.druid.server.initialization.ZkPathsConfig; @@ -129,6 +130,13 @@ public class BatchDataSegmentAnnouncer implements DataSegmentAnnouncer this(server, config, zkPaths, () -> announcer, jsonMapper, ZkEnablementConfig.ENABLED); } + @LifecycleStop + public void stop() + { + changes.stop(); + } + + @Override public void announceSegment(DataSegment segment) throws IOException { diff --git a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHistory.java b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHistory.java index 65c8dbd6247..a0c45b4a335 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHistory.java +++ b/server/src/main/java/org/apache/druid/server/coordination/ChangeRequestHistory.java @@ -32,6 +32,7 @@ import org.apache.druid.utils.CircularBuffer; import java.util.ArrayList; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -43,6 +44,7 @@ import java.util.concurrent.ExecutorService; * * Clients call {@link #getRequestsSince} to get updates since given counter. */ + public class ChangeRequestHistory { private static int MAX_SIZE = 1000; @@ -74,11 +76,24 @@ public class ChangeRequestHistory this.singleThreadedExecutor = Execs.singleThreaded("SegmentChangeRequestHistory"); } + public void stop() + { + singleThreadedExecutor.shutdownNow(); + final LinkedHashSet> futures = new LinkedHashSet<>(waitingFutures.keySet()); + waitingFutures.clear(); + for (CustomSettableFuture theFuture : futures) { + theFuture.setException(new IllegalStateException("Server is shutting down.")); + } + } + /** * Add batch of segment changes update. */ public synchronized void addChangeRequests(List requests) { + if (singleThreadedExecutor.isShutdown()) { + return; + } for (T request : requests) { changes.add(new Holder<>(request, getLastCounter().inc())); } @@ -108,6 +123,10 @@ public class ChangeRequestHistory public synchronized ListenableFuture> getRequestsSince(final Counter counter) { final CustomSettableFuture future = new CustomSettableFuture<>(waitingFutures); + if (singleThreadedExecutor.isShutdown()) { + future.setException(new IllegalStateException("Server is shutting down.")); + return future; + } if (counter.counter < 0) { future.setException(new IAE("counter[%s] must be >= 0", counter)); diff --git a/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java b/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java index 82a67b038ed..ecba53a8a03 100644 --- a/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java +++ b/server/src/test/java/org/apache/druid/server/coordination/ChangeRequestHistoryTest.java @@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture; import org.junit.Assert; import org.junit.Test; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -171,4 +172,44 @@ public class ChangeRequestHistoryTest Assert.assertEquals(1, snapshot.getCounter().getCounter()); Assert.assertEquals(1, snapshot.getRequests().size()); } + + @Test + public void testStop() + { + final ChangeRequestHistory history = new ChangeRequestHistory(); + + ListenableFuture> future = history.getRequestsSince( + ChangeRequestHistory.Counter.ZERO + ); + Assert.assertEquals(1, history.waitingFutures.size()); + + final AtomicBoolean callbackExcecuted = new AtomicBoolean(false); + Futures.addCallback( + future, + new FutureCallback>() + { + @Override + public void onSuccess(ChangeRequestsSnapshot result) + { + callbackExcecuted.set(true); + } + + @Override + public void onFailure(Throwable t) + { + callbackExcecuted.set(true); + } + } + ); + + history.stop(); + // any new change requests should be ignored, there should be no waiting futures, and open futures should be resolved + history.addChangeRequest(new SegmentChangeRequestNoop()); + Assert.assertEquals(0, history.waitingFutures.size()); + Assert.assertTrue(callbackExcecuted.get()); + Assert.assertTrue(future.isDone()); + + Throwable thrown = Assert.assertThrows(ExecutionException.class, future::get); + Assert.assertEquals("java.lang.IllegalStateException: Server is shutting down.", thrown.getMessage()); + } }