mirror of https://github.com/apache/druid.git
NettyHttpClient: Cleaner state transitions for handlers. (#12889)
The Netty pipeline set up by the client can deliver multiple exceptions, and can deliver chunks even after delivering exceptions. This makes it difficult to implement HttpResponseHandlers. Looking at existing handler implementations, I do not see attempts to handle this case, so it's also a potential source of bugs. This patch updates the client to track whether an exception was encountered, and if so, to not call any additional methods on the handler after exceptionCaught. It also harmonizes exception handling between exceptionCaught and channelDisconnected.
This commit is contained in:
parent
b4985ccd5e
commit
38af5f7b57
|
@ -59,8 +59,10 @@ import java.net.URL;
|
|||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/**
|
||||
* Implementation of {@link HttpClient} built using Netty.
|
||||
*/
|
||||
public class NettyHttpClient extends AbstractHttpClient
|
||||
{
|
||||
|
@ -169,6 +171,11 @@ public class NettyHttpClient extends AbstractHttpClient
|
|||
final long readTimeout = getReadTimeout(requestReadTimeout);
|
||||
final SettableFuture<Final> retVal = SettableFuture.create();
|
||||
|
||||
// Pipeline can hand us chunks even after exceptionCaught is called. This has the potential to confuse
|
||||
// HttpResponseHandler implementations, which expect exceptionCaught to be the final method called. So, we
|
||||
// use this boolean to ensure that handlers do not see any chunks after exceptionCaught fires.
|
||||
final AtomicBoolean didEncounterException = new AtomicBoolean();
|
||||
|
||||
if (readTimeout > 0) {
|
||||
channel.getPipeline().addLast(
|
||||
READ_TIMEOUT_HANDLER_NAME,
|
||||
|
@ -202,6 +209,11 @@ public class NettyHttpClient extends AbstractHttpClient
|
|||
Object msg = e.getMessage();
|
||||
|
||||
if (msg instanceof HttpResponse) {
|
||||
if (didEncounterException.get()) {
|
||||
// Don't process HttpResponse after encountering an exception.
|
||||
return;
|
||||
}
|
||||
|
||||
HttpResponse httpResponse = (HttpResponse) msg;
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("[%s] Got response: %s", requestDesc, httpResponse.getStatus());
|
||||
|
@ -234,6 +246,11 @@ public class NettyHttpClient extends AbstractHttpClient
|
|||
finishRequest();
|
||||
}
|
||||
} else if (msg instanceof HttpChunk) {
|
||||
if (didEncounterException.get()) {
|
||||
// Don't process HttpChunk after encountering an exception.
|
||||
return;
|
||||
}
|
||||
|
||||
HttpChunk httpChunk = (HttpChunk) msg;
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(
|
||||
|
@ -308,61 +325,61 @@ public class NettyHttpClient extends AbstractHttpClient
|
|||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext context, ExceptionEvent event)
|
||||
{
|
||||
if (log.isDebugEnabled()) {
|
||||
final Throwable cause = event.getCause();
|
||||
if (cause == null) {
|
||||
log.debug("[%s] Caught exception", requestDesc);
|
||||
} else {
|
||||
log.debug(cause, "[%s] Caught exception", requestDesc);
|
||||
}
|
||||
}
|
||||
|
||||
// Ignore return value of setException, since exceptionCaught can be called multiple times and we
|
||||
// only want to report the first one.
|
||||
if (event.getCause() instanceof ReadTimeoutException) {
|
||||
// ReadTimeoutException thrown by ReadTimeoutHandler is a singleton with a misleading stack trace.
|
||||
// No point including it: instead, we replace it with a fresh exception.
|
||||
retVal.setException(new ReadTimeoutException(StringUtils.format("[%s] Read timed out", requestDesc)));
|
||||
} else {
|
||||
retVal.setException(event.getCause());
|
||||
}
|
||||
|
||||
// response is non-null if we received initial chunk and then exception occurs
|
||||
if (response != null) {
|
||||
handler.exceptionCaught(response, event.getCause());
|
||||
}
|
||||
try {
|
||||
if (channel.isOpen()) {
|
||||
channel.close();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(e, "Error while closing channel");
|
||||
}
|
||||
finally {
|
||||
if (channelResourceContainer.isPresent()) {
|
||||
// exceptionCaught can be called multiple times: we only want to return the channel if it hasn't
|
||||
// already been returned.
|
||||
channelResourceContainer.returnResource();
|
||||
}
|
||||
}
|
||||
handleExceptionAndCloseChannel(event.getCause(), false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelDisconnected(ChannelHandlerContext context, ChannelStateEvent event)
|
||||
{
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug("[%s] Channel disconnected", requestDesc);
|
||||
handleExceptionAndCloseChannel(new ChannelException("Channel disconnected"), true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle an exception by logging it, possibly calling {@link SettableFuture#setException} on {@code retVal},
|
||||
* possibly calling {@link HttpResponseHandler#exceptionCaught}, and possibly closing the channel.
|
||||
*
|
||||
* No actions will be taken (other than logging) if an exception has already been handled for this request.
|
||||
*
|
||||
* @param t exception
|
||||
* @param closeIfNotOpen Call {@link Channel#close()} even if {@link Channel#isOpen()} returns false.
|
||||
* Provided to retain existing behavior of two different chunks of code that were
|
||||
* merged into this single method.
|
||||
*/
|
||||
private void handleExceptionAndCloseChannel(final Throwable t, final boolean closeIfNotOpen)
|
||||
{
|
||||
if (log.isDebugEnabled()) {
|
||||
log.debug(t, "[%s] Caught exception", requestDesc);
|
||||
}
|
||||
|
||||
// Only process the first exception encountered.
|
||||
if (!didEncounterException.compareAndSet(false, true)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!retVal.isDone()) {
|
||||
if (t instanceof ReadTimeoutException) {
|
||||
// ReadTimeoutException thrown by ReadTimeoutHandler is a singleton with a misleading stack trace.
|
||||
// No point including it: instead, we replace it with a fresh exception.
|
||||
retVal.setException(new ReadTimeoutException(StringUtils.format("[%s] Read timed out", requestDesc)));
|
||||
} else {
|
||||
retVal.setException(t);
|
||||
}
|
||||
}
|
||||
|
||||
// response is non-null if we received initial chunk and then exception occurs
|
||||
if (response != null) {
|
||||
handler.exceptionCaught(response, new ChannelException("Channel disconnected"));
|
||||
handler.exceptionCaught(response, t);
|
||||
}
|
||||
try {
|
||||
if (closeIfNotOpen || channel.isOpen()) {
|
||||
channel.close();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
log.warn(e, "[%s] Error while closing channel", requestDesc);
|
||||
}
|
||||
finally {
|
||||
channelResourceContainer.returnResource();
|
||||
if (!retVal.isDone()) {
|
||||
log.warn("[%s] Channel disconnected before response complete", requestDesc);
|
||||
retVal.setException(new ChannelException("Channel disconnected"));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,5 @@ package org.apache.druid.java.util.http.client.pool;
|
|||
public interface ResourceContainer<ResourceType>
|
||||
{
|
||||
ResourceType get();
|
||||
boolean isPresent();
|
||||
void returnResource();
|
||||
}
|
||||
|
|
|
@ -119,12 +119,6 @@ public class ResourcePool<K, V> implements Closeable
|
|||
return value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPresent()
|
||||
{
|
||||
return !returned.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void returnResource()
|
||||
{
|
||||
|
|
Loading…
Reference in New Issue