From 992dc2bc1b613dcb788ae2b4d59dc098e3325610 Mon Sep 17 00:00:00 2001 From: Jeff Mesnil Date: Tue, 14 Feb 2017 15:25:39 +0100 Subject: [PATCH] =?UTF-8?q?[ARTEMIS-963]=C2=A0Prevent=20ClassCastException?= =?UTF-8?q?=20in=20ActiveMQChannelHandler?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When HTTP Upgrade is enabled, update Netty's pipeline only after the HTTP Upgrade handshake is successful *and* the trailing EMPTY_LAST_CONTENT is received. Otherwise, this EMPTY_LAST_CONTENT is handled by ActiveMQChannelHandler which is only expected to handle ByteBuf JIRA: https://issues.apache.org/jira/browse/ARTEMIS-963 --- .../remoting/impl/netty/NettyConnector.java | 57 ++++++++++++------- 1 file changed, 36 insertions(+), 21 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index f7852925eb..38fb326f3d 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -80,6 +80,7 @@ import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseDecoder; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.codec.http.LastHttpContent; import io.netty.handler.codec.http.cookie.ClientCookieEncoder; import io.netty.handler.ssl.SslHandler; import io.netty.util.AttributeKey; @@ -745,36 +746,50 @@ public class NettyConnector extends AbstractConnector { this.httpClientCodec = httpClientCodec; } + /** + * HTTP upgrade response will be decode by Netty as 2 objects: + * - 1 HttpObject corresponding to the 101 SWITCHING PROTOCOL headers + * - 1 EMPTY_LAST_CONTENT + * + * The HTTP upgrade is successful whne the 101 SWITCHING PROTOCOL has been received (handshakeComplete = true) + * but the latch is count down only when the following EMPTY_LAST_CONTENT is also received. + * Otherwise this ChannelHandler would be removed too soon and the ActiveMQChannelHandler would handle the + * EMPTY_LAST_CONTENT (while it is expecitng only ByteBuf). + */ @Override public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { if (logger.isDebugEnabled()) { logger.debug("Received msg=" + msg); } - try { - if (msg instanceof HttpResponse) { - HttpResponse response = (HttpResponse) msg; - if (response.getStatus().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() && response.headers().get(HttpHeaders.Names.UPGRADE).equals(ACTIVEMQ_REMOTING)) { - String accept = response.headers().get(SEC_ACTIVEMQ_REMOTING_ACCEPT); - String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get()); + if (msg instanceof HttpResponse) { + HttpResponse response = (HttpResponse) msg; + if (response.getStatus().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() && response.headers().get(HttpHeaders.Names.UPGRADE).equals(ACTIVEMQ_REMOTING)) { + String accept = response.headers().get(SEC_ACTIVEMQ_REMOTING_ACCEPT); + String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get()); - if (expectedResponse.equals(accept)) { - // remove the http handlers and flag the activemq channel handler as active - pipeline.remove(httpClientCodec); - pipeline.remove(this); - handshakeComplete = true; - ActiveMQChannelHandler channelHandler = pipeline.get(ActiveMQChannelHandler.class); - channelHandler.active = true; - return; - } + if (expectedResponse.equals(accept)) { + // HTTP upgrade is successful but let's wait to receive the EMPTY_LAST_CONTENT to count down the latch + handshakeComplete = true; + } else { + // HTTP upgrade failed + ActiveMQClientLogger.LOGGER.httpHandshakeFailed(msg); + ctx.close(); + latch.countDown(); } + return; } - } finally { - if (!handshakeComplete) { - ActiveMQClientLogger.LOGGER.httpHandshakeFailed(msg); - ctx.close(); - } - latch.countDown(); + } else if (msg == LastHttpContent.EMPTY_LAST_CONTENT && handshakeComplete) { + // remove the http handlers and flag the activemq channel handler as active + pipeline.remove(httpClientCodec); + pipeline.remove(this); + ActiveMQChannelHandler channelHandler = pipeline.get(ActiveMQChannelHandler.class); + channelHandler.active = true; } + if (!handshakeComplete) { + ActiveMQClientLogger.LOGGER.httpHandshakeFailed(msg); + ctx.close(); + } + latch.countDown(); } @Override