Fix IndexTaskClient to retry on ChannelException (#6649)

* Fix IndexTaskClient to retry on ChannelException

* fix travis and add javadoc

* address comment
This commit is contained in:
Jihoon Son 2018-11-27 15:54:38 -08:00 committed by Clint Wylie
parent 9a89200607
commit 422b76b33c
2 changed files with 264 additions and 59 deletions

View File

@ -26,6 +26,7 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
@ -50,10 +51,12 @@ import org.joda.time.Period;
import javax.annotation.Nullable;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.Socket;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
/**
* Abstract class to communicate with index tasks via HTTP. This class provides interfaces to serialize/deserialize
@ -228,6 +231,38 @@ public abstract class IndexTaskClient implements AutoCloseable
);
}
private Request createRequest(
String taskId,
TaskLocation location,
String path,
@Nullable String encodedQueryString,
HttpMethod method,
@Nullable String mediaType,
byte[] content
) throws MalformedURLException
{
final String host = location.getHost();
final String scheme = location.getTlsPort() >= 0 ? "https" : "http";
final int port = location.getTlsPort() >= 0 ? location.getTlsPort() : location.getPort();
// Use URL constructor, not URI, since the path is already encoded.
// The below line can throw a MalformedURLException, and this method should return immediately without rety.
final URL serviceUrl = new URL(
scheme,
host,
port,
encodedQueryString == null ? path : StringUtils.format("%s?%s", path, encodedQueryString)
);
final Request request = new Request(method, serviceUrl);
// used to validate that we are talking to the correct worker
request.addHeader(ChatHandlerResource.TASK_ID_HEADER, taskId);
if (content.length > 0) {
request.setContent(Preconditions.checkNotNull(mediaType, "mediaType"), content);
}
return request;
}
/**
* Sends an HTTP request to the task of the specified {@code taskId} and returns a response if it succeeded.
*/
@ -244,67 +279,40 @@ public abstract class IndexTaskClient implements AutoCloseable
final RetryPolicy retryPolicy = retryPolicyFactory.makeRetryPolicy();
while (true) {
FullResponseHolder response = null;
Request request = null;
TaskLocation location = TaskLocation.unknown();
String path = StringUtils.format("%s/%s/%s", BASE_PATH, StringUtils.urlEncode(taskId), encodedPathSuffix);
Optional<TaskStatus> status = taskInfoProvider.getTaskStatus(taskId);
if (!status.isPresent() || !status.get().isRunnable()) {
throw new TaskNotRunnableException(StringUtils.format(
"Aborting request because task [%s] is not runnable",
taskId
));
throw new TaskNotRunnableException(
StringUtils.format(
"Aborting request because task [%s] is not runnable",
taskId
)
);
}
String host = location.getHost();
String scheme = "";
int port = -1;
final TaskLocation location = taskInfoProvider.getTaskLocation(taskId);
if (location.equals(TaskLocation.unknown())) {
throw new NoTaskLocationException(StringUtils.format("No TaskLocation available for task [%s]", taskId));
}
final Request request = createRequest(
taskId,
location,
path,
encodedQueryString,
method,
mediaType,
content
);
FullResponseHolder response = null;
try {
location = taskInfoProvider.getTaskLocation(taskId);
if (location.equals(TaskLocation.unknown())) {
throw new NoTaskLocationException(StringUtils.format("No TaskLocation available for task [%s]", taskId));
}
host = location.getHost();
scheme = location.getTlsPort() >= 0 ? "https" : "http";
port = location.getTlsPort() >= 0 ? location.getTlsPort() : location.getPort();
// Netty throws some annoying exceptions if a connection can't be opened, which happens relatively frequently
// for tasks that happen to still be starting up, so test the connection first to keep the logs clean.
checkConnection(host, port);
checkConnection(request.getUrl().getHost(), request.getUrl().getPort());
try {
// Use URL constructor, not URI, since the path is already encoded.
final URL serviceUrl = new URL(
scheme,
host,
port,
encodedQueryString == null ? path : StringUtils.format("%s?%s", path, encodedQueryString)
);
request = new Request(method, serviceUrl);
// used to validate that we are talking to the correct worker
request.addHeader(ChatHandlerResource.TASK_ID_HEADER, taskId);
if (content.length > 0) {
request.setContent(Preconditions.checkNotNull(mediaType, "mediaType"), content);
}
log.debug("HTTP %s: %s", method.getName(), serviceUrl.toString());
response = httpClient.go(request, new FullResponseHandler(StandardCharsets.UTF_8), httpTimeout).get();
}
catch (IOException | ChannelException ioce) {
throw ioce;
}
catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(ie);
}
catch (Exception e) {
throw new RuntimeException(e);
}
response = submitRequest(request);
int responseCode = response.getStatus().getCode();
if (responseCode / 100 == 2) {
@ -339,15 +347,7 @@ public abstract class IndexTaskClient implements AutoCloseable
} else {
delay = retryPolicy.getAndIncrementRetryDelay();
}
String urlForLog = (request != null
? request.getUrl().toString()
: StringUtils.nonStrictFormat(
"%s://%s:%d%s",
scheme,
host,
port,
path
));
final String urlForLog = request.getUrl().toString();
if (!retry) {
// if retry=false, we probably aren't too concerned if the operation doesn't succeed (i.e. the request was
// for informational purposes only) so don't log a scary stack trace
@ -387,6 +387,58 @@ public abstract class IndexTaskClient implements AutoCloseable
}
}
private FullResponseHolder submitRequest(Request request) throws IOException, ChannelException
{
try {
log.debug("HTTP %s: %s", request.getMethod().getName(), request.getUrl().toString());
return httpClient.go(request, new FullResponseHandler(StandardCharsets.UTF_8), httpTimeout).get();
}
catch (Exception e) {
throw throwIfPossible(e);
}
}
/**
* Throws if it's possible to throw the given Throwable.
*
* - The input throwable shouldn't be null.
* - If Throwable is an {@link ExecutionException}, this calls itself recursively with the cause of ExecutionException.
* - If Throwable is an {@link IOException} or a {@link ChannelException}, this simply throws it.
* - If Throwable is an {@link InterruptedException}, this interrupts the current thread and throws a RuntimeException
* wrapping the InterruptedException
* - Otherwise, this simply returns the given Throwable.
*
* Note that if the given Throable is an ExecutionException, this can return the cause of ExecutionException.
*/
private RuntimeException throwIfPossible(Throwable t) throws IOException, ChannelException
{
Preconditions.checkNotNull(t, "Throwable shoulnd't null");
if (t instanceof ExecutionException) {
if (t.getCause() != null) {
return throwIfPossible(t.getCause());
} else {
return new RuntimeException(t);
}
}
if (t instanceof IOException) {
throw (IOException) t;
}
if (t instanceof ChannelException) {
throw (ChannelException) t;
}
if (t instanceof InterruptedException) {
Thread.currentThread().interrupt();
throw new RuntimeException(t);
}
Throwables.propagateIfPossible(t);
return new RuntimeException(t);
}
@Override
public void close()
{

View File

@ -0,0 +1,153 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexing.common;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.Futures;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.response.FullResponseHolder;
import org.easymock.EasyMock;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.function.Function;
public class IndexTaskClientTest
{
@Rule
public ExpectedException expectedException = ExpectedException.none();
private final ObjectMapper objectMapper = new DefaultObjectMapper();
private final int numRetries = 2;
@Test
public void failOnMalformedURLException() throws IOException
{
try (IndexTaskClient indexTaskClient = buildIndexTaskClient(
EasyMock.createNiceMock(HttpClient.class),
id -> TaskLocation.create(id, -2, -2)
)) {
expectedException.expect(MalformedURLException.class);
expectedException.expectMessage("Invalid port number :-2");
indexTaskClient.submitRequestWithEmptyContent(
"taskId",
HttpMethod.GET,
"test",
null,
true
);
}
}
@Test
public void retryOnChannelException() throws IOException
{
final HttpClient httpClient = EasyMock.createNiceMock(HttpClient.class);
EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(Futures.immediateFailedFuture(new ChannelException("IndexTaskClientTest")))
.times(2);
EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(
Futures.immediateFuture(
new FullResponseHolder(
HttpResponseStatus.OK,
new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK),
new StringBuilder()
)
)
)
.once();
EasyMock.replay(httpClient);
try (IndexTaskClient indexTaskClient = buildIndexTaskClient(httpClient, id -> TaskLocation.create(id, 8000, -1))) {
final FullResponseHolder response = indexTaskClient.submitRequestWithEmptyContent(
"taskId",
HttpMethod.GET,
"test",
null,
true
);
Assert.assertEquals(HttpResponseStatus.OK, response.getStatus());
}
}
private IndexTaskClient buildIndexTaskClient(HttpClient httpClient, Function<String, TaskLocation> taskLocationProvider)
{
final TaskInfoProvider taskInfoProvider = new TaskInfoProvider()
{
@Override
public TaskLocation getTaskLocation(String id)
{
return taskLocationProvider.apply(id);
}
@Override
public Optional<TaskStatus> getTaskStatus(String id)
{
return Optional.of(TaskStatus.running(id));
}
};
return new TestIndexTaskClient(
httpClient,
objectMapper,
taskInfoProvider,
new Duration(1000),
"indexTaskClientTest",
1,
numRetries
);
}
private static class TestIndexTaskClient extends IndexTaskClient
{
private TestIndexTaskClient(
HttpClient httpClient,
ObjectMapper objectMapper,
TaskInfoProvider taskInfoProvider,
Duration httpTimeout,
String callerId,
int numThreads,
long numRetries
)
{
super(httpClient, objectMapper, taskInfoProvider, httpTimeout, callerId, numThreads, numRetries);
}
@Override
protected void checkConnection(String host, int port)
{
// do nothing
}
}
}