This closes #991

This commit is contained in:
Martyn Taylor 2017-02-07 11:05:52 +00:00
commit 78a8a23523
2 changed files with 47 additions and 25 deletions

View File

@ -404,8 +404,8 @@ public interface ActiveMQClientLogger extends BasicLogger {
void invalidProtocol(String validProtocols);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 214023, value = "HTTP Handshake failed, the received accept value %s does not match the expected response %s")
void httpHandshakeFailed(String response, String expectedResponse);
@Message(id = 214023, value = "HTTP Handshake failed, received %s")
void httpHandshakeFailed(Object msg);
@LogMessage(level = Logger.Level.ERROR)
@Message(id = 214024, value = "HTTP upgrade not supported by remote acceptor")

View File

@ -70,7 +70,6 @@ import io.netty.handler.codec.http.DefaultHttpRequest;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpObject;
@ -352,9 +351,19 @@ public class NettyConnector extends AbstractConnector {
sslEnabled +
", useNio=" +
true +
getHttpUpgradeInfo() +
"]";
}
private String getHttpUpgradeInfo() {
if (!httpUpgradeEnabled) {
return "";
}
String serverName = ConfigurationHelper.getStringProperty(TransportConstants.ACTIVEMQ_SERVER_NAME, null, configuration);
String acceptor = ConfigurationHelper.getStringProperty(TransportConstants.HTTP_UPGRADE_ENDPOINT_PROP_NAME, null, configuration);
return ", activemqServerName=" + serverName + ", httpUpgradeEndpoint=" + acceptor;
}
@Override
public synchronized void start() {
if (channelClazz != null) {
@ -738,33 +747,32 @@ public class NettyConnector extends AbstractConnector {
@Override
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("Received msg=" + msg);
}
try {
if (!(msg instanceof HttpResponse)) {
ActiveMQClientLogger.LOGGER.unexpectedResponseFromHttpServer(msg);
ctx.close();
return;
}
HttpResponse response = (HttpResponse) msg;
if (response.status().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() && response.headers().get(HttpHeaderNames.UPGRADE).equals(ACTIVEMQ_REMOTING)) {
String accept = response.headers().get(SEC_ACTIVEMQ_REMOTING_ACCEPT);
String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get());
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;
if (response.getStatus().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() && response.headers().get(HttpHeaders.Names.UPGRADE).equals(ACTIVEMQ_REMOTING)) {
String accept = response.headers().get(SEC_ACTIVEMQ_REMOTING_ACCEPT);
String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get());
if (expectedResponse.equals(accept)) {
// remove the http handlers and flag the activemq channel handler as active
pipeline.remove(httpClientCodec);
pipeline.remove(this);
handshakeComplete = true;
ActiveMQChannelHandler channelHandler = pipeline.get(ActiveMQChannelHandler.class);
channelHandler.active = true;
} else {
ActiveMQClientLogger.LOGGER.httpHandshakeFailed(accept, expectedResponse);
ctx.close();
if (expectedResponse.equals(accept)) {
// remove the http handlers and flag the activemq channel handler as active
pipeline.remove(httpClientCodec);
pipeline.remove(this);
handshakeComplete = true;
ActiveMQChannelHandler channelHandler = pipeline.get(ActiveMQChannelHandler.class);
channelHandler.active = true;
return;
}
}
} else if (response.status().code() == HttpResponseStatus.FORBIDDEN.code()) {
ActiveMQClientLogger.LOGGER.httpUpgradeNotSupportedByRemoteAcceptor();
ctx.close();
}
} finally {
if (!handshakeComplete) {
ActiveMQClientLogger.LOGGER.httpHandshakeFailed(msg);
ctx.close();
}
latch.countDown();
}
}
@ -977,6 +985,20 @@ public class NettyConnector extends AbstractConnector {
@Override
public boolean isEquivalent(Map<String, Object> configuration) {
Boolean httpUpgradeEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_UPGRADE_ENABLED, configuration);
if (httpUpgradeEnabled) {
// we need to look at the activemqServerName to distinguish between ActiveMQ servers that could be proxied behind the same
// HTTP upgrade handler in the Web server
String otherActiveMQServerName = ConfigurationHelper.getStringProperty(TransportConstants.ACTIVEMQ_SERVER_NAME, null, configuration);
String activeMQServerName = ConfigurationHelper.getStringProperty(TransportConstants.ACTIVEMQ_SERVER_NAME, null, this.configuration);
boolean equivalent = isSameHostAndPort(configuration) && otherActiveMQServerName != null && otherActiveMQServerName.equals(activeMQServerName);
return equivalent;
} else {
return isSameHostAndPort(configuration);
}
}
private boolean isSameHostAndPort(Map<String, Object> configuration) {
//here we only check host and port because these two parameters
//is sufficient to determine the target host
String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration);