From e0e05aad995d82d40fb54b960120d6c7d1050932 Mon Sep 17 00:00:00 2001 From: Gian Merlino Date: Fri, 3 Dec 2021 09:14:32 -0800 Subject: [PATCH] Enhancements to IndexTaskClient. (#12011) * Enhancements to IndexTaskClient. 1) Ability to use handlers other than StringFullResponseHandler. This functionality is not used in production code yet, but is useful because it will allow tasks to communicate with each other in non-string-based formats and in streaming fashion. In the future, we'll be able to use this to make task-to-task communication more efficient. 2) Truncate server errors at 1KB, so long errors do not pollute logs. 3) Change error log level for retryable errors from WARN to INFO. (The final error is still WARN.) 4) Harmonize log and exception messages to have a more consistent format. * Additional tests and improvements. --- .../apache/druid/java/util/common/Either.java | 146 ++++++++++++ .../response/BytesFullResponseHandler.java | 2 +- .../response/BytesFullResponseHolder.java | 5 +- .../client/response/FullResponseHolder.java | 6 +- .../InputStreamFullResponseHandler.java | 2 +- .../InputStreamFullResponseHolder.java | 8 +- .../ObjectOrErrorResponseHandler.java | 180 +++++++++++++++ .../response/StringFullResponseHandler.java | 2 +- .../response/StringFullResponseHolder.java | 4 +- .../org/apache/druid/common/EitherTest.java | 111 ++++++++++ .../ObjectOrErrorResponseHandlerTest.java | 121 ++++++++++ .../kafka/KafkaIndexTaskClientTest.java | 165 +++++++------- .../kinesis/KinesisIndexTaskClientTest.java | 165 +++++++------- .../indexing/common/IndexTaskClient.java | 86 ++++++-- .../indexing/common/IndexTaskClientTest.java | 207 +++++++++++++++++- .../actions/RemoteTaskActionClientTest.java | 4 +- .../HttpIndexingServiceClientTest.java | 10 +- .../lookup/LookupReferencesManagerTest.java | 42 ++-- .../coordinator/duty/CompactSegmentsTest.java | 1 - .../sql/calcite/schema/SystemSchemaTest.java | 7 +- 20 files changed, 1030 insertions(+), 244 deletions(-) create mode 100644 core/src/main/java/org/apache/druid/java/util/common/Either.java create mode 100644 core/src/main/java/org/apache/druid/java/util/http/client/response/ObjectOrErrorResponseHandler.java create mode 100644 core/src/test/java/org/apache/druid/common/EitherTest.java create mode 100644 core/src/test/java/org/apache/druid/java/util/http/client/response/ObjectOrErrorResponseHandlerTest.java diff --git a/core/src/main/java/org/apache/druid/java/util/common/Either.java b/core/src/main/java/org/apache/druid/java/util/common/Either.java new file mode 100644 index 00000000000..85fc6252972 --- /dev/null +++ b/core/src/main/java/org/apache/druid/java/util/common/Either.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.common; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; + +import javax.annotation.Nullable; +import java.util.Objects; +import java.util.function.Function; + +/** + * Encapsulates either an "error" or a "value". + * + * Similar to the Either class in Scala or Haskell, except the possibilities are named "error" and "value" instead of + * "left" and "right". + */ +public class Either +{ + private final L error; + private final R value; + + private Either(L error, R value) + { + this.error = error; + this.value = value; + } + + public static Either error(final L error) + { + return new Either<>(Preconditions.checkNotNull(error, "error"), null); + } + + public static Either value(@Nullable final R value) + { + return new Either<>(null, value); + } + + public boolean isValue() + { + return error == null; + } + + public boolean isError() + { + return error != null; + } + + public L error() + { + if (isError()) { + return error; + } else { + throw new IllegalStateException("Not an error; check isError first"); + } + } + + /** + * If this Either represents a value, returns it. If this Either represents an error, throw an error. + * + * If the error is a {@link RuntimeException} or {@link Error}, it is thrown directly. If it is some other + * {@link Throwable}, it is wrapped in a RuntimeException and thrown. If it is not a throwable at all, a generic + * error is thrown containing the string representation of the error object. + * + * If you want to be able to retrieve the error as-is, use {@link #isError()} and {@link #error()} instead. + */ + @Nullable + public R valueOrThrow() + { + if (isValue()) { + return value; + } else if (error instanceof Throwable) { + Throwables.propagateIfPossible((Throwable) error); + throw new RuntimeException((Throwable) error); + } else { + throw new RuntimeException(error.toString()); + } + } + + /** + * Applies a function to this value, if present. + * + * If the mapping function throws an exception, it is thrown by this method instead of being packed up into + * the returned Either. + * + * If this Either represents an error, the mapping function is not applied. + * + * @throws NullPointerException if the mapping function returns null + */ + public Either map(final Function fn) + { + if (isValue()) { + return Either.value(fn.apply(value)); + } else { + // Safe because the value is never going to be returned. + //noinspection unchecked + return (Either) this; + } + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Either either = (Either) o; + return Objects.equals(error, either.error) && Objects.equals(value, either.value); + } + + @Override + public int hashCode() + { + return Objects.hash(error, value); + } + + @Override + public String toString() + { + if (isValue()) { + return "Value[" + value + "]"; + } else { + return "Error[" + error + "]"; + } + } +} diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHandler.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHandler.java index 2e9c404e29d..665f496c72f 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHandler.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHandler.java @@ -32,7 +32,7 @@ public class BytesFullResponseHandler implements HttpResponseHandler handleResponse(HttpResponse response, TrafficCop trafficCop) { - BytesFullResponseHolder holder = new BytesFullResponseHolder(response.getStatus(), response); + BytesFullResponseHolder holder = new BytesFullResponseHolder(response); holder.addChunk(getContentBytes(response.getContent())); diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHolder.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHolder.java index 8f6f233ba9a..b4280310c02 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHolder.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/BytesFullResponseHolder.java @@ -20,7 +20,6 @@ package org.apache.druid.java.util.http.client.response; import org.jboss.netty.handler.codec.http.HttpResponse; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -30,9 +29,9 @@ public class BytesFullResponseHolder extends FullResponseHolder { private final List chunks; - public BytesFullResponseHolder(HttpResponseStatus status, HttpResponse response) + public BytesFullResponseHolder(HttpResponse response) { - super(status, response); + super(response); this.chunks = new ArrayList<>(); } diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/FullResponseHolder.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/FullResponseHolder.java index fbbab874ffc..27c2ed25123 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/response/FullResponseHolder.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/FullResponseHolder.java @@ -29,18 +29,16 @@ import org.jboss.netty.handler.codec.http.HttpResponseStatus; */ public abstract class FullResponseHolder { - private final HttpResponseStatus status; private final HttpResponse response; - public FullResponseHolder(HttpResponseStatus status, HttpResponse response) + public FullResponseHolder(HttpResponse response) { - this.status = status; this.response = response; } public HttpResponseStatus getStatus() { - return status; + return response.getStatus(); } public HttpResponse getResponse() diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandler.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandler.java index 01a69a80f6c..71f1a852181 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandler.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHandler.java @@ -32,7 +32,7 @@ public class InputStreamFullResponseHandler implements HttpResponseHandler handleResponse(HttpResponse response, TrafficCop trafficCop) { - InputStreamFullResponseHolder holder = new InputStreamFullResponseHolder(response.getStatus(), response); + InputStreamFullResponseHolder holder = new InputStreamFullResponseHolder(response); holder.addChunk(getContentBytes(response.getContent())); return ClientResponse.finished(holder); } diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHolder.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHolder.java index fbabe63a754..266090868a2 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHolder.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/InputStreamFullResponseHolder.java @@ -21,7 +21,6 @@ package org.apache.druid.java.util.http.client.response; import org.apache.druid.java.util.http.client.io.AppendableByteArrayInputStream; import org.jboss.netty.handler.codec.http.HttpResponse; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; import java.io.InputStream; @@ -29,12 +28,9 @@ public class InputStreamFullResponseHolder extends FullResponseHolder + implements HttpResponseHandler, Either> +{ + private final HttpResponseHandler okHandler; + private final StringFullResponseHandler errorHandler; + + public ObjectOrErrorResponseHandler(HttpResponseHandler okHandler) + { + this.okHandler = okHandler; + this.errorHandler = new StringFullResponseHandler(StandardCharsets.UTF_8); + } + + @Override + public ClientResponse> handleResponse( + final HttpResponse response, + final TrafficCop trafficCop + ) + { + if (response.getStatus().getCode() / 100 == 2) { + final ClientResponse delegateResponse = okHandler.handleResponse(response, trafficCop); + + return new ClientResponse<>( + delegateResponse.isFinished(), + delegateResponse.isContinueReading(), + Either.value(delegateResponse.getObj()) + ); + } else { + final ClientResponse delegateResponse = + errorHandler.handleResponse(response, trafficCop); + + return new ClientResponse<>( + delegateResponse.isFinished(), + delegateResponse.isContinueReading(), + Either.error(delegateResponse.getObj()) + ); + } + } + + @Override + public ClientResponse> handleChunk( + final ClientResponse> clientResponse, + final HttpChunk chunk, + final long chunkNum + ) + { + final Either prevHolder = clientResponse.getObj(); + + if (prevHolder.isValue()) { + final ClientResponse delegateResponse = okHandler.handleChunk( + new ClientResponse<>( + clientResponse.isFinished(), + clientResponse.isContinueReading(), + prevHolder.valueOrThrow() + ), + chunk, + chunkNum + ); + + return new ClientResponse<>( + delegateResponse.isFinished(), + delegateResponse.isContinueReading(), + Either.value(delegateResponse.getObj()) + ); + } else { + final ClientResponse delegateResponse = errorHandler.handleChunk( + new ClientResponse<>( + clientResponse.isFinished(), + clientResponse.isContinueReading(), + prevHolder.error() + ), + chunk, + chunkNum + ); + + return new ClientResponse<>( + delegateResponse.isFinished(), + delegateResponse.isContinueReading(), + Either.error(delegateResponse.getObj()) + ); + } + } + + @Override + public ClientResponse> done( + final ClientResponse> clientResponse + ) + { + final Either prevHolder = clientResponse.getObj(); + + if (prevHolder.isValue()) { + final ClientResponse delegateResponse = okHandler.done( + new ClientResponse<>( + clientResponse.isFinished(), + clientResponse.isContinueReading(), + prevHolder.valueOrThrow() + ) + ); + + return new ClientResponse<>( + delegateResponse.isFinished(), + delegateResponse.isContinueReading(), + Either.value(delegateResponse.getObj()) + ); + } else { + final ClientResponse delegateResponse = errorHandler.done( + new ClientResponse<>( + clientResponse.isFinished(), + clientResponse.isContinueReading(), + prevHolder.error() + ) + ); + + return new ClientResponse<>( + delegateResponse.isFinished(), + delegateResponse.isContinueReading(), + Either.error(delegateResponse.getObj()) + ); + } + } + + @Override + public void exceptionCaught( + final ClientResponse> clientResponse, + final Throwable e + ) + { + final Either prevHolder = clientResponse.getObj(); + + if (prevHolder.isValue()) { + okHandler.exceptionCaught( + new ClientResponse<>( + clientResponse.isFinished(), + clientResponse.isContinueReading(), + prevHolder.valueOrThrow() + ), + e + ); + } else { + errorHandler.exceptionCaught( + new ClientResponse<>( + clientResponse.isFinished(), + clientResponse.isContinueReading(), + prevHolder.error() + ), + e + ); + } + } +} diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHandler.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHandler.java index f4176e83236..57af8e11796 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHandler.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHandler.java @@ -41,7 +41,7 @@ public class StringFullResponseHandler @Override public ClientResponse handleResponse(HttpResponse response, TrafficCop trafficCop) { - return ClientResponse.unfinished(new StringFullResponseHolder(response.getStatus(), response, charset)); + return ClientResponse.unfinished(new StringFullResponseHolder(response, charset)); } @Override diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHolder.java b/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHolder.java index 3fe2e081b5e..457c6e2f3c2 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHolder.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/response/StringFullResponseHolder.java @@ -20,7 +20,6 @@ package org.apache.druid.java.util.http.client.response; import org.jboss.netty.handler.codec.http.HttpResponse; -import org.jboss.netty.handler.codec.http.HttpResponseStatus; import java.nio.charset.Charset; @@ -29,12 +28,11 @@ public class StringFullResponseHolder extends FullResponseHolder private final StringBuilder builder; public StringFullResponseHolder( - HttpResponseStatus status, HttpResponse response, Charset charset ) { - super(status, response); + super(response); this.builder = new StringBuilder(response.getContent().toString(charset)); } diff --git a/core/src/test/java/org/apache/druid/common/EitherTest.java b/core/src/test/java/org/apache/druid/common/EitherTest.java new file mode 100644 index 00000000000..a91908e5f95 --- /dev/null +++ b/core/src/test/java/org/apache/druid/common/EitherTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.common; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.apache.druid.java.util.common.Either; +import org.apache.druid.java.util.common.StringUtils; +import org.hamcrest.CoreMatchers; +import org.hamcrest.MatcherAssert; +import org.junit.Assert; +import org.junit.Test; + +public class EitherTest +{ + @Test + public void testValueString() + { + final Either either = Either.value("yay"); + + Assert.assertFalse(either.isError()); + Assert.assertTrue(either.isValue()); + Assert.assertEquals("yay", either.valueOrThrow()); + + final IllegalStateException e = Assert.assertThrows(IllegalStateException.class, either::error); + MatcherAssert.assertThat(e.getMessage(), CoreMatchers.startsWith("Not an error")); + + // Test toString. + Assert.assertEquals("Value[yay]", either.toString()); + + // Test map. + Assert.assertEquals(Either.value("YAY"), either.map(StringUtils::toUpperCase)); + } + + @Test + public void testValueNull() + { + final Either either = Either.value(null); + + Assert.assertFalse(either.isError()); + Assert.assertTrue(either.isValue()); + Assert.assertNull(either.valueOrThrow()); + + final IllegalStateException e = Assert.assertThrows(IllegalStateException.class, either::error); + MatcherAssert.assertThat(e.getMessage(), CoreMatchers.startsWith("Not an error")); + + // Test toString. + Assert.assertEquals("Value[null]", either.toString()); + + // Test map. + Assert.assertEquals(Either.value("nullxyz"), either.map(s -> s + "xyz")); + } + + @Test + public void testErrorString() + { + final Either either = Either.error("oh no"); + + Assert.assertTrue(either.isError()); + Assert.assertFalse(either.isValue()); + Assert.assertEquals("oh no", either.error()); + + final RuntimeException e = Assert.assertThrows(RuntimeException.class, either::valueOrThrow); + MatcherAssert.assertThat(e.getMessage(), CoreMatchers.equalTo("oh no")); + + // Test toString. + Assert.assertEquals("Error[oh no]", either.toString()); + + // Test map. + Assert.assertEquals(either, either.map(o -> "this does nothing because the Either is an error")); + } + + @Test + public void testErrorThrowable() + { + final Either either = Either.error(new AssertionError("oh no")); + + Assert.assertTrue(either.isError()); + Assert.assertFalse(either.isValue()); + MatcherAssert.assertThat(either.error(), CoreMatchers.instanceOf(AssertionError.class)); + MatcherAssert.assertThat(either.error().getMessage(), CoreMatchers.equalTo("oh no")); + + final AssertionError e = Assert.assertThrows(AssertionError.class, either::valueOrThrow); + MatcherAssert.assertThat(e.getMessage(), CoreMatchers.equalTo("oh no")); + + // Test toString. + Assert.assertEquals("Error[java.lang.AssertionError: oh no]", either.toString()); + } + + @Test + public void testEqualsAndHashCode() + { + EqualsVerifier.forClass(Either.class).usingGetClass().verify(); + } +} diff --git a/core/src/test/java/org/apache/druid/java/util/http/client/response/ObjectOrErrorResponseHandlerTest.java b/core/src/test/java/org/apache/druid/java/util/http/client/response/ObjectOrErrorResponseHandlerTest.java new file mode 100644 index 00000000000..4e2247cf935 --- /dev/null +++ b/core/src/test/java/org/apache/druid/java/util/http/client/response/ObjectOrErrorResponseHandlerTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.java.util.http.client.response; + +import org.apache.commons.io.IOUtils; +import org.apache.druid.java.util.common.Either; +import org.apache.druid.java.util.common.StringUtils; +import org.jboss.netty.buffer.BigEndianHeapChannelBuffer; +import org.jboss.netty.handler.codec.http.DefaultHttpChunk; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.handler.codec.http.HttpVersion; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; + +public class ObjectOrErrorResponseHandlerTest +{ + @Test + public void testOk() throws Exception + { + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.setChunked(false); + response.setContent(new BigEndianHeapChannelBuffer("abcd".getBytes(StringUtils.UTF8_STRING))); + + final ObjectOrErrorResponseHandler responseHandler = + new ObjectOrErrorResponseHandler<>(new InputStreamFullResponseHandler()); + + ClientResponse> clientResp = + responseHandler.handleResponse(response, null); + + DefaultHttpChunk chunk = + new DefaultHttpChunk(new BigEndianHeapChannelBuffer("efg".getBytes(StringUtils.UTF8_STRING))); + clientResp = responseHandler.handleChunk(clientResp, chunk, 0); + clientResp = responseHandler.done(clientResp); + + Assert.assertTrue(clientResp.isFinished()); + Assert.assertEquals( + "abcdefg", + IOUtils.toString(clientResp.getObj().valueOrThrow().getContent(), StandardCharsets.UTF_8) + ); + } + + @Test + public void testExceptionAfterOk() throws Exception + { + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + response.setChunked(false); + response.setContent(new BigEndianHeapChannelBuffer("abcd".getBytes(StringUtils.UTF8_STRING))); + + final ObjectOrErrorResponseHandler responseHandler = + new ObjectOrErrorResponseHandler<>(new InputStreamFullResponseHandler()); + + ClientResponse> clientResp = + responseHandler.handleResponse(response, null); + + Exception ex = new RuntimeException("dummy!"); + responseHandler.exceptionCaught(clientResp, ex); + + // Exception after HTTP OK still is handled by the "OK handler" + // (The handler that starts the request gets to finish it.) + Assert.assertTrue(clientResp.isFinished()); + Assert.assertTrue(clientResp.getObj().isValue()); + + final InputStream responseStream = clientResp.getObj().valueOrThrow().getContent(); + final IOException e = Assert.assertThrows( + IOException.class, + () -> IOUtils.toString(responseStream, StandardCharsets.UTF_8) + ); + Assert.assertEquals("java.lang.RuntimeException: dummy!", e.getMessage()); + } + + @Test + public void testServerError() throws Exception + { + HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR); + response.setChunked(false); + response.setContent(new BigEndianHeapChannelBuffer("abcd".getBytes(StringUtils.UTF8_STRING))); + + final ObjectOrErrorResponseHandler responseHandler = + new ObjectOrErrorResponseHandler<>(new InputStreamFullResponseHandler()); + + ClientResponse> clientResp = + responseHandler.handleResponse(response, null); + + DefaultHttpChunk chunk = + new DefaultHttpChunk(new BigEndianHeapChannelBuffer("efg".getBytes(StringUtils.UTF8_STRING))); + clientResp = responseHandler.handleChunk(clientResp, chunk, 0); + clientResp = responseHandler.done(clientResp); + + // 5xx HTTP code is handled by the error handler. + Assert.assertTrue(clientResp.isFinished()); + Assert.assertTrue(clientResp.getObj().isError()); + Assert.assertEquals( + HttpResponseStatus.INTERNAL_SERVER_ERROR.getCode(), + clientResp.getObj().error().getResponse().getStatus().getCode() + ); + Assert.assertEquals("abcdefg", clientResp.getObj().error().getContent()); + } +} diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java index db26a9bd5ac..3646b4ad1ec 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskClientTest.java @@ -33,16 +33,17 @@ import org.apache.druid.indexing.common.TaskInfoProvider; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.Either; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; -import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; +import org.apache.druid.java.util.http.client.response.ObjectOrErrorResponseHandler; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.easymock.Capture; import org.easymock.CaptureType; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; +import org.hamcrest.CoreMatchers; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponse; @@ -182,18 +183,19 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport public void testInternalServerError() { expectedException.expect(RuntimeException.class); - expectedException.expectMessage("org.apache.druid.java.util.common.IOE: Received status [500] and content []"); + expectedException.expectCause(CoreMatchers.instanceOf(IOException.class)); + expectedException.expectMessage("Received server error with status [500 Internal Server Error]"); EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.INTERNAL_SERVER_ERROR).times(2); EasyMock.expect(responseHolder.getContent()).andReturn(""); EasyMock.expect( httpClient.go( EasyMock.anyObject(Request.class), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) ) ).andReturn( - Futures.immediateFuture(responseHolder) + errorResponseHolder() ); replayAll(); @@ -204,19 +206,19 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport @Test public void testBadRequest() { - expectedException.expect(IAE.class); - expectedException.expectMessage("Received 400 Bad Request with body:"); + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Received server error with status [400 Bad Request]"); EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.BAD_REQUEST).times(2); EasyMock.expect(responseHolder.getContent()).andReturn(""); EasyMock.expect( httpClient.go( EasyMock.anyObject(Request.class), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) ) ).andReturn( - Futures.immediateFuture(responseHolder) + errorResponseHolder() ); replayAll(); @@ -227,22 +229,22 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport @Test public void testTaskLocationMismatch() { - EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(3) - .andReturn(HttpResponseStatus.OK); + EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(2); EasyMock.expect(responseHolder.getResponse()).andReturn(response); - EasyMock.expect(responseHolder.getContent()).andReturn("").times(2) - .andReturn("{}"); + EasyMock.expect(responseHolder.getContent()).andReturn("").andReturn("{}"); EasyMock.expect(response.headers()).andReturn(headers); EasyMock.expect(headers.get("X-Druid-Task-Id")).andReturn("a-different-task-id"); EasyMock.expect( httpClient.go( EasyMock.anyObject(Request.class), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) ) ).andReturn( - Futures.immediateFuture(responseHolder) - ).times(2); + errorResponseHolder() + ).andReturn( + okResponseHolder() + ); replayAll(); Map results = client.getCurrentOffsets(TEST_ID, true); @@ -255,14 +257,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport public void testGetCurrentOffsets() throws Exception { Capture captured = Capture.newInstance(); - EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK); EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}"); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); replayAll(); @@ -288,9 +289,8 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport client = new TestableKafkaIndexTaskClient(httpClient, OBJECT_MAPPER, taskInfoProvider, 3); Capture captured = Capture.newInstance(CaptureType.ALL); - EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(6) - .andReturn(HttpResponseStatus.OK).times(1); - EasyMock.expect(responseHolder.getContent()).andReturn("").times(4) + EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(4); + EasyMock.expect(responseHolder.getContent()).andReturn("").times(2) .andReturn("{\"0\":1, \"1\":10}"); EasyMock.expect(responseHolder.getResponse()).andReturn(response).times(2); EasyMock.expect(response.headers()).andReturn(headers).times(2); @@ -298,11 +298,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) - ).times(3); + errorResponseHolder() + ).times(2).andReturn( + okResponseHolder() + ); replayAll(); @@ -328,7 +330,8 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport public void testGetCurrentOffsetsWithExhaustedRetries() { expectedException.expect(RuntimeException.class); - expectedException.expectMessage("org.apache.druid.java.util.common.IOE: Received status [404]"); + expectedException.expectCause(CoreMatchers.instanceOf(IOException.class)); + expectedException.expectMessage("Received server error with status [404 Not Found]"); client = new TestableKafkaIndexTaskClient(httpClient, OBJECT_MAPPER, taskInfoProvider, 2); @@ -341,10 +344,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport EasyMock.expect( httpClient.go( EasyMock.anyObject(Request.class), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) ) - ).andReturn(Futures.immediateFuture(responseHolder)).anyTimes(); + ).andReturn(errorResponseHolder()).anyTimes(); replayAll(); client.getCurrentOffsets(TEST_ID, true); @@ -355,14 +358,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport public void testGetEndOffsets() throws Exception { Capture captured = Capture.newInstance(); - EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK); EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}"); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); replayAll(); @@ -389,19 +391,16 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport DateTime now = DateTimes.nowUtc(); Capture captured = Capture.newInstance(); - EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(3) - .andReturn(HttpResponseStatus.OK); + EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(2); EasyMock.expect(responseHolder.getResponse()).andReturn(response); EasyMock.expect(response.headers()).andReturn(headers); EasyMock.expect(headers.get("X-Druid-Task-Id")).andReturn(null); EasyMock.expect(responseHolder.getContent()).andReturn(String.valueOf(now.getMillis())).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) - )).andReturn( - Futures.immediateFuture(responseHolder) - ).times(2); + )).andReturn(errorResponseHolder()).once().andReturn(okResponseHolder()); replayAll(); DateTime results = client.getStartTime(TEST_ID); @@ -424,14 +423,13 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport Status status = Status.READING; Capture captured = Capture.newInstance(); - EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK); EasyMock.expect(responseHolder.getContent()).andReturn(StringUtils.format("\"%s\"", status.toString())).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); replayAll(); @@ -453,14 +451,14 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport public void testPause() throws Exception { Capture captured = Capture.newInstance(); - EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).times(2); + EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK); EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}").anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); replayAll(); @@ -487,30 +485,29 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport Capture captured2 = Capture.newInstance(); Capture captured3 = Capture.newInstance(); // one time in IndexTaskClient.submitRequest() and another in KafkaIndexTaskClient.pause() - EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.ACCEPTED).times(2) - .andReturn(HttpResponseStatus.OK).anyTimes(); + EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.ACCEPTED); EasyMock.expect(responseHolder.getContent()).andReturn("\"PAUSED\"").times(2) .andReturn("{\"0\":1, \"1\":10}").anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); EasyMock.expect(httpClient.go( EasyMock.capture(captured2), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); EasyMock.expect(httpClient.go( EasyMock.capture(captured3), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); replayAll(); @@ -552,10 +549,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); replayAll(); @@ -580,10 +577,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); replayAll(); @@ -609,10 +606,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); replayAll(); @@ -636,10 +633,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); replayAll(); @@ -662,10 +659,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); replayAll(); @@ -689,10 +686,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ).times(numRequests); replayAll(); @@ -725,10 +722,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ).times(numRequests); replayAll(); @@ -762,10 +759,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ).times(numRequests); replayAll(); @@ -799,10 +796,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getContent()).andReturn("\"READING\"").anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ).times(numRequests); replayAll(); @@ -837,10 +834,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getContent()).andReturn(String.valueOf(now.getMillis())).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ).times(numRequests); replayAll(); @@ -874,10 +871,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ).times(numRequests); replayAll(); @@ -911,10 +908,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ).times(numRequests); replayAll(); @@ -948,10 +945,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ).times(numRequests); replayAll(); @@ -991,10 +988,10 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ).times(numRequests); replayAll(); @@ -1029,6 +1026,16 @@ public class KafkaIndexTaskClientTest extends EasyMockSupport } } + private ListenableFuture okResponseHolder() + { + return Futures.immediateFuture(Either.value(responseHolder)); + } + + private ListenableFuture errorResponseHolder() + { + return Futures.immediateFuture(Either.error(responseHolder)); + } + private class TestableKafkaIndexTaskClient extends KafkaIndexTaskClient { TestableKafkaIndexTaskClient( diff --git a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientTest.java b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientTest.java index 7be086ecce2..1463b7a7e32 100644 --- a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientTest.java +++ b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskClientTest.java @@ -33,16 +33,17 @@ import org.apache.druid.indexing.common.TaskInfoProvider; import org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.Status; import org.apache.druid.jackson.DefaultObjectMapper; import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.Either; import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; -import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; +import org.apache.druid.java.util.http.client.response.ObjectOrErrorResponseHandler; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.easymock.Capture; import org.easymock.CaptureType; import org.easymock.EasyMock; import org.easymock.EasyMockSupport; +import org.hamcrest.CoreMatchers; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpResponse; @@ -183,18 +184,19 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport public void testInternalServerError() { expectedException.expect(RuntimeException.class); - expectedException.expectMessage("org.apache.druid.java.util.common.IOE: Received status [500] and content []"); + expectedException.expectCause(CoreMatchers.instanceOf(IOException.class)); + expectedException.expectMessage("Received server error with status [500 Internal Server Error]"); EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.INTERNAL_SERVER_ERROR).times(2); EasyMock.expect(responseHolder.getContent()).andReturn(""); EasyMock.expect( httpClient.go( EasyMock.anyObject(Request.class), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) ) ).andReturn( - Futures.immediateFuture(responseHolder) + errorResponseHolder() ); replayAll(); @@ -205,19 +207,19 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport @Test public void testBadRequest() { - expectedException.expect(IAE.class); - expectedException.expectMessage("Received 400 Bad Request with body:"); + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("Received server error with status [400 Bad Request]"); EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.BAD_REQUEST).times(2); EasyMock.expect(responseHolder.getContent()).andReturn(""); EasyMock.expect( httpClient.go( EasyMock.anyObject(Request.class), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) ) ).andReturn( - Futures.immediateFuture(responseHolder) + errorResponseHolder() ); replayAll(); @@ -228,22 +230,22 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport @Test public void testTaskLocationMismatch() { - EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(3) - .andReturn(HttpResponseStatus.OK); + EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(2); EasyMock.expect(responseHolder.getResponse()).andReturn(response); - EasyMock.expect(responseHolder.getContent()).andReturn("").times(2) - .andReturn("{}"); + EasyMock.expect(responseHolder.getContent()).andReturn("").andReturn("{}"); EasyMock.expect(response.headers()).andReturn(headers); EasyMock.expect(headers.get("X-Druid-Task-Id")).andReturn("a-different-task-id"); EasyMock.expect( httpClient.go( EasyMock.anyObject(Request.class), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) ) ).andReturn( - Futures.immediateFuture(responseHolder) - ).times(2); + errorResponseHolder() + ).andReturn( + okResponseHolder() + ); replayAll(); Map results = client.getCurrentOffsets(TEST_ID, true); @@ -256,14 +258,13 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport public void testGetCurrentOffsets() throws Exception { Capture captured = Capture.newInstance(); - EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK); EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}"); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); replayAll(); @@ -289,9 +290,8 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport client = new TestableKinesisIndexTaskClient(httpClient, OBJECT_MAPPER, taskInfoProvider, 3); Capture captured = Capture.newInstance(CaptureType.ALL); - EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(6) - .andReturn(HttpResponseStatus.OK).times(1); - EasyMock.expect(responseHolder.getContent()).andReturn("").times(4) + EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(4); + EasyMock.expect(responseHolder.getContent()).andReturn("").times(2) .andReturn("{\"0\":1, \"1\":10}"); EasyMock.expect(responseHolder.getResponse()).andReturn(response).times(2); EasyMock.expect(response.headers()).andReturn(headers).times(2); @@ -299,11 +299,13 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) - ).times(3); + errorResponseHolder() + ).times(2).andReturn( + okResponseHolder() + ); replayAll(); @@ -329,7 +331,8 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport public void testGetCurrentOffsetsWithExhaustedRetries() { expectedException.expect(RuntimeException.class); - expectedException.expectMessage("org.apache.druid.java.util.common.IOE: Received status [404]"); + expectedException.expectCause(CoreMatchers.instanceOf(IOException.class)); + expectedException.expectMessage("Received server error with status [404 Not Found]"); client = new TestableKinesisIndexTaskClient(httpClient, OBJECT_MAPPER, taskInfoProvider, 2); @@ -342,10 +345,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport EasyMock.expect( httpClient.go( EasyMock.anyObject(Request.class), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) ) - ).andReturn(Futures.immediateFuture(responseHolder)).anyTimes(); + ).andReturn(errorResponseHolder()).anyTimes(); replayAll(); client.getCurrentOffsets(TEST_ID, true); @@ -356,14 +359,13 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport public void testGetEndOffsets() throws Exception { Capture captured = Capture.newInstance(); - EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK); EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}"); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); replayAll(); @@ -390,19 +392,16 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport DateTime now = DateTimes.nowUtc(); Capture captured = Capture.newInstance(); - EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(3) - .andReturn(HttpResponseStatus.OK); + EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).times(2); EasyMock.expect(responseHolder.getResponse()).andReturn(response); EasyMock.expect(response.headers()).andReturn(headers); EasyMock.expect(headers.get("X-Druid-Task-Id")).andReturn(null); EasyMock.expect(responseHolder.getContent()).andReturn(String.valueOf(now.getMillis())).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) - )).andReturn( - Futures.immediateFuture(responseHolder) - ).times(2); + )).andReturn(errorResponseHolder()).once().andReturn(okResponseHolder()); replayAll(); DateTime results = client.getStartTime(TEST_ID); @@ -425,14 +424,13 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport Status status = Status.READING; Capture captured = Capture.newInstance(); - EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK); EasyMock.expect(responseHolder.getContent()).andReturn(StringUtils.format("\"%s\"", status.toString())).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); replayAll(); @@ -454,14 +452,14 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport public void testPause() throws Exception { Capture captured = Capture.newInstance(); - EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).times(2); + EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK); EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":1, \"1\":10}").anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); replayAll(); @@ -487,30 +485,29 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport Capture captured = Capture.newInstance(); Capture captured2 = Capture.newInstance(); Capture captured3 = Capture.newInstance(); - EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.ACCEPTED).times(2) - .andReturn(HttpResponseStatus.OK).anyTimes(); + EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.ACCEPTED); EasyMock.expect(responseHolder.getContent()).andReturn("\"PAUSED\"").times(2) .andReturn("{\"0\":1, \"1\":10}").anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); EasyMock.expect(httpClient.go( EasyMock.capture(captured2), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); EasyMock.expect(httpClient.go( EasyMock.capture(captured3), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); replayAll(); @@ -552,10 +549,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); replayAll(); @@ -580,10 +577,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); replayAll(); @@ -609,10 +606,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); replayAll(); @@ -636,10 +633,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); replayAll(); @@ -662,10 +659,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ); replayAll(); @@ -689,10 +686,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ).times(numRequests); replayAll(); @@ -725,10 +722,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ).times(numRequests); replayAll(); @@ -762,10 +759,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ).times(numRequests); replayAll(); @@ -799,10 +796,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getContent()).andReturn("\"READING\"").anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ).times(numRequests); replayAll(); @@ -837,10 +834,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getContent()).andReturn(String.valueOf(now.getMillis())).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ).times(numRequests); replayAll(); @@ -874,10 +871,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ).times(numRequests); replayAll(); @@ -911,10 +908,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getContent()).andReturn("{\"0\":\"1\"}").anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ).times(numRequests); replayAll(); @@ -948,10 +945,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ).times(numRequests); replayAll(); @@ -991,10 +988,10 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport EasyMock.expect(responseHolder.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(httpClient.go( EasyMock.capture(captured), - EasyMock.anyObject(StringFullResponseHandler.class), + EasyMock.anyObject(ObjectOrErrorResponseHandler.class), EasyMock.eq(TEST_HTTP_TIMEOUT) )).andReturn( - Futures.immediateFuture(responseHolder) + okResponseHolder() ).times(numRequests); replayAll(); @@ -1029,6 +1026,16 @@ public class KinesisIndexTaskClientTest extends EasyMockSupport } } + private ListenableFuture okResponseHolder() + { + return Futures.immediateFuture(Either.value(responseHolder)); + } + + private ListenableFuture errorResponseHolder() + { + return Futures.immediateFuture(Either.error(responseHolder)); + } + private class TestableKinesisIndexTaskClient extends KinesisIndexTaskClient { TestableKinesisIndexTaskClient( diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java index 27d77597d9f..db1692a3e09 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/common/IndexTaskClient.java @@ -28,12 +28,14 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Strings; import com.google.common.base.Throwables; import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.java.util.common.Either; import org.apache.druid.java.util.common.IAE; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.StringUtils; @@ -41,6 +43,8 @@ import org.apache.druid.java.util.common.concurrent.Execs; import org.apache.druid.java.util.emitter.EmittingLogger; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.Request; +import org.apache.druid.java.util.http.client.response.HttpResponseHandler; +import org.apache.druid.java.util.http.client.response.ObjectOrErrorResponseHandler; import org.apache.druid.java.util.http.client.response.StringFullResponseHandler; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; import org.apache.druid.segment.realtime.firehose.ChatHandlerResource; @@ -217,7 +221,16 @@ public abstract class IndexTaskClient implements AutoCloseable boolean retry ) throws IOException, ChannelException, NoTaskLocationException { - return submitRequest(taskId, null, method, encodedPathSuffix, encodedQueryString, new byte[0], retry); + return submitRequest( + taskId, + null, + method, + encodedPathSuffix, + encodedQueryString, + new byte[0], + new StringFullResponseHandler(StandardCharsets.UTF_8), + retry + ); } /** @@ -239,6 +252,7 @@ public abstract class IndexTaskClient implements AutoCloseable encodedPathSuffix, encodedQueryString, content, + new StringFullResponseHandler(StandardCharsets.UTF_8), retry ); } @@ -262,6 +276,7 @@ public abstract class IndexTaskClient implements AutoCloseable encodedPathSuffix, encodedQueryString, content, + new StringFullResponseHandler(StandardCharsets.UTF_8), retry ); } @@ -293,13 +308,14 @@ public abstract class IndexTaskClient implements AutoCloseable /** * Sends an HTTP request to the task of the specified {@code taskId} and returns a response if it succeeded. */ - private StringFullResponseHolder submitRequest( + protected FinalType submitRequest( String taskId, @Nullable String mediaType, // nullable if content is empty HttpMethod method, String encodedPathSuffix, @Nullable String encodedQueryString, byte[] content, + HttpResponseHandler responseHandler, boolean retry ) throws IOException, ChannelException, NoTaskLocationException { @@ -333,21 +349,38 @@ public abstract class IndexTaskClient implements AutoCloseable content ); - StringFullResponseHolder response = null; + Either response = null; try { // Netty throws some annoying exceptions if a connection can't be opened, which happens relatively frequently // for tasks that happen to still be starting up, so test the connection first to keep the logs clean. checkConnection(request.getUrl().getHost(), request.getUrl().getPort()); - response = submitRequest(request); + response = submitRequest(request, responseHandler); - int responseCode = response.getStatus().getCode(); - if (responseCode / 100 == 2) { - return response; - } else if (responseCode == 400) { // don't bother retrying if it's a bad request - throw new IAE("Received 400 Bad Request with body: %s", response.getContent()); + if (response.isValue()) { + return response.valueOrThrow(); } else { - throw new IOE("Received status [%d] and content [%s]", responseCode, response.getContent()); + final StringBuilder exceptionMessage = new StringBuilder(); + final HttpResponseStatus httpResponseStatus = response.error().getStatus(); + final String httpResponseContent = response.error().getContent(); + exceptionMessage.append("Received server error with status [").append(httpResponseStatus).append("]"); + + if (!Strings.isNullOrEmpty(httpResponseContent)) { + final String choppedMessage = + StringUtils.chop( + StringUtils.nullToEmptyNonDruidDataString(httpResponseContent), + 1000 + ); + + exceptionMessage.append("; first 1KB of body: ").append(choppedMessage); + } + + if (httpResponseStatus.getCode() == 400) { + // don't bother retrying if it's a bad request + throw new IAE(exceptionMessage.toString()); + } else { + throw new IOE(exceptionMessage.toString()); + } } } catch (IOException | ChannelException e) { @@ -360,9 +393,10 @@ public abstract class IndexTaskClient implements AutoCloseable // eventually be updated. final Duration delay; - if (response != null && response.getStatus().equals(HttpResponseStatus.NOT_FOUND)) { + if (response != null && !response.isValue() + && response.error().getStatus().equals(HttpResponseStatus.NOT_FOUND)) { String headerId = StringUtils.urlDecode( - response.getResponse().headers().get(ChatHandlerResource.TASK_ID_HEADER) + response.error().getResponse().headers().get(ChatHandlerResource.TASK_ID_HEADER) ); if (headerId != null && !headerId.equals(taskId)) { log.warn( @@ -381,22 +415,24 @@ public abstract class IndexTaskClient implements AutoCloseable final String urlForLog = request.getUrl().toString(); if (!retry) { // if retry=false, we probably aren't too concerned if the operation doesn't succeed (i.e. the request was - // for informational purposes only) so don't log a scary stack trace - log.info("submitRequest failed for [%s], with message [%s]", urlForLog, e.getMessage()); + // for informational purposes only); log at INFO instead of WARN. + log.noStackTrace().info(e, "submitRequest failed for [%s]", urlForLog); throw e; } else if (delay == null) { - log.warn(e, "Retries exhausted for [%s], last exception:", urlForLog); + // When retrying, log the final failure at WARN level, since it is likely to be bad news. + log.warn(e, "submitRequest failed for [%s]", urlForLog); throw e; } else { try { final long sleepTime = delay.getMillis(); - log.warn( - "Bad response HTTP [%s] from [%s]; will try again in [%s] (body/exception: [%s])", - (response != null ? response.getStatus().getCode() : "no response"), + // When retrying, log non-final failures at INFO level. + log.noStackTrace().info( + e, + "submitRequest failed for [%s]; will try again in [%s]", urlForLog, - new Duration(sleepTime).toString(), - (response != null ? response.getContent() : e.getMessage()) + new Duration(sleepTime).toString() ); + Thread.sleep(sleepTime); } catch (InterruptedException e2) { @@ -421,11 +457,17 @@ public abstract class IndexTaskClient implements AutoCloseable } } - private StringFullResponseHolder submitRequest(Request request) throws IOException, ChannelException + private Either submitRequest( + Request request, + HttpResponseHandler responseHandler + ) throws IOException, ChannelException { + final ObjectOrErrorResponseHandler wrappedHandler = + new ObjectOrErrorResponseHandler<>(responseHandler); + try { log.debug("HTTP %s: %s", request.getMethod().getName(), request.getUrl().toString()); - return httpClient.go(request, new StringFullResponseHandler(StandardCharsets.UTF_8), httpTimeout).get(); + return httpClient.go(request, wrappedHandler, httpTimeout).get(); } catch (Exception e) { throw throwIfPossible(e); diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/IndexTaskClientTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/IndexTaskClientTest.java index b41ad947926..128fdb3a69b 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/IndexTaskClientTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/IndexTaskClientTest.java @@ -25,8 +25,10 @@ import com.google.common.util.concurrent.Futures; import org.apache.druid.indexer.TaskLocation; import org.apache.druid.indexer.TaskStatus; import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.Either; import org.apache.druid.java.util.http.client.HttpClient; import org.apache.druid.java.util.http.client.response.StringFullResponseHolder; +import org.apache.druid.segment.realtime.firehose.ChatHandlerResource; import org.easymock.EasyMock; import org.jboss.netty.channel.ChannelException; import org.jboss.netty.handler.codec.http.DefaultHttpResponse; @@ -82,10 +84,11 @@ public class IndexTaskClientTest EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject())) .andReturn( Futures.immediateFuture( - new StringFullResponseHolder( - HttpResponseStatus.OK, - new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK), - StandardCharsets.UTF_8 + Either.value( + new StringFullResponseHolder( + new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK), + StandardCharsets.UTF_8 + ) ) ) ) @@ -101,9 +104,201 @@ public class IndexTaskClientTest ); Assert.assertEquals(HttpResponseStatus.OK, response.getStatus()); } - } + } - private IndexTaskClient buildIndexTaskClient(HttpClient httpClient, Function taskLocationProvider) + @Test + public void retryOnServerError() throws IOException + { + final HttpClient httpClient = EasyMock.createMock(HttpClient.class); + EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn( + Futures.immediateFuture( + Either.error( + new StringFullResponseHolder( + new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR), + StandardCharsets.UTF_8 + ).addChunk("Error") + ) + ) + ) + .times(2); + EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn( + Futures.immediateFuture( + Either.value( + new StringFullResponseHolder( + new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK), + StandardCharsets.UTF_8 + ) + ) + ) + ) + .once(); + EasyMock.replay(httpClient); + try (IndexTaskClient indexTaskClient = buildIndexTaskClient(httpClient, id -> TaskLocation.create(id, 8000, -1))) { + final StringFullResponseHolder response = indexTaskClient.submitRequestWithEmptyContent( + "taskId", + HttpMethod.GET, + "test", + null, + true + ); + Assert.assertEquals(HttpResponseStatus.OK, response.getStatus()); + } + EasyMock.verify(httpClient); + } + + @Test + public void retryIfNotFoundWithIncorrectTaskId() throws IOException + { + final HttpClient httpClient = EasyMock.createMock(HttpClient.class); + final String taskId = "taskId"; + final String incorrectTaskId = "incorrectTaskId"; + final DefaultHttpResponse incorrectResponse = + new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND); + incorrectResponse.headers().add(ChatHandlerResource.TASK_ID_HEADER, incorrectTaskId); + final DefaultHttpResponse correctResponse = + new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); + correctResponse.headers().add(ChatHandlerResource.TASK_ID_HEADER, taskId); + + EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn( + Futures.immediateFuture( + Either.error( + new StringFullResponseHolder( + incorrectResponse, + StandardCharsets.UTF_8 + ) + ) + ) + ) + .times(2); + EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn( + Futures.immediateFuture( + Either.value( + new StringFullResponseHolder( + correctResponse, + StandardCharsets.UTF_8 + ) + ) + ) + ) + .once(); + EasyMock.replay(httpClient); + try (IndexTaskClient indexTaskClient = buildIndexTaskClient(httpClient, id -> TaskLocation.create(id, 8000, -1))) { + final StringFullResponseHolder response = indexTaskClient.submitRequestWithEmptyContent( + taskId, + HttpMethod.GET, + "test", + null, + true + ); + Assert.assertEquals(HttpResponseStatus.OK, response.getStatus()); + } + EasyMock.verify(httpClient); + } + + @Test + public void dontRetryOnBadRequest() + { + final HttpClient httpClient = EasyMock.createMock(HttpClient.class); + EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn( + Futures.immediateFuture( + Either.error( + new StringFullResponseHolder( + new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST), + StandardCharsets.UTF_8 + ).addChunk("Error") + ) + ) + ) + .times(1); + EasyMock.replay(httpClient); + try (IndexTaskClient indexTaskClient = buildIndexTaskClient(httpClient, id -> TaskLocation.create(id, 8000, -1))) { + final IllegalArgumentException e = Assert.assertThrows( + IllegalArgumentException.class, + () -> indexTaskClient.submitRequestWithEmptyContent("taskId", HttpMethod.GET, "test", null, true) + ); + + Assert.assertEquals( + "Received server error with status [400 Bad Request]; first 1KB of body: Error", + e.getMessage() + ); + } + + EasyMock.verify(httpClient); + } + + @Test + public void dontRetryIfRetryFalse() + { + final HttpClient httpClient = EasyMock.createMock(HttpClient.class); + EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn( + Futures.immediateFuture( + Either.error( + new StringFullResponseHolder( + new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR), + StandardCharsets.UTF_8 + ).addChunk("Error") + ) + ) + ) + .times(1); + EasyMock.replay(httpClient); + try (IndexTaskClient indexTaskClient = buildIndexTaskClient(httpClient, id -> TaskLocation.create(id, 8000, -1))) { + final IOException e = Assert.assertThrows( + IOException.class, + () -> indexTaskClient.submitRequestWithEmptyContent("taskId", HttpMethod.GET, "test", null, false) + ); + + Assert.assertEquals( + "Received server error with status [500 Internal Server Error]; first 1KB of body: Error", + e.getMessage() + ); + } + + EasyMock.verify(httpClient); + } + + @Test + public void dontRetryIfNotFoundWithCorrectTaskId() + { + final String taskId = "taskId"; + final HttpClient httpClient = EasyMock.createMock(HttpClient.class); + final DefaultHttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND); + response.headers().add(ChatHandlerResource.TASK_ID_HEADER, taskId); + EasyMock.expect(httpClient.go(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject())) + .andReturn( + Futures.immediateFuture( + Either.error( + new StringFullResponseHolder(response, StandardCharsets.UTF_8).addChunk("Error") + ) + ) + ) + .times(1); + EasyMock.replay(httpClient); + try (IndexTaskClient indexTaskClient = buildIndexTaskClient(httpClient, id -> TaskLocation.create(id, 8000, -1))) { + final IOException e = Assert.assertThrows( + IOException.class, + () -> indexTaskClient.submitRequestWithEmptyContent(taskId, HttpMethod.GET, "test", null, false) + ); + + Assert.assertEquals( + "Received server error with status [404 Not Found]; first 1KB of body: Error", + e.getMessage() + ); + } + + EasyMock.verify(httpClient); + } + + private IndexTaskClient buildIndexTaskClient( + HttpClient httpClient, + Function taskLocationProvider + ) { final TaskInfoProvider taskInfoProvider = new TaskInfoProvider() { diff --git a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RemoteTaskActionClientTest.java b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RemoteTaskActionClientTest.java index 5d6b5570464..c3528c72c02 100644 --- a/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RemoteTaskActionClientTest.java +++ b/indexing-service/src/test/java/org/apache/druid/indexing/common/actions/RemoteTaskActionClientTest.java @@ -85,10 +85,10 @@ public class RemoteTaskActionClientTest responseBody.put("result", expectedLocks); String strResult = objectMapper.writeValueAsString(responseBody); final HttpResponse response = EasyMock.createNiceMock(HttpResponse.class); + EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0)); EasyMock.replay(response); StringFullResponseHolder responseHolder = new StringFullResponseHolder( - HttpResponseStatus.OK, response, StandardCharsets.UTF_8 ).addChunk(strResult); @@ -120,10 +120,10 @@ public class RemoteTaskActionClientTest // return status code 200 and a list with size equals 1 final HttpResponse response = EasyMock.createNiceMock(HttpResponse.class); + EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.BAD_REQUEST).anyTimes(); EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0)); EasyMock.replay(response); StringFullResponseHolder responseHolder = new StringFullResponseHolder( - HttpResponseStatus.BAD_REQUEST, response, StandardCharsets.UTF_8 ).addChunk("testSubmitWithIllegalStatusCode"); diff --git a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java index 36caf90e956..b2fed8528e3 100644 --- a/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java +++ b/server/src/test/java/org/apache/druid/client/indexing/HttpIndexingServiceClientTest.java @@ -89,11 +89,11 @@ public class HttpIndexingServiceClientTest }; HttpResponse response = EasyMock.createMock(HttpResponse.class); + EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0)); EasyMock.replay(response); StringFullResponseHolder responseHolder = new StringFullResponseHolder( - HttpResponseStatus.OK, response, StandardCharsets.UTF_8 ).addChunk(jsonMapper.writeValueAsString(samplerResponse)); @@ -142,11 +142,11 @@ public class HttpIndexingServiceClientTest } }; HttpResponse response = EasyMock.createMock(HttpResponse.class); + EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.INTERNAL_SERVER_ERROR).anyTimes(); EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0)); EasyMock.replay(response); StringFullResponseHolder responseHolder = new StringFullResponseHolder( - HttpResponseStatus.INTERNAL_SERVER_ERROR, response, StandardCharsets.UTF_8 ).addChunk(""); @@ -170,13 +170,13 @@ public class HttpIndexingServiceClientTest { String taskId = "testTaskId"; HttpResponse response = EasyMock.createMock(HttpResponse.class); + EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0)); EasyMock.replay(response); Map dummyResponse = ImmutableMap.of("test", "value"); StringFullResponseHolder responseHolder = new StringFullResponseHolder( - HttpResponseStatus.OK, response, StandardCharsets.UTF_8 ).addChunk(jsonMapper.writeValueAsString(dummyResponse)); @@ -209,11 +209,11 @@ public class HttpIndexingServiceClientTest ChannelBuffer buf = ChannelBuffers.buffer(errorMsg.length()); buf.writeBytes(errorMsg.getBytes(StandardCharsets.UTF_8)); + EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.NOT_FOUND).anyTimes(); EasyMock.expect(response.getContent()).andReturn(buf); EasyMock.replay(response); StringFullResponseHolder responseHolder = new StringFullResponseHolder( - HttpResponseStatus.NOT_FOUND, response, StandardCharsets.UTF_8 ).addChunk(""); @@ -241,11 +241,11 @@ public class HttpIndexingServiceClientTest { String taskId = "testTaskId"; HttpResponse response = EasyMock.createMock(HttpResponse.class); + EasyMock.expect(response.getStatus()).andReturn(HttpResponseStatus.OK).anyTimes(); EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0)); EasyMock.replay(response); StringFullResponseHolder responseHolder = new StringFullResponseHolder( - HttpResponseStatus.OK, response, StandardCharsets.UTF_8 ).addChunk(""); diff --git a/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java b/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java index 3b953da096d..8cc6956d117 100644 --- a/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java +++ b/server/src/test/java/org/apache/druid/query/lookup/LookupReferencesManagerTest.java @@ -86,9 +86,10 @@ public class LookupReferencesManagerTest ); } - private static HttpResponse newEmptyResponse() + private static HttpResponse newEmptyResponse(final HttpResponseStatus status) { final HttpResponse response = EasyMock.createNiceMock(HttpResponse.class); + EasyMock.expect(response.getStatus()).andReturn(status).anyTimes(); EasyMock.expect(response.getContent()).andReturn(new BigEndianHeapChannelBuffer(0)); EasyMock.replay(response); return response; @@ -114,8 +115,7 @@ public class LookupReferencesManagerTest )) .andReturn(request); StringFullResponseHolder responseHolder = new StringFullResponseHolder( - HttpResponseStatus.OK, - newEmptyResponse(), + newEmptyResponse(HttpResponseStatus.OK), StandardCharsets.UTF_8 ).addChunk(strResult); EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder); @@ -178,8 +178,7 @@ public class LookupReferencesManagerTest )) .andReturn(request); StringFullResponseHolder responseHolder = new StringFullResponseHolder( - HttpResponseStatus.OK, - newEmptyResponse(), + newEmptyResponse(HttpResponseStatus.OK), StandardCharsets.UTF_8 ).addChunk(strResult); EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder); @@ -219,8 +218,7 @@ public class LookupReferencesManagerTest )) .andReturn(request); StringFullResponseHolder responseHolder = new StringFullResponseHolder( - HttpResponseStatus.OK, - newEmptyResponse(), + newEmptyResponse(HttpResponseStatus.OK), StandardCharsets.UTF_8 ).addChunk(strResult); EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder); @@ -253,8 +251,7 @@ public class LookupReferencesManagerTest )) .andReturn(request); StringFullResponseHolder responseHolder = new StringFullResponseHolder( - HttpResponseStatus.OK, - newEmptyResponse(), + newEmptyResponse(HttpResponseStatus.OK), StandardCharsets.UTF_8 ).addChunk(strResult); EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder); @@ -284,8 +281,7 @@ public class LookupReferencesManagerTest )) .andReturn(request); StringFullResponseHolder responseHolder = new StringFullResponseHolder( - HttpResponseStatus.OK, - newEmptyResponse(), + newEmptyResponse(HttpResponseStatus.OK), StandardCharsets.UTF_8 ).addChunk(strResult); EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder); @@ -317,8 +313,7 @@ public class LookupReferencesManagerTest )) .andReturn(request); StringFullResponseHolder responseHolder = new StringFullResponseHolder( - HttpResponseStatus.OK, - newEmptyResponse(), + newEmptyResponse(HttpResponseStatus.OK), StandardCharsets.UTF_8 ).addChunk(strResult); EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder); @@ -354,8 +349,7 @@ public class LookupReferencesManagerTest )) .andReturn(request); StringFullResponseHolder responseHolder = new StringFullResponseHolder( - HttpResponseStatus.OK, - newEmptyResponse(), + newEmptyResponse(HttpResponseStatus.OK), StandardCharsets.UTF_8 ).addChunk(strResult); EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder); @@ -385,8 +379,7 @@ public class LookupReferencesManagerTest )) .andReturn(request); StringFullResponseHolder responseHolder = new StringFullResponseHolder( - HttpResponseStatus.OK, - newEmptyResponse(), + newEmptyResponse(HttpResponseStatus.OK), StandardCharsets.UTF_8 ).addChunk(strResult); EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder); @@ -417,8 +410,7 @@ public class LookupReferencesManagerTest druidLeaderClient.makeRequest(HttpMethod.GET, "/druid/coordinator/v1/lookups/config/lookupTier?detailed=true") ).andReturn(request); StringFullResponseHolder responseHolder = new StringFullResponseHolder( - HttpResponseStatus.OK, - newEmptyResponse(), + newEmptyResponse(HttpResponseStatus.OK), StandardCharsets.UTF_8 ).addChunk(strResult); EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder); @@ -479,8 +471,7 @@ public class LookupReferencesManagerTest )) .andReturn(request); StringFullResponseHolder responseHolder = new StringFullResponseHolder( - HttpResponseStatus.OK, - newEmptyResponse(), + newEmptyResponse(HttpResponseStatus.OK), StandardCharsets.UTF_8 ).addChunk(strResult); EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder); @@ -524,8 +515,7 @@ public class LookupReferencesManagerTest )) .andReturn(request); StringFullResponseHolder responseHolder = new StringFullResponseHolder( - HttpResponseStatus.OK, - newEmptyResponse(), + newEmptyResponse(HttpResponseStatus.OK), StandardCharsets.UTF_8 ).addChunk(strResult); EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder); @@ -613,8 +603,7 @@ public class LookupReferencesManagerTest )) .andReturn(request); StringFullResponseHolder responseHolder = new StringFullResponseHolder( - HttpResponseStatus.OK, - newEmptyResponse(), + newEmptyResponse(HttpResponseStatus.OK), StandardCharsets.UTF_8 ).addChunk(strResult); EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder); @@ -725,8 +714,7 @@ public class LookupReferencesManagerTest )) .andReturn(request); StringFullResponseHolder responseHolder = new StringFullResponseHolder( - HttpResponseStatus.OK, - newEmptyResponse(), + newEmptyResponse(HttpResponseStatus.OK), StandardCharsets.UTF_8 ).addChunk(strResult); EasyMock.expect(druidLeaderClient.go(request)).andReturn(responseHolder); diff --git a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java index 7defa0dac18..68b9eb2ca73 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/duty/CompactSegmentsTest.java @@ -1726,7 +1726,6 @@ public class CompactSegmentsTest { final HttpResponse httpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); final StringFullResponseHolder holder = new StringFullResponseHolder( - HttpResponseStatus.OK, httpResponse, StandardCharsets.UTF_8 ); diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index 8f5d110fb5c..9ce4852f7bf 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -1108,7 +1108,7 @@ public class SystemSchemaTest extends CalciteTestBase HttpResponse httpResp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - InputStreamFullResponseHolder responseHolder = new InputStreamFullResponseHolder(httpResp.getStatus(), httpResp); + InputStreamFullResponseHolder responseHolder = new InputStreamFullResponseHolder(httpResp); EasyMock.expect(client.go(EasyMock.eq(request), EasyMock.anyObject(InputStreamFullResponseHandler.class))).andReturn(responseHolder).once(); EasyMock.expect(request.getUrl()).andReturn(new URL("http://test-host:1234/druid/indexer/v1/tasks")).anyTimes(); @@ -1282,7 +1282,7 @@ public class SystemSchemaTest extends CalciteTestBase .anyTimes(); HttpResponse httpResp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); - InputStreamFullResponseHolder responseHolder = new InputStreamFullResponseHolder(httpResp.getStatus(), httpResp); + InputStreamFullResponseHolder responseHolder = new InputStreamFullResponseHolder(httpResp); EasyMock.expect(client.go(EasyMock.eq(request), EasyMock.anyObject(InputStreamFullResponseHandler.class))).andReturn(responseHolder).once(); @@ -1397,8 +1397,7 @@ public class SystemSchemaTest extends CalciteTestBase String json ) { - InputStreamFullResponseHolder responseHolder = - new InputStreamFullResponseHolder(httpResponse.getStatus(), httpResponse); + InputStreamFullResponseHolder responseHolder = new InputStreamFullResponseHolder(httpResponse); byte[] bytesToWrite = json.getBytes(StandardCharsets.UTF_8); responseHolder.addChunk(bytesToWrite);