diff --git a/server/src/main/java/io/druid/client/HttpServerInventoryView.java b/server/src/main/java/io/druid/client/HttpServerInventoryView.java index 9419f55d0fb..7f4d4f60e20 100644 --- a/server/src/main/java/io/druid/client/HttpServerInventoryView.java +++ b/server/src/main/java/io/druid/client/HttpServerInventoryView.java @@ -64,9 +64,9 @@ import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.InputStream; import java.net.URL; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -96,7 +96,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer private volatile Predicate> finalPredicate; // For each queryable server, a name -> DruidServerHolder entry is kept - private final Map servers = new HashMap<>(); + private final ConcurrentHashMap servers = new ConcurrentHashMap<>(); private volatile ExecutorService executor; @@ -159,13 +159,9 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) { try { - String name = queue.take(); - - synchronized (servers) { - DruidServerHolder holder = servers.get(name); - if (holder != null) { - holder.updateSegmentsListAsync(); - } + DruidServerHolder holder = servers.get(queue.take()); + if (holder != null) { + holder.updateSegmentsListAsync(); } } catch (InterruptedException ex) { @@ -274,31 +270,26 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer @Override public DruidServer getInventoryValue(String containerKey) { - synchronized (servers) { - DruidServerHolder holder = servers.get(containerKey); - if (holder != null) { - return holder.druidServer; - } + DruidServerHolder holder = servers.get(containerKey); + if (holder != null) { + return holder.druidServer; } - return null; } @Override public Iterable getInventory() { - synchronized (servers) { - return Iterables.transform( - servers.values(), new Function() + return Iterables.transform( + servers.values(), new Function() + { + @Override + public DruidServer apply(DruidServerHolder input) { - @Override - public DruidServer apply(DruidServerHolder input) - { - return input.druidServer; - } + return input.druidServer; } - ); - } + } + ); } private void runSegmentCallbacks( @@ -369,24 +360,17 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer private DruidServer serverAddedOrUpdated(DruidServer server) { - DruidServerHolder curr; - DruidServerHolder newHolder; - synchronized (servers) { - curr = servers.get(server.getName()); - newHolder = curr == null ? new DruidServerHolder(server) : curr.updatedHolder(server); - servers.put(server.getName(), newHolder); - } - + DruidServerHolder newHolder = servers.compute( + server.getName(), + (k, v) -> v == null ? new DruidServerHolder(server) : v.updatedHolder(server) + ); newHolder.updateSegmentsListAsync(); - return newHolder.druidServer; } private void serverRemoved(DruidServer server) { - synchronized (servers) { - servers.remove(server.getName()); - } + servers.remove(server.getName()); } public DruidServer serverUpdated(DruidServer oldServer, DruidServer newServer) @@ -403,14 +387,8 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer @Override public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment) { - synchronized (servers) { - DruidServerHolder holder = servers.get(serverKey); - if (holder != null) { - return holder.druidServer.getSegment(segment.getIdentifier()) != null; - } else { - return false; - } - } + DruidServerHolder holder = servers.get(serverKey); + return holder != null && holder.druidServer.getSegment(segment.getIdentifier()) != null; } private class DruidServerHolder @@ -431,6 +409,9 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer private final CountDownLatch initializationLatch = new CountDownLatch(1); + private volatile boolean isUnstable = false; + private volatile long unstableStartTime = -1; + DruidServerHolder(DruidServer druidServer) { this(druidServer, null); @@ -588,6 +569,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer } initializationLatch.countDown(); + isUnstable = false; } catch (Exception ex) { log.error(ex, "error processing segment list response from server [%s]", druidServer.getName()); @@ -601,21 +583,26 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer public void onFailure(Throwable t) { try { - if (t != null) { - log.error( - t, - "failed to fetch segment list from server [%s]. Return code [%s], Reason: [%s]", - druidServer.getName(), - responseHandler.status, - responseHandler.description - ); + String logMsg = StringUtils.nonStrictFormat( + "failed to fetch segment list from server [%s]. Return code [%s], Reason: [%s]", + druidServer.getName(), + responseHandler.status, + responseHandler.description + ); + + if (hasUnstabilityTimeoutPassed()) { + if (t != null) { + log.error(t, logMsg); + } else { + log.error(logMsg); + } } else { - log.error( - "failed to fetch segment list from server [%s]. Return code [%s], Reason: [%s]", - druidServer.getName(), - responseHandler.status, - responseHandler.description - ); + log.info("Temporary Failure. %s", logMsg); + if (t != null) { + log.debug(t, logMsg); + } else { + log.debug(logMsg); + } } // sleep for a bit so that retry does not happen immediately. @@ -638,7 +625,17 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer } catch (Throwable th) { queue.add(druidServer.getName()); - log.makeAlert(th, "Fatal error while fetching segment list from server [%s].", druidServer.getName()).emit(); + + String logMsg = StringUtils.nonStrictFormat( + "Fatal error while fetching segment list from server [%s].", druidServer.getName() + ); + + if (hasUnstabilityTimeoutPassed()) { + log.makeAlert(th, logMsg).emit(); + } else { + log.info("Temporary Failure. %s", logMsg); + log.debug(th, logMsg); + } // sleep for a bit so that retry does not happen immediately. try { @@ -651,6 +648,20 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer throw Throwables.propagate(th); } } + + private boolean hasUnstabilityTimeoutPassed() + { + if (isUnstable && (System.currentTimeMillis() - unstableStartTime) > config.getServerUnstabilityTimeout()) { + return true; + } + + if (!isUnstable) { + isUnstable = true; + unstableStartTime = System.currentTimeMillis(); + } + + return false; + } } private static class BytesAccumulatingResponseHandler extends InputStreamResponseHandler diff --git a/server/src/main/java/io/druid/client/HttpServerInventoryViewConfig.java b/server/src/main/java/io/druid/client/HttpServerInventoryViewConfig.java index ecddbaa5e52..678c09e5764 100644 --- a/server/src/main/java/io/druid/client/HttpServerInventoryViewConfig.java +++ b/server/src/main/java/io/druid/client/HttpServerInventoryViewConfig.java @@ -24,25 +24,39 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.joda.time.Period; +import java.util.concurrent.TimeUnit; + /** */ public class HttpServerInventoryViewConfig { + // HTTP request timeout @JsonProperty private final long serverTimeout; + // Requests to server may fail when it is shutsdown abruptly and there is a lag in coordinator + // discovering its disappearance. So, failure would be logged only after the acceptable + // unstableTimeout has passed. + @JsonProperty + private final long serverUnstabilityTimeout; + @JsonProperty private final int numThreads; @JsonCreator public HttpServerInventoryViewConfig( @JsonProperty("serverTimeout") Period serverTimeout, + @JsonProperty("serverUnstabilityTimeout") Period serverUnstabilityTimeout, @JsonProperty("numThreads") Integer numThreads ) { this.serverTimeout = serverTimeout != null ? serverTimeout.toStandardDuration().getMillis() - : 4*60*1000; //4 mins + : TimeUnit.MINUTES.toMillis(4); + + this.serverUnstabilityTimeout = serverUnstabilityTimeout != null + ? serverUnstabilityTimeout.toStandardDuration().getMillis() + : TimeUnit.MINUTES.toMillis(1); this.numThreads = numThreads != null ? numThreads.intValue() : 5; @@ -55,6 +69,11 @@ public class HttpServerInventoryViewConfig return serverTimeout; } + public long getServerUnstabilityTimeout() + { + return serverUnstabilityTimeout; + } + public int getNumThreads() { return numThreads; diff --git a/server/src/test/java/io/druid/client/HttpServerInventoryViewConfigTest.java b/server/src/test/java/io/druid/client/HttpServerInventoryViewConfigTest.java new file mode 100644 index 00000000000..51b1c457ae2 --- /dev/null +++ b/server/src/test/java/io/druid/client/HttpServerInventoryViewConfigTest.java @@ -0,0 +1,59 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.client; + +import io.druid.segment.TestHelper; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.TimeUnit; + +/** + */ +public class HttpServerInventoryViewConfigTest +{ + @Test + public void testDeserializationWithDefaults() throws Exception + { + String json = "{}"; + + HttpServerInventoryViewConfig config = TestHelper.getJsonMapper().readValue(json, HttpServerInventoryViewConfig.class); + + Assert.assertEquals(TimeUnit.MINUTES.toMillis(4), config.getServerTimeout()); + Assert.assertEquals(TimeUnit.MINUTES.toMillis(1), config.getServerUnstabilityTimeout()); + Assert.assertEquals(5, config.getNumThreads()); + } + + @Test + public void testDeserializationWithNonDefaults() throws Exception + { + String json = "{\n" + + " \"serverTimeout\": \"PT2M\",\n" + + " \"serverUnstabilityTimeout\": \"PT3M\",\n" + + " \"numThreads\": 7\n" + + "}"; + + HttpServerInventoryViewConfig config = TestHelper.getJsonMapper().readValue(json, HttpServerInventoryViewConfig.class); + + Assert.assertEquals(TimeUnit.MINUTES.toMillis(2), config.getServerTimeout()); + Assert.assertEquals(TimeUnit.MINUTES.toMillis(3), config.getServerUnstabilityTimeout()); + Assert.assertEquals(7, config.getNumThreads()); + } +}