[ARTEMIS-803] Fix colocated backups with http-upgrade acceptor

*  Do not offset ports for Netty connector/acceptor with http-upgrade
enabled.
* Pass the name of the ActiveMQ server to the HTTP request to initiate
the Upgrade so that the HTTP endpoint on the app server can find the
correct ActiveMQ broker that must handle the upgrade.

JIRA: https://issues.apache.org/jira/browse/ARTEMIS-803
This commit is contained in:
Jeff Mesnil 2016-10-17 11:42:57 +02:00 committed by Clebert Suconic
parent a80bd4b8b3
commit c1ecddcb71
3 changed files with 27 additions and 6 deletions

View File

@ -653,6 +653,10 @@ public class NettyConnector extends AbstractConnector {
request.headers().set(HttpHeaders.Names.HOST, host);
request.headers().set(HttpHeaders.Names.UPGRADE, ACTIVEMQ_REMOTING);
request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.UPGRADE);
final String serverName = ConfigurationHelper.getStringProperty(TransportConstants.ACTIVEMQ_SERVER_NAME, null, configuration);
if (serverName != null) {
request.headers().set(TransportConstants.ACTIVEMQ_SERVER_NAME, serverName);
}
final String endpoint = ConfigurationHelper.getStringProperty(TransportConstants.HTTP_UPGRADE_ENDPOINT_PROP_NAME, null, configuration);
if (endpoint != null) {

View File

@ -53,6 +53,8 @@ public class TransportConstants {
public static final String USE_INVM_PROP_NAME = "useInvm";
public static final String ACTIVEMQ_SERVER_NAME = "activemqServerName";
/**
* @deprecated use PROTOCOLS_PROP_NAME
*/
@ -252,6 +254,7 @@ public class TransportConstants {
ALLOWABLE_ACCEPTOR_KEYS = Collections.unmodifiableSet(allowableAcceptorKeys);
Set<String> allowableConnectorKeys = new HashSet<>();
allowableConnectorKeys.add(TransportConstants.ACTIVEMQ_SERVER_NAME);
allowableConnectorKeys.add(TransportConstants.SSL_ENABLED_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.HTTP_ENABLED_PROP_NAME);
allowableConnectorKeys.add(TransportConstants.HTTP_CLIENT_IDLE_PROP_NAME);

View File

@ -26,11 +26,13 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActivationParams;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.cluster.ClusterControl;
import org.apache.activemq.artemis.core.server.cluster.ClusterController;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
public class ColocatedHAManager implements HAManager {
@ -268,19 +270,31 @@ public class ColocatedHAManager implements HAManager {
}
}
/**
* Offset the port for Netty connector/acceptor (unless HTTP upgrade is enabled) and the server ID for invm connector/acceptor.
*
* The port is not offset for Netty connector/acceptor when HTTP upgrade is enabled. In this case, the app server that
* embed ActiveMQ is "owning" the port and is charge to delegate the HTTP ugprade to the correct broker (that can be
* the main one or any colocated backup hosted on the main broker). Delegation to the correct broker is done by looking at the
* {@link TransportConstants#ACTIVEMQ_SERVER_NAME} property [ARTEMIS-803]
*/
private static void updatebackupParams(String name, int portOffset, Map<String, Object> params) {
if (params != null) {
Object port = params.get("port");
Object port = params.get(TransportConstants.PORT_PROP_NAME);
if (port != null) {
Integer integer = Integer.valueOf(port.toString());
integer += portOffset;
params.put("port", integer.toString());
boolean httpUpgradeEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.HTTP_UPGRADE_ENABLED_PROP_NAME, TransportConstants.DEFAULT_HTTP_UPGRADE_ENABLED, params);
if (!httpUpgradeEnabled) {
Integer integer = Integer.valueOf(port.toString());
integer += portOffset;
params.put(TransportConstants.PORT_PROP_NAME, integer.toString());
}
}
Object serverId = params.get("serverId");
Object serverId = params.get(org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME);
if (serverId != null) {
Integer newid = Integer.parseInt(serverId.toString()) + portOffset;
params.put("serverId", newid.toString());
params.put(org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME, newid.toString());
}
params.put(TransportConstants.ACTIVEMQ_SERVER_NAME, name);
}
}
}