misc http segment discovery fixes (#4618)

* Use ConcurrentHashMap to store segment servers or else getInventory() would need to clone the values list

* introduce unstableTimeout for segment servers

* address review comment

* add HttpServerInventoryViewConfigTest
This commit is contained in:
Himanshu 2017-08-02 16:11:26 -05:00 committed by Slim
parent 729e44d767
commit 6d60ef67ce
3 changed files with 150 additions and 61 deletions

View File

@ -64,9 +64,9 @@ import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
@ -96,7 +96,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
private volatile Predicate<Pair<DruidServerMetadata, DataSegment>> finalPredicate;
// For each queryable server, a name -> DruidServerHolder entry is kept
private final Map<String, DruidServerHolder> servers = new HashMap<>();
private final ConcurrentHashMap<String, DruidServerHolder> servers = new ConcurrentHashMap<>();
private volatile ExecutorService executor;
@ -159,13 +159,9 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
while (!Thread.interrupted() && lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)) {
try {
String name = queue.take();
synchronized (servers) {
DruidServerHolder holder = servers.get(name);
if (holder != null) {
holder.updateSegmentsListAsync();
}
DruidServerHolder holder = servers.get(queue.take());
if (holder != null) {
holder.updateSegmentsListAsync();
}
}
catch (InterruptedException ex) {
@ -274,31 +270,26 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
@Override
public DruidServer getInventoryValue(String containerKey)
{
synchronized (servers) {
DruidServerHolder holder = servers.get(containerKey);
if (holder != null) {
return holder.druidServer;
}
DruidServerHolder holder = servers.get(containerKey);
if (holder != null) {
return holder.druidServer;
}
return null;
}
@Override
public Iterable<DruidServer> getInventory()
{
synchronized (servers) {
return Iterables.transform(
servers.values(), new Function<DruidServerHolder, DruidServer>()
return Iterables.transform(
servers.values(), new Function<DruidServerHolder, DruidServer>()
{
@Override
public DruidServer apply(DruidServerHolder input)
{
@Override
public DruidServer apply(DruidServerHolder input)
{
return input.druidServer;
}
return input.druidServer;
}
);
}
}
);
}
private void runSegmentCallbacks(
@ -369,24 +360,17 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
private DruidServer serverAddedOrUpdated(DruidServer server)
{
DruidServerHolder curr;
DruidServerHolder newHolder;
synchronized (servers) {
curr = servers.get(server.getName());
newHolder = curr == null ? new DruidServerHolder(server) : curr.updatedHolder(server);
servers.put(server.getName(), newHolder);
}
DruidServerHolder newHolder = servers.compute(
server.getName(),
(k, v) -> v == null ? new DruidServerHolder(server) : v.updatedHolder(server)
);
newHolder.updateSegmentsListAsync();
return newHolder.druidServer;
}
private void serverRemoved(DruidServer server)
{
synchronized (servers) {
servers.remove(server.getName());
}
servers.remove(server.getName());
}
public DruidServer serverUpdated(DruidServer oldServer, DruidServer newServer)
@ -403,14 +387,8 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
@Override
public boolean isSegmentLoadedByServer(String serverKey, DataSegment segment)
{
synchronized (servers) {
DruidServerHolder holder = servers.get(serverKey);
if (holder != null) {
return holder.druidServer.getSegment(segment.getIdentifier()) != null;
} else {
return false;
}
}
DruidServerHolder holder = servers.get(serverKey);
return holder != null && holder.druidServer.getSegment(segment.getIdentifier()) != null;
}
private class DruidServerHolder
@ -431,6 +409,9 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
private final CountDownLatch initializationLatch = new CountDownLatch(1);
private volatile boolean isUnstable = false;
private volatile long unstableStartTime = -1;
DruidServerHolder(DruidServer druidServer)
{
this(druidServer, null);
@ -588,6 +569,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
}
initializationLatch.countDown();
isUnstable = false;
}
catch (Exception ex) {
log.error(ex, "error processing segment list response from server [%s]", druidServer.getName());
@ -601,21 +583,26 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
public void onFailure(Throwable t)
{
try {
if (t != null) {
log.error(
t,
"failed to fetch segment list from server [%s]. Return code [%s], Reason: [%s]",
druidServer.getName(),
responseHandler.status,
responseHandler.description
);
String logMsg = StringUtils.nonStrictFormat(
"failed to fetch segment list from server [%s]. Return code [%s], Reason: [%s]",
druidServer.getName(),
responseHandler.status,
responseHandler.description
);
if (hasUnstabilityTimeoutPassed()) {
if (t != null) {
log.error(t, logMsg);
} else {
log.error(logMsg);
}
} else {
log.error(
"failed to fetch segment list from server [%s]. Return code [%s], Reason: [%s]",
druidServer.getName(),
responseHandler.status,
responseHandler.description
);
log.info("Temporary Failure. %s", logMsg);
if (t != null) {
log.debug(t, logMsg);
} else {
log.debug(logMsg);
}
}
// sleep for a bit so that retry does not happen immediately.
@ -638,7 +625,17 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
}
catch (Throwable th) {
queue.add(druidServer.getName());
log.makeAlert(th, "Fatal error while fetching segment list from server [%s].", druidServer.getName()).emit();
String logMsg = StringUtils.nonStrictFormat(
"Fatal error while fetching segment list from server [%s].", druidServer.getName()
);
if (hasUnstabilityTimeoutPassed()) {
log.makeAlert(th, logMsg).emit();
} else {
log.info("Temporary Failure. %s", logMsg);
log.debug(th, logMsg);
}
// sleep for a bit so that retry does not happen immediately.
try {
@ -651,6 +648,20 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer
throw Throwables.propagate(th);
}
}
private boolean hasUnstabilityTimeoutPassed()
{
if (isUnstable && (System.currentTimeMillis() - unstableStartTime) > config.getServerUnstabilityTimeout()) {
return true;
}
if (!isUnstable) {
isUnstable = true;
unstableStartTime = System.currentTimeMillis();
}
return false;
}
}
private static class BytesAccumulatingResponseHandler extends InputStreamResponseHandler

View File

@ -24,25 +24,39 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.joda.time.Period;
import java.util.concurrent.TimeUnit;
/**
*/
public class HttpServerInventoryViewConfig
{
// HTTP request timeout
@JsonProperty
private final long serverTimeout;
// Requests to server may fail when it is shutsdown abruptly and there is a lag in coordinator
// discovering its disappearance. So, failure would be logged only after the acceptable
// unstableTimeout has passed.
@JsonProperty
private final long serverUnstabilityTimeout;
@JsonProperty
private final int numThreads;
@JsonCreator
public HttpServerInventoryViewConfig(
@JsonProperty("serverTimeout") Period serverTimeout,
@JsonProperty("serverUnstabilityTimeout") Period serverUnstabilityTimeout,
@JsonProperty("numThreads") Integer numThreads
)
{
this.serverTimeout = serverTimeout != null
? serverTimeout.toStandardDuration().getMillis()
: 4*60*1000; //4 mins
: TimeUnit.MINUTES.toMillis(4);
this.serverUnstabilityTimeout = serverUnstabilityTimeout != null
? serverUnstabilityTimeout.toStandardDuration().getMillis()
: TimeUnit.MINUTES.toMillis(1);
this.numThreads = numThreads != null ? numThreads.intValue() : 5;
@ -55,6 +69,11 @@ public class HttpServerInventoryViewConfig
return serverTimeout;
}
public long getServerUnstabilityTimeout()
{
return serverUnstabilityTimeout;
}
public int getNumThreads()
{
return numThreads;

View File

@ -0,0 +1,59 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.client;
import io.druid.segment.TestHelper;
import org.junit.Assert;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
/**
*/
public class HttpServerInventoryViewConfigTest
{
@Test
public void testDeserializationWithDefaults() throws Exception
{
String json = "{}";
HttpServerInventoryViewConfig config = TestHelper.getJsonMapper().readValue(json, HttpServerInventoryViewConfig.class);
Assert.assertEquals(TimeUnit.MINUTES.toMillis(4), config.getServerTimeout());
Assert.assertEquals(TimeUnit.MINUTES.toMillis(1), config.getServerUnstabilityTimeout());
Assert.assertEquals(5, config.getNumThreads());
}
@Test
public void testDeserializationWithNonDefaults() throws Exception
{
String json = "{\n"
+ " \"serverTimeout\": \"PT2M\",\n"
+ " \"serverUnstabilityTimeout\": \"PT3M\",\n"
+ " \"numThreads\": 7\n"
+ "}";
HttpServerInventoryViewConfig config = TestHelper.getJsonMapper().readValue(json, HttpServerInventoryViewConfig.class);
Assert.assertEquals(TimeUnit.MINUTES.toMillis(2), config.getServerTimeout());
Assert.assertEquals(TimeUnit.MINUTES.toMillis(3), config.getServerUnstabilityTimeout());
Assert.assertEquals(7, config.getNumThreads());
}
}