[ARTEMIS-963] Prevent ClassCastException in ActiveMQChannelHandler

When HTTP Upgrade is enabled, update Netty's pipeline only after the
HTTP Upgrade handshake is successful *and* the trailing
EMPTY_LAST_CONTENT is received.
Otherwise, this EMPTY_LAST_CONTENT is handled by
ActiveMQChannelHandler which is only expected to handle ByteBuf

JIRA: https://issues.apache.org/jira/browse/ARTEMIS-963
This commit is contained in:
Jeff Mesnil 2017-02-14 15:25:39 +01:00 committed by Justin Bertram
parent db2c711e24
commit 992dc2bc1b
1 changed files with 36 additions and 21 deletions

View File

@ -80,6 +80,7 @@ import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.cookie.ClientCookieEncoder;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.AttributeKey;
@ -745,36 +746,50 @@ public class NettyConnector extends AbstractConnector {
this.httpClientCodec = httpClientCodec;
}
/**
* HTTP upgrade response will be decode by Netty as 2 objects:
* - 1 HttpObject corresponding to the 101 SWITCHING PROTOCOL headers
* - 1 EMPTY_LAST_CONTENT
*
* The HTTP upgrade is successful whne the 101 SWITCHING PROTOCOL has been received (handshakeComplete = true)
* but the latch is count down only when the following EMPTY_LAST_CONTENT is also received.
* Otherwise this ChannelHandler would be removed too soon and the ActiveMQChannelHandler would handle the
* EMPTY_LAST_CONTENT (while it is expecitng only ByteBuf).
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("Received msg=" + msg);
}
try {
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;
if (response.getStatus().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() && response.headers().get(HttpHeaders.Names.UPGRADE).equals(ACTIVEMQ_REMOTING)) {
String accept = response.headers().get(SEC_ACTIVEMQ_REMOTING_ACCEPT);
String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get());
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;
if (response.getStatus().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() && response.headers().get(HttpHeaders.Names.UPGRADE).equals(ACTIVEMQ_REMOTING)) {
String accept = response.headers().get(SEC_ACTIVEMQ_REMOTING_ACCEPT);
String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get());
if (expectedResponse.equals(accept)) {
// remove the http handlers and flag the activemq channel handler as active
pipeline.remove(httpClientCodec);
pipeline.remove(this);
handshakeComplete = true;
ActiveMQChannelHandler channelHandler = pipeline.get(ActiveMQChannelHandler.class);
channelHandler.active = true;
return;
}
if (expectedResponse.equals(accept)) {
// HTTP upgrade is successful but let's wait to receive the EMPTY_LAST_CONTENT to count down the latch
handshakeComplete = true;
} else {
// HTTP upgrade failed
ActiveMQClientLogger.LOGGER.httpHandshakeFailed(msg);
ctx.close();
latch.countDown();
}
return;
}
} finally {
if (!handshakeComplete) {
ActiveMQClientLogger.LOGGER.httpHandshakeFailed(msg);
ctx.close();
}
latch.countDown();
} else if (msg == LastHttpContent.EMPTY_LAST_CONTENT && handshakeComplete) {
// remove the http handlers and flag the activemq channel handler as active
pipeline.remove(httpClientCodec);
pipeline.remove(this);
ActiveMQChannelHandler channelHandler = pipeline.get(ActiveMQChannelHandler.class);
channelHandler.active = true;
}
if (!handshakeComplete) {
ActiveMQClientLogger.LOGGER.httpHandshakeFailed(msg);
ctx.close();
}
latch.countDown();
}
@Override