fix issue with jetty graceful shutdown of data servers when druid.serverview.type=http (#13499)

* fix issue with http server inventory view blocking data node http server shutdown with long polling

* adjust

* fix test inspections
This commit is contained in:
Clint Wylie 2022-12-06 15:52:44 -08:00 committed by GitHub
parent 136322d13b
commit cf472162a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 69 additions and 1 deletions

View File

@ -54,7 +54,7 @@ public class AnnouncerModule implements Module
JsonConfigProvider.bind(binder, "druid.announcer", BatchDataSegmentAnnouncerConfig.class);
JsonConfigProvider.bind(binder, "druid.announcer", DataSegmentAnnouncerProvider.class);
binder.bind(DataSegmentAnnouncer.class).toProvider(DataSegmentAnnouncerProvider.class);
binder.bind(BatchDataSegmentAnnouncer.class).in(LazySingleton.class);
binder.bind(BatchDataSegmentAnnouncer.class).in(ManageLifecycleAnnouncements.class);
if (isZkEnabled) {
binder.bind(DataSegmentServerAnnouncer.class).to(CuratorDataSegmentServerAnnouncer.class).in(LazySingleton.class);

View File

@ -36,6 +36,7 @@ import org.apache.druid.curator.announcement.Announcer;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import org.apache.druid.server.initialization.ZkPathsConfig;
@ -129,6 +130,13 @@ public class BatchDataSegmentAnnouncer implements DataSegmentAnnouncer
this(server, config, zkPaths, () -> announcer, jsonMapper, ZkEnablementConfig.ENABLED);
}
@LifecycleStop
public void stop()
{
changes.stop();
}
@Override
public void announceSegment(DataSegment segment) throws IOException
{

View File

@ -32,6 +32,7 @@ import org.apache.druid.utils.CircularBuffer;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@ -43,6 +44,7 @@ import java.util.concurrent.ExecutorService;
*
* Clients call {@link #getRequestsSince} to get updates since given counter.
*/
public class ChangeRequestHistory<T>
{
private static int MAX_SIZE = 1000;
@ -74,11 +76,24 @@ public class ChangeRequestHistory<T>
this.singleThreadedExecutor = Execs.singleThreaded("SegmentChangeRequestHistory");
}
public void stop()
{
singleThreadedExecutor.shutdownNow();
final LinkedHashSet<CustomSettableFuture<?>> futures = new LinkedHashSet<>(waitingFutures.keySet());
waitingFutures.clear();
for (CustomSettableFuture<?> theFuture : futures) {
theFuture.setException(new IllegalStateException("Server is shutting down."));
}
}
/**
* Add batch of segment changes update.
*/
public synchronized void addChangeRequests(List<T> requests)
{
if (singleThreadedExecutor.isShutdown()) {
return;
}
for (T request : requests) {
changes.add(new Holder<>(request, getLastCounter().inc()));
}
@ -108,6 +123,10 @@ public class ChangeRequestHistory<T>
public synchronized ListenableFuture<ChangeRequestsSnapshot<T>> getRequestsSince(final Counter counter)
{
final CustomSettableFuture<T> future = new CustomSettableFuture<>(waitingFutures);
if (singleThreadedExecutor.isShutdown()) {
future.setException(new IllegalStateException("Server is shutting down."));
return future;
}
if (counter.counter < 0) {
future.setException(new IAE("counter[%s] must be >= 0", counter));

View File

@ -25,6 +25,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -171,4 +172,44 @@ public class ChangeRequestHistoryTest
Assert.assertEquals(1, snapshot.getCounter().getCounter());
Assert.assertEquals(1, snapshot.getRequests().size());
}
@Test
public void testStop()
{
final ChangeRequestHistory<DataSegmentChangeRequest> history = new ChangeRequestHistory();
ListenableFuture<ChangeRequestsSnapshot<DataSegmentChangeRequest>> future = history.getRequestsSince(
ChangeRequestHistory.Counter.ZERO
);
Assert.assertEquals(1, history.waitingFutures.size());
final AtomicBoolean callbackExcecuted = new AtomicBoolean(false);
Futures.addCallback(
future,
new FutureCallback<ChangeRequestsSnapshot<DataSegmentChangeRequest>>()
{
@Override
public void onSuccess(ChangeRequestsSnapshot result)
{
callbackExcecuted.set(true);
}
@Override
public void onFailure(Throwable t)
{
callbackExcecuted.set(true);
}
}
);
history.stop();
// any new change requests should be ignored, there should be no waiting futures, and open futures should be resolved
history.addChangeRequest(new SegmentChangeRequestNoop());
Assert.assertEquals(0, history.waitingFutures.size());
Assert.assertTrue(callbackExcecuted.get());
Assert.assertTrue(future.isDone());
Throwable thrown = Assert.assertThrows(ExecutionException.class, future::get);
Assert.assertEquals("java.lang.IllegalStateException: Server is shutting down.", thrown.getMessage());
}
}