diff --git a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java index fa20e41f467..b2e3c5f23c3 100644 --- a/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java +++ b/indexing-service/src/main/java/io/druid/indexing/common/actions/RemoteTaskActionClient.java @@ -77,17 +77,10 @@ public class RemoteTaskActionClient implements TaskActionClient log.info("Submitting action for task[%s] to overlord: [%s].", task.getId(), taskAction); - try { - fullResponseHolder = druidLeaderClient.go( - druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/action") - .setContent(MediaType.APPLICATION_JSON, dataToSend) - ); - } - catch (Exception e) { - Throwables.propagateIfInstanceOf(e.getCause(), IOException.class); - Throwables.propagateIfInstanceOf(e.getCause(), ChannelException.class); - throw Throwables.propagate(e); - } + fullResponseHolder = druidLeaderClient.go( + druidLeaderClient.makeRequest(HttpMethod.POST, "/druid/indexer/v1/action") + .setContent(MediaType.APPLICATION_JSON, dataToSend) + ); if (fullResponseHolder.getStatus().getCode() / 100 == 2) { final Map responseDict = jsonMapper.readValue( @@ -120,6 +113,9 @@ public class RemoteTaskActionClient implements TaskActionClient } } } + catch (InterruptedException e) { + throw new RuntimeException(e); + } } } diff --git a/server/src/main/java/io/druid/discovery/DruidLeaderClient.java b/server/src/main/java/io/druid/discovery/DruidLeaderClient.java index 5835405949d..a3352fa6d9a 100644 --- a/server/src/main/java/io/druid/discovery/DruidLeaderClient.java +++ b/server/src/main/java/io/druid/discovery/DruidLeaderClient.java @@ -21,6 +21,7 @@ package io.druid.discovery; import com.google.common.base.Charsets; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.metamx.http.client.HttpClient; import com.metamx.http.client.Request; import com.metamx.http.client.response.FullResponseHandler; @@ -28,14 +29,19 @@ import com.metamx.http.client.response.FullResponseHolder; import io.druid.client.selector.Server; import io.druid.concurrent.LifecycleLock; import io.druid.curator.discovery.ServerDiscoverySelector; +import io.druid.java.util.common.IOE; import io.druid.java.util.common.ISE; +import io.druid.java.util.common.RE; import io.druid.java.util.common.StringUtils; import io.druid.java.util.common.lifecycle.LifecycleStart; import io.druid.java.util.common.lifecycle.LifecycleStop; import io.druid.java.util.common.logger.Logger; +import org.jboss.netty.channel.ChannelException; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import javax.annotation.Nullable; +import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; import java.util.Iterator; @@ -47,15 +53,15 @@ import java.util.concurrent.atomic.AtomicReference; * This class facilitates interaction with Coordinator/Overlord leader nodes. Instance of this class is injected * via Guice with annotations @Coordinator or @IndexingService . * Usage: - * Request request = druidLeaderClient.makeRequest(HttpMethod, requestPath) - * request.setXXX(..) - * FullResponseHolder responseHolder = druidLeaderClient.go(request) + * Request request = druidLeaderClient.makeRequest(HttpMethod, requestPath) + * request.setXXX(..) + * FullResponseHolder responseHolder = druidLeaderClient.go(request) */ public class DruidLeaderClient { private final Logger log = new Logger(DruidLeaderClient.class); - private static final int MAX_RETRIES = 3; + private static final int MAX_RETRIES = 5; private final HttpClient httpClient; private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; @@ -112,13 +118,19 @@ public class DruidLeaderClient log.info("Stopped."); } - public Request makeRequest(HttpMethod httpMethod, String urlPath) throws MalformedURLException + /** + * Make a Request object aimed at the leader. Throws IOException if the leader cannot be located. + */ + public Request makeRequest(HttpMethod httpMethod, String urlPath) throws IOException { Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); return new Request(httpMethod, new URL(StringUtils.format("%s%s", getCurrentKnownLeader(true), urlPath))); } - public FullResponseHolder go(Request request) throws InterruptedException + /** + * Executes a Request object aimed at the leader. Throws IOException if the leader cannot be located. + */ + public FullResponseHolder go(Request request) throws IOException, InterruptedException { Preconditions.checkState(lifecycleLock.awaitStarted(1, TimeUnit.MILLISECONDS)); for (int counter = 0; counter < MAX_RETRIES; counter++) { @@ -126,9 +138,17 @@ public class DruidLeaderClient final FullResponseHolder fullResponseHolder; try { - fullResponseHolder = httpClient.go(request, new FullResponseHandler(Charsets.UTF_8)).get(); + try { + fullResponseHolder = httpClient.go(request, new FullResponseHandler(Charsets.UTF_8)).get(); + } + catch (ExecutionException e) { + // Unwrap IOExceptions and ChannelExceptions, re-throw others + Throwables.propagateIfInstanceOf(e.getCause(), IOException.class); + Throwables.propagateIfInstanceOf(e.getCause(), ChannelException.class); + throw new RE(e, "HTTP request to[%s] failed", request.getUrl()); + } } - catch (ExecutionException ex) { + catch (IOException | ChannelException ex) { // can happen if the node is stopped. log.info("Request[%s] failed with msg [%s].", request.getUrl(), ex.getMessage()); log.debug(ex, "Request[%s] failed.", request.getUrl()); @@ -142,16 +162,18 @@ public class DruidLeaderClient } else { request = withUrl( request, - new URL(StringUtils.format("%s%s?%s", - getCurrentKnownLeader(false), - request.getUrl().getPath(), - request.getUrl().getQuery() + new URL(StringUtils.format( + "%s%s?%s", + getCurrentKnownLeader(false), + request.getUrl().getPath(), + request.getUrl().getQuery() )) ); } continue; } catch (MalformedURLException e) { + // Not an IOException; this is our own fault. throw new ISE( e, "failed to build url with path[%] and query string [%s].", @@ -164,7 +186,7 @@ public class DruidLeaderClient if (HttpResponseStatus.TEMPORARY_REDIRECT.equals(fullResponseHolder.getResponse().getStatus())) { String redirectUrlStr = fullResponseHolder.getResponse().headers().get("Location"); if (redirectUrlStr == null) { - throw new ISE("No redirect location is found in response from url[%s].", request.getUrl()); + throw new IOE("No redirect location is found in response from url[%s].", request.getUrl()); } log.info("Request[%s] received redirect response to location [%s].", request.getUrl(), redirectUrlStr); @@ -174,7 +196,7 @@ public class DruidLeaderClient redirectUrl = new URL(redirectUrlStr); } catch (MalformedURLException ex) { - throw new ISE( + throw new IOE( ex, "Malformed redirect location is found in response from url[%s], new location[%s].", request.getUrl(), @@ -196,7 +218,7 @@ public class DruidLeaderClient } } - throw new ISE("Retries exhausted, couldn't fulfill request to [%s].", request.getUrl()); + throw new IOE("Retries exhausted, couldn't fulfill request to [%s].", request.getUrl()); } public String findCurrentLeader() @@ -221,14 +243,21 @@ public class DruidLeaderClient } } - private String getCurrentKnownLeader(final boolean cached) + private String getCurrentKnownLeader(final boolean cached) throws IOException { - return currentKnownLeader.accumulateAndGet( + final String leader = currentKnownLeader.accumulateAndGet( null, (current, given) -> current == null || !cached ? pickOneHost() : current ); + + if (leader == null) { + throw new IOE("No known server"); + } else { + return leader; + } } + @Nullable private String pickOneHost() { Server server = serverDiscoverySelector.pick(); @@ -251,7 +280,7 @@ public class DruidLeaderClient ); } - throw new ISE("Couldn't find any servers."); + return null; } private Request withUrl(Request old, URL url) diff --git a/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java b/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java index 7140e4f4001..cc88f6660ff 100644 --- a/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java +++ b/server/src/test/java/io/druid/discovery/DruidLeaderClientTest.java @@ -52,7 +52,9 @@ import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; import org.jboss.netty.handler.codec.http.HttpMethod; import org.junit.Assert; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import javax.ws.rs.GET; import javax.ws.rs.POST; @@ -60,12 +62,16 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; +import java.io.IOException; import java.net.URI; /** */ public class DruidLeaderClientTest extends BaseJettyTest { + @Rule + public ExpectedException expectedException = ExpectedException.none(); + private DiscoveryDruidNode discoveryDruidNode; private HttpClient httpClient; @@ -126,6 +132,31 @@ public class DruidLeaderClientTest extends BaseJettyTest Assert.assertEquals("hello", druidLeaderClient.go(request).getContent()); } + @Test + public void testNoLeaderFound() throws Exception + { + DruidNodeDiscovery druidNodeDiscovery = EasyMock.createMock(DruidNodeDiscovery.class); + EasyMock.expect(druidNodeDiscovery.getAllNodes()).andReturn(ImmutableList.of()); + + DruidNodeDiscoveryProvider druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); + EasyMock.expect(druidNodeDiscoveryProvider.getForNodeType("nodetype")).andReturn(druidNodeDiscovery); + + EasyMock.replay(druidNodeDiscovery, druidNodeDiscoveryProvider); + + DruidLeaderClient druidLeaderClient = new DruidLeaderClient( + httpClient, + druidNodeDiscoveryProvider, + "nodetype", + "/simple/leader", + EasyMock.createNiceMock(ServerDiscoverySelector.class) + ); + druidLeaderClient.start(); + + expectedException.expect(IOException.class); + expectedException.expectMessage("No known server"); + druidLeaderClient.makeRequest(HttpMethod.POST, "/simple/direct"); + } + @Test public void testRedirection() throws Exception {