diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java b/core/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java index f873da994a3..3ab3719180f 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/NettyHttpClient.java @@ -59,8 +59,10 @@ import java.net.URL; import java.util.Collection; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; /** + * Implementation of {@link HttpClient} built using Netty. */ public class NettyHttpClient extends AbstractHttpClient { @@ -169,6 +171,11 @@ public class NettyHttpClient extends AbstractHttpClient final long readTimeout = getReadTimeout(requestReadTimeout); final SettableFuture retVal = SettableFuture.create(); + // Pipeline can hand us chunks even after exceptionCaught is called. This has the potential to confuse + // HttpResponseHandler implementations, which expect exceptionCaught to be the final method called. So, we + // use this boolean to ensure that handlers do not see any chunks after exceptionCaught fires. + final AtomicBoolean didEncounterException = new AtomicBoolean(); + if (readTimeout > 0) { channel.getPipeline().addLast( READ_TIMEOUT_HANDLER_NAME, @@ -202,6 +209,11 @@ public class NettyHttpClient extends AbstractHttpClient Object msg = e.getMessage(); if (msg instanceof HttpResponse) { + if (didEncounterException.get()) { + // Don't process HttpResponse after encountering an exception. + return; + } + HttpResponse httpResponse = (HttpResponse) msg; if (log.isDebugEnabled()) { log.debug("[%s] Got response: %s", requestDesc, httpResponse.getStatus()); @@ -234,6 +246,11 @@ public class NettyHttpClient extends AbstractHttpClient finishRequest(); } } else if (msg instanceof HttpChunk) { + if (didEncounterException.get()) { + // Don't process HttpChunk after encountering an exception. + return; + } + HttpChunk httpChunk = (HttpChunk) msg; if (log.isDebugEnabled()) { log.debug( @@ -308,61 +325,61 @@ public class NettyHttpClient extends AbstractHttpClient @Override public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event) { - if (log.isDebugEnabled()) { - final Throwable cause = event.getCause(); - if (cause == null) { - log.debug("[%s] Caught exception", requestDesc); - } else { - log.debug(cause, "[%s] Caught exception", requestDesc); - } - } - - // Ignore return value of setException, since exceptionCaught can be called multiple times and we - // only want to report the first one. - if (event.getCause() instanceof ReadTimeoutException) { - // ReadTimeoutException thrown by ReadTimeoutHandler is a singleton with a misleading stack trace. - // No point including it: instead, we replace it with a fresh exception. - retVal.setException(new ReadTimeoutException(StringUtils.format("[%s] Read timed out", requestDesc))); - } else { - retVal.setException(event.getCause()); - } - - // response is non-null if we received initial chunk and then exception occurs - if (response != null) { - handler.exceptionCaught(response, event.getCause()); - } - try { - if (channel.isOpen()) { - channel.close(); - } - } - catch (Exception e) { - log.warn(e, "Error while closing channel"); - } - finally { - if (channelResourceContainer.isPresent()) { - // exceptionCaught can be called multiple times: we only want to return the channel if it hasn't - // already been returned. - channelResourceContainer.returnResource(); - } - } + handleExceptionAndCloseChannel(event.getCause(), false); } @Override public void channelDisconnected(ChannelHandlerContext context, ChannelStateEvent event) + { + handleExceptionAndCloseChannel(new ChannelException("Channel disconnected"), true); + } + + /** + * Handle an exception by logging it, possibly calling {@link SettableFuture#setException} on {@code retVal}, + * possibly calling {@link HttpResponseHandler#exceptionCaught}, and possibly closing the channel. + * + * No actions will be taken (other than logging) if an exception has already been handled for this request. + * + * @param t exception + * @param closeIfNotOpen Call {@link Channel#close()} even if {@link Channel#isOpen()} returns false. + * Provided to retain existing behavior of two different chunks of code that were + * merged into this single method. + */ + private void handleExceptionAndCloseChannel(final Throwable t, final boolean closeIfNotOpen) { if (log.isDebugEnabled()) { - log.debug("[%s] Channel disconnected", requestDesc); + log.debug(t, "[%s] Caught exception", requestDesc); } + + // Only process the first exception encountered. + if (!didEncounterException.compareAndSet(false, true)) { + return; + } + + if (!retVal.isDone()) { + if (t instanceof ReadTimeoutException) { + // ReadTimeoutException thrown by ReadTimeoutHandler is a singleton with a misleading stack trace. + // No point including it: instead, we replace it with a fresh exception. + retVal.setException(new ReadTimeoutException(StringUtils.format("[%s] Read timed out", requestDesc))); + } else { + retVal.setException(t); + } + } + // response is non-null if we received initial chunk and then exception occurs if (response != null) { - handler.exceptionCaught(response, new ChannelException("Channel disconnected")); + handler.exceptionCaught(response, t); } - channel.close(); - channelResourceContainer.returnResource(); - if (!retVal.isDone()) { - log.warn("[%s] Channel disconnected before response complete", requestDesc); - retVal.setException(new ChannelException("Channel disconnected")); + try { + if (closeIfNotOpen || channel.isOpen()) { + channel.close(); + } + } + catch (Exception e) { + log.warn(e, "[%s] Error while closing channel", requestDesc); + } + finally { + channelResourceContainer.returnResource(); } } diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourceContainer.java b/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourceContainer.java index 7f18fdd18a5..37fc23ea5ce 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourceContainer.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourceContainer.java @@ -24,6 +24,5 @@ package org.apache.druid.java.util.http.client.pool; public interface ResourceContainer { ResourceType get(); - boolean isPresent(); void returnResource(); } diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java b/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java index 6ed025c768d..2476e4c9ebf 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java @@ -119,12 +119,6 @@ public class ResourcePool implements Closeable return value; } - @Override - public boolean isPresent() - { - return !returned.get(); - } - @Override public void returnResource() {