Enhancements to IndexTaskClient. (#12011)

* Enhancements to IndexTaskClient.

1) Ability to use handlers other than StringFullResponseHandler. This
   functionality is not used in production code yet, but is useful
   because it will allow tasks to communicate with each other in
   non-string-based formats and in streaming fashion. In the future,
   we'll be able to use this to make task-to-task communication
   more efficient.

2) Truncate server errors at 1KB, so long errors do not pollute logs.

3) Change error log level for retryable errors from WARN to INFO. (The
   final error is still WARN.)

4) Harmonize log and exception messages to have a more consistent format.

* Additional tests and improvements.
This commit is contained in:
Gian Merlino 2021-12-03 09:14:32 -08:00 committed by GitHub
parent f7f5505631
commit e0e05aad99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 1030 additions and 244 deletions

View File

@ -0,0 +1,146 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.common;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import javax.annotation.Nullable;
import java.util.Objects;
import java.util.function.Function;
/**
* Encapsulates either an "error" or a "value".
*
* Similar to the Either class in Scala or Haskell, except the possibilities are named "error" and "value" instead of
* "left" and "right".
*/
public class Either<L, R>
{
private final L error;
private final R value;
private Either(L error, R value)
{
this.error = error;
this.value = value;
}
public static <L, R> Either<L, R> error(final L error)
{
return new Either<>(Preconditions.checkNotNull(error, "error"), null);
}
public static <L, R> Either<L, R> value(@Nullable final R value)
{
return new Either<>(null, value);
}
public boolean isValue()
{
return error == null;
}
public boolean isError()
{
return error != null;
}
public L error()
{
if (isError()) {
return error;
} else {
throw new IllegalStateException("Not an error; check isError first");
}
}
/**
* If this Either represents a value, returns it. If this Either represents an error, throw an error.
*
* If the error is a {@link RuntimeException} or {@link Error}, it is thrown directly. If it is some other
* {@link Throwable}, it is wrapped in a RuntimeException and thrown. If it is not a throwable at all, a generic
* error is thrown containing the string representation of the error object.
*
* If you want to be able to retrieve the error as-is, use {@link #isError()} and {@link #error()} instead.
*/
@Nullable
public R valueOrThrow()
{
if (isValue()) {
return value;
} else if (error instanceof Throwable) {
Throwables.propagateIfPossible((Throwable) error);
throw new RuntimeException((Throwable) error);
} else {
throw new RuntimeException(error.toString());
}
}
/**
* Applies a function to this value, if present.
*
* If the mapping function throws an exception, it is thrown by this method instead of being packed up into
* the returned Either.
*
* If this Either represents an error, the mapping function is not applied.
*
* @throws NullPointerException if the mapping function returns null
*/
public <T> Either<L, T> map(final Function<R, T> fn)
{
if (isValue()) {
return Either.value(fn.apply(value));
} else {
// Safe because the value is never going to be returned.
//noinspection unchecked
return (Either<L, T>) this;
}
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Either<?, ?> either = (Either<?, ?>) o;
return Objects.equals(error, either.error) && Objects.equals(value, either.value);
}
@Override
public int hashCode()
{
return Objects.hash(error, value);
}
@Override
public String toString()
{
if (isValue()) {
return "Value[" + value + "]";
} else {
return "Error[" + error + "]";
}
}
}

View File

@ -32,7 +32,7 @@ public class BytesFullResponseHandler implements HttpResponseHandler<BytesFullRe
@Override
public ClientResponse<BytesFullResponseHolder> handleResponse(HttpResponse response, TrafficCop trafficCop)
{
BytesFullResponseHolder holder = new BytesFullResponseHolder(response.getStatus(), response);
BytesFullResponseHolder holder = new BytesFullResponseHolder(response);
holder.addChunk(getContentBytes(response.getContent()));

View File

@ -20,7 +20,6 @@
package org.apache.druid.java.util.http.client.response;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@ -30,9 +29,9 @@ public class BytesFullResponseHolder extends FullResponseHolder<byte[]>
{
private final List<byte[]> chunks;
public BytesFullResponseHolder(HttpResponseStatus status, HttpResponse response)
public BytesFullResponseHolder(HttpResponse response)
{
super(status, response);
super(response);
this.chunks = new ArrayList<>();
}

View File

@ -29,18 +29,16 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus;
*/
public abstract class FullResponseHolder<T>
{
private final HttpResponseStatus status;
private final HttpResponse response;
public FullResponseHolder(HttpResponseStatus status, HttpResponse response)
public FullResponseHolder(HttpResponse response)
{
this.status = status;
this.response = response;
}
public HttpResponseStatus getStatus()
{
return status;
return response.getStatus();
}
public HttpResponse getResponse()

View File

@ -32,7 +32,7 @@ public class InputStreamFullResponseHandler implements HttpResponseHandler<Input
@Override
public ClientResponse<InputStreamFullResponseHolder> handleResponse(HttpResponse response, TrafficCop trafficCop)
{
InputStreamFullResponseHolder holder = new InputStreamFullResponseHolder(response.getStatus(), response);
InputStreamFullResponseHolder holder = new InputStreamFullResponseHolder(response);
holder.addChunk(getContentBytes(response.getContent()));
return ClientResponse.finished(holder);
}

View File

@ -21,7 +21,6 @@ package org.apache.druid.java.util.http.client.response;
import org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import java.io.InputStream;
@ -29,12 +28,9 @@ public class InputStreamFullResponseHolder extends FullResponseHolder<InputStrea
{
private final AppendableByteArrayInputStream is;
public InputStreamFullResponseHolder(
HttpResponseStatus status,
HttpResponse response
)
public InputStreamFullResponseHolder(HttpResponse response)
{
super(status, response);
super(response);
is = new AppendableByteArrayInputStream();
}

View File

@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.http.client.response;
import org.apache.druid.java.util.common.Either;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpResponse;
import java.nio.charset.StandardCharsets;
/**
* Response handler that delegates successful responses (2xx response codes) to some other handler, but returns
* errors (non-2xx response codes) as Strings. The return value is an {@link Either}.
*/
public class ObjectOrErrorResponseHandler<IntermediateType, FinalType>
implements HttpResponseHandler<Either<StringFullResponseHolder, IntermediateType>, Either<StringFullResponseHolder, FinalType>>
{
private final HttpResponseHandler<IntermediateType, FinalType> okHandler;
private final StringFullResponseHandler errorHandler;
public ObjectOrErrorResponseHandler(HttpResponseHandler<IntermediateType, FinalType> okHandler)
{
this.okHandler = okHandler;
this.errorHandler = new StringFullResponseHandler(StandardCharsets.UTF_8);
}
@Override
public ClientResponse<Either<StringFullResponseHolder, IntermediateType>> handleResponse(
final HttpResponse response,
final TrafficCop trafficCop
)
{
if (response.getStatus().getCode() / 100 == 2) {
final ClientResponse<IntermediateType> delegateResponse = okHandler.handleResponse(response, trafficCop);
return new ClientResponse<>(
delegateResponse.isFinished(),
delegateResponse.isContinueReading(),
Either.value(delegateResponse.getObj())
);
} else {
final ClientResponse<StringFullResponseHolder> delegateResponse =
errorHandler.handleResponse(response, trafficCop);
return new ClientResponse<>(
delegateResponse.isFinished(),
delegateResponse.isContinueReading(),
Either.error(delegateResponse.getObj())
);
}
}
@Override
public ClientResponse<Either<StringFullResponseHolder, IntermediateType>> handleChunk(
final ClientResponse<Either<StringFullResponseHolder, IntermediateType>> clientResponse,
final HttpChunk chunk,
final long chunkNum
)
{
final Either<StringFullResponseHolder, IntermediateType> prevHolder = clientResponse.getObj();
if (prevHolder.isValue()) {
final ClientResponse<IntermediateType> delegateResponse = okHandler.handleChunk(
new ClientResponse<>(
clientResponse.isFinished(),
clientResponse.isContinueReading(),
prevHolder.valueOrThrow()
),
chunk,
chunkNum
);
return new ClientResponse<>(
delegateResponse.isFinished(),
delegateResponse.isContinueReading(),
Either.value(delegateResponse.getObj())
);
} else {
final ClientResponse<StringFullResponseHolder> delegateResponse = errorHandler.handleChunk(
new ClientResponse<>(
clientResponse.isFinished(),
clientResponse.isContinueReading(),
prevHolder.error()
),
chunk,
chunkNum
);
return new ClientResponse<>(
delegateResponse.isFinished(),
delegateResponse.isContinueReading(),
Either.error(delegateResponse.getObj())
);
}
}
@Override
public ClientResponse<Either<StringFullResponseHolder, FinalType>> done(
final ClientResponse<Either<StringFullResponseHolder, IntermediateType>> clientResponse
)
{
final Either<StringFullResponseHolder, IntermediateType> prevHolder = clientResponse.getObj();
if (prevHolder.isValue()) {
final ClientResponse<FinalType> delegateResponse = okHandler.done(
new ClientResponse<>(
clientResponse.isFinished(),
clientResponse.isContinueReading(),
prevHolder.valueOrThrow()
)
);
return new ClientResponse<>(
delegateResponse.isFinished(),
delegateResponse.isContinueReading(),
Either.value(delegateResponse.getObj())
);
} else {
final ClientResponse<StringFullResponseHolder> delegateResponse = errorHandler.done(
new ClientResponse<>(
clientResponse.isFinished(),
clientResponse.isContinueReading(),
prevHolder.error()
)
);
return new ClientResponse<>(
delegateResponse.isFinished(),
delegateResponse.isContinueReading(),
Either.error(delegateResponse.getObj())
);
}
}
@Override
public void exceptionCaught(
final ClientResponse<Either<StringFullResponseHolder, IntermediateType>> clientResponse,
final Throwable e
)
{
final Either<StringFullResponseHolder, IntermediateType> prevHolder = clientResponse.getObj();
if (prevHolder.isValue()) {
okHandler.exceptionCaught(
new ClientResponse<>(
clientResponse.isFinished(),
clientResponse.isContinueReading(),
prevHolder.valueOrThrow()
),
e
);
} else {
errorHandler.exceptionCaught(
new ClientResponse<>(
clientResponse.isFinished(),
clientResponse.isContinueReading(),
prevHolder.error()
),
e
);
}
}
}

View File

@ -41,7 +41,7 @@ public class StringFullResponseHandler
@Override
public ClientResponse<StringFullResponseHolder> handleResponse(HttpResponse response, TrafficCop trafficCop)
{
return ClientResponse.unfinished(new StringFullResponseHolder(response.getStatus(), response, charset));
return ClientResponse.unfinished(new StringFullResponseHolder(response, charset));
}
@Override

View File

@ -20,7 +20,6 @@
package org.apache.druid.java.util.http.client.response;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import java.nio.charset.Charset;
@ -29,12 +28,11 @@ public class StringFullResponseHolder extends FullResponseHolder<String>
private final StringBuilder builder;
public StringFullResponseHolder(
HttpResponseStatus status,
HttpResponse response,
Charset charset
)
{
super(status, response);
super(response);
this.builder = new StringBuilder(response.getContent().toString(charset));
}

View File

@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.common;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.StringUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Test;
public class EitherTest
{
@Test
public void testValueString()
{
final Either<String, String> either = Either.value("yay");
Assert.assertFalse(either.isError());
Assert.assertTrue(either.isValue());
Assert.assertEquals("yay", either.valueOrThrow());
final IllegalStateException e = Assert.assertThrows(IllegalStateException.class, either::error);
MatcherAssert.assertThat(e.getMessage(), CoreMatchers.startsWith("Not an error"));
// Test toString.
Assert.assertEquals("Value[yay]", either.toString());
// Test map.
Assert.assertEquals(Either.value("YAY"), either.map(StringUtils::toUpperCase));
}
@Test
public void testValueNull()
{
final Either<String, String> either = Either.value(null);
Assert.assertFalse(either.isError());
Assert.assertTrue(either.isValue());
Assert.assertNull(either.valueOrThrow());
final IllegalStateException e = Assert.assertThrows(IllegalStateException.class, either::error);
MatcherAssert.assertThat(e.getMessage(), CoreMatchers.startsWith("Not an error"));
// Test toString.
Assert.assertEquals("Value[null]", either.toString());
// Test map.
Assert.assertEquals(Either.value("nullxyz"), either.map(s -> s + "xyz"));
}
@Test
public void testErrorString()
{
final Either<String, Object> either = Either.error("oh no");
Assert.assertTrue(either.isError());
Assert.assertFalse(either.isValue());
Assert.assertEquals("oh no", either.error());
final RuntimeException e = Assert.assertThrows(RuntimeException.class, either::valueOrThrow);
MatcherAssert.assertThat(e.getMessage(), CoreMatchers.equalTo("oh no"));
// Test toString.
Assert.assertEquals("Error[oh no]", either.toString());
// Test map.
Assert.assertEquals(either, either.map(o -> "this does nothing because the Either is an error"));
}
@Test
public void testErrorThrowable()
{
final Either<Throwable, Object> either = Either.error(new AssertionError("oh no"));
Assert.assertTrue(either.isError());
Assert.assertFalse(either.isValue());
MatcherAssert.assertThat(either.error(), CoreMatchers.instanceOf(AssertionError.class));
MatcherAssert.assertThat(either.error().getMessage(), CoreMatchers.equalTo("oh no"));
final AssertionError e = Assert.assertThrows(AssertionError.class, either::valueOrThrow);
MatcherAssert.assertThat(e.getMessage(), CoreMatchers.equalTo("oh no"));
// Test toString.
Assert.assertEquals("Error[java.lang.AssertionError: oh no]", either.toString());
}
@Test
public void testEqualsAndHashCode()
{
EqualsVerifier.forClass(Either.class).usingGetClass().verify();
}
}

View File

@ -0,0 +1,121 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.java.util.http.client.response;
import org.apache.commons.io.IOUtils;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.StringUtils;
import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
import org.jboss.netty.handler.codec.http.DefaultHttpChunk;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
public class ObjectOrErrorResponseHandlerTest
{
@Test
public void testOk() throws Exception
{
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.setChunked(false);
response.setContent(new BigEndianHeapChannelBuffer("abcd".getBytes(StringUtils.UTF8_STRING)));
final ObjectOrErrorResponseHandler<InputStreamFullResponseHolder, InputStreamFullResponseHolder> responseHandler =
new ObjectOrErrorResponseHandler<>(new InputStreamFullResponseHandler());
ClientResponse<Either<StringFullResponseHolder, InputStreamFullResponseHolder>> clientResp =
responseHandler.handleResponse(response, null);
DefaultHttpChunk chunk =
new DefaultHttpChunk(new BigEndianHeapChannelBuffer("efg".getBytes(StringUtils.UTF8_STRING)));
clientResp = responseHandler.handleChunk(clientResp, chunk, 0);
clientResp = responseHandler.done(clientResp);
Assert.assertTrue(clientResp.isFinished());
Assert.assertEquals(
"abcdefg",
IOUtils.toString(clientResp.getObj().valueOrThrow().getContent(), StandardCharsets.UTF_8)
);
}
@Test
public void testExceptionAfterOk() throws Exception
{
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
response.setChunked(false);
response.setContent(new BigEndianHeapChannelBuffer("abcd".getBytes(StringUtils.UTF8_STRING)));
final ObjectOrErrorResponseHandler<InputStreamFullResponseHolder, InputStreamFullResponseHolder> responseHandler =
new ObjectOrErrorResponseHandler<>(new InputStreamFullResponseHandler());
ClientResponse<Either<StringFullResponseHolder, InputStreamFullResponseHolder>> clientResp =
responseHandler.handleResponse(response, null);
Exception ex = new RuntimeException("dummy!");
responseHandler.exceptionCaught(clientResp, ex);
// Exception after HTTP OK still is handled by the "OK handler"
// (The handler that starts the request gets to finish it.)
Assert.assertTrue(clientResp.isFinished());
Assert.assertTrue(clientResp.getObj().isValue());
final InputStream responseStream = clientResp.getObj().valueOrThrow().getContent();
final IOException e = Assert.assertThrows(
IOException.class,
() -> IOUtils.toString(responseStream, StandardCharsets.UTF_8)
);
Assert.assertEquals("java.lang.RuntimeException: dummy!", e.getMessage());
}
@Test
public void testServerError() throws Exception
{
HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR);
response.setChunked(false);
response.setContent(new BigEndianHeapChannelBuffer("abcd".getBytes(StringUtils.UTF8_STRING)));
final ObjectOrErrorResponseHandler<InputStreamFullResponseHolder, InputStreamFullResponseHolder> responseHandler =
new ObjectOrErrorResponseHandler<>(new InputStreamFullResponseHandler());
ClientResponse<Either<StringFullResponseHolder, InputStreamFullResponseHolder>> clientResp =
responseHandler.handleResponse(response, null);
DefaultHttpChunk chunk =
new DefaultHttpChunk(new BigEndianHeapChannelBuffer("efg".getBytes(StringUtils.UTF8_STRING)));
clientResp = responseHandler.handleChunk(clientResp, chunk, 0);
clientResp = responseHandler.done(clientResp);
// 5xx HTTP code is handled by the error handler.
Assert.assertTrue(clientResp.isFinished());
Assert.assertTrue(clientResp.getObj().isError());
Assert.assertEquals(
HttpResponseStatus.INTERNAL_SERVER_ERROR.getCode(),
clientResp.getObj().error().getResponse().getStatus().getCode()
);
Assert.assertEquals("abcdefg", clientResp.getObj().error().getContent());
}
}

View File

@ -33,16 +33,17 @@ import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
import org.apache.druid.java.util.http.client.response.ObjectOrErrorResponseHandler;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.hamcrest.CoreMatchers;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse;
@ -182,18 +183,19 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
public void testInternalServerError()
{
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("org.apache.druid.java.util.common.IOE: Received status [500] and content []");
expectedException.expectCause(CoreMatchers.instanceOf(IOException.class));
expectedException.expectMessage("Received server error with status [500 Internal Server Error]");
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.INTERNAL_SERVER_ERROR).times(2);
EasyMock.expect(responseHolder.getContent()).andReturn("");
EasyMock.expect(
httpClient.go(
EasyMock.anyObject(Request.class),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)
).andReturn(
Futures.immediateFuture(responseHolder)
errorResponseHolder()
);
replayAll();
@ -204,19 +206,19 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
@Test
public void testBadRequest()
{
expectedException.expect(IAE.class);
expectedException.expectMessage("Received 400 Bad Request with body:");
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Received server error with status [400 Bad Request]");
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.BAD_REQUEST).times(2);
EasyMock.expect(responseHolder.getContent()).andReturn("");
EasyMock.expect(
httpClient.go(
EasyMock.anyObject(Request.class),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)
).andReturn(
Futures.immediateFuture(responseHolder)
errorResponseHolder()
);
replayAll();
@ -227,22 +229,22 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
@Test
public void testTaskLocationMismatch()
{
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(3)
.andReturn(HttpResponseStatus.OK);
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(2);
EasyMock.expect(responseHolder.getResponse()).andReturn(response);
EasyMock.expect(responseHolder.getContent()).andReturn("").times(2)
.andReturn("{}");
EasyMock.expect(responseHolder.getContent()).andReturn("").andReturn("{}");
EasyMock.expect(response.headers()).andReturn(headers);
EasyMock.expect(headers.get("X-Druid-Task-Id")).andReturn("a-different-task-id");
EasyMock.expect(
httpClient.go(
EasyMock.anyObject(Request.class),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)
).andReturn(
Futures.immediateFuture(responseHolder)
).times(2);
errorResponseHolder()
).andReturn(
okResponseHolder()
);
replayAll();
Map<Integer, Long> results = client.getCurrentOffsets(TEST_ID, true);
@ -255,14 +257,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
public void testGetCurrentOffsets() throws Exception
{
Capture<Request> captured = Capture.newInstance();
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}");
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
replayAll();
@ -288,9 +289,8 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
client = new TestableKafkaIndexTaskClient(httpClient, OBJECT_MAPPER, taskInfoProvider, 3);
Capture<Request> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(6)
.andReturn(HttpResponseStatus.OK).times(1);
EasyMock.expect(responseHolder.getContent()).andReturn("").times(4)
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(4);
EasyMock.expect(responseHolder.getContent()).andReturn("").times(2)
.andReturn("{\"0\":1, \"1\":10}");
EasyMock.expect(responseHolder.getResponse()).andReturn(response).times(2);
EasyMock.expect(response.headers()).andReturn(headers).times(2);
@ -298,11 +298,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
).times(3);
errorResponseHolder()
).times(2).andReturn(
okResponseHolder()
);
replayAll();
@ -328,7 +330,8 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
public void testGetCurrentOffsetsWithExhaustedRetries()
{
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("org.apache.druid.java.util.common.IOE: Received status [404]");
expectedException.expectCause(CoreMatchers.instanceOf(IOException.class));
expectedException.expectMessage("Received server error with status [404 Not Found]");
client = new TestableKafkaIndexTaskClient(httpClient, OBJECT_MAPPER, taskInfoProvider, 2);
@ -341,10 +344,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(
httpClient.go(
EasyMock.anyObject(Request.class),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)
).andReturn(Futures.immediateFuture(responseHolder)).anyTimes();
).andReturn(errorResponseHolder()).anyTimes();
replayAll();
client.getCurrentOffsets(TEST_ID, true);
@ -355,14 +358,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
public void testGetEndOffsets() throws Exception
{
Capture<Request> captured = Capture.newInstance();
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}");
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
replayAll();
@ -389,19 +391,16 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
DateTime now = DateTimes.nowUtc();
Capture<Request> captured = Capture.newInstance();
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(3)
.andReturn(HttpResponseStatus.OK);
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(2);
EasyMock.expect(responseHolder.getResponse()).andReturn(response);
EasyMock.expect(response.headers()).andReturn(headers);
EasyMock.expect(headers.get("X-Druid-Task-Id")).andReturn(null);
EasyMock.expect(responseHolder.getContent()).andReturn(String.valueOf(now.getMillis())).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
).times(2);
)).andReturn(errorResponseHolder()).once().andReturn(okResponseHolder());
replayAll();
DateTime results = client.getStartTime(TEST_ID);
@ -424,14 +423,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
Status status = Status.READING;
Capture<Request> captured = Capture.newInstance();
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
EasyMock.expect(responseHolder.getContent()).andReturn(StringUtils.format("\"%s\"", status.toString())).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
replayAll();
@ -453,14 +451,14 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
public void testPause() throws Exception
{
Capture<Request> captured = Capture.newInstance();
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).times(2);
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}").anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
replayAll();
@ -487,30 +485,29 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
Capture<Request> captured2 = Capture.newInstance();
Capture<Request> captured3 = Capture.newInstance();
// one time in IndexTaskClient.submitRequest() and another in KafkaIndexTaskClient.pause()
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.ACCEPTED).times(2)
.andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.ACCEPTED);
EasyMock.expect(responseHolder.getContent()).andReturn("\"PAUSED\"").times(2)
.andReturn("{\"0\":1, \"1\":10}").anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
EasyMock.expect(httpClient.go(
EasyMock.capture(captured2),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
EasyMock.expect(httpClient.go(
EasyMock.capture(captured3),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
replayAll();
@ -552,10 +549,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
replayAll();
@ -580,10 +577,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
replayAll();
@ -609,10 +606,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
replayAll();
@ -636,10 +633,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
replayAll();
@ -662,10 +659,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
replayAll();
@ -689,10 +686,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
).times(numRequests);
replayAll();
@ -725,10 +722,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
).times(numRequests);
replayAll();
@ -762,10 +759,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
).times(numRequests);
replayAll();
@ -799,10 +796,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getContent()).andReturn("\"READING\"").anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
).times(numRequests);
replayAll();
@ -837,10 +834,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getContent()).andReturn(String.valueOf(now.getMillis())).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
).times(numRequests);
replayAll();
@ -874,10 +871,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
).times(numRequests);
replayAll();
@ -911,10 +908,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
).times(numRequests);
replayAll();
@ -948,10 +945,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
).times(numRequests);
replayAll();
@ -991,10 +988,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
).times(numRequests);
replayAll();
@ -1029,6 +1026,16 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport
}
}
private ListenableFuture<Either> okResponseHolder()
{
return Futures.immediateFuture(Either.value(responseHolder));
}
private ListenableFuture<Either> errorResponseHolder()
{
return Futures.immediateFuture(Either.error(responseHolder));
}
private class TestableKafkaIndexTaskClient extends KafkaIndexTaskClient
{
TestableKafkaIndexTaskClient(

View File

@ -33,16 +33,17 @@ import org.apache.druid.indexing.common.TaskInfoProvider;
import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
import org.apache.druid.java.util.http.client.response.ObjectOrErrorResponseHandler;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.easymock.Capture;
import org.easymock.CaptureType;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
import org.hamcrest.CoreMatchers;
import org.jboss.netty.handler.codec.http.HttpHeaders;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpResponse;
@ -183,18 +184,19 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
public void testInternalServerError()
{
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("org.apache.druid.java.util.common.IOE: Received status [500] and content []");
expectedException.expectCause(CoreMatchers.instanceOf(IOException.class));
expectedException.expectMessage("Received server error with status [500 Internal Server Error]");
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.INTERNAL_SERVER_ERROR).times(2);
EasyMock.expect(responseHolder.getContent()).andReturn("");
EasyMock.expect(
httpClient.go(
EasyMock.anyObject(Request.class),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)
).andReturn(
Futures.immediateFuture(responseHolder)
errorResponseHolder()
);
replayAll();
@ -205,19 +207,19 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
@Test
public void testBadRequest()
{
expectedException.expect(IAE.class);
expectedException.expectMessage("Received 400 Bad Request with body:");
expectedException.expect(IllegalArgumentException.class);
expectedException.expectMessage("Received server error with status [400 Bad Request]");
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.BAD_REQUEST).times(2);
EasyMock.expect(responseHolder.getContent()).andReturn("");
EasyMock.expect(
httpClient.go(
EasyMock.anyObject(Request.class),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)
).andReturn(
Futures.immediateFuture(responseHolder)
errorResponseHolder()
);
replayAll();
@ -228,22 +230,22 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
@Test
public void testTaskLocationMismatch()
{
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(3)
.andReturn(HttpResponseStatus.OK);
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(2);
EasyMock.expect(responseHolder.getResponse()).andReturn(response);
EasyMock.expect(responseHolder.getContent()).andReturn("").times(2)
.andReturn("{}");
EasyMock.expect(responseHolder.getContent()).andReturn("").andReturn("{}");
EasyMock.expect(response.headers()).andReturn(headers);
EasyMock.expect(headers.get("X-Druid-Task-Id")).andReturn("a-different-task-id");
EasyMock.expect(
httpClient.go(
EasyMock.anyObject(Request.class),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)
).andReturn(
Futures.immediateFuture(responseHolder)
).times(2);
errorResponseHolder()
).andReturn(
okResponseHolder()
);
replayAll();
Map<String, String> results = client.getCurrentOffsets(TEST_ID, true);
@ -256,14 +258,13 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
public void testGetCurrentOffsets() throws Exception
{
Capture<Request> captured = Capture.newInstance();
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}");
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
replayAll();
@ -289,9 +290,8 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
client = new TestableKinesisIndexTaskClient(httpClient, OBJECT_MAPPER, taskInfoProvider, 3);
Capture<Request> captured = Capture.newInstance(CaptureType.ALL);
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(6)
.andReturn(HttpResponseStatus.OK).times(1);
EasyMock.expect(responseHolder.getContent()).andReturn("").times(4)
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(4);
EasyMock.expect(responseHolder.getContent()).andReturn("").times(2)
.andReturn("{\"0\":1, \"1\":10}");
EasyMock.expect(responseHolder.getResponse()).andReturn(response).times(2);
EasyMock.expect(response.headers()).andReturn(headers).times(2);
@ -299,11 +299,13 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
).times(3);
errorResponseHolder()
).times(2).andReturn(
okResponseHolder()
);
replayAll();
@ -329,7 +331,8 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
public void testGetCurrentOffsetsWithExhaustedRetries()
{
expectedException.expect(RuntimeException.class);
expectedException.expectMessage("org.apache.druid.java.util.common.IOE: Received status [404]");
expectedException.expectCause(CoreMatchers.instanceOf(IOException.class));
expectedException.expectMessage("Received server error with status [404 Not Found]");
client = new TestableKinesisIndexTaskClient(httpClient, OBJECT_MAPPER, taskInfoProvider, 2);
@ -342,10 +345,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(
httpClient.go(
EasyMock.anyObject(Request.class),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)
).andReturn(Futures.immediateFuture(responseHolder)).anyTimes();
).andReturn(errorResponseHolder()).anyTimes();
replayAll();
client.getCurrentOffsets(TEST_ID, true);
@ -356,14 +359,13 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
public void testGetEndOffsets() throws Exception
{
Capture<Request> captured = Capture.newInstance();
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}");
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
replayAll();
@ -390,19 +392,16 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
DateTime now = DateTimes.nowUtc();
Capture<Request> captured = Capture.newInstance();
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(3)
.andReturn(HttpResponseStatus.OK);
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(2);
EasyMock.expect(responseHolder.getResponse()).andReturn(response);
EasyMock.expect(response.headers()).andReturn(headers);
EasyMock.expect(headers.get("X-Druid-Task-Id")).andReturn(null);
EasyMock.expect(responseHolder.getContent()).andReturn(String.valueOf(now.getMillis())).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
).times(2);
)).andReturn(errorResponseHolder()).once().andReturn(okResponseHolder());
replayAll();
DateTime results = client.getStartTime(TEST_ID);
@ -425,14 +424,13 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
Status status = Status.READING;
Capture<Request> captured = Capture.newInstance();
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
EasyMock.expect(responseHolder.getContent()).andReturn(StringUtils.format("\"%s\"", status.toString())).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
replayAll();
@ -454,14 +452,14 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
public void testPause() throws Exception
{
Capture<Request> captured = Capture.newInstance();
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).times(2);
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK);
EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}").anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
replayAll();
@ -487,30 +485,29 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
Capture<Request> captured = Capture.newInstance();
Capture<Request> captured2 = Capture.newInstance();
Capture<Request> captured3 = Capture.newInstance();
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.ACCEPTED).times(2)
.andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.ACCEPTED);
EasyMock.expect(responseHolder.getContent()).andReturn("\"PAUSED\"").times(2)
.andReturn("{\"0\":1, \"1\":10}").anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
EasyMock.expect(httpClient.go(
EasyMock.capture(captured2),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
EasyMock.expect(httpClient.go(
EasyMock.capture(captured3),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
replayAll();
@ -552,10 +549,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
replayAll();
@ -580,10 +577,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
replayAll();
@ -609,10 +606,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
replayAll();
@ -636,10 +633,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
replayAll();
@ -662,10 +659,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
);
replayAll();
@ -689,10 +686,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
).times(numRequests);
replayAll();
@ -725,10 +722,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
).times(numRequests);
replayAll();
@ -762,10 +759,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
).times(numRequests);
replayAll();
@ -799,10 +796,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getContent()).andReturn("\"READING\"").anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
).times(numRequests);
replayAll();
@ -837,10 +834,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getContent()).andReturn(String.valueOf(now.getMillis())).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
).times(numRequests);
replayAll();
@ -874,10 +871,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
).times(numRequests);
replayAll();
@ -911,10 +908,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
).times(numRequests);
replayAll();
@ -948,10 +945,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
).times(numRequests);
replayAll();
@ -991,10 +988,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(httpClient.go(
EasyMock.capture(captured),
EasyMock.anyObject(StringFullResponseHandler.class),
EasyMock.anyObject(ObjectOrErrorResponseHandler.class),
EasyMock.eq(TEST_HTTP_TIMEOUT)
)).andReturn(
Futures.immediateFuture(responseHolder)
okResponseHolder()
).times(numRequests);
replayAll();
@ -1029,6 +1026,16 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport
}
}
private ListenableFuture<Either> okResponseHolder()
{
return Futures.immediateFuture(Either.value(responseHolder));
}
private ListenableFuture<Either> errorResponseHolder()
{
return Futures.immediateFuture(Either.error(responseHolder));
}
private class TestableKinesisIndexTaskClient extends KinesisIndexTaskClient
{
TestableKinesisIndexTaskClient(

View File

@ -28,12 +28,14 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.StringUtils;
@ -41,6 +43,8 @@ import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.Request;
import org.apache.druid.java.util.http.client.response.HttpResponseHandler;
import org.apache.druid.java.util.http.client.response.ObjectOrErrorResponseHandler;
import org.apache.druid.java.util.http.client.response.StringFullResponseHandler;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.segment.realtime.firehose.ChatHandlerResource;
@ -217,7 +221,16 @@ public abstract class IndexTaskClient implements AutoCloseable
boolean retry
) throws IOException, ChannelException, NoTaskLocationException
{
return submitRequest(taskId, null, method, encodedPathSuffix, encodedQueryString, new byte[0], retry);
return submitRequest(
taskId,
null,
method,
encodedPathSuffix,
encodedQueryString,
new byte[0],
new StringFullResponseHandler(StandardCharsets.UTF_8),
retry
);
}
/**
@ -239,6 +252,7 @@ public abstract class IndexTaskClient implements AutoCloseable
encodedPathSuffix,
encodedQueryString,
content,
new StringFullResponseHandler(StandardCharsets.UTF_8),
retry
);
}
@ -262,6 +276,7 @@ public abstract class IndexTaskClient implements AutoCloseable
encodedPathSuffix,
encodedQueryString,
content,
new StringFullResponseHandler(StandardCharsets.UTF_8),
retry
);
}
@ -293,13 +308,14 @@ public abstract class IndexTaskClient implements AutoCloseable
/**
* Sends an HTTP request to the task of the specified {@code taskId} and returns a response if it succeeded.
*/
private StringFullResponseHolder submitRequest(
protected <IntermediateType, FinalType> FinalType submitRequest(
String taskId,
@Nullable String mediaType, // nullable if content is empty
HttpMethod method,
String encodedPathSuffix,
@Nullable String encodedQueryString,
byte[] content,
HttpResponseHandler<IntermediateType, FinalType> responseHandler,
boolean retry
) throws IOException, ChannelException, NoTaskLocationException
{
@ -333,21 +349,38 @@ public abstract class IndexTaskClient implements AutoCloseable
content
);
StringFullResponseHolder response = null;
Either<StringFullResponseHolder, FinalType> response = null;
try {
// Netty throws some annoying exceptions if a connection can't be opened, which happens relatively frequently
// for tasks that happen to still be starting up, so test the connection first to keep the logs clean.
checkConnection(request.getUrl().getHost(), request.getUrl().getPort());
response = submitRequest(request);
response = submitRequest(request, responseHandler);
int responseCode = response.getStatus().getCode();
if (responseCode / 100 == 2) {
return response;
} else if (responseCode == 400) { // don't bother retrying if it's a bad request
throw new IAE("Received 400 Bad Request with body: %s", response.getContent());
if (response.isValue()) {
return response.valueOrThrow();
} else {
throw new IOE("Received status [%d] and content [%s]", responseCode, response.getContent());
final StringBuilder exceptionMessage = new StringBuilder();
final HttpResponseStatus httpResponseStatus = response.error().getStatus();
final String httpResponseContent = response.error().getContent();
exceptionMessage.append("Received server error with status [").append(httpResponseStatus).append("]");
if (!Strings.isNullOrEmpty(httpResponseContent)) {
final String choppedMessage =
StringUtils.chop(
StringUtils.nullToEmptyNonDruidDataString(httpResponseContent),
1000
);
exceptionMessage.append("; first 1KB of body: ").append(choppedMessage);
}
if (httpResponseStatus.getCode() == 400) {
// don't bother retrying if it's a bad request
throw new IAE(exceptionMessage.toString());
} else {
throw new IOE(exceptionMessage.toString());
}
}
}
catch (IOException | ChannelException e) {
@ -360,9 +393,10 @@ public abstract class IndexTaskClient implements AutoCloseable
// eventually be updated.
final Duration delay;
if (response != null && response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
if (response != null && !response.isValue()
&& response.error().getStatus().equals(HttpResponseStatus.NOT_FOUND)) {
String headerId = StringUtils.urlDecode(
response.getResponse().headers().get(ChatHandlerResource.TASK_ID_HEADER)
response.error().getResponse().headers().get(ChatHandlerResource.TASK_ID_HEADER)
);
if (headerId != null && !headerId.equals(taskId)) {
log.warn(
@ -381,22 +415,24 @@ public abstract class IndexTaskClient implements AutoCloseable
final String urlForLog = request.getUrl().toString();
if (!retry) {
// if retry=false, we probably aren't too concerned if the operation doesn't succeed (i.e. the request was
// for informational purposes only) so don't log a scary stack trace
log.info("submitRequest failed for [%s], with message [%s]", urlForLog, e.getMessage());
// for informational purposes only); log at INFO instead of WARN.
log.noStackTrace().info(e, "submitRequest failed for [%s]", urlForLog);
throw e;
} else if (delay == null) {
log.warn(e, "Retries exhausted for [%s], last exception:", urlForLog);
// When retrying, log the final failure at WARN level, since it is likely to be bad news.
log.warn(e, "submitRequest failed for [%s]", urlForLog);
throw e;
} else {
try {
final long sleepTime = delay.getMillis();
log.warn(
"Bad response HTTP [%s] from [%s]; will try again in [%s] (body/exception: [%s])",
(response != null ? response.getStatus().getCode() : "no response"),
// When retrying, log non-final failures at INFO level.
log.noStackTrace().info(
e,
"submitRequest failed for [%s]; will try again in [%s]",
urlForLog,
new Duration(sleepTime).toString(),
(response != null ? response.getContent() : e.getMessage())
new Duration(sleepTime).toString()
);
Thread.sleep(sleepTime);
}
catch (InterruptedException e2) {
@ -421,11 +457,17 @@ public abstract class IndexTaskClient implements AutoCloseable
}
}
private StringFullResponseHolder submitRequest(Request request) throws IOException, ChannelException
private <IntermediateType, FinalType> Either<StringFullResponseHolder, FinalType> submitRequest(
Request request,
HttpResponseHandler<IntermediateType, FinalType> responseHandler
) throws IOException, ChannelException
{
final ObjectOrErrorResponseHandler<IntermediateType, FinalType> wrappedHandler =
new ObjectOrErrorResponseHandler<>(responseHandler);
try {
log.debug("HTTP %s: %s", request.getMethod().getName(), request.getUrl().toString());
return httpClient.go(request, new StringFullResponseHandler(StandardCharsets.UTF_8), httpTimeout).get();
return httpClient.go(request, wrappedHandler, httpTimeout).get();
}
catch (Exception e) {
throw throwIfPossible(e);

View File

@ -25,8 +25,10 @@ import com.google.common.util.concurrent.Futures;
import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.Either;
import org.apache.druid.java.util.http.client.HttpClient;
import org.apache.druid.java.util.http.client.response.StringFullResponseHolder;
import org.apache.druid.segment.realtime.firehose.ChatHandlerResource;
import org.easymock.EasyMock;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
@ -82,10 +84,11 @@ public class IndexTaskClientTest
EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(
Futures.immediateFuture(
new StringFullResponseHolder(
HttpResponseStatus.OK,
new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK),
StandardCharsets.UTF_8
Either.value(
new StringFullResponseHolder(
new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK),
StandardCharsets.UTF_8
)
)
)
)
@ -101,9 +104,201 @@ public class IndexTaskClientTest
);
Assert.assertEquals(HttpResponseStatus.OK, response.getStatus());
}
}
}
private IndexTaskClient buildIndexTaskClient(HttpClient httpClient, Function<String, TaskLocation> taskLocationProvider)
@Test
public void retryOnServerError() throws IOException
{
final HttpClient httpClient = EasyMock.createMock(HttpClient.class);
EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(
Futures.immediateFuture(
Either.error(
new StringFullResponseHolder(
new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR),
StandardCharsets.UTF_8
).addChunk("Error")
)
)
)
.times(2);
EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(
Futures.immediateFuture(
Either.value(
new StringFullResponseHolder(
new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK),
StandardCharsets.UTF_8
)
)
)
)
.once();
EasyMock.replay(httpClient);
try (IndexTaskClient indexTaskClient = buildIndexTaskClient(httpClient, id -> TaskLocation.create(id, 8000, -1))) {
final StringFullResponseHolder response = indexTaskClient.submitRequestWithEmptyContent(
"taskId",
HttpMethod.GET,
"test",
null,
true
);
Assert.assertEquals(HttpResponseStatus.OK, response.getStatus());
}
EasyMock.verify(httpClient);
}
@Test
public void retryIfNotFoundWithIncorrectTaskId() throws IOException
{
final HttpClient httpClient = EasyMock.createMock(HttpClient.class);
final String taskId = "taskId";
final String incorrectTaskId = "incorrectTaskId";
final DefaultHttpResponse incorrectResponse =
new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
incorrectResponse.headers().add(ChatHandlerResource.TASK_ID_HEADER, incorrectTaskId);
final DefaultHttpResponse correctResponse =
new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
correctResponse.headers().add(ChatHandlerResource.TASK_ID_HEADER, taskId);
EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(
Futures.immediateFuture(
Either.error(
new StringFullResponseHolder(
incorrectResponse,
StandardCharsets.UTF_8
)
)
)
)
.times(2);
EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(
Futures.immediateFuture(
Either.value(
new StringFullResponseHolder(
correctResponse,
StandardCharsets.UTF_8
)
)
)
)
.once();
EasyMock.replay(httpClient);
try (IndexTaskClient indexTaskClient = buildIndexTaskClient(httpClient, id -> TaskLocation.create(id, 8000, -1))) {
final StringFullResponseHolder response = indexTaskClient.submitRequestWithEmptyContent(
taskId,
HttpMethod.GET,
"test",
null,
true
);
Assert.assertEquals(HttpResponseStatus.OK, response.getStatus());
}
EasyMock.verify(httpClient);
}
@Test
public void dontRetryOnBadRequest()
{
final HttpClient httpClient = EasyMock.createMock(HttpClient.class);
EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(
Futures.immediateFuture(
Either.error(
new StringFullResponseHolder(
new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST),
StandardCharsets.UTF_8
).addChunk("Error")
)
)
)
.times(1);
EasyMock.replay(httpClient);
try (IndexTaskClient indexTaskClient = buildIndexTaskClient(httpClient, id -> TaskLocation.create(id, 8000, -1))) {
final IllegalArgumentException e = Assert.assertThrows(
IllegalArgumentException.class,
() -> indexTaskClient.submitRequestWithEmptyContent("taskId", HttpMethod.GET, "test", null, true)
);
Assert.assertEquals(
"Received server error with status [400 Bad Request]; first 1KB of body: Error",
e.getMessage()
);
}
EasyMock.verify(httpClient);
}
@Test
public void dontRetryIfRetryFalse()
{
final HttpClient httpClient = EasyMock.createMock(HttpClient.class);
EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(
Futures.immediateFuture(
Either.error(
new StringFullResponseHolder(
new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR),
StandardCharsets.UTF_8
).addChunk("Error")
)
)
)
.times(1);
EasyMock.replay(httpClient);
try (IndexTaskClient indexTaskClient = buildIndexTaskClient(httpClient, id -> TaskLocation.create(id, 8000, -1))) {
final IOException e = Assert.assertThrows(
IOException.class,
() -> indexTaskClient.submitRequestWithEmptyContent("taskId", HttpMethod.GET, "test", null, false)
);
Assert.assertEquals(
"Received server error with status [500 Internal Server Error]; first 1KB of body: Error",
e.getMessage()
);
}
EasyMock.verify(httpClient);
}
@Test
public void dontRetryIfNotFoundWithCorrectTaskId()
{
final String taskId = "taskId";
final HttpClient httpClient = EasyMock.createMock(HttpClient.class);
final DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND);
response.headers().add(ChatHandlerResource.TASK_ID_HEADER, taskId);
EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject()))
.andReturn(
Futures.immediateFuture(
Either.error(
new StringFullResponseHolder(response, StandardCharsets.UTF_8).addChunk("Error")
)
)
)
.times(1);
EasyMock.replay(httpClient);
try (IndexTaskClient indexTaskClient = buildIndexTaskClient(httpClient, id -> TaskLocation.create(id, 8000, -1))) {
final IOException e = Assert.assertThrows(
IOException.class,
() -> indexTaskClient.submitRequestWithEmptyContent(taskId, HttpMethod.GET, "test", null, false)
);
Assert.assertEquals(
"Received server error with status [404 Not Found]; first 1KB of body: Error",
e.getMessage()
);
}
EasyMock.verify(httpClient);
}
private IndexTaskClient buildIndexTaskClient(
HttpClient httpClient,
Function<String, TaskLocation> taskLocationProvider
)
{
final TaskInfoProvider taskInfoProvider = new TaskInfoProvider()
{

View File

@ -85,10 +85,10 @@ public class RemoteTaskActionClientTest
responseBody.put("result", expectedLocks);
String strResult = objectMapper.writeValueAsString(responseBody);
final HttpResponse response = EasyMock.createNiceMock(HttpResponse.class);
EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0));
EasyMock.replay(response);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
HttpResponseStatus.OK,
response,
StandardCharsets.UTF_8
).addChunk(strResult);
@ -120,10 +120,10 @@ public class RemoteTaskActionClientTest
// return status code 200 and a list with size equals 1
final HttpResponse response = EasyMock.createNiceMock(HttpResponse.class);
EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.BAD_REQUEST).anyTimes();
EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0));
EasyMock.replay(response);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
HttpResponseStatus.BAD_REQUEST,
response,
StandardCharsets.UTF_8
).addChunk("testSubmitWithIllegalStatusCode");

View File

@ -89,11 +89,11 @@ public class HttpIndexingServiceClientTest
};
HttpResponse response = EasyMock.createMock(HttpResponse.class);
EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0));
EasyMock.replay(response);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
HttpResponseStatus.OK,
response,
StandardCharsets.UTF_8
).addChunk(jsonMapper.writeValueAsString(samplerResponse));
@ -142,11 +142,11 @@ public class HttpIndexingServiceClientTest
}
};
HttpResponse response = EasyMock.createMock(HttpResponse.class);
EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.INTERNAL_SERVER_ERROR).anyTimes();
EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0));
EasyMock.replay(response);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
HttpResponseStatus.INTERNAL_SERVER_ERROR,
response,
StandardCharsets.UTF_8
).addChunk("");
@ -170,13 +170,13 @@ public class HttpIndexingServiceClientTest
{
String taskId = "testTaskId";
HttpResponse response = EasyMock.createMock(HttpResponse.class);
EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0));
EasyMock.replay(response);
Map<String, Object> dummyResponse = ImmutableMap.of("test", "value");
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
HttpResponseStatus.OK,
response,
StandardCharsets.UTF_8
).addChunk(jsonMapper.writeValueAsString(dummyResponse));
@ -209,11 +209,11 @@ public class HttpIndexingServiceClientTest
ChannelBuffer buf = ChannelBuffers.buffer(errorMsg.length());
buf.writeBytes(errorMsg.getBytes(StandardCharsets.UTF_8));
EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).anyTimes();
EasyMock.expect(response.getContent()).andReturn(buf);
EasyMock.replay(response);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
HttpResponseStatus.NOT_FOUND,
response,
StandardCharsets.UTF_8
).addChunk("");
@ -241,11 +241,11 @@ public class HttpIndexingServiceClientTest
{
String taskId = "testTaskId";
HttpResponse response = EasyMock.createMock(HttpResponse.class);
EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes();
EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0));
EasyMock.replay(response);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
HttpResponseStatus.OK,
response,
StandardCharsets.UTF_8
).addChunk("");

View File

@ -86,9 +86,10 @@ public class LookupReferencesManagerTest
);
}
private static HttpResponse newEmptyResponse()
private static HttpResponse newEmptyResponse(final HttpResponseStatus status)
{
final HttpResponse response = EasyMock.createNiceMock(HttpResponse.class);
EasyMock.expect(response.getStatus()).andReturn(status).anyTimes();
EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0));
EasyMock.replay(response);
return response;
@ -114,8 +115,7 @@ public class LookupReferencesManagerTest
))
.andReturn(request);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
HttpResponseStatus.OK,
newEmptyResponse(),
newEmptyResponse(HttpResponseStatus.OK),
StandardCharsets.UTF_8
).addChunk(strResult);
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@ -178,8 +178,7 @@ public class LookupReferencesManagerTest
))
.andReturn(request);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
HttpResponseStatus.OK,
newEmptyResponse(),
newEmptyResponse(HttpResponseStatus.OK),
StandardCharsets.UTF_8
).addChunk(strResult);
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@ -219,8 +218,7 @@ public class LookupReferencesManagerTest
))
.andReturn(request);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
HttpResponseStatus.OK,
newEmptyResponse(),
newEmptyResponse(HttpResponseStatus.OK),
StandardCharsets.UTF_8
).addChunk(strResult);
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@ -253,8 +251,7 @@ public class LookupReferencesManagerTest
))
.andReturn(request);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
HttpResponseStatus.OK,
newEmptyResponse(),
newEmptyResponse(HttpResponseStatus.OK),
StandardCharsets.UTF_8
).addChunk(strResult);
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@ -284,8 +281,7 @@ public class LookupReferencesManagerTest
))
.andReturn(request);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
HttpResponseStatus.OK,
newEmptyResponse(),
newEmptyResponse(HttpResponseStatus.OK),
StandardCharsets.UTF_8
).addChunk(strResult);
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@ -317,8 +313,7 @@ public class LookupReferencesManagerTest
))
.andReturn(request);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
HttpResponseStatus.OK,
newEmptyResponse(),
newEmptyResponse(HttpResponseStatus.OK),
StandardCharsets.UTF_8
).addChunk(strResult);
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@ -354,8 +349,7 @@ public class LookupReferencesManagerTest
))
.andReturn(request);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
HttpResponseStatus.OK,
newEmptyResponse(),
newEmptyResponse(HttpResponseStatus.OK),
StandardCharsets.UTF_8
).addChunk(strResult);
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@ -385,8 +379,7 @@ public class LookupReferencesManagerTest
))
.andReturn(request);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
HttpResponseStatus.OK,
newEmptyResponse(),
newEmptyResponse(HttpResponseStatus.OK),
StandardCharsets.UTF_8
).addChunk(strResult);
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@ -417,8 +410,7 @@ public class LookupReferencesManagerTest
druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true")
).andReturn(request);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
HttpResponseStatus.OK,
newEmptyResponse(),
newEmptyResponse(HttpResponseStatus.OK),
StandardCharsets.UTF_8
).addChunk(strResult);
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@ -479,8 +471,7 @@ public class LookupReferencesManagerTest
))
.andReturn(request);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
HttpResponseStatus.OK,
newEmptyResponse(),
newEmptyResponse(HttpResponseStatus.OK),
StandardCharsets.UTF_8
).addChunk(strResult);
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@ -524,8 +515,7 @@ public class LookupReferencesManagerTest
))
.andReturn(request);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
HttpResponseStatus.OK,
newEmptyResponse(),
newEmptyResponse(HttpResponseStatus.OK),
StandardCharsets.UTF_8
).addChunk(strResult);
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@ -613,8 +603,7 @@ public class LookupReferencesManagerTest
))
.andReturn(request);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
HttpResponseStatus.OK,
newEmptyResponse(),
newEmptyResponse(HttpResponseStatus.OK),
StandardCharsets.UTF_8
).addChunk(strResult);
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);
@ -725,8 +714,7 @@ public class LookupReferencesManagerTest
))
.andReturn(request);
StringFullResponseHolder responseHolder = new StringFullResponseHolder(
HttpResponseStatus.OK,
newEmptyResponse(),
newEmptyResponse(HttpResponseStatus.OK),
StandardCharsets.UTF_8
).addChunk(strResult);
EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder);

View File

@ -1726,7 +1726,6 @@ public class CompactSegmentsTest
{
final HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
final StringFullResponseHolder holder = new StringFullResponseHolder(
HttpResponseStatus.OK,
httpResponse,
StandardCharsets.UTF_8
);

View File

@ -1108,7 +1108,7 @@ public class SystemSchemaTest extends CalciteTestBase
HttpResponse httpResp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
InputStreamFullResponseHolder responseHolder = new InputStreamFullResponseHolder(httpResp.getStatus(), httpResp);
InputStreamFullResponseHolder responseHolder = new InputStreamFullResponseHolder(httpResp);
EasyMock.expect(client.go(EasyMock.eq(request), EasyMock.anyObject(InputStreamFullResponseHandler.class))).andReturn(responseHolder).once();
EasyMock.expect(request.getUrl()).andReturn(new URL("http://test-host:1234/druid/indexer/v1/tasks")).anyTimes();
@ -1282,7 +1282,7 @@ public class SystemSchemaTest extends CalciteTestBase
.anyTimes();
HttpResponse httpResp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
InputStreamFullResponseHolder responseHolder = new InputStreamFullResponseHolder(httpResp.getStatus(), httpResp);
InputStreamFullResponseHolder responseHolder = new InputStreamFullResponseHolder(httpResp);
EasyMock.expect(client.go(EasyMock.eq(request), EasyMock.anyObject(InputStreamFullResponseHandler.class))).andReturn(responseHolder).once();
@ -1397,8 +1397,7 @@ public class SystemSchemaTest extends CalciteTestBase
String json
)
{
InputStreamFullResponseHolder responseHolder =
new InputStreamFullResponseHolder(httpResponse.getStatus(), httpResponse);
InputStreamFullResponseHolder responseHolder = new InputStreamFullResponseHolder(httpResponse);
byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8);
responseHolder.addChunk(bytesToWrite);