diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index f7852925eb..38fb326f3d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -80,6 +80,7 @@ import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseDecoder; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.cookie.ClientCookieEncoder; import io.netty.handler.ssl.SslHandler; import io.netty.util.AttributeKey; @@ -745,36 +746,50 @@ public class NettyConnector extends AbstractConnector { this.httpClientCodec = httpClientCodec; } + /** + * HTTP upgrade response will be decode by Netty as 2 objects: + * - 1 HttpObject corresponding to the 101 SWITCHING PROTOCOL headers + * - 1 EMPTY_LAST_CONTENT + * + * The HTTP upgrade is successful whne the 101 SWITCHING PROTOCOL has been received (handshakeComplete = true) + * but the latch is count down only when the following EMPTY_LAST_CONTENT is also received. + * Otherwise this ChannelHandler would be removed too soon and the ActiveMQChannelHandler would handle the + * EMPTY_LAST_CONTENT (while it is expecitng only ByteBuf). + */ @Override public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { if (logger.isDebugEnabled()) { logger.debug("Received msg=" + msg); } - try { - if (msg instanceof HttpResponse) { - HttpResponse response = (HttpResponse) msg; - if (response.getStatus().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() && response.headers().get(HttpHeaders.Names.UPGRADE).equals(ACTIVEMQ_REMOTING)) { - String accept = response.headers().get(SEC_ACTIVEMQ_REMOTING_ACCEPT); - String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get()); + if (msg instanceof HttpResponse) { + HttpResponse response = (HttpResponse) msg; + if (response.getStatus().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() && response.headers().get(HttpHeaders.Names.UPGRADE).equals(ACTIVEMQ_REMOTING)) { + String accept = response.headers().get(SEC_ACTIVEMQ_REMOTING_ACCEPT); + String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get()); - if (expectedResponse.equals(accept)) { - // remove the http handlers and flag the activemq channel handler as active - pipeline.remove(httpClientCodec); - pipeline.remove(this); - handshakeComplete = true; - ActiveMQChannelHandler channelHandler = pipeline.get(ActiveMQChannelHandler.class); - channelHandler.active = true; - return; - } + if (expectedResponse.equals(accept)) { + // HTTP upgrade is successful but let's wait to receive the EMPTY_LAST_CONTENT to count down the latch + handshakeComplete = true; + } else { + // HTTP upgrade failed + ActiveMQClientLogger.LOGGER.httpHandshakeFailed(msg); + ctx.close(); + latch.countDown(); } + return; } - } finally { - if (!handshakeComplete) { - ActiveMQClientLogger.LOGGER.httpHandshakeFailed(msg); - ctx.close(); - } - latch.countDown(); + } else if (msg == LastHttpContent.EMPTY_LAST_CONTENT && handshakeComplete) { + // remove the http handlers and flag the activemq channel handler as active + pipeline.remove(httpClientCodec); + pipeline.remove(this); + ActiveMQChannelHandler channelHandler = pipeline.get(ActiveMQChannelHandler.class); + channelHandler.active = true; } + if (!handshakeComplete) { + ActiveMQClientLogger.LOGGER.httpHandshakeFailed(msg); + ctx.close(); + } + latch.countDown(); } @Override