diff --git a/java-util/src/main/java/io/druid/java/util/common/RetryUtils.java b/java-util/src/main/java/io/druid/java/util/common/RetryUtils.java index 7b0e019283b..f282ead3a5b 100644 --- a/java-util/src/main/java/io/druid/java/util/common/RetryUtils.java +++ b/java-util/src/main/java/io/druid/java/util/common/RetryUtils.java @@ -81,7 +81,7 @@ public class RetryUtils return retry(f, shouldRetry, 0, maxTries); } - private static void awaitNextRetry(Throwable e, final int nTry, final boolean quiet) throws InterruptedException + private static void awaitNextRetry(final Throwable e, final int nTry, final boolean quiet) throws InterruptedException { final long sleepMillis = nextRetrySleepMillis(nTry); @@ -95,7 +95,7 @@ public class RetryUtils Thread.sleep(sleepMillis); } - public static long nextRetrySleepMillis(final int nTry) throws InterruptedException + public static long nextRetrySleepMillis(final int nTry) { final long baseSleepMillis = 1000; final long maxSleepMillis = 60000; diff --git a/server/src/main/java/io/druid/client/HttpServerInventoryView.java b/server/src/main/java/io/druid/client/HttpServerInventoryView.java index d19ca3bb776..14d53d09704 100644 --- a/server/src/main/java/io/druid/client/HttpServerInventoryView.java +++ b/server/src/main/java/io/druid/client/HttpServerInventoryView.java @@ -31,7 +31,6 @@ import com.google.common.net.HostAndPort; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.inject.Inject; import com.metamx.emitter.EmittingLogger; import com.metamx.http.client.HttpClient; @@ -50,6 +49,7 @@ import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; import io.druid.java.util.common.RetryUtils; import io.druid.java.util.common.StringUtils; +import io.druid.java.util.common.concurrent.ScheduledExecutors; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.server.coordination.DataSegmentChangeRequest; @@ -75,9 +75,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -103,7 +102,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer // For each queryable server, a name -> DruidServerHolder entry is kept private final ConcurrentHashMap servers = new ConcurrentHashMap<>(); - private volatile ExecutorService executor; + private volatile ScheduledExecutorService executor; // the work queue, all items in this are sequentially processed by main thread setup in start() // used to call inventoryInitialized on all SegmentCallbacks and @@ -145,9 +144,9 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer log.info("Starting HttpServerInventoryView."); try { - executor = Executors.newFixedThreadPool( + executor = ScheduledExecutors.fixed( config.getNumThreads(), - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("HttpServerInventoryView-%s").build() + "HttpServerInventoryView-%s" ); executor.execute( @@ -433,10 +432,23 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer private volatile long unstableStartTime = -1; private volatile int consecutiveFailedAttemptCount = 0; + private final Runnable addToQueueRunnable; + DruidServerHolder(DruidServer druidServer) { this.druidServer = druidServer; this.serverHostAndPort = HostAndPort.fromString(druidServer.getHost()); + + this.addToQueueRunnable = () -> { + queue.add( + () -> { + DruidServerHolder holder = servers.get(druidServer.getName()); + if (holder != null) { + holder.updateSegmentsListAsync(); + } + } + ); + }; } //wait for first fetch of segment listing from server. @@ -571,10 +583,21 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer consecutiveFailedAttemptCount = 0; } catch (Exception ex) { - log.error(ex, "error processing segment list response from server [%s]", druidServer.getName()); + String logMsg = StringUtils.nonStrictFormat( + "Error processing segment list response from server [%s]. Reason [%s]", + druidServer.getName(), + ex.getMessage() + ); + + if (incrementFailedAttemptAndCheckUnstabilityTimeout()) { + log.error(ex, logMsg); + } else { + log.info("Temporary Failure. %s", logMsg); + log.debug(ex, logMsg); + } } finally { - addNextSyncToWorkQueue(druidServer.getName()); + addNextSyncToWorkQueue(); } } @@ -603,17 +626,9 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer log.debug(logMsg); } } - - // sleep for a bit so that retry does not happen immediately. - try { - Thread.sleep(RetryUtils.nextRetrySleepMillis(consecutiveFailedAttemptCount)); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } } finally { - addNextSyncToWorkQueue(druidServer.getName()); + addNextSyncToWorkQueue(); } } }, @@ -632,17 +647,9 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer log.info("Temporary Failure. %s", logMsg); log.debug(th, logMsg); } - - // sleep for a bit so that retry does not happen immediately. - try { - Thread.sleep(RetryUtils.nextRetrySleepMillis(consecutiveFailedAttemptCount)); - } - catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } } finally { - addNextSyncToWorkQueue(druidServer.getName()); + addNextSyncToWorkQueue(); } } } @@ -696,16 +703,28 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer } } - private void addNextSyncToWorkQueue(final String serverId) + private void addNextSyncToWorkQueue() { - queue.add( - () -> { - DruidServerHolder holder = servers.get(serverId); - if (holder != null) { - holder.updateSegmentsListAsync(); - } - } - ); + if (consecutiveFailedAttemptCount > 0) { + try { + long sleepMillis = RetryUtils.nextRetrySleepMillis(consecutiveFailedAttemptCount); + log.info("Scheduling next syncup in [%d] millis from server [%s].", sleepMillis, druidServer.getName()); + executor.schedule( + addToQueueRunnable, + sleepMillis, + TimeUnit.MILLISECONDS + ); + } + catch (Exception ex) { + log.makeAlert( + ex, + "WTF! Couldn't schedule next sync. Server[%s] is not being synced any more, restarting Druid process on that server might fix the issue.", + druidServer.getName() + ).emit(); + } + } else { + addToQueueRunnable.run(); + } } private boolean incrementFailedAttemptAndCheckUnstabilityTimeout()