don't hold thread while waiting after failure from server (#4795)

This commit is contained in:
Himanshu 2017-09-14 17:19:25 -05:00 committed by Parag Jain
parent b61248fdb1
commit d37be5e6e9
2 changed files with 56 additions and 37 deletions

View File

@ -81,7 +81,7 @@ public class RetryUtils
return retry(f, shouldRetry, 0, maxTries);
}
private static void awaitNextRetry(Throwable e, final int nTry, final boolean quiet) throws InterruptedException
private static void awaitNextRetry(final Throwable e, final int nTry, final boolean quiet) throws InterruptedException
{
final long sleepMillis = nextRetrySleepMillis(nTry);
@ -95,7 +95,7 @@ public class RetryUtils
Thread.sleep(sleepMillis);
}
public static long nextRetrySleepMillis(final int nTry) throws InterruptedException
public static long nextRetrySleepMillis(final int nTry)
{
final long baseSleepMillis = 1000;
final long maxSleepMillis = 60000;

View File

@ -31,7 +31,6 @@ import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.metamx.emitter.EmittingLogger;
import com.metamx.http.client.HttpClient;
@ -50,6 +49,7 @@ import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.server.coordination.DataSegmentChangeRequest;
@ -75,9 +75,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -103,7 +102,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
// For each queryable server, a name -> DruidServerHolder entry is kept
private final ConcurrentHashMap<String, DruidServerHolder> servers = new ConcurrentHashMap<>();
private volatile ExecutorService executor;
private volatile ScheduledExecutorService executor;
// the work queue, all items in this are sequentially processed by main thread setup in start()
// used to call inventoryInitialized on all SegmentCallbacks and
@ -145,9 +144,9 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
log.info("Starting HttpServerInventoryView.");
try {
executor = Executors.newFixedThreadPool(
executor = ScheduledExecutors.fixed(
config.getNumThreads(),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HttpServerInventoryView-%s").build()
"HttpServerInventoryView-%s"
);
executor.execute(
@ -433,10 +432,23 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
private volatile long unstableStartTime = -1;
private volatile int consecutiveFailedAttemptCount = 0;
private final Runnable addToQueueRunnable;
DruidServerHolder(DruidServer druidServer)
{
this.druidServer = druidServer;
this.serverHostAndPort = HostAndPort.fromString(druidServer.getHost());
this.addToQueueRunnable = () -> {
queue.add(
() -> {
DruidServerHolder holder = servers.get(druidServer.getName());
if (holder != null) {
holder.updateSegmentsListAsync();
}
}
);
};
}
//wait for first fetch of segment listing from server.
@ -571,10 +583,21 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
consecutiveFailedAttemptCount = 0;
}
catch (Exception ex) {
log.error(ex, "error processing segment list response from server [%s]", druidServer.getName());
String logMsg = StringUtils.nonStrictFormat(
"Error processing segment list response from server [%s]. Reason [%s]",
druidServer.getName(),
ex.getMessage()
);
if (incrementFailedAttemptAndCheckUnstabilityTimeout()) {
log.error(ex, logMsg);
} else {
log.info("Temporary Failure. %s", logMsg);
log.debug(ex, logMsg);
}
}
finally {
addNextSyncToWorkQueue(druidServer.getName());
addNextSyncToWorkQueue();
}
}
@ -603,17 +626,9 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
log.debug(logMsg);
}
}
// sleep for a bit so that retry does not happen immediately.
try {
Thread.sleep(RetryUtils.nextRetrySleepMillis(consecutiveFailedAttemptCount));
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
finally {
addNextSyncToWorkQueue(druidServer.getName());
addNextSyncToWorkQueue();
}
}
},
@ -632,17 +647,9 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
log.info("Temporary Failure. %s", logMsg);
log.debug(th, logMsg);
}
// sleep for a bit so that retry does not happen immediately.
try {
Thread.sleep(RetryUtils.nextRetrySleepMillis(consecutiveFailedAttemptCount));
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
finally {
addNextSyncToWorkQueue(druidServer.getName());
addNextSyncToWorkQueue();
}
}
}
@ -696,16 +703,28 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
}
}
private void addNextSyncToWorkQueue(final String serverId)
private void addNextSyncToWorkQueue()
{
queue.add(
() -> {
DruidServerHolder holder = servers.get(serverId);
if (holder != null) {
holder.updateSegmentsListAsync();
}
}
);
if (consecutiveFailedAttemptCount > 0) {
try {
long sleepMillis = RetryUtils.nextRetrySleepMillis(consecutiveFailedAttemptCount);
log.info("Scheduling next syncup in [%d] millis from server [%s].", sleepMillis, druidServer.getName());
executor.schedule(
addToQueueRunnable,
sleepMillis,
TimeUnit.MILLISECONDS
);
}
catch (Exception ex) {
log.makeAlert(
ex,
"WTF! Couldn't schedule next sync. Server[%s] is not being synced any more, restarting Druid process on that server might fix the issue.",
druidServer.getName()
).emit();
}
} else {
addToQueueRunnable.run();
}
}
private boolean incrementFailedAttemptAndCheckUnstabilityTimeout()