diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 2f5fa016f8..afaa281731 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -211,12 +211,13 @@ public class AMQPSessionCallback implements SessionCallback { true, //boolean xa, (String) null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain()); } else { + final String validatedUser = manager.getServer().validateUser(user, passcode, protonSPI.getProtonConnectionDelegate(), manager.getSecurityDomain()); serverSession = manager.getServer().createSession(name, user, passcode, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection, false, // boolean autoCommitSends false, // boolean autoCommitAcks, false, // boolean preAcknowledge, true, //boolean xa, - (String) null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain()); + (String) null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain(), validatedUser); } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java index bab287f045..31752a88fb 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPRedirectHandler.java @@ -41,10 +41,17 @@ public class AMQPRedirectHandler extends RedirectHandler { } @Override - protected void cannotRedirect(AMQPRedirectContext context) throws Exception { + protected void cannotRedirect(AMQPRedirectContext context) { ErrorCondition error = new ErrorCondition(); error.setCondition(ConnectionError.CONNECTION_FORCED); - error.setDescription(String.format("Broker balancer %s is not ready to redirect", context.getConnection().getTransportConnection().getRedirectTo())); + switch (context.getResult().status) { + case REFUSED_USE_ANOTHER: + error.setDescription(String.format("Broker balancer %s, rejected this connection", context.getConnection().getTransportConnection().getRedirectTo())); + break; + case REFUSED_UNAVAILABLE: + error.setDescription(String.format("Broker balancer %s is not ready to redirect", context.getConnection().getTransportConnection().getRedirectTo())); + break; + } Connection protonConnection = context.getProtonConnection(); protonConnection.setCondition(error); @@ -52,7 +59,7 @@ public class AMQPRedirectHandler extends RedirectHandler { } @Override - protected void redirectTo(AMQPRedirectContext context) throws Exception { + protected void redirectTo(AMQPRedirectContext context) { String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, context.getTarget().getConnector().getParams()); int port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, context.getTarget().getConnector().getParams()); diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java index fbc6d8aaa3..66cc1c9436 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java @@ -59,6 +59,7 @@ public class MQTTConnection implements RemotingConnection { this.creationTime = System.currentTimeMillis(); this.dataReceived = new AtomicBoolean(); this.destroyed = false; + transportConnection.setProtocolConnection(this); } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java index ab03c2916b..8bf4d5e5c2 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnectionManager.java @@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.handler.codec.mqtt.MqttConnectReturnCode; import io.netty.handler.codec.mqtt.MqttProperties; -import io.netty.util.CharsetUtil; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ServerSession; @@ -61,13 +60,13 @@ public class MQTTConnectionManager { */ void connect(String cId, String username, - byte[] passwordInBytes, + String password, boolean will, byte[] willMessage, String willTopic, boolean willRetain, int willQosLevel, - boolean cleanSession) throws Exception { + boolean cleanSession, String validatedUser) throws Exception { String clientId = validateClientId(cId, cleanSession); if (clientId == null) { session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED); @@ -79,11 +78,10 @@ public class MQTTConnectionManager { MQTTSessionState sessionState = getSessionState(clientId); synchronized (sessionState) { session.setSessionState(sessionState); - String password = passwordInBytes == null ? null : new String(passwordInBytes, CharsetUtil.UTF_8); session.getConnection().setClientID(clientId); - ServerSessionImpl serverSession = createServerSession(username, password); + ServerSessionImpl serverSession = createServerSession(username, password, validatedUser); serverSession.start(); - ServerSessionImpl internalServerSession = createServerSession(username, password); + ServerSessionImpl internalServerSession = createServerSession(username, password, validatedUser); internalServerSession.disableSecurity(); internalServerSession.start(); session.setServerSession(serverSession, internalServerSession); @@ -120,10 +118,9 @@ public class MQTTConnectionManager { * @return * @throws Exception */ - ServerSessionImpl createServerSession(String username, String password) throws Exception { + ServerSessionImpl createServerSession(String username, String password, String validatedUser) throws Exception { String id = UUIDGenerator.getInstance().generateStringUUID(); ActiveMQServer server = session.getServer(); - ServerSession serverSession = server.createSession(id, username, password, @@ -138,7 +135,7 @@ public class MQTTConnectionManager { MQTTUtil.SESSION_AUTO_CREATE_QUEUE, server.newOperationContext(), session.getProtocolManager().getPrefixes(), - session.getProtocolManager().getSecurityDomain()); + session.getProtocolManager().getSecurityDomain(), validatedUser); return (ServerSessionImpl) serverSession; } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java index f33c5decec..e7f343d4ab 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java @@ -38,6 +38,7 @@ import io.netty.handler.codec.mqtt.MqttSubAckPayload; import io.netty.handler.codec.mqtt.MqttSubscribeMessage; import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage; +import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.logs.AuditLogger; @@ -177,12 +178,15 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter { * @param connect */ void handleConnect(MqttConnectMessage connect) throws Exception { + final String username = connect.payload().userName(); + final String password = connect.payload().passwordInBytes() == null ? null : new String( connect.payload().passwordInBytes(), CharsetUtil.UTF_8); + final String validatedUser = server.validateUser(username, password, session.getConnection(), session.getProtocolManager().getSecurityDomain()); if (connection.getTransportConnection().getRedirectTo() == null || !protocolManager.getRedirectHandler().redirect(connection, session, connect)) { connectionEntry.ttl = connect.variableHeader().keepAliveTimeSeconds() * 1500L; String clientId = connect.payload().clientIdentifier(); - session.getConnectionManager().connect(clientId, connect.payload().userName(), connect.payload().passwordInBytes(), connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession()); + session.getConnectionManager().connect(clientId, username, password, connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession(), validatedUser); } } diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRedirectHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRedirectHandler.java index 3b372032c3..15dcc4aacb 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRedirectHandler.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRedirectHandler.java @@ -36,13 +36,20 @@ public class MQTTRedirectHandler extends RedirectHandler { } @Override - protected void cannotRedirect(MQTTRedirectContext context) throws Exception { - context.getMQTTSession().getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE); + protected void cannotRedirect(MQTTRedirectContext context) { + switch (context.getResult().status) { + case REFUSED_USE_ANOTHER: + context.getMQTTSession().getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_USE_ANOTHER_SERVER); + break; + case REFUSED_UNAVAILABLE: + context.getMQTTSession().getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE); + break; + } context.getMQTTSession().getProtocolHandler().disconnect(true); } @Override - protected void redirectTo(MQTTRedirectContext context) throws Exception { + protected void redirectTo(MQTTRedirectContext context) { String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, context.getTarget().getConnector().getParams()); int port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, context.getTarget().getConnector().getParams()); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 9e8dee7eb5..c0c2a2ffa7 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -197,6 +197,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private final AtomicBoolean disableTtl = new AtomicBoolean(false); + private String validatedUser = null; + public OpenWireConnection(Connection connection, ActiveMQServer server, OpenWireProtocolManager openWireProtocolManager, @@ -210,6 +212,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se this.outWireFormat = wf.copy(); this.useKeepAlive = openWireProtocolManager.isUseKeepAlive(); this.maxInactivityDuration = openWireProtocolManager.getMaxInactivityDuration(); + this.transportConnection.setProtocolConnection(this); } // SecurityAuth implementation @@ -768,7 +771,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } private void createInternalSession(ConnectionInfo info) throws Exception { - internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes(), protocolManager.getSecurityDomain()); + internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes(), protocolManager.getSecurityDomain(), validatedUser); } //raise the refCount of context @@ -1010,6 +1013,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se return protocolManager.isSupportAdvisory(); } + public String getValidatedUser() { + return validatedUser; + } + + public void setValidatedUser(String validatedUser) { + this.validatedUser = validatedUser; + } + class SlowConsumerDetection implements SlowConsumerDetectionListener { @Override @@ -1126,12 +1137,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se @Override public Response processAddConnection(ConnectionInfo info) throws Exception { try { - if (transportConnection.getRedirectTo() != null && protocolManager.getRedirectHandler() - .redirect(OpenWireConnection.this, info)) { - shutdown(true); - return null; + protocolManager.validateUser(OpenWireConnection.this, info); + if (transportConnection.getRedirectTo() != null) { + if (protocolManager.getRedirectHandler().redirect(OpenWireConnection.this, info)) { + shutdown(true); + return null; + } } - protocolManager.addConnection(OpenWireConnection.this, info); } catch (Exception e) { Response resp = new ExceptionResponse(e); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java index 0ce20939e0..8433c9c838 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java @@ -357,19 +357,21 @@ public class OpenWireProtocolManager extends AbstractProtocolManager param : connector.getParams().entrySet()) { paramsData.put(new CompositeDataSupport(getParameterType(), new String[]{"key", "value"}, - new Object[]{param.getKey(), param == null ? param : param.getValue().toString()})); + new Object[]{param.getKey(), param.getValue() == null ? null : param.getValue().toString()})); } connectorData = new CompositeDataSupport(getTransportConfigurationType(), @@ -77,11 +77,9 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker new Object[]{connector.getName(), connector.getFactoryClassName(), paramsData}); } - CompositeData targetData = new CompositeDataSupport(getTargetCompositeType(), - new String[]{"nodeID", "local", "connector"}, - new Object[]{target.getNodeID(), target.isLocal(), connectorData}); - - return targetData; + return new CompositeDataSupport(getTargetCompositeType(), + new String[]{"nodeID", "local", "connector"}, + new Object[]{result.target.getNodeID(), result.target.isLocal(), connectorData}); } return null; @@ -89,14 +87,13 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker @Override public String getTargetAsJSON(String key) { - Target target = balancer.getTarget(key); - - if (target != null) { - TransportConfiguration connector = target.getConnector(); + TargetResult result = balancer.getTarget(key); + if (TargetResult.Status.OK == result.status) { + TransportConfiguration connector = result.target.getConnector(); JsonObjectBuilder targetDataBuilder = JsonLoader.createObjectBuilder() - .add("nodeID", target.getNodeID()) - .add("local", target.isLocal()); + .add("nodeID", result.target.getNodeID()) + .add("local", result.target.isLocal()); if (connector == null) { targetDataBuilder.addNull("connector"); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java index e63dbf5fc4..a2322a9f4c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQPacketHandler.java @@ -163,10 +163,6 @@ public class ActiveMQPacketHandler implements ChannelHandler { connection.setClientID(((CreateSessionMessage_V2) request).getClientID()); } - if (connection.getTransportConnection().getRedirectTo() != null) { - protocolManager.getRedirectHandler().redirect(connection, request); - } - Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize()); ActiveMQPrincipal activeMQPrincipal = null; @@ -175,12 +171,17 @@ public class ActiveMQPacketHandler implements ChannelHandler { activeMQPrincipal = connection.getDefaultActiveMQPrincipal(); } + final String validatedUser = server.validateUser(activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), connection, protocolManager.getSecurityDomain()); + if (connection.getTransportConnection().getRedirectTo() != null) { + protocolManager.getRedirectHandler().redirect(connection, request); + } + OperationContext sessionOperationContext = server.newOperationContext(); Map routingTypeMap = protocolManager.getPrefixes(); CoreSessionCallback sessionCallback = new CoreSessionCallback(request.getName(), protocolManager, channel, connection); - ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), sessionCallback, true, sessionOperationContext, routingTypeMap, protocolManager.getSecurityDomain()); + ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), sessionCallback, true, sessionOperationContext, routingTypeMap, protocolManager.getSecurityDomain(), validatedUser); ServerProducer serverProducer = new ServerProducerImpl(session.getName(), "CORE", request.getDefaultAddress()); session.addProducer(serverProducer); ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server, session, channel); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQRedirectHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQRedirectHandler.java index 937bd273bd..9be389c436 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQRedirectHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQRedirectHandler.java @@ -40,7 +40,12 @@ public class ActiveMQRedirectHandler extends RedirectHandler prefixes, - String securityDomain) throws Exception; + String securityDomain, String validatedUser) throws Exception; /** This is to be used in places where security is bypassed, like internal sessions, broker connections, etc... */ ServerSession createInternalSession(String name, @@ -959,4 +959,6 @@ public interface ActiveMQServer extends ServiceComponent { void reloadConfigurationFile() throws Exception; BrokerBalancerManager getBalancerManager(); -} + + String validateUser(String username, String password, RemotingConnection connection, String securityDomain) throws Exception; +} \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java index 512056f36a..2aa69db267 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancer.java @@ -26,6 +26,7 @@ import org.apache.activemq.artemis.core.server.balancing.pools.Pool; import org.apache.activemq.artemis.core.server.balancing.targets.Target; import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey; import org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver; +import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.jboss.logging.Logger; @@ -46,7 +47,7 @@ public class BrokerBalancer implements ActiveMQComponent { private final TargetKeyResolver targetKeyResolver; - private final Target localTarget; + private final TargetResult localTarget; private final Pattern localTargetFilter; @@ -54,7 +55,7 @@ public class BrokerBalancer implements ActiveMQComponent { private final Policy policy; - private final Cache cache; + private final Cache cache; private volatile boolean started = false; @@ -67,7 +68,7 @@ public class BrokerBalancer implements ActiveMQComponent { } public Target getLocalTarget() { - return localTarget; + return localTarget.target; } public String getLocalTargetFilter() { @@ -82,7 +83,7 @@ public class BrokerBalancer implements ActiveMQComponent { return policy; } - public Cache getCache() { + public Cache getCache() { return cache; } @@ -99,7 +100,7 @@ public class BrokerBalancer implements ActiveMQComponent { this.targetKeyResolver = new TargetKeyResolver(targetKey, targetKeyFilter); - this.localTarget = localTarget; + this.localTarget = new TargetResult(localTarget); this.localTargetFilter = localTargetFilter != null ? Pattern.compile(localTargetFilter) : null; @@ -134,7 +135,7 @@ public class BrokerBalancer implements ActiveMQComponent { } } - public Target getTarget(Connection connection, String clientID, String username) { + public TargetResult getTarget(Connection connection, String clientID, String username) { if (clientID != null && clientID.startsWith(BrokerBalancer.CLIENT_ID_PREFIX)) { if (logger.isDebugEnabled()) { logger.debug("The clientID [" + clientID + "] starts with BrokerBalancer.CLIENT_ID_PREFIX"); @@ -146,7 +147,7 @@ public class BrokerBalancer implements ActiveMQComponent { return getTarget(targetKeyResolver.resolve(connection, clientID, username)); } - public Target getTarget(String key) { + public TargetResult getTarget(String key) { if (this.localTargetFilter != null && this.localTargetFilter.matcher(key).matches()) { if (logger.isDebugEnabled()) { @@ -157,45 +158,47 @@ public class BrokerBalancer implements ActiveMQComponent { } if (pool == null) { - return null; + return TargetResult.REFUSED_USE_ANOTHER_RESULT; } - Target target = null; + TargetResult result = null; if (cache != null) { - target = cache.getIfPresent(key); + result = cache.getIfPresent(key); } - if (target != null) { - if (pool.isTargetReady(target)) { + if (result != null) { + if (pool.isTargetReady(result.target)) { if (logger.isDebugEnabled()) { - logger.debug("The cache returns [" + target + "] ready for " + targetKey + "[" + key + "]"); + logger.debug("The cache returns [" + result.target + "] ready for " + targetKey + "[" + key + "]"); } - return target; + return result; } if (logger.isDebugEnabled()) { - logger.debug("The cache returns [" + target + "] not ready for " + targetKey + "[" + key + "]"); + logger.debug("The cache returns [" + result.target + "] not ready for " + targetKey + "[" + key + "]"); } } List targets = pool.getTargets(); - target = policy.selectTarget(targets, key); + Target target = policy.selectTarget(targets, key); if (logger.isDebugEnabled()) { logger.debug("The policy selects [" + target + "] from " + targets + " for " + targetKey + "[" + key + "]"); } - if (target != null && cache != null) { - if (logger.isDebugEnabled()) { - logger.debug("Caching " + targetKey + "[" + key + "] for [" + target + "]"); + if (target != null) { + result = new TargetResult(target); + if (cache != null) { + if (logger.isDebugEnabled()) { + logger.debug("Caching " + targetKey + "[" + key + "] for [" + target + "]"); + } + cache.put(key, result); } - - cache.put(key, target); } - return target; + return result != null ? result : TargetResult.REFUSED_UNAVAILABLE_RESULT; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectContext.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectContext.java index 76a2a5437d..5534a56a2d 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectContext.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectContext.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.balancing; import org.apache.activemq.artemis.core.server.balancing.targets.Target; +import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; public class RedirectContext { @@ -27,7 +28,7 @@ public class RedirectContext { private final String username; - private Target target; + private TargetResult result; public RemotingConnection getConnection() { return connection; @@ -42,11 +43,15 @@ public class RedirectContext { } public Target getTarget() { - return target; + return result.target; } - public void setTarget(Target target) { - this.target = target; + public TargetResult getResult() { + return result; + } + + public void setResult(TargetResult result) { + this.result = result; } public RedirectContext(RemotingConnection connection, String clientID, String username) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectHandler.java index 89ace13057..967e686d60 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/RedirectHandler.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.balancing; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult; import org.apache.activemq.artemis.spi.core.remoting.Connection; public abstract class RedirectHandler { @@ -51,9 +52,9 @@ public abstract class RedirectHandler { return true; } - context.setTarget(brokerBalancer.getTarget(transportConnection, context.getClientID(), context.getUsername())); + context.setResult(brokerBalancer.getTarget(transportConnection, context.getClientID(), context.getUsername())); - if (context.getTarget() == null) { + if (TargetResult.Status.OK != context.getResult().status) { ActiveMQServerLogger.LOGGER.cannotRedirectClientConnection(transportConnection); cannotRedirect(context); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/Target.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/Target.java index f82331ba5b..fb09d87771 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/Target.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/Target.java @@ -56,4 +56,5 @@ public interface Target { T getAttribute(String resourceName, String attributeName, Class attributeClass, int timeout) throws Exception; T invokeOperation(String resourceName, String operationName, Object[] operationParams, Class operationClass, int timeout) throws Exception; + } \ No newline at end of file diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKey.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKey.java index d01b932f63..3493ec36b9 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKey.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKey.java @@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.server.balancing.targets; public enum TargetKey { - CLIENT_ID, SNI_HOST, SOURCE_IP, USER_NAME; + CLIENT_ID, SNI_HOST, SOURCE_IP, USER_NAME, ROLE_NAME; public static final String validValues; @@ -46,6 +46,8 @@ public enum TargetKey { return SOURCE_IP; case "USER_NAME": return USER_NAME; + case "ROLE_NAME": + return ROLE_NAME; default: throw new IllegalStateException("Invalid RedirectKey:" + type + " valid Types: " + validValues); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKeyResolver.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKeyResolver.java index dac82deedf..6a409d03ec 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKeyResolver.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKeyResolver.java @@ -17,7 +17,10 @@ package org.apache.activemq.artemis.core.server.balancing.targets; +import javax.security.auth.Subject; + import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal; import org.jboss.logging.Logger; import java.util.regex.Matcher; @@ -80,6 +83,35 @@ public class TargetKeyResolver { case USER_NAME: keyValue = username; break; + case ROLE_NAME: + if (connection != null && connection.getProtocolConnection() != null) { + Subject subject = connection.getProtocolConnection().getAuditSubject(); + if (subject != null) { + for (RolePrincipal candidateRole : subject.getPrincipals(RolePrincipal.class)) { + String roleName = candidateRole.getName(); + if (roleName != null) { + if (keyFilter != null) { + Matcher keyMatcher = keyFilter.matcher(roleName); + if (keyMatcher.find()) { + keyValue = keyMatcher.group(); + if (logger.isDebugEnabled()) { + logger.debugf("role match for %s via %s", roleName, keyMatcher); + } + return keyValue; + } + } else { + // with no filter, first role is the candidate + keyValue = roleName; + if (logger.isDebugEnabled()) { + logger.debugf("first role match: %s", roleName); + } + return keyValue; + } + } + } + } + } + break; default: throw new IllegalStateException("Unexpected value: " + key); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetResult.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetResult.java new file mode 100644 index 0000000000..1be2ff9e4f --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetResult.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.core.server.balancing.targets; + +public class TargetResult { + + public static final TargetResult REFUSED_UNAVAILABLE_RESULT = new TargetResult(Status.REFUSED_UNAVAILABLE); + public static final TargetResult REFUSED_USE_ANOTHER_RESULT = new TargetResult(Status.REFUSED_USE_ANOTHER); + + public Status status; + public Target target; + + public TargetResult(Target t) { + this.target = t; + this.status = Status.OK; + } + + private TargetResult(Status s) { + this.status = s; + } + + public enum Status { + OK, + REFUSED_UNAVAILABLE, // pool is not yet ready, possibly transient + REFUSED_USE_ANOTHER // rejected, go else where, non-transient + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index 9cfbd34b6b..c3f694f4dc 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1674,12 +1674,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { final boolean autoCreateQueues, final OperationContext context, final Map prefixes, - final String securityDomain) throws Exception { - String validatedUser = ""; - - if (securityStore != null) { - validatedUser = securityStore.authenticate(username, password, connection, securityDomain); - } + final String securityDomain, + String validatedUser) throws Exception { checkSessionLimit(validatedUser); @@ -1693,6 +1689,16 @@ public class ActiveMQServerImpl implements ActiveMQServer { return session; } + @Override + public String validateUser(String username, String password, RemotingConnection connection, String securityDomain) throws Exception { + String validatedUser = ""; + + if (securityStore != null) { + validatedUser = securityStore.authenticate(username, password, connection, securityDomain); + } + return validatedUser; + } + @Override public ServerSession createInternalSession(String name, int minLargeMessageSize, diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java index a59307d7f5..f41aca7513 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/BrokerBalancerTest.java @@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.server.balancing; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -29,6 +28,7 @@ import org.apache.activemq.artemis.core.server.balancing.pools.Pool; import org.apache.activemq.artemis.core.server.balancing.targets.LocalTarget; import org.apache.activemq.artemis.core.server.balancing.targets.Target; import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey; +import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -74,8 +74,8 @@ public class BrokerBalancerTest { @Test public void getTarget() { - assertEquals( localTarget, underTest.getTarget("FOO_EE")); - assertNotEquals( localTarget, underTest.getTarget("BAR_EE")); + assertEquals( localTarget, underTest.getTarget("FOO_EE").target); + assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT, underTest.getTarget("BAR_EE")); } } \ No newline at end of file diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKeyResolverTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKeyResolverTest.java index 59336969ba..df741f09d0 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKeyResolverTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/balancing/targets/TargetKeyResolverTest.java @@ -17,7 +17,15 @@ package org.apache.activemq.artemis.core.server.balancing.targets; +import javax.security.auth.Subject; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.Connection; +import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal; +import org.apache.commons.collections.set.ListOrderedSet; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; @@ -107,4 +115,65 @@ public class TargetKeyResolverTest { Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(null, null, null)); } + + @Test + public void testRoleNameKeyWithFilter() throws Exception { + TargetKeyResolver targetKeyResolver = new TargetKeyResolver(TargetKey.ROLE_NAME, "B"); + + Connection connection = Mockito.mock(Connection.class); + Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(connection, null, null)); + + RemotingConnection protocolConnection = Mockito.mock(RemotingConnection.class); + Mockito.when(connection.getProtocolConnection()).thenReturn(protocolConnection); + Subject subject = Mockito.mock(Subject.class); + Mockito.when(protocolConnection.getAuditSubject()).thenReturn(subject); + + Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(connection, null, null)); + + Set rolePrincipals = new HashSet<>(); + Mockito.when(subject.getPrincipals(RolePrincipal.class)).thenReturn(rolePrincipals); + + Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(connection, null, null)); + + rolePrincipals.add(new RolePrincipal("A")); + + Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(connection, null, null)); + + rolePrincipals.add(new RolePrincipal("B")); + + Assert.assertEquals("B", targetKeyResolver.resolve(connection, null, null)); + } + + @Test + public void testRoleNameKeyWithoutFilter() throws Exception { + TargetKeyResolver targetKeyResolver = new TargetKeyResolver(TargetKey.ROLE_NAME, null); + + Connection connection = Mockito.mock(Connection.class); + Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(connection, null, null)); + + RemotingConnection protocolConnection = Mockito.mock(RemotingConnection.class); + Mockito.when(connection.getProtocolConnection()).thenReturn(protocolConnection); + Subject subject = Mockito.mock(Subject.class); + Mockito.when(protocolConnection.getAuditSubject()).thenReturn(subject); + + Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(connection, null, null)); + + Set rolePrincipals = new ListOrderedSet(); + Mockito.when(subject.getPrincipals(RolePrincipal.class)).thenReturn(rolePrincipals); + + Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(connection, null, null)); + + final RolePrincipal roleA = new RolePrincipal("A"); + rolePrincipals.add(roleA); + + Assert.assertEquals("A", targetKeyResolver.resolve(connection, null, null)); + + rolePrincipals.add(new RolePrincipal("B")); + + Assert.assertEquals("A", targetKeyResolver.resolve(connection, null, null)); + + rolePrincipals.remove(roleA); + // with no filter, the first entry matches + Assert.assertEquals("B", targetKeyResolver.resolve(connection, null, null)); + } } diff --git a/docs/user-manual/en/broker-balancers.md b/docs/user-manual/en/broker-balancers.md index 4605d85ca8..be426563c0 100644 --- a/docs/user-manual/en/broker-balancers.md +++ b/docs/user-manual/en/broker-balancers.md @@ -13,10 +13,11 @@ The remote target is another reachable broker. ## Target Key The broker balancer uses a target key to select a target broker. It is a string retrieved from an incoming client connection, the supported values are: -* `CLIENT_ID` is the JMS client ID; -* `SNI_HOST` is the hostname indicated by the client in the SNI extension of the TLS protocol; -* `SOURCE_IP` is the source IP address of the client; +* `CLIENT_ID` is the JMS client ID. +* `SNI_HOST` is the hostname indicated by the client in the SNI extension of the TLS protocol. +* `SOURCE_IP` is the source IP address of the client. * `USER_NAME` is the username indicated by the client. +* `ROLE_NAME` is a role associated with the authenticated user of the connection. ## Pools The pool is a group of target brokers with periodic checks on their state. @@ -108,7 +109,7 @@ for more details about setting the `cache-timeout` parameter. ## Defining broker balancers A broker balancer is defined by the `broker-balancer` element, it includes the following items: * the `name` attribute defines the name of the broker balancer and is used to reference the balancer from an acceptor; -* the `target-key` element defines what key to select a target broker, the supported values are: `CLIENT_ID`, `SNI_HOST`, `SOURCE_IP`, `USER_NAME`, default is `SOURCE_IP`, see [target key](#target-key) for further details; +* the `target-key` element defines what key to select a target broker, the supported values are: `CLIENT_ID`, `SNI_HOST`, `SOURCE_IP`, `USER_NAME`, `ROLE_NAME`, default is `SOURCE_IP`, see [target key](#target-key) for further details; * the `target-key-filter` element defines a regular expression to filter the resolved keys; * the `local-target-filter` element defines a regular expression to match the keys that have to return a local target; * the `cache-timeout` element is the time period for a target broker to remain in the cache, measured in milliseconds, setting `0` will disable the cache, default is `-1`, meaning no expiration; diff --git a/pom.xml b/pom.xml index fa6438a4d4..941778afa9 100644 --- a/pom.xml +++ b/pom.xml @@ -207,7 +207,7 @@ -Dorg.apache.commons.logging.Log=org.apache.activemq.artemis.logs.JBossLoggingApacheLoggerBridge -Dorg.apache.activemq.artemis.utils.RetryRule.retry=${retryTests} -Dbrokerconfig.maxDiskUsage=100 -Dorg.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_QUIET_PERIOD=0 -Dorg.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_SHUTDOWN_TIMEOUT=0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager -Dlogging.configuration="file:${activemq.basedir}/tests/config/${logging.config}" - -Djava.library.path="${activemq.basedir}/target/bin/lib/linux-x86_64:${activemq.basedir}/target/bin/lib/linux-i686" -Djgroups.bind_addr=localhost -Dorg.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory.localBindAddress=localhost + -Djava.library.path="${activemq.basedir}/target/bin/lib/linux-x86_64:${activemq.basedir}/target/bin/lib/linux-i686" -Djgroups.bind_addr=localhost -Djava.net.preferIPv4Stack=true -Dbasedir=${basedir} ${project.basedir} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AmqpRedirectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AmqpRedirectTest.java index e3768486e7..03b410156a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AmqpRedirectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/AmqpRedirectTest.java @@ -189,4 +189,62 @@ public class AmqpRedirectTest extends BalancingTestBase { stopServers(0, 1); } + + @Test + public void testBalancerRejectionUseAnother() throws Exception { + setupLiveServerWithDiscovery(0, GROUP_ADDRESS, GROUP_PORT, true, true, false); + + // only accepts users with RoleName==B so will reject + setupBalancerServerWithLocalTarget(0, TargetKey.ROLE_NAME, "B", null); + + startServers(0); + + URI uri = new URI("tcp://localhost:" + TransportConstants.DEFAULT_PORT); + AmqpClient client = new AmqpClient(uri, "admin", "admin"); + + AmqpConnection connection = client.createConnection(); + connection.setContainerId(getName()); + + connection.setStateInspector(new AmqpValidator() { + + @Override + public void inspectOpenedResource(Connection connection) { + if (!connection.getRemoteProperties().containsKey(AmqpSupport.CONNECTION_OPEN_FAILED)) { + markAsInvalid("Broker did not set connection establishment failed hint"); + } + } + + @Override + public void inspectClosedResource(Connection connection) { + ErrorCondition remoteError = connection.getRemoteCondition(); + if (remoteError == null || remoteError.getCondition() == null) { + markAsInvalid("Broker did not add error condition for connection"); + return; + } + + if (!remoteError.getCondition().equals(ConnectionError.CONNECTION_FORCED)) { + markAsInvalid("Broker did not set condition to " + ConnectionError.CONNECTION_FORCED); + return; + } + String expectedDescription = "Broker balancer " + BROKER_BALANCER_NAME + ", rejected this connection"; + String actualDescription = remoteError.getDescription(); + if (!expectedDescription.equals(actualDescription)) { + markAsInvalid("Broker did not set description as expected, was: " + actualDescription); + return; + } + } + }); + + try { + connection.connect(); + fail("Expected connection to fail, without redirect"); + } catch (Exception e) { + // Expected + } + + connection.getStateInspector().assertValid(); + connection.close(); + + stopServers(0); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java index db7a849277..890a813302 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/BalancingTestBase.java @@ -137,7 +137,25 @@ public class BalancingTestBase extends ClusterTestBase { acceptor.getParams().put("redirect-to", BROKER_BALANCER_NAME); } + + protected void setupBalancerServerWithLocalTarget(final int node, final TargetKey targetKey, final String targetKeyFilter, final String localTargetFilter) { + + Configuration configuration = getServer(node).getConfiguration(); + BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration().setName(BROKER_BALANCER_NAME); + brokerBalancerConfiguration.setTargetKey(targetKey).setLocalTargetFilter(localTargetFilter).setTargetKeyFilter(targetKeyFilter); + + configuration.setBalancerConfigurations(Collections.singletonList(brokerBalancerConfiguration)); + + TransportConfiguration acceptor = getDefaultServerAcceptor(node); + acceptor.getParams().put("redirect-to", BROKER_BALANCER_NAME); + + } + protected ConnectionFactory createFactory(String protocol, boolean sslEnabled, String host, int port, String clientID, String user, String password) throws Exception { + return createFactory(protocol, sslEnabled, host, port, clientID, user, password, -1); + } + + protected ConnectionFactory createFactory(String protocol, boolean sslEnabled, String host, int port, String clientID, String user, String password, int retries) throws Exception { switch (protocol) { case CORE_PROTOCOL: { StringBuilder urlBuilder = new StringBuilder(); @@ -146,7 +164,7 @@ public class BalancingTestBase extends ClusterTestBase { urlBuilder.append(host); urlBuilder.append(":"); urlBuilder.append(port); - urlBuilder.append("?ha=true&reconnectAttempts=30"); + urlBuilder.append("?ha=true&reconnectAttempts=10&initialConnectAttempts=" + retries); urlBuilder.append("&sniHost="); urlBuilder.append(host); @@ -197,8 +215,10 @@ public class BalancingTestBase extends ClusterTestBase { urlBuilder.append(")"); } + urlBuilder.append("?failover.startupMaxReconnectAttempts=" + retries); + if (clientID != null) { - urlBuilder.append("?jms.clientID="); + urlBuilder.append("&jms.clientID="); urlBuilder.append(clientID); } @@ -223,8 +243,10 @@ public class BalancingTestBase extends ClusterTestBase { urlBuilder.append(")"); } + urlBuilder.append("?startupMaxReconnectAttempts=" + retries + "&maxReconnectAttempts=" + retries); + if (clientID != null) { - urlBuilder.append("?jms.clientID="); + urlBuilder.append("&jms.clientID="); urlBuilder.append(clientID); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/MQTTRedirectTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/MQTTRedirectTest.java index acef93ce74..da8a17f0d5 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/MQTTRedirectTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/MQTTRedirectTest.java @@ -24,8 +24,11 @@ import org.apache.activemq.artemis.api.core.management.BrokerBalancerControl; import org.apache.activemq.artemis.api.core.management.QueueControl; import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.balancing.policies.FirstElementPolicy; import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey; +import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; +import org.apache.activemq.artemis.tests.integration.security.SecurityTest; import org.apache.activemq.artemis.utils.Wait; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; @@ -37,6 +40,8 @@ import org.junit.Test; import javax.management.openmbean.CompositeData; import javax.management.openmbean.TabularData; +import java.lang.management.ManagementFactory; +import java.net.URL; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -44,7 +49,16 @@ import java.util.concurrent.TimeUnit; public class MQTTRedirectTest extends BalancingTestBase { - private final boolean discovery = true; + static { + String path = System.getProperty("java.security.auth.login.config"); + if (path == null) { + URL resource = SecurityTest.class.getClassLoader().getResource("login.config"); + if (resource != null) { + path = resource.getFile(); + System.setProperty("java.security.auth.login.config", path); + } + } + } @Test public void testSimpleRedirect() throws Exception { @@ -52,11 +66,7 @@ public class MQTTRedirectTest extends BalancingTestBase { setupLiveServerWithDiscovery(0, GROUP_ADDRESS, GROUP_PORT, true, true, false); setupLiveServerWithDiscovery(1, GROUP_ADDRESS, GROUP_PORT, true, true, false); - if (discovery) { - setupBalancerServerWithDiscovery(0, TargetKey.USER_NAME, FirstElementPolicy.NAME, null, false, null, 1); - } else { - setupBalancerServerWithStaticConnectors(0, TargetKey.USER_NAME, FirstElementPolicy.NAME, null, false, null, 1, 1); - } + setupBalancerServerWithDiscovery(0, TargetKey.USER_NAME, FirstElementPolicy.NAME, null, false, null, 1); startServers(0, 1); @@ -94,7 +104,7 @@ public class MQTTRedirectTest extends BalancingTestBase { CompositeData hostData = targetConnectorParams.get(new Object[]{TransportConstants.HOST_PROP_NAME}); CompositeData portData = targetConnectorParams.get(new Object[]{TransportConstants.PORT_PROP_NAME}); String host = hostData != null ? (String)hostData.get("value") : TransportConstants.DEFAULT_HOST; - int port = portData != null ? Integer.valueOf((String)portData.get("value")) : TransportConstants.DEFAULT_PORT; + int port = portData != null ? Integer.parseInt((String)portData.get("value")) : TransportConstants.DEFAULT_PORT; CountDownLatch latch = new CountDownLatch(1); List messages = new ArrayList<>(); @@ -119,7 +129,40 @@ public class MQTTRedirectTest extends BalancingTestBase { client1.close(); Assert.assertEquals(0, queueControl0.countMessages()); - Wait.assertEquals(0, () -> queueControl1.countMessages()); + Wait.assertEquals(0, (Wait.LongCondition) queueControl1::countMessages); + } + + @Test + public void testRoleNameKeyLocalTarget() throws Exception { + + ActiveMQJAASSecurityManager securityManager = new ActiveMQJAASSecurityManager("PropertiesLogin"); + servers[0] = addServer(ActiveMQServers.newActiveMQServer(createDefaultConfig(true).setSecurityEnabled(true), ManagementFactory.getPlatformMBeanServer(), securityManager, false)); + setupBalancerServerWithLocalTarget(0, TargetKey.ROLE_NAME, "b", "b"); + + startServers(0); + + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(true); + connOpts.setUserName("a"); + connOpts.setPassword("a".toCharArray()); + + MqttClient client0 = new MqttClient("tcp://" + TransportConstants.DEFAULT_HOST + ":" + TransportConstants.DEFAULT_PORT, "TEST", new MemoryPersistence()); + try { + client0.connect(connOpts); + fail("Expect to be rejected as not in role b"); + } catch (MqttException e) { + Assert.assertEquals(MqttConnectReturnCode.CONNECTION_REFUSED_USE_ANOTHER_SERVER, MqttConnectReturnCode.valueOf((byte) e.getReasonCode())); + } + client0.close(); + + MqttClient client1 = new MqttClient("tcp://" + TransportConstants.DEFAULT_HOST + ":" + TransportConstants.DEFAULT_PORT, "TEST", new MemoryPersistence()); + connOpts.setUserName("b"); + connOpts.setPassword("b".toCharArray()); + + // expect to be accepted, b has role b + client1.connect(connOpts); + client1.disconnect(); + client1.close(); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/TargetKeyTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/TargetKeyTest.java index 901941b720..5ccd8c9150 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/TargetKeyTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/balancing/TargetKeyTest.java @@ -18,12 +18,17 @@ package org.apache.activemq.artemis.tests.integration.balancing; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.core.security.Role; +import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.balancing.policies.FirstElementPolicy; import org.apache.activemq.artemis.core.server.balancing.policies.Policy; import org.apache.activemq.artemis.core.server.balancing.policies.PolicyFactory; import org.apache.activemq.artemis.core.server.balancing.policies.PolicyFactoryResolver; import org.apache.activemq.artemis.core.server.balancing.targets.Target; import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey; +import org.apache.activemq.artemis.core.settings.HierarchicalRepository; +import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager; +import org.apache.activemq.artemis.tests.integration.security.SecurityTest; import org.junit.Assert; import org.junit.Assume; import org.junit.Before; @@ -33,12 +38,16 @@ import org.junit.runners.Parameterized; import javax.jms.Connection; import javax.jms.ConnectionFactory; +import java.lang.management.ManagementFactory; import java.net.InetAddress; +import java.net.URL; import java.net.UnknownHostException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.List; +import java.util.Set; @RunWith(Parameterized.class) public class TargetKeyTest extends BalancingTestBase { @@ -57,6 +66,17 @@ public class TargetKeyTest extends BalancingTestBase { } + static { + String path = System.getProperty("java.security.auth.login.config"); + if (path == null) { + URL resource = SecurityTest.class.getClassLoader().getResource("login.config"); + if (resource != null) { + path = resource.getFile(); + System.setProperty("java.security.auth.login.config", path); + } + } + } + private final String protocol; private final List keys = new ArrayList<>(); @@ -174,6 +194,42 @@ public class TargetKeyTest extends BalancingTestBase { Assert.assertEquals("admin", keys.get(0)); } + @Test + public void testRoleNameKeyLocalTarget() throws Exception { + + ActiveMQJAASSecurityManager securityManager = new ActiveMQJAASSecurityManager("PropertiesLogin"); + servers[0] = addServer(ActiveMQServers.newActiveMQServer(createDefaultConfig(true).setSecurityEnabled(true), ManagementFactory.getPlatformMBeanServer(), securityManager, false)); + setupBalancerServerWithLocalTarget(0, TargetKey.ROLE_NAME, "b", "b"); + + // ensure advisory permission is present for openwire connection creation by 'b' + HierarchicalRepository> securityRepository = servers[0].getSecurityRepository(); + Role role = new Role("b", true, true, true, true, true, true, false, false, true, true); + Set roles = new HashSet<>(); + roles.add(role); + securityRepository.addMatch("ActiveMQ.Advisory.#", roles); + + startServers(0); + + final int noRetriesSuchThatWeGetAnErrorOnRejection = 0; + ConnectionFactory connectionFactory = createFactory(protocol, false, TransportConstants.DEFAULT_HOST, + TransportConstants.DEFAULT_PORT + 0, null, "a", "a", noRetriesSuchThatWeGetAnErrorOnRejection); + + // expect disconnect/reject as not role b + try (Connection connection = connectionFactory.createConnection()) { + connection.start(); + fail("Expect to be rejected as not in role b"); + } catch (Exception expectedButNotSpecificDueToDifferentProtocolsInPlay) { + } + + connectionFactory = createFactory(protocol, false, TransportConstants.DEFAULT_HOST, + TransportConstants.DEFAULT_PORT + 0, null, "b", "b"); + + // expect to be accepted, b has role b + try (Connection connection = connectionFactory.createConnection()) { + connection.start(); + } + } + private boolean checkLocalHostname(String host) { try { return InetAddress.getByName(host).isLoopbackAddress();