HttpServerInventoryView: fixed startup wait time and more informative logging (#5336)

This commit is contained in:
Himanshu 2018-03-13 00:13:51 -05:00 committed by Gian Merlino
parent 6b158abe3f
commit e968811583
2 changed files with 71 additions and 24 deletions

View File

@ -28,8 +28,6 @@ import com.google.common.base.Predicates;
import com.google.common.collect.Maps;
import com.google.common.net.HostAndPort;
import com.google.inject.Inject;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.http.client.HttpClient;
import io.druid.concurrent.LifecycleLock;
import io.druid.discovery.DataNodeService;
import io.druid.discovery.DiscoveryDruidNode;
@ -40,22 +38,27 @@ import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.IAE;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.http.client.HttpClient;
import io.druid.server.coordination.ChangeRequestHttpSyncer;
import io.druid.server.coordination.ChangeRequestsSnapshot;
import io.druid.server.coordination.DataSegmentChangeRequest;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.coordination.SegmentChangeRequestDrop;
import io.druid.server.coordination.SegmentChangeRequestLoad;
import io.druid.server.coordination.ChangeRequestHttpSyncer;
import io.druid.timeline.DataSegment;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -328,8 +331,42 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
//segmentViewInitialized on all registered segment callbacks.
private void serverInventoryInitialized()
{
long start = System.currentTimeMillis();
long serverSyncWaitTimeout = config.getServerTimeout() + 2 * ChangeRequestHttpSyncer.HTTP_TIMEOUT_EXTRA_MS;
List<DruidServerHolder> uninitializedServers = new ArrayList<>();
for (DruidServerHolder server : servers.values()) {
server.awaitInitialization();
if (!server.isSyncedSuccessfullyAtleastOnce()) {
uninitializedServers.add(server);
}
}
while (!uninitializedServers.isEmpty() && ((System.currentTimeMillis() - start) < serverSyncWaitTimeout)) {
try {
Thread.sleep(5000);
}
catch (InterruptedException ex) {
throw new RE(ex, "Interrupted while waiting for queryable server initial successful sync.");
}
log.info("Checking whether all servers have been synced at least once yet....");
Iterator<DruidServerHolder> iter = uninitializedServers.iterator();
while (iter.hasNext()) {
if (iter.next().isSyncedSuccessfullyAtleastOnce()) {
iter.remove();
}
}
}
if (uninitializedServers.isEmpty()) {
log.info("All servers have been synced successfully at least once.");
} else {
for (DruidServerHolder server : uninitializedServers) {
log.warn(
"Server[%s] might not yet be synced successfully. We will continue to retry that in the background.",
server.druidServer.getName()
);
}
}
inventoryInitializationLatch.countDown();
@ -495,17 +532,17 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
syncer.stop();
}
//best effort wait for first fetch of segment listing from server.
void awaitInitialization()
boolean isSyncedSuccessfullyAtleastOnce()
{
try {
if (!syncer.awaitInitialization(syncer.getServerHttpTimeout())) {
log.warn("Await initialization timed out for server [%s].", druidServer.getName());
}
return syncer.awaitInitialization(1);
}
catch (InterruptedException ex) {
log.warn("Await initialization interrupted while waiting on server [%s].", druidServer.getName());
Thread.currentThread().interrupt();
throw new RE(
ex,
"Interrupted while waiting for queryable server[%s] initial successful sync.",
druidServer.getName()
);
}
}

View File

@ -26,17 +26,17 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import io.druid.concurrent.LifecycleLock;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.http.client.HttpClient;
import io.druid.java.util.http.client.Request;
import io.druid.java.util.http.client.io.AppendableByteArrayInputStream;
import io.druid.java.util.http.client.response.ClientResponse;
import io.druid.java.util.http.client.response.InputStreamResponseHandler;
import io.druid.concurrent.LifecycleLock;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.RetryUtils;
import io.druid.java.util.common.StringUtils;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse;
@ -60,6 +60,8 @@ public class ChangeRequestHttpSyncer<T>
{
private static final EmittingLogger log = new EmittingLogger(ChangeRequestHttpSyncer.class);
public static final long HTTP_TIMEOUT_EXTRA_MS = 5000;
private static final long MAX_RETRY_BACKOFF = TimeUnit.MINUTES.toMillis(2);
private final ObjectMapper smileMapper;
@ -87,8 +89,8 @@ public class ChangeRequestHttpSyncer<T>
private ChangeRequestHistory.Counter counter = null;
private long unstableStartTime = -1;
private int consecutiveFailedAttemptCount = 0;
private long lastSuccessfulSyncTime = System.currentTimeMillis();
private long lastSyncTime = System.currentTimeMillis();
private long lastSuccessfulSyncTime = 0;
private long lastSyncTime = 0;
public ChangeRequestHttpSyncer(
ObjectMapper smileMapper,
@ -110,7 +112,7 @@ public class ChangeRequestHttpSyncer<T>
this.responseTypeReferences = responseTypeReferences;
this.serverTimeoutMS = serverTimeoutMS;
this.serverUnstabilityTimeout = serverUnstabilityTimeout;
this.serverHttpTimeout = serverTimeoutMS + 5000;
this.serverHttpTimeout = serverTimeoutMS + HTTP_TIMEOUT_EXTRA_MS;
this.listener = listener;
this.logIdentity = StringUtils.format("%s_%s", baseServerURL, System.currentTimeMillis());
}
@ -154,10 +156,10 @@ public class ChangeRequestHttpSyncer<T>
{
long currTime = System.currentTimeMillis();
return ImmutableMap.of("notSyncedForSecs", (currTime - lastSyncTime) / 1000,
"notSuccessfullySyncedFor", (currTime - lastSuccessfulSyncTime) / 1000,
return ImmutableMap.of("notSyncedForSecs", lastSyncTime == 0 ? "Never Synced" : (currTime - lastSyncTime) / 1000,
"notSuccessfullySyncedFor", lastSuccessfulSyncTime == 0 ? "Never Successfully Synced" : (currTime - lastSuccessfulSyncTime) / 1000,
"consecutiveFailedAttemptCount", consecutiveFailedAttemptCount,
"started", startStopLock.isStarted()
"syncScheduled", startStopLock.isStarted()
);
}
@ -269,8 +271,16 @@ public class ChangeRequestHttpSyncer<T>
counter = changes.getCounter();
initializationLatch.countDown();
consecutiveFailedAttemptCount = 0;
if (!initializationLatch.await(1, TimeUnit.MILLISECONDS)) {
initializationLatch.countDown();
log.info("[%s] synced successfully for the first time.", logIdentity);
}
if (consecutiveFailedAttemptCount > 0) {
consecutiveFailedAttemptCount = 0;
log.info("[%s] synced successfully.", logIdentity);
}
lastSuccessfulSyncTime = System.currentTimeMillis();
}
catch (Exception ex) {