mirror of https://github.com/apache/druid.git
HttpClient: Include error handler on all connection attempts. (#14915)
Currently we have an error handler for https connection attempts, but not for plaintext connection attempts. This leads to warnings like the following for plaintext connection errors: EXCEPTION, please implement org.jboss.netty.handler.codec.http.HttpContentDecompressor.exceptionCaught() for proper handling. This happens because if we don't add our own error handler, the last handler in the chain during a connection attempt is HttpContentDecompressor, which doesn't handle errors. The new error handler for plaintext doesn't do much: it just closes the channel.
This commit is contained in:
parent
8885805bb3
commit
004cd012e1
|
@ -44,6 +44,7 @@ import org.jboss.netty.handler.codec.http.HttpVersion;
|
|||
import org.jboss.netty.handler.ssl.SslHandler;
|
||||
import org.jboss.netty.util.Timer;
|
||||
|
||||
import javax.annotation.Nullable;
|
||||
import javax.net.ssl.SSLContext;
|
||||
import javax.net.ssl.SSLEngine;
|
||||
import javax.net.ssl.SSLParameters;
|
||||
|
@ -53,13 +54,15 @@ import java.net.URL;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
public class ChannelResourceFactory implements ResourceFactory<String, ChannelFuture>
|
||||
{
|
||||
private static final Logger log = new Logger(ChannelResourceFactory.class);
|
||||
|
||||
private static final long DEFAULT_SSL_HANDSHAKE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(10);
|
||||
private static final String DRUID_PROXY_HANDLER = "druid_proxyHandler";
|
||||
private static final String PROXY_HANDLER_NAME = "druid-proxy";
|
||||
private static final String ERROR_HANDLER_NAME = "druid-connection-error";
|
||||
|
||||
private final ClientBootstrap bootstrap;
|
||||
private final SSLContext sslContext;
|
||||
|
@ -128,7 +131,7 @@ public class ChannelResourceFactory implements ResourceFactory<String, ChannelFu
|
|||
if (f1.isSuccess()) {
|
||||
final Channel channel = f1.getChannel();
|
||||
channel.getPipeline().addLast(
|
||||
DRUID_PROXY_HANDLER,
|
||||
PROXY_HANDLER_NAME,
|
||||
new SimpleChannelUpstreamHandler()
|
||||
{
|
||||
@Override
|
||||
|
@ -137,7 +140,7 @@ public class ChannelResourceFactory implements ResourceFactory<String, ChannelFu
|
|||
Object msg = e.getMessage();
|
||||
|
||||
final ChannelPipeline pipeline = ctx.getPipeline();
|
||||
pipeline.remove(DRUID_PROXY_HANDLER);
|
||||
pipeline.remove(PROXY_HANDLER_NAME);
|
||||
|
||||
if (msg instanceof HttpResponse) {
|
||||
HttpResponse httpResponse = (HttpResponse) msg;
|
||||
|
@ -217,27 +220,7 @@ public class ChannelResourceFactory implements ResourceFactory<String, ChannelFu
|
|||
sslHandler.setCloseOnSSLException(true);
|
||||
|
||||
final ChannelFuture handshakeFuture = Channels.future(connectFuture.getChannel());
|
||||
connectFuture.getChannel().getPipeline().addLast(
|
||||
"connectionErrorHandler", new SimpleChannelUpstreamHandler()
|
||||
{
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
|
||||
{
|
||||
final Channel channel = ctx.getChannel();
|
||||
if (channel == null) {
|
||||
// For the case where this pipeline is not attached yet.
|
||||
handshakeFuture.setFailure(new ChannelException(
|
||||
StringUtils.format("Channel is null. The context name is [%s]", ctx.getName())
|
||||
));
|
||||
return;
|
||||
}
|
||||
handshakeFuture.setFailure(e.getCause());
|
||||
if (channel.isOpen()) {
|
||||
channel.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
);
|
||||
connectFuture.getChannel().getPipeline().addLast(ERROR_HANDLER_NAME, new ConnectionErrorHandler(handshakeFuture));
|
||||
connectFuture.addListener(
|
||||
new ChannelFutureListener()
|
||||
{
|
||||
|
@ -280,6 +263,7 @@ public class ChannelResourceFactory implements ResourceFactory<String, ChannelFu
|
|||
|
||||
retVal = handshakeFuture;
|
||||
} else {
|
||||
connectFuture.getChannel().getPipeline().addLast(ERROR_HANDLER_NAME, new ConnectionErrorHandler(null));
|
||||
retVal = connectFuture;
|
||||
}
|
||||
|
||||
|
@ -308,4 +292,59 @@ public class ChannelResourceFactory implements ResourceFactory<String, ChannelFu
|
|||
log.trace("Closing");
|
||||
resource.awaitUninterruptibly().getChannel().close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Handler that captures errors that occur while connecting. Typically superseded by other handlers after
|
||||
* a connection happens, in {@link org.apache.druid.java.util.http.client.NettyHttpClient}.
|
||||
*
|
||||
* It's important to have this for all channels, even if {@link #future} is null, because otherwise exceptions
|
||||
* that occur during connection land at {@link org.jboss.netty.handler.codec.http.HttpContentDecompressor} (the last
|
||||
* handler from {@link org.apache.druid.java.util.http.client.netty.HttpClientPipelineFactory}) and are dropped on
|
||||
* the floor along with a scary-looking warning like "EXCEPTION, please implement
|
||||
* org.jboss.netty.handler.codec.http.HttpContentDecompressor.exceptionCaught() for proper handling."
|
||||
*/
|
||||
private static class ConnectionErrorHandler extends SimpleChannelUpstreamHandler
|
||||
{
|
||||
@Nullable
|
||||
private final ChannelFuture future;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param future future to attach errors to
|
||||
*/
|
||||
public ConnectionErrorHandler(@Nullable ChannelFuture future)
|
||||
{
|
||||
this.future = future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(final ChannelHandlerContext ctx, final ExceptionEvent e)
|
||||
{
|
||||
final Channel channel = ctx.getChannel();
|
||||
if (channel == null) {
|
||||
// For the case where this pipeline is not attached yet.
|
||||
if (future != null && !future.isDone()) {
|
||||
final ChannelException e2 =
|
||||
new ChannelException(StringUtils.format("Channel is null. The context name is [%s]", ctx.getName()));
|
||||
e2.addSuppressed(e.getCause());
|
||||
future.setFailure(e2);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
if (future != null && !future.isDone()) {
|
||||
future.setFailure(e.getCause());
|
||||
}
|
||||
|
||||
// Close the channel if this is the last handler. Otherwise, we expect that NettyHttpClient would have added
|
||||
// additional handlers to take care of the errors.
|
||||
//noinspection ObjectEquality
|
||||
if (channel.isOpen() && this == ctx.getPipeline().getLast()) {
|
||||
channel.close();
|
||||
}
|
||||
|
||||
ctx.sendUpstream(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -296,6 +296,78 @@ public class JankyServersTest
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHttpConnectionRefused() throws Throwable
|
||||
{
|
||||
final Lifecycle lifecycle = new Lifecycle();
|
||||
try {
|
||||
final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build();
|
||||
final HttpClient client = HttpClientInit.createClient(config, lifecycle);
|
||||
|
||||
// Need to select a port that isn't being listened to. This approach finds an unused port in a racey way.
|
||||
// Hopefully it works most of the time.
|
||||
final ServerSocket sock = new ServerSocket(0);
|
||||
final int port = sock.getLocalPort();
|
||||
sock.close();
|
||||
|
||||
final ListenableFuture<StatusResponseHolder> response = client
|
||||
.go(
|
||||
new Request(HttpMethod.GET, new URL(StringUtils.format("http://localhost:%d/", port))),
|
||||
StatusResponseHandler.getInstance()
|
||||
);
|
||||
|
||||
Throwable e = null;
|
||||
try {
|
||||
response.get();
|
||||
}
|
||||
catch (ExecutionException e1) {
|
||||
e = e1.getCause();
|
||||
e1.printStackTrace();
|
||||
}
|
||||
|
||||
Assert.assertTrue("ChannelException thrown by 'get'", isChannelClosedException(e));
|
||||
}
|
||||
finally {
|
||||
lifecycle.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHttpsConnectionRefused() throws Throwable
|
||||
{
|
||||
final Lifecycle lifecycle = new Lifecycle();
|
||||
try {
|
||||
final HttpClientConfig config = HttpClientConfig.builder().withSslContext(SSLContext.getDefault()).build();
|
||||
final HttpClient client = HttpClientInit.createClient(config, lifecycle);
|
||||
|
||||
// Need to select a port that isn't being listened to. This approach finds an unused port in a racey way.
|
||||
// Hopefully it works most of the time.
|
||||
final ServerSocket sock = new ServerSocket(0);
|
||||
final int port = sock.getLocalPort();
|
||||
sock.close();
|
||||
|
||||
final ListenableFuture<StatusResponseHolder> response = client
|
||||
.go(
|
||||
new Request(HttpMethod.GET, new URL(StringUtils.format("https://localhost:%d/", port))),
|
||||
StatusResponseHandler.getInstance()
|
||||
);
|
||||
|
||||
Throwable e = null;
|
||||
try {
|
||||
response.get();
|
||||
}
|
||||
catch (ExecutionException e1) {
|
||||
e = e1.getCause();
|
||||
e1.printStackTrace();
|
||||
}
|
||||
|
||||
Assert.assertTrue("ChannelException thrown by 'get'", isChannelClosedException(e));
|
||||
}
|
||||
finally {
|
||||
lifecycle.stop();
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isChannelClosedException(Throwable e)
|
||||
{
|
||||
return e instanceof ChannelException ||
|
||||
|
|
Loading…
Reference in New Issue