From 422b76b33cf80dce62a7f2ac2fe9d80fa7a25244 Mon Sep 17 00:00:00 2001 From: Jihoon Son Date: Tue, 27 Nov 2018 15:54:38 -0800 Subject: [PATCH] Fix IndexTaskClient to retry on ChannelException (#6649) * Fix IndexTaskClient to retry on ChannelException * fix travis and add javadoc * address comment --- .../indexing/common/IndexTaskClient.java | 170 ++++++++++++------ .../indexing/common/IndexTaskClientTest.java | 153 ++++++++++++++++ 2 files changed, 264 insertions(+), 59 deletions(-) create mode 100644 indexing-service/src/test/java/org/apache/druid/indexing/common/IndexTaskClientTest.java diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java index 92042a26aa2..148dc6c8704 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; @@ -50,10 +51,12 @@ import org.joda.time.Period; import javax.annotation.Nullable; import javax.ws.rs.core.MediaType; import java.io.IOException; +import java.net.MalformedURLException; import java.net.Socket; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; /** * Abstract class to communicate with index tasks via HTTP. This class provides interfaces to serialize/deserialize @@ -228,6 +231,38 @@ public abstract class IndexTaskClient implements AutoCloseable ); } + private Request createRequest( + String taskId, + TaskLocation location, + String path, + @Nullable String encodedQueryString, + HttpMethod method, + @Nullable String mediaType, + byte[] content + ) throws MalformedURLException + { + final String host = location.getHost(); + final String scheme = location.getTlsPort() >= 0 ? "https" : "http"; + final int port = location.getTlsPort() >= 0 ? location.getTlsPort() : location.getPort(); + + // Use URL constructor, not URI, since the path is already encoded. + // The below line can throw a MalformedURLException, and this method should return immediately without rety. + final URL serviceUrl = new URL( + scheme, + host, + port, + encodedQueryString == null ? path : StringUtils.format("%s?%s", path, encodedQueryString) + ); + + final Request request = new Request(method, serviceUrl); + // used to validate that we are talking to the correct worker + request.addHeader(ChatHandlerResource.TASK_ID_HEADER, taskId); + if (content.length > 0) { + request.setContent(Preconditions.checkNotNull(mediaType, "mediaType"), content); + } + return request; + } + /** * Sends an HTTP request to the task of the specified {@code taskId} and returns a response if it succeeded. */ @@ -244,67 +279,40 @@ public abstract class IndexTaskClient implements AutoCloseable final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy(); while (true) { - FullResponseHolder response = null; - Request request = null; - TaskLocation location = TaskLocation.unknown(); String path = StringUtils.format("%s/%s/%s", BASE_PATH, StringUtils.urlEncode(taskId), encodedPathSuffix); Optional status = taskInfoProvider.getTaskStatus(taskId); if (!status.isPresent() || !status.get().isRunnable()) { - throw new TaskNotRunnableException(StringUtils.format( - "Aborting request because task [%s] is not runnable", - taskId - )); + throw new TaskNotRunnableException( + StringUtils.format( + "Aborting request because task [%s] is not runnable", + taskId + ) + ); } - String host = location.getHost(); - String scheme = ""; - int port = -1; + final TaskLocation location = taskInfoProvider.getTaskLocation(taskId); + if (location.equals(TaskLocation.unknown())) { + throw new NoTaskLocationException(StringUtils.format("No TaskLocation available for task [%s]", taskId)); + } + final Request request = createRequest( + taskId, + location, + path, + encodedQueryString, + method, + mediaType, + content + ); + + FullResponseHolder response = null; try { - location = taskInfoProvider.getTaskLocation(taskId); - if (location.equals(TaskLocation.unknown())) { - throw new NoTaskLocationException(StringUtils.format("No TaskLocation available for task [%s]", taskId)); - } - - host = location.getHost(); - scheme = location.getTlsPort() >= 0 ? "https" : "http"; - port = location.getTlsPort() >= 0 ? location.getTlsPort() : location.getPort(); - // Netty throws some annoying exceptions if a connection can't be opened, which happens relatively frequently // for tasks that happen to still be starting up, so test the connection first to keep the logs clean. - checkConnection(host, port); + checkConnection(request.getUrl().getHost(), request.getUrl().getPort()); - try { - // Use URL constructor, not URI, since the path is already encoded. - final URL serviceUrl = new URL( - scheme, - host, - port, - encodedQueryString == null ? path : StringUtils.format("%s?%s", path, encodedQueryString) - ); - request = new Request(method, serviceUrl); - - // used to validate that we are talking to the correct worker - request.addHeader(ChatHandlerResource.TASK_ID_HEADER, taskId); - - if (content.length > 0) { - request.setContent(Preconditions.checkNotNull(mediaType, "mediaType"), content); - } - - log.debug("HTTP %s: %s", method.getName(), serviceUrl.toString()); - response = httpClient.go(request, new FullResponseHandler(StandardCharsets.UTF_8), httpTimeout).get(); - } - catch (IOException | ChannelException ioce) { - throw ioce; - } - catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - throw new RuntimeException(ie); - } - catch (Exception e) { - throw new RuntimeException(e); - } + response = submitRequest(request); int responseCode = response.getStatus().getCode(); if (responseCode / 100 == 2) { @@ -339,15 +347,7 @@ public abstract class IndexTaskClient implements AutoCloseable } else { delay = retryPolicy.getAndIncrementRetryDelay(); } - String urlForLog = (request != null - ? request.getUrl().toString() - : StringUtils.nonStrictFormat( - "%s://%s:%d%s", - scheme, - host, - port, - path - )); + final String urlForLog = request.getUrl().toString(); if (!retry) { // if retry=false, we probably aren't too concerned if the operation doesn't succeed (i.e. the request was // for informational purposes only) so don't log a scary stack trace @@ -387,6 +387,58 @@ public abstract class IndexTaskClient implements AutoCloseable } } + private FullResponseHolder submitRequest(Request request) throws IOException, ChannelException + { + try { + log.debug("HTTP %s: %s", request.getMethod().getName(), request.getUrl().toString()); + return httpClient.go(request, new FullResponseHandler(StandardCharsets.UTF_8), httpTimeout).get(); + } + catch (Exception e) { + throw throwIfPossible(e); + } + } + + /** + * Throws if it's possible to throw the given Throwable. + * + * - The input throwable shouldn't be null. + * - If Throwable is an {@link ExecutionException}, this calls itself recursively with the cause of ExecutionException. + * - If Throwable is an {@link IOException} or a {@link ChannelException}, this simply throws it. + * - If Throwable is an {@link InterruptedException}, this interrupts the current thread and throws a RuntimeException + * wrapping the InterruptedException + * - Otherwise, this simply returns the given Throwable. + * + * Note that if the given Throable is an ExecutionException, this can return the cause of ExecutionException. + */ + private RuntimeException throwIfPossible(Throwable t) throws IOException, ChannelException + { + Preconditions.checkNotNull(t, "Throwable shoulnd't null"); + + if (t instanceof ExecutionException) { + if (t.getCause() != null) { + return throwIfPossible(t.getCause()); + } else { + return new RuntimeException(t); + } + } + + if (t instanceof IOException) { + throw (IOException) t; + } + + if (t instanceof ChannelException) { + throw (ChannelException) t; + } + + if (t instanceof InterruptedException) { + Thread.currentThread().interrupt(); + throw new RuntimeException(t); + } + + Throwables.propagateIfPossible(t); + return new RuntimeException(t); + } + @Override public void close() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/IndexTaskClientTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/IndexTaskClientTest.java new file mode 100644 index 00000000000..a297db6db33 --- /dev/null +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/IndexTaskClientTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.common; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.Futures; +import org.apache.druid.indexer.TaskLocation; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.http.client.HttpClient; +import org.apache.druid.java.util.http.client.response.FullResponseHolder; +import org.easymock.EasyMock; +import org.jboss.netty.channel.ChannelException; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; +import org.jboss.netty.handler.codec.http.HttpMethod; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.codec.http.HttpVersion; +import org.joda.time.Duration; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.net.MalformedURLException; +import java.util.function.Function; + +public class IndexTaskClientTest +{ + @Rule + public ExpectedException expectedException = ExpectedException.none(); + + private final ObjectMapper objectMapper = new DefaultObjectMapper(); + private final int numRetries = 2; + + @Test + public void failOnMalformedURLException() throws IOException + { + try (IndexTaskClient indexTaskClient = buildIndexTaskClient( + EasyMock.createNiceMock(HttpClient.class), + id -> TaskLocation.create(id, -2, -2) + )) { + expectedException.expect(MalformedURLException.class); + expectedException.expectMessage("Invalid port number :-2"); + + indexTaskClient.submitRequestWithEmptyContent( + "taskId", + HttpMethod.GET, + "test", + null, + true + ); + } + } + + @Test + public void retryOnChannelException() throws IOException + { + final HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class); + EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn(Futures.immediateFailedFuture(new ChannelException("IndexTaskClientTest"))) + .times(2); + EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn( + Futures.immediateFuture( + new FullResponseHolder( + HttpResponseStatus.OK, + new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK), + new StringBuilder() + ) + ) + ) + .once(); + EasyMock.replay(httpClient); + try (IndexTaskClient indexTaskClient = buildIndexTaskClient(httpClient, id -> TaskLocation.create(id, 8000, -1))) { + final FullResponseHolder response = indexTaskClient.submitRequestWithEmptyContent( + "taskId", + HttpMethod.GET, + "test", + null, + true + ); + Assert.assertEquals(HttpResponseStatus.OK, response.getStatus()); + } + } + + private IndexTaskClient buildIndexTaskClient(HttpClient httpClient, Function taskLocationProvider) + { + final TaskInfoProvider taskInfoProvider = new TaskInfoProvider() + { + @Override + public TaskLocation getTaskLocation(String id) + { + return taskLocationProvider.apply(id); + } + + @Override + public Optional getTaskStatus(String id) + { + return Optional.of(TaskStatus.running(id)); + } + }; + return new TestIndexTaskClient( + httpClient, + objectMapper, + taskInfoProvider, + new Duration(1000), + "indexTaskClientTest", + 1, + numRetries + ); + } + + private static class TestIndexTaskClient extends IndexTaskClient + { + private TestIndexTaskClient( + HttpClient httpClient, + ObjectMapper objectMapper, + TaskInfoProvider taskInfoProvider, + Duration httpTimeout, + String callerId, + int numThreads, + long numRetries + ) + { + super(httpClient, objectMapper, taskInfoProvider, httpTimeout, callerId, numThreads, numRetries); + } + + @Override + protected void checkConnection(String host, int port) + { + // do nothing + } + } +}