This closes #975

This commit is contained in:
Clebert Suconic 2017-01-20 09:54:30 -05:00
commit 5edf940ed0
2 changed files with 14 additions and 3 deletions

View File

@ -433,4 +433,8 @@ public interface ActiveMQClientLogger extends BasicLogger {
format = Message.Format.MESSAGE_FORMAT)
void reconnectCreatingNewSession(long id);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 214029, value = "Unexpected response from HTTP server: %s")
void unexpectedResponseFromHttpServer(Object response);
}

View File

@ -70,6 +70,7 @@ import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObject;
@ -737,9 +738,14 @@ public class NettyConnector extends AbstractConnector {
@Override
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (msg instanceof HttpResponse) {
try {
if (!(msg instanceof HttpResponse)) {
ActiveMQClientLogger.LOGGER.unexpectedResponseFromHttpServer(msg);
ctx.close();
return;
}
HttpResponse response = (HttpResponse) msg;
if (response.getStatus().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() && response.headers().get(HttpHeaders.Names.UPGRADE).equals(ACTIVEMQ_REMOTING)) {
if (response.status().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() && response.headers().get(HttpHeaderNames.UPGRADE).equals(ACTIVEMQ_REMOTING)) {
String accept = response.headers().get(SEC_ACTIVEMQ_REMOTING_ACCEPT);
String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get());
@ -754,10 +760,11 @@ public class NettyConnector extends AbstractConnector {
ActiveMQClientLogger.LOGGER.httpHandshakeFailed(accept, expectedResponse);
ctx.close();
}
} else if (response.getStatus().code() == HttpResponseStatus.FORBIDDEN.code()) {
} else if (response.status().code() == HttpResponseStatus.FORBIDDEN.code()) {
ActiveMQClientLogger.LOGGER.httpUpgradeNotSupportedByRemoteAcceptor();
ctx.close();
}
} finally {
latch.countDown();
}
}