diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java index eeff924c1b..4a0a21e706 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/ActiveMQClientLogger.java @@ -404,8 +404,8 @@ public interface ActiveMQClientLogger extends BasicLogger { void invalidProtocol(String validProtocols); @LogMessage(level = Logger.Level.ERROR) - @Message(id = 214023, value = "HTTP Handshake failed, the received accept value %s does not match the expected response %s") - void httpHandshakeFailed(String response, String expectedResponse); + @Message(id = 214023, value = "HTTP Handshake failed, received %s") + void httpHandshakeFailed(Object msg); @LogMessage(level = Logger.Level.ERROR) @Message(id = 214024, value = "HTTP upgrade not supported by remote acceptor") diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java index c317ec919a..f7852925eb 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/NettyConnector.java @@ -70,7 +70,6 @@ import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpClientCodec; -import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpObject; @@ -352,9 +351,19 @@ public class NettyConnector extends AbstractConnector { sslEnabled + ", useNio=" + true + + getHttpUpgradeInfo() + "]"; } + private String getHttpUpgradeInfo() { + if (!httpUpgradeEnabled) { + return ""; + } + String serverName = ConfigurationHelper.getStringProperty(TransportConstants.ACTIVEMQ_SERVER_NAME, null, configuration); + String acceptor = ConfigurationHelper.getStringProperty(TransportConstants.HTTP_UPGRADE_ENDPOINT_PROP_NAME, null, configuration); + return ", activemqServerName=" + serverName + ", httpUpgradeEndpoint=" + acceptor; + } + @Override public synchronized void start() { if (channelClazz != null) { @@ -738,33 +747,32 @@ public class NettyConnector extends AbstractConnector { @Override public void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { + if (logger.isDebugEnabled()) { + logger.debug("Received msg=" + msg); + } try { - if (!(msg instanceof HttpResponse)) { - ActiveMQClientLogger.LOGGER.unexpectedResponseFromHttpServer(msg); - ctx.close(); - return; - } - HttpResponse response = (HttpResponse) msg; - if (response.status().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() && response.headers().get(HttpHeaderNames.UPGRADE).equals(ACTIVEMQ_REMOTING)) { - String accept = response.headers().get(SEC_ACTIVEMQ_REMOTING_ACCEPT); - String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get()); + if (msg instanceof HttpResponse) { + HttpResponse response = (HttpResponse) msg; + if (response.getStatus().code() == HttpResponseStatus.SWITCHING_PROTOCOLS.code() && response.headers().get(HttpHeaders.Names.UPGRADE).equals(ACTIVEMQ_REMOTING)) { + String accept = response.headers().get(SEC_ACTIVEMQ_REMOTING_ACCEPT); + String expectedResponse = createExpectedResponse(MAGIC_NUMBER, ctx.channel().attr(REMOTING_KEY).get()); - if (expectedResponse.equals(accept)) { - // remove the http handlers and flag the activemq channel handler as active - pipeline.remove(httpClientCodec); - pipeline.remove(this); - handshakeComplete = true; - ActiveMQChannelHandler channelHandler = pipeline.get(ActiveMQChannelHandler.class); - channelHandler.active = true; - } else { - ActiveMQClientLogger.LOGGER.httpHandshakeFailed(accept, expectedResponse); - ctx.close(); + if (expectedResponse.equals(accept)) { + // remove the http handlers and flag the activemq channel handler as active + pipeline.remove(httpClientCodec); + pipeline.remove(this); + handshakeComplete = true; + ActiveMQChannelHandler channelHandler = pipeline.get(ActiveMQChannelHandler.class); + channelHandler.active = true; + return; + } } - } else if (response.status().code() == HttpResponseStatus.FORBIDDEN.code()) { - ActiveMQClientLogger.LOGGER.httpUpgradeNotSupportedByRemoteAcceptor(); - ctx.close(); } } finally { + if (!handshakeComplete) { + ActiveMQClientLogger.LOGGER.httpHandshakeFailed(msg); + ctx.close(); + } latch.countDown(); } } @@ -977,6 +985,20 @@ public class NettyConnector extends AbstractConnector { @Override public boolean isEquivalent(Map configuration) { + Boolean httpUpgradeEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_UPGRADE_ENABLED, configuration); + if (httpUpgradeEnabled) { + // we need to look at the activemqServerName to distinguish between ActiveMQ servers that could be proxied behind the same + // HTTP upgrade handler in the Web server + String otherActiveMQServerName = ConfigurationHelper.getStringProperty(TransportConstants.ACTIVEMQ_SERVER_NAME, null, configuration); + String activeMQServerName = ConfigurationHelper.getStringProperty(TransportConstants.ACTIVEMQ_SERVER_NAME, null, this.configuration); + boolean equivalent = isSameHostAndPort(configuration) && otherActiveMQServerName != null && otherActiveMQServerName.equals(activeMQServerName); + return equivalent; + } else { + return isSameHostAndPort(configuration); + } + } + + private boolean isSameHostAndPort(Map configuration) { //here we only check host and port because these two parameters //is sufficient to determine the target host String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration);