Fix HTTP Upgrade Connection
* Fix isEquivalent() method to take into account the activemqServerName
property when httpUpgradeEnabled is true. Two ActiveMQ server hosted on
the same app server may have the same host and port (corresponding to
the Web server HTTP port). The activemqServerName property is used to
distinguish them.
* Iron out HTTP upgrade handler so that the latch is always count down
and the channel context is closed unless the handshake was completed
successfully
JIRA: https://issues.apache.org/jira/browse/ARTEMIS-931
(cherry picked from dd052026e6
)
This commit is contained in:
parent
83b00d6a8e
commit
88680fe1e8
|
@ -404,8 +404,8 @@ public interface ActiveMQClientLogger extends BasicLogger {
|
|||
void invalidProtocol(String validProtocols);
|
||||
|
||||
@LogMessage(level = Logger.Level.ERROR)
|
||||
@Message(id = 214023, value = "HTTP Handshake failed, the received accept value %s does not match the expected response %s")
|
||||
void httpHandshakeFailed(String response, String expectedResponse);
|
||||
@Message(id = 214023, value = "HTTP Handshake failed, received %s")
|
||||
void httpHandshakeFailed(Object msg);
|
||||
|
||||
@LogMessage(level = Logger.Level.ERROR)
|
||||
@Message(id = 214024, value = "HTTP upgrade not supported by remote acceptor")
|
||||
|
|
|
@ -70,7 +70,6 @@ import io.netty.handler.codec.http.DefaultHttpRequest;
|
|||
import io.netty.handler.codec.http.FullHttpRequest;
|
||||
import io.netty.handler.codec.http.FullHttpResponse;
|
||||
import io.netty.handler.codec.http.HttpClientCodec;
|
||||
import io.netty.handler.codec.http.HttpHeaderNames;
|
||||
import io.netty.handler.codec.http.HttpHeaders;
|
||||
import io.netty.handler.codec.http.HttpMethod;
|
||||
import io.netty.handler.codec.http.HttpObject;
|
||||
|
@ -352,9 +351,19 @@ public class NettyConnector extends AbstractConnector {
|
|||
sslEnabled +
|
||||
", useNio=" +
|
||||
true +
|
||||
getHttpUpgradeInfo() +
|
||||
"]";
|
||||
}
|
||||
|
||||
private String getHttpUpgradeInfo() {
|
||||
if (!httpUpgradeEnabled) {
|
||||
return "";
|
||||
}
|
||||
String serverName = ConfigurationHelper.getStringProperty(TransportConstants.ACTIVEMQ_SERVER_NAME, null, configuration);
|
||||
String acceptor = ConfigurationHelper.getStringProperty(TransportConstants.HTTP_UPGRADE_ENDPOINT_PROP_NAME, null, configuration);
|
||||
return ", activemqServerName=" + serverName + ", httpUpgradeEndpoint=" + acceptor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
if (channelClazz != null) {
|
||||
|
@ -738,33 +747,32 @@ public class NettyConnector extends AbstractConnector {
|
|||
|
||||
@Override
|
||||
public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("Received msg=" + msg);
|
||||
}
|
||||
try {
|
||||
if (!(msg instanceof HttpResponse)) {
|
||||
ActiveMQClientLogger.LOGGER.unexpectedResponseFromHttpServer(msg);
|
||||
ctx.close();
|
||||
return;
|
||||
}
|
||||
HttpResponse response = (HttpResponse) msg;
|
||||
if (response.status().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() && response.headers().get(HttpHeaderNames.UPGRADE).equals(ACTIVEMQ_REMOTING)) {
|
||||
String accept = response.headers().get(SEC_ACTIVEMQ_REMOTING_ACCEPT);
|
||||
String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get());
|
||||
if (msg instanceof HttpResponse) {
|
||||
HttpResponse response = (HttpResponse) msg;
|
||||
if (response.getStatus().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() && response.headers().get(HttpHeaders.Names.UPGRADE).equals(ACTIVEMQ_REMOTING)) {
|
||||
String accept = response.headers().get(SEC_ACTIVEMQ_REMOTING_ACCEPT);
|
||||
String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get());
|
||||
|
||||
if (expectedResponse.equals(accept)) {
|
||||
// remove the http handlers and flag the activemq channel handler as active
|
||||
pipeline.remove(httpClientCodec);
|
||||
pipeline.remove(this);
|
||||
handshakeComplete = true;
|
||||
ActiveMQChannelHandler channelHandler = pipeline.get(ActiveMQChannelHandler.class);
|
||||
channelHandler.active = true;
|
||||
} else {
|
||||
ActiveMQClientLogger.LOGGER.httpHandshakeFailed(accept, expectedResponse);
|
||||
ctx.close();
|
||||
if (expectedResponse.equals(accept)) {
|
||||
// remove the http handlers and flag the activemq channel handler as active
|
||||
pipeline.remove(httpClientCodec);
|
||||
pipeline.remove(this);
|
||||
handshakeComplete = true;
|
||||
ActiveMQChannelHandler channelHandler = pipeline.get(ActiveMQChannelHandler.class);
|
||||
channelHandler.active = true;
|
||||
return;
|
||||
}
|
||||
}
|
||||
} else if (response.status().code() == HttpResponseStatus.FORBIDDEN.code()) {
|
||||
ActiveMQClientLogger.LOGGER.httpUpgradeNotSupportedByRemoteAcceptor();
|
||||
ctx.close();
|
||||
}
|
||||
} finally {
|
||||
if (!handshakeComplete) {
|
||||
ActiveMQClientLogger.LOGGER.httpHandshakeFailed(msg);
|
||||
ctx.close();
|
||||
}
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
@ -977,6 +985,20 @@ public class NettyConnector extends AbstractConnector {
|
|||
|
||||
@Override
|
||||
public boolean isEquivalent(Map<String, Object> configuration) {
|
||||
Boolean httpUpgradeEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_UPGRADE_ENABLED, configuration);
|
||||
if (httpUpgradeEnabled) {
|
||||
// we need to look at the activemqServerName to distinguish between ActiveMQ servers that could be proxied behind the same
|
||||
// HTTP upgrade handler in the Web server
|
||||
String otherActiveMQServerName = ConfigurationHelper.getStringProperty(TransportConstants.ACTIVEMQ_SERVER_NAME, null, configuration);
|
||||
String activeMQServerName = ConfigurationHelper.getStringProperty(TransportConstants.ACTIVEMQ_SERVER_NAME, null, this.configuration);
|
||||
boolean equivalent = isSameHostAndPort(configuration) && otherActiveMQServerName != null && otherActiveMQServerName.equals(activeMQServerName);
|
||||
return equivalent;
|
||||
} else {
|
||||
return isSameHostAndPort(configuration);
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isSameHostAndPort(Map<String, Object> configuration) {
|
||||
//here we only check host and port because these two parameters
|
||||
//is sufficient to determine the target host
|
||||
String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration);
|
||||
|
|
Loading…
Reference in New Issue