DruidLeaderClient: Throw IOException on retryable errors. (#4913)

* DruidLeaderClient: Throw IOException on retryable errors.

Fixes #4911.

* Adjustments.
This commit is contained in:
Gian Merlino 2017-10-06 13:12:09 -07:00 committed by Himanshu
parent 535c034c06
commit 797b54d283
3 changed files with 85 additions and 29 deletions

View File

@ -77,17 +77,10 @@ public class RemoteTaskActionClient implements TaskActionClient
log.info("Submitting action for task[%s] to overlord: [%s].", task.getId(), taskAction); log.info("Submitting action for task[%s] to overlord: [%s].", task.getId(), taskAction);
try { fullResponseHolder = druidLeaderClient.go(
fullResponseHolder = druidLeaderClient.go( druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/action")
druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/action") .setContent(MediaType.APPLICATION_JSON, dataToSend)
.setContent(MediaType.APPLICATION_JSON, dataToSend) );
);
}
catch (Exception e) {
Throwables.propagateIfInstanceOf(e.getCause(), IOException.class);
Throwables.propagateIfInstanceOf(e.getCause(), ChannelException.class);
throw Throwables.propagate(e);
}
if (fullResponseHolder.getStatus().getCode() / 100 == 2) { if (fullResponseHolder.getStatus().getCode() / 100 == 2) {
final Map<String, Object> responseDict = jsonMapper.readValue( final Map<String, Object> responseDict = jsonMapper.readValue(
@ -120,6 +113,9 @@ public class RemoteTaskActionClient implements TaskActionClient
} }
} }
} }
catch (InterruptedException e) {
throw new RuntimeException(e);
}
} }
} }

View File

@ -21,6 +21,7 @@ package io.druid.discovery;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.metamx.http.client.HttpClient; import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request; import com.metamx.http.client.Request;
import com.metamx.http.client.response.FullResponseHandler; import com.metamx.http.client.response.FullResponseHandler;
@ -28,14 +29,19 @@ import com.metamx.http.client.response.FullResponseHolder;
import io.druid.client.selector.Server; import io.druid.client.selector.Server;
import io.druid.concurrent.LifecycleLock; import io.druid.concurrent.LifecycleLock;
import io.druid.curator.discovery.ServerDiscoverySelector; import io.druid.curator.discovery.ServerDiscoverySelector;
import io.druid.java.util.common.IOE;
import io.druid.java.util.common.ISE; import io.druid.java.util.common.ISE;
import io.druid.java.util.common.RE;
import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStart;
import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.java.util.common.lifecycle.LifecycleStop;
import io.druid.java.util.common.logger.Logger; import io.druid.java.util.common.logger.Logger;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus; import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import javax.annotation.Nullable;
import java.io.IOException;
import java.net.MalformedURLException; import java.net.MalformedURLException;
import java.net.URL; import java.net.URL;
import java.util.Iterator; import java.util.Iterator;
@ -47,15 +53,15 @@ import java.util.concurrent.atomic.AtomicReference;
* This class facilitates interaction with Coordinator/Overlord leader nodes. Instance of this class is injected * This class facilitates interaction with Coordinator/Overlord leader nodes. Instance of this class is injected
* via Guice with annotations @Coordinator or @IndexingService . * via Guice with annotations @Coordinator or @IndexingService .
* Usage: * Usage:
* Request request = druidLeaderClient.makeRequest(HttpMethod, requestPath) * Request request = druidLeaderClient.makeRequest(HttpMethod, requestPath)
* request.setXXX(..) * request.setXXX(..)
* FullResponseHolder responseHolder = druidLeaderClient.go(request) * FullResponseHolder responseHolder = druidLeaderClient.go(request)
*/ */
public class DruidLeaderClient public class DruidLeaderClient
{ {
private final Logger log = new Logger(DruidLeaderClient.class); private final Logger log = new Logger(DruidLeaderClient.class);
private static final int MAX_RETRIES = 3; private static final int MAX_RETRIES = 5;
private final HttpClient httpClient; private final HttpClient httpClient;
private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider;
@ -112,13 +118,19 @@ public class DruidLeaderClient
log.info("Stopped."); log.info("Stopped.");
} }
public Request makeRequest(HttpMethod httpMethod, String urlPath) throws MalformedURLException /**
* Make a Request object aimed at the leader. Throws IOException if the leader cannot be located.
*/
public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException
{ {
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(true), urlPath))); return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(true), urlPath)));
} }
public FullResponseHolder go(Request request) throws InterruptedException /**
* Executes a Request object aimed at the leader. Throws IOException if the leader cannot be located.
*/
public FullResponseHolder go(Request request) throws IOException, InterruptedException
{ {
Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS));
for (int counter = 0; counter < MAX_RETRIES; counter++) { for (int counter = 0; counter < MAX_RETRIES; counter++) {
@ -126,9 +138,17 @@ public class DruidLeaderClient
final FullResponseHolder fullResponseHolder; final FullResponseHolder fullResponseHolder;
try { try {
fullResponseHolder = httpClient.go(request, new FullResponseHandler(Charsets.UTF_8)).get(); try {
fullResponseHolder = httpClient.go(request, new FullResponseHandler(Charsets.UTF_8)).get();
}
catch (ExecutionException e) {
// Unwrap IOExceptions and ChannelExceptions, re-throw others
Throwables.propagateIfInstanceOf(e.getCause(), IOException.class);
Throwables.propagateIfInstanceOf(e.getCause(), ChannelException.class);
throw new RE(e, "HTTP request to[%s] failed", request.getUrl());
}
} }
catch (ExecutionException ex) { catch (IOException | ChannelException ex) {
// can happen if the node is stopped. // can happen if the node is stopped.
log.info("Request[%s] failed with msg [%s].", request.getUrl(), ex.getMessage()); log.info("Request[%s] failed with msg [%s].", request.getUrl(), ex.getMessage());
log.debug(ex, "Request[%s] failed.", request.getUrl()); log.debug(ex, "Request[%s] failed.", request.getUrl());
@ -142,16 +162,18 @@ public class DruidLeaderClient
} else { } else {
request = withUrl( request = withUrl(
request, request,
new URL(StringUtils.format("%s%s?%s", new URL(StringUtils.format(
getCurrentKnownLeader(false), "%s%s?%s",
request.getUrl().getPath(), getCurrentKnownLeader(false),
request.getUrl().getQuery() request.getUrl().getPath(),
request.getUrl().getQuery()
)) ))
); );
} }
continue; continue;
} }
catch (MalformedURLException e) { catch (MalformedURLException e) {
// Not an IOException; this is our own fault.
throw new ISE( throw new ISE(
e, e,
"failed to build url with path[%] and query string [%s].", "failed to build url with path[%] and query string [%s].",
@ -164,7 +186,7 @@ public class DruidLeaderClient
if (HttpResponseStatus.TEMPORARY_REDIRECT.equals(fullResponseHolder.getResponse().getStatus())) { if (HttpResponseStatus.TEMPORARY_REDIRECT.equals(fullResponseHolder.getResponse().getStatus())) {
String redirectUrlStr = fullResponseHolder.getResponse().headers().get("Location"); String redirectUrlStr = fullResponseHolder.getResponse().headers().get("Location");
if (redirectUrlStr == null) { if (redirectUrlStr == null) {
throw new ISE("No redirect location is found in response from url[%s].", request.getUrl()); throw new IOE("No redirect location is found in response from url[%s].", request.getUrl());
} }
log.info("Request[%s] received redirect response to location [%s].", request.getUrl(), redirectUrlStr); log.info("Request[%s] received redirect response to location [%s].", request.getUrl(), redirectUrlStr);
@ -174,7 +196,7 @@ public class DruidLeaderClient
redirectUrl = new URL(redirectUrlStr); redirectUrl = new URL(redirectUrlStr);
} }
catch (MalformedURLException ex) { catch (MalformedURLException ex) {
throw new ISE( throw new IOE(
ex, ex,
"Malformed redirect location is found in response from url[%s], new location[%s].", "Malformed redirect location is found in response from url[%s], new location[%s].",
request.getUrl(), request.getUrl(),
@ -196,7 +218,7 @@ public class DruidLeaderClient
} }
} }
throw new ISE("Retries exhausted, couldn't fulfill request to [%s].", request.getUrl()); throw new IOE("Retries exhausted, couldn't fulfill request to [%s].", request.getUrl());
} }
public String findCurrentLeader() public String findCurrentLeader()
@ -221,14 +243,21 @@ public class DruidLeaderClient
} }
} }
private String getCurrentKnownLeader(final boolean cached) private String getCurrentKnownLeader(final boolean cached) throws IOException
{ {
return currentKnownLeader.accumulateAndGet( final String leader = currentKnownLeader.accumulateAndGet(
null, null,
(current, given) -> current == null || !cached ? pickOneHost() : current (current, given) -> current == null || !cached ? pickOneHost() : current
); );
if (leader == null) {
throw new IOE("No known server");
} else {
return leader;
}
} }
@Nullable
private String pickOneHost() private String pickOneHost()
{ {
Server server = serverDiscoverySelector.pick(); Server server = serverDiscoverySelector.pick();
@ -251,7 +280,7 @@ public class DruidLeaderClient
); );
} }
throw new ISE("Couldn't find any servers."); return null;
} }
private Request withUrl(Request old, URL url) private Request withUrl(Request old, URL url)

View File

@ -52,7 +52,9 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder; import org.eclipse.jetty.servlet.ServletHolder;
import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpMethod;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.ExpectedException;
import javax.ws.rs.GET; import javax.ws.rs.GET;
import javax.ws.rs.POST; import javax.ws.rs.POST;
@ -60,12 +62,16 @@ import javax.ws.rs.Path;
import javax.ws.rs.Produces; import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.io.IOException;
import java.net.URI; import java.net.URI;
/** /**
*/ */
public class DruidLeaderClientTest extends BaseJettyTest public class DruidLeaderClientTest extends BaseJettyTest
{ {
@Rule
public ExpectedException expectedException = ExpectedException.none();
private DiscoveryDruidNode discoveryDruidNode; private DiscoveryDruidNode discoveryDruidNode;
private HttpClient httpClient; private HttpClient httpClient;
@ -126,6 +132,31 @@ public class DruidLeaderClientTest extends BaseJettyTest
Assert.assertEquals("hello", druidLeaderClient.go(request).getContent()); Assert.assertEquals("hello", druidLeaderClient.go(request).getContent());
} }
@Test
public void testNoLeaderFound() throws Exception
{
DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class);
EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of());
DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class);
EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery);
EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider);
DruidLeaderClient druidLeaderClient = new DruidLeaderClient(
httpClient,
druidNodeDiscoveryProvider,
"nodetype",
"/simple/leader",
EasyMock.createNiceMock(ServerDiscoverySelector.class)
);
druidLeaderClient.start();
expectedException.expect(IOException.class);
expectedException.expectMessage("No known server");
druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/direct");
}
@Test @Test
public void testRedirection() throws Exception public void testRedirection() throws Exception
{ {