mirror of https://github.com/apache/druid.git
Add better messages around LookupCoordinatorManager failures (#3027)
* Add better messages around LookupCoordinatorManager failures * Catches #3026 * A few more little tests * Add more forceful shutdown
This commit is contained in:
parent
e2653a8cf4
commit
847501a939
|
@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
|
|||
import com.fasterxml.jackson.core.type.TypeReference;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Function;
|
||||
import com.google.common.base.Predicates;
|
||||
import com.google.common.base.Throwables;
|
||||
|
@ -31,8 +32,10 @@ import com.google.common.collect.ImmutableList;
|
|||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.net.HostAndPort;
|
||||
import com.google.common.util.concurrent.FutureCallback;
|
||||
import com.google.common.util.concurrent.Futures;
|
||||
import com.google.common.util.concurrent.ListenableFuture;
|
||||
import com.google.common.util.concurrent.ListenableScheduledFuture;
|
||||
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
|
||||
import com.google.common.util.concurrent.MoreExecutors;
|
||||
import com.google.inject.Inject;
|
||||
|
@ -71,6 +74,7 @@ import java.util.Collection;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -121,6 +125,8 @@ public class LookupCoordinatorManager
|
|||
private AtomicReference<Map<String, Map<String, Map<String, Object>>>> lookupMapConfigRef;
|
||||
private volatile Map<String, Map<String, Map<String, Object>>> prior_update = ImmutableMap.of();
|
||||
private volatile boolean started = false;
|
||||
private volatile ListenableScheduledFuture<?> backgroundManagerFuture = null;
|
||||
private final CountDownLatch backgroundManagerExitedLatch = new CountDownLatch(1);
|
||||
|
||||
|
||||
@Inject
|
||||
|
@ -535,7 +541,7 @@ public class LookupCoordinatorManager
|
|||
},
|
||||
null
|
||||
);
|
||||
executorService.scheduleWithFixedDelay(
|
||||
final ListenableScheduledFuture backgroundManagerFuture = this.backgroundManagerFuture = executorService.scheduleWithFixedDelay(
|
||||
new Runnable()
|
||||
{
|
||||
@Override
|
||||
|
@ -588,6 +594,27 @@ public class LookupCoordinatorManager
|
|||
lookupCoordinatorManagerConfig.getPeriod(),
|
||||
TimeUnit.MILLISECONDS
|
||||
);
|
||||
Futures.addCallback(backgroundManagerFuture, new FutureCallback<Object>()
|
||||
{
|
||||
@Override
|
||||
public void onSuccess(@Nullable Object result)
|
||||
{
|
||||
backgroundManagerExitedLatch.countDown();
|
||||
LOG.debug("Exited background lookup manager");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t)
|
||||
{
|
||||
backgroundManagerExitedLatch.countDown();
|
||||
if (backgroundManagerFuture.isCancelled()) {
|
||||
LOG.info("Background lookup manager exited");
|
||||
LOG.trace(t, "Background lookup manager exited with throwable");
|
||||
} else {
|
||||
LOG.makeAlert(t, "Background lookup manager exited with error!").emit();
|
||||
}
|
||||
}
|
||||
});
|
||||
started = true;
|
||||
LOG.debug("Started");
|
||||
}
|
||||
|
@ -603,6 +630,11 @@ public class LookupCoordinatorManager
|
|||
}
|
||||
started = false;
|
||||
executorService.shutdownNow();
|
||||
final ListenableScheduledFuture backgroundManagerFuture = this.backgroundManagerFuture;
|
||||
this.backgroundManagerFuture = null;
|
||||
if (backgroundManagerFuture != null && !backgroundManagerFuture.cancel(true)) {
|
||||
LOG.warn("Background lookup manager thread could not be cancelled");
|
||||
}
|
||||
// NOTE: we can't un-watch the configuration key
|
||||
LOG.debug("Stopped");
|
||||
}
|
||||
|
@ -627,4 +659,17 @@ public class LookupCoordinatorManager
|
|||
{
|
||||
return statusCode == 404;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
boolean backgroundManagerIsRunning()
|
||||
{
|
||||
ListenableScheduledFuture backgroundManagerFuture = this.backgroundManagerFuture;
|
||||
return backgroundManagerFuture != null && !backgroundManagerFuture.isDone();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
boolean waitForBackgroundTermination(long timeout) throws InterruptedException
|
||||
{
|
||||
return backgroundManagerExitedLatch.await(timeout, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1190,11 +1190,18 @@ public class LookupCoordinatorManagerTest
|
|||
discoverer,
|
||||
mapper,
|
||||
configManager,
|
||||
lookupCoordinatorManagerConfig
|
||||
new LookupCoordinatorManagerConfig(){
|
||||
@Override
|
||||
public long getPeriod(){
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
);
|
||||
manager.start();
|
||||
manager.start();
|
||||
Assert.assertTrue(manager.backgroundManagerIsRunning());
|
||||
Assert.assertNull(manager.getKnownLookups());
|
||||
Assert.assertFalse(manager.waitForBackgroundTermination(10));
|
||||
EasyMock.verify(configManager);
|
||||
}
|
||||
|
||||
|
@ -1219,8 +1226,12 @@ public class LookupCoordinatorManagerTest
|
|||
lookupCoordinatorManagerConfig
|
||||
);
|
||||
manager.start();
|
||||
Assert.assertTrue(manager.backgroundManagerIsRunning());
|
||||
Assert.assertFalse(manager.waitForBackgroundTermination(10));
|
||||
manager.stop();
|
||||
manager.stop();
|
||||
Assert.assertTrue(manager.waitForBackgroundTermination(10));
|
||||
Assert.assertFalse(manager.backgroundManagerIsRunning());
|
||||
EasyMock.verify(configManager);
|
||||
}
|
||||
|
||||
|
@ -1245,6 +1256,8 @@ public class LookupCoordinatorManagerTest
|
|||
lookupCoordinatorManagerConfig
|
||||
);
|
||||
manager.start();
|
||||
Assert.assertTrue(manager.backgroundManagerIsRunning());
|
||||
Assert.assertFalse(manager.waitForBackgroundTermination(10));
|
||||
manager.stop();
|
||||
expectedException.expect(new BaseMatcher<Throwable>()
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue