mirror of https://github.com/apache/druid.git
Fix missing exception handling as part of `io.druid.java.util.http.client.netty.HttpClientPipelineFactory` (#6090)
* Fix missing exception handling as part of `io.druid.java.util.http.client.netty.HttpClientPipelineFactory` * 1. Extends SimpleChannelUpstreamHandler; 2. Remove sendUpstream; 3. Using ExpectedException. * Add more checks for channel * Fix missing exception handler in NettyHttpClient and ChannelResourceFactory * Rename the anonymous class of `SimpleChannelUpstreamHandler` as connectionErrorHandler & use `addLast` instead of `addFirst` * Remove `removeHandlers()` * Using expectedException.expect instead of Assert.assertNotNull in testHttpsEchoServer * Using handshakeFuture.setFailure instead of logger * Using handshakeFuture.setFailure instead of logger
This commit is contained in:
parent
064c22c937
commit
bd95b426c9
|
@ -278,18 +278,17 @@ public class NettyHttpClient extends AbstractHttpClient
|
|||
if (response != null) {
|
||||
handler.exceptionCaught(response, event.getCause());
|
||||
}
|
||||
removeHandlers();
|
||||
try {
|
||||
channel.close();
|
||||
if (channel.isOpen()) {
|
||||
channel.close();
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
// ignore
|
||||
log.warn(e, "Error while closing channel");
|
||||
}
|
||||
finally {
|
||||
channelResourceContainer.returnResource();
|
||||
}
|
||||
|
||||
context.sendUpstream(event);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -308,7 +307,6 @@ public class NettyHttpClient extends AbstractHttpClient
|
|||
log.warn("[%s] Channel disconnected before response complete", requestDesc);
|
||||
retVal.setException(new ChannelException("Channel disconnected"));
|
||||
}
|
||||
context.sendUpstream(event);
|
||||
}
|
||||
|
||||
private void removeHandlers()
|
||||
|
|
|
@ -27,8 +27,11 @@ import org.jboss.netty.channel.Channel;
|
|||
import org.jboss.netty.channel.ChannelException;
|
||||
import org.jboss.netty.channel.ChannelFuture;
|
||||
import org.jboss.netty.channel.ChannelFutureListener;
|
||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||
import org.jboss.netty.channel.ChannelPipeline;
|
||||
import org.jboss.netty.channel.Channels;
|
||||
import org.jboss.netty.channel.ExceptionEvent;
|
||||
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
|
||||
import org.jboss.netty.handler.ssl.SslHandler;
|
||||
import org.jboss.netty.util.Timer;
|
||||
|
||||
|
@ -111,6 +114,25 @@ public class ChannelResourceFactory implements ResourceFactory<String, ChannelFu
|
|||
pipeline.addFirst("ssl", sslHandler);
|
||||
|
||||
final ChannelFuture handshakeFuture = Channels.future(connectFuture.getChannel());
|
||||
pipeline.addLast("connectionErrorHandler", new SimpleChannelUpstreamHandler()
|
||||
{
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
|
||||
{
|
||||
final Channel channel = ctx.getChannel();
|
||||
if (channel == null) {
|
||||
// For the case where this pipeline is not attached yet.
|
||||
handshakeFuture.setFailure(new ChannelException(
|
||||
StringUtils.format("Channel is null. The context name is [%s]", ctx.getName())
|
||||
));
|
||||
return;
|
||||
}
|
||||
handshakeFuture.setFailure(e.getCause());
|
||||
if (channel.isOpen()) {
|
||||
channel.close();
|
||||
}
|
||||
}
|
||||
});
|
||||
connectFuture.addListener(
|
||||
new ChannelFutureListener()
|
||||
{
|
||||
|
|
|
@ -31,7 +31,9 @@ import org.joda.time.Duration;
|
|||
import org.junit.AfterClass;
|
||||
import org.junit.Assert;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
import java.io.IOException;
|
||||
|
@ -55,6 +57,9 @@ public class JankyServersTest
|
|||
static ServerSocket echoServerSocket;
|
||||
static ServerSocket closingServerSocket;
|
||||
|
||||
@Rule
|
||||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception
|
||||
{
|
||||
|
@ -309,16 +314,10 @@ public class JankyServersTest
|
|||
new StatusResponseHandler(StandardCharsets.UTF_8)
|
||||
);
|
||||
|
||||
Throwable e = null;
|
||||
try {
|
||||
response.get();
|
||||
}
|
||||
catch (ExecutionException e1) {
|
||||
e = e1.getCause();
|
||||
}
|
||||
expectedException.expect(ExecutionException.class);
|
||||
expectedException.expectMessage("java.lang.IllegalArgumentException: invalid version format: GET");
|
||||
|
||||
Assert.assertTrue("IllegalArgumentException thrown by 'get'", e instanceof IllegalArgumentException);
|
||||
Assert.assertTrue("Expected error message", e.getMessage().matches(".*invalid version format:.*"));
|
||||
response.get();
|
||||
}
|
||||
finally {
|
||||
lifecycle.stop();
|
||||
|
@ -339,15 +338,10 @@ public class JankyServersTest
|
|||
new StatusResponseHandler(StandardCharsets.UTF_8)
|
||||
);
|
||||
|
||||
Throwable e = null;
|
||||
try {
|
||||
response.get();
|
||||
}
|
||||
catch (ExecutionException e1) {
|
||||
e = e1.getCause();
|
||||
}
|
||||
expectedException.expect(ExecutionException.class);
|
||||
expectedException.expectMessage("org.jboss.netty.channel.ChannelException: Faulty channel in resource pool");
|
||||
|
||||
Assert.assertNotNull("ChannelException thrown by 'get'", e);
|
||||
response.get();
|
||||
}
|
||||
finally {
|
||||
lifecycle.stop();
|
||||
|
|
Loading…
Reference in New Issue