This commit is contained in:
Justin Bertram 2017-02-14 11:45:26 -06:00
commit 8938c26cc0
1 changed files with 36 additions and 21 deletions

View File

@ -80,6 +80,7 @@ import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseDecoder; import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.cookie.ClientCookieEncoder; import io.netty.handler.codec.http.cookie.ClientCookieEncoder;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
import io.netty.util.AttributeKey; import io.netty.util.AttributeKey;
@ -745,12 +746,21 @@ public class NettyConnector extends AbstractConnector {
this.httpClientCodec = httpClientCodec; this.httpClientCodec = httpClientCodec;
} }
/**
* HTTP upgrade response will be decode by Netty as 2 objects:
* - 1 HttpObject corresponding to the 101 SWITCHING PROTOCOL headers
* - 1 EMPTY_LAST_CONTENT
*
* The HTTP upgrade is successful whne the 101 SWITCHING PROTOCOL has been received (handshakeComplete = true)
* but the latch is count down only when the following EMPTY_LAST_CONTENT is also received.
* Otherwise this ChannelHandler would be removed too soon and the ActiveMQChannelHandler would handle the
* EMPTY_LAST_CONTENT (while it is expecitng only ByteBuf).
*/
@Override @Override
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Received msg=" + msg); logger.debug("Received msg=" + msg);
} }
try {
if (msg instanceof HttpResponse) { if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg; HttpResponse response = (HttpResponse) msg;
if (response.getStatus().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() && response.headers().get(HttpHeaders.Names.UPGRADE).equals(ACTIVEMQ_REMOTING)) { if (response.getStatus().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() && response.headers().get(HttpHeaders.Names.UPGRADE).equals(ACTIVEMQ_REMOTING)) {
@ -758,24 +768,29 @@ public class NettyConnector extends AbstractConnector {
String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get()); String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get());
if (expectedResponse.equals(accept)) { if (expectedResponse.equals(accept)) {
// HTTP upgrade is successful but let's wait to receive the EMPTY_LAST_CONTENT to count down the latch
handshakeComplete = true;
} else {
// HTTP upgrade failed
ActiveMQClientLogger.LOGGER.httpHandshakeFailed(msg);
ctx.close();
latch.countDown();
}
return;
}
} else if (msg == LastHttpContent.EMPTY_LAST_CONTENT && handshakeComplete) {
// remove the http handlers and flag the activemq channel handler as active // remove the http handlers and flag the activemq channel handler as active
pipeline.remove(httpClientCodec); pipeline.remove(httpClientCodec);
pipeline.remove(this); pipeline.remove(this);
handshakeComplete = true;
ActiveMQChannelHandler channelHandler = pipeline.get(ActiveMQChannelHandler.class); ActiveMQChannelHandler channelHandler = pipeline.get(ActiveMQChannelHandler.class);
channelHandler.active = true; channelHandler.active = true;
return;
} }
}
}
} finally {
if (!handshakeComplete) { if (!handshakeComplete) {
ActiveMQClientLogger.LOGGER.httpHandshakeFailed(msg); ActiveMQClientLogger.LOGGER.httpHandshakeFailed(msg);
ctx.close(); ctx.close();
} }
latch.countDown(); latch.countDown();
} }
}
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {