From e96881158326c8c21c25810d932548fd087a656c Mon Sep 17 00:00:00 2001 From: Himanshu Date: Tue, 13 Mar 2018 00:13:51 -0500 Subject: [PATCH] HttpServerInventoryView: fixed startup wait time and more informative logging (#5336) --- .../druid/client/HttpServerInventoryView.java | 59 +++++++++++++++---- .../coordination/ChangeRequestHttpSyncer.java | 36 +++++++---- 2 files changed, 71 insertions(+), 24 deletions(-) diff --git a/server/src/main/java/io/druid/client/HttpServerInventoryView.java b/server/src/main/java/io/druid/client/HttpServerInventoryView.java index 1c3274c338f..3c84795619e 100644 --- a/server/src/main/java/io/druid/client/HttpServerInventoryView.java +++ b/server/src/main/java/io/druid/client/HttpServerInventoryView.java @@ -28,8 +28,6 @@ import com.google.common.base.Predicates; import com.google.common.collect.Maps; import com.google.common.net.HostAndPort; import com.google.inject.Inject; -import io.druid.java.util.emitter.EmittingLogger; -import io.druid.java.util.http.client.HttpClient; import io.druid.concurrent.LifecycleLock; import io.druid.discovery.DataNodeService; import io.druid.discovery.DiscoveryDruidNode; @@ -40,22 +38,27 @@ import io.druid.guice.annotations.Smile; import io.druid.java.util.common.IAE; import io.druid.java.util.common.ISE; import io.druid.java.util.common.Pair; +import io.druid.java.util.common.RE; import io.druid.java.util.common.concurrent.ScheduledExecutors; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; +import io.druid.java.util.emitter.EmittingLogger; +import io.druid.java.util.http.client.HttpClient; +import io.druid.server.coordination.ChangeRequestHttpSyncer; import io.druid.server.coordination.ChangeRequestsSnapshot; import io.druid.server.coordination.DataSegmentChangeRequest; import io.druid.server.coordination.DruidServerMetadata; import io.druid.server.coordination.SegmentChangeRequestDrop; import io.druid.server.coordination.SegmentChangeRequestLoad; -import io.druid.server.coordination.ChangeRequestHttpSyncer; import io.druid.timeline.DataSegment; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -328,8 +331,42 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer //segmentViewInitialized on all registered segment callbacks. private void serverInventoryInitialized() { + long start = System.currentTimeMillis(); + long serverSyncWaitTimeout = config.getServerTimeout() + 2 * ChangeRequestHttpSyncer.HTTP_TIMEOUT_EXTRA_MS; + + List uninitializedServers = new ArrayList<>(); for (DruidServerHolder server : servers.values()) { - server.awaitInitialization(); + if (!server.isSyncedSuccessfullyAtleastOnce()) { + uninitializedServers.add(server); + } + } + + while (!uninitializedServers.isEmpty() && ((System.currentTimeMillis() - start) < serverSyncWaitTimeout)) { + try { + Thread.sleep(5000); + } + catch (InterruptedException ex) { + throw new RE(ex, "Interrupted while waiting for queryable server initial successful sync."); + } + + log.info("Checking whether all servers have been synced at least once yet...."); + Iterator iter = uninitializedServers.iterator(); + while (iter.hasNext()) { + if (iter.next().isSyncedSuccessfullyAtleastOnce()) { + iter.remove(); + } + } + } + + if (uninitializedServers.isEmpty()) { + log.info("All servers have been synced successfully at least once."); + } else { + for (DruidServerHolder server : uninitializedServers) { + log.warn( + "Server[%s] might not yet be synced successfully. We will continue to retry that in the background.", + server.druidServer.getName() + ); + } } inventoryInitializationLatch.countDown(); @@ -495,17 +532,17 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer syncer.stop(); } - //best effort wait for first fetch of segment listing from server. - void awaitInitialization() + boolean isSyncedSuccessfullyAtleastOnce() { try { - if (!syncer.awaitInitialization(syncer.getServerHttpTimeout())) { - log.warn("Await initialization timed out for server [%s].", druidServer.getName()); - } + return syncer.awaitInitialization(1); } catch (InterruptedException ex) { - log.warn("Await initialization interrupted while waiting on server [%s].", druidServer.getName()); - Thread.currentThread().interrupt(); + throw new RE( + ex, + "Interrupted while waiting for queryable server[%s] initial successful sync.", + druidServer.getName() + ); } } diff --git a/server/src/main/java/io/druid/server/coordination/ChangeRequestHttpSyncer.java b/server/src/main/java/io/druid/server/coordination/ChangeRequestHttpSyncer.java index 6d8f37d91e8..1770d8b7842 100644 --- a/server/src/main/java/io/druid/server/coordination/ChangeRequestHttpSyncer.java +++ b/server/src/main/java/io/druid/server/coordination/ChangeRequestHttpSyncer.java @@ -26,17 +26,17 @@ import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import io.druid.concurrent.LifecycleLock; +import io.druid.java.util.common.ISE; +import io.druid.java.util.common.RE; +import io.druid.java.util.common.RetryUtils; +import io.druid.java.util.common.StringUtils; import io.druid.java.util.emitter.EmittingLogger; import io.druid.java.util.http.client.HttpClient; import io.druid.java.util.http.client.Request; import io.druid.java.util.http.client.io.AppendableByteArrayInputStream; import io.druid.java.util.http.client.response.ClientResponse; import io.druid.java.util.http.client.response.InputStreamResponseHandler; -import io.druid.concurrent.LifecycleLock; -import io.druid.java.util.common.ISE; -import io.druid.java.util.common.RE; -import io.druid.java.util.common.RetryUtils; -import io.druid.java.util.common.StringUtils; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponse; @@ -60,6 +60,8 @@ public class ChangeRequestHttpSyncer { private static final EmittingLogger log = new EmittingLogger(ChangeRequestHttpSyncer.class); + public static final long HTTP_TIMEOUT_EXTRA_MS = 5000; + private static final long MAX_RETRY_BACKOFF = TimeUnit.MINUTES.toMillis(2); private final ObjectMapper smileMapper; @@ -87,8 +89,8 @@ public class ChangeRequestHttpSyncer private ChangeRequestHistory.Counter counter = null; private long unstableStartTime = -1; private int consecutiveFailedAttemptCount = 0; - private long lastSuccessfulSyncTime = System.currentTimeMillis(); - private long lastSyncTime = System.currentTimeMillis(); + private long lastSuccessfulSyncTime = 0; + private long lastSyncTime = 0; public ChangeRequestHttpSyncer( ObjectMapper smileMapper, @@ -110,7 +112,7 @@ public class ChangeRequestHttpSyncer this.responseTypeReferences = responseTypeReferences; this.serverTimeoutMS = serverTimeoutMS; this.serverUnstabilityTimeout = serverUnstabilityTimeout; - this.serverHttpTimeout = serverTimeoutMS + 5000; + this.serverHttpTimeout = serverTimeoutMS + HTTP_TIMEOUT_EXTRA_MS; this.listener = listener; this.logIdentity = StringUtils.format("%s_%s", baseServerURL, System.currentTimeMillis()); } @@ -154,10 +156,10 @@ public class ChangeRequestHttpSyncer { long currTime = System.currentTimeMillis(); - return ImmutableMap.of("notSyncedForSecs", (currTime - lastSyncTime) / 1000, - "notSuccessfullySyncedFor", (currTime - lastSuccessfulSyncTime) / 1000, + return ImmutableMap.of("notSyncedForSecs", lastSyncTime == 0 ? "Never Synced" : (currTime - lastSyncTime) / 1000, + "notSuccessfullySyncedFor", lastSuccessfulSyncTime == 0 ? "Never Successfully Synced" : (currTime - lastSuccessfulSyncTime) / 1000, "consecutiveFailedAttemptCount", consecutiveFailedAttemptCount, - "started", startStopLock.isStarted() + "syncScheduled", startStopLock.isStarted() ); } @@ -269,8 +271,16 @@ public class ChangeRequestHttpSyncer counter = changes.getCounter(); - initializationLatch.countDown(); - consecutiveFailedAttemptCount = 0; + if (!initializationLatch.await(1, TimeUnit.MILLISECONDS)) { + initializationLatch.countDown(); + log.info("[%s] synced successfully for the first time.", logIdentity); + } + + if (consecutiveFailedAttemptCount > 0) { + consecutiveFailedAttemptCount = 0; + log.info("[%s] synced successfully.", logIdentity); + } + lastSuccessfulSyncTime = System.currentTimeMillis(); } catch (Exception ex) {