ARTEMIS-3569 - balancer role_name local target, matches role of authenticated user

This commit is contained in:
gtully 2021-11-02 12:12:10 +00:00 committed by Bruscino Domenico Francesco
parent 8c63d9af72
commit b979189187
34 changed files with 493 additions and 110 deletions

View File

@ -211,12 +211,13 @@ public class AMQPSessionCallback implements SessionCallback {
true, //boolean xa,
(String) null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain());
} else {
final String validatedUser = manager.getServer().validateUser(user, passcode, protonSPI.getProtonConnectionDelegate(), manager.getSecurityDomain());
serverSession = manager.getServer().createSession(name, user, passcode, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection,
false, // boolean autoCommitSends
false, // boolean autoCommitAcks,
false, // boolean preAcknowledge,
true, //boolean xa,
(String) null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain());
(String) null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain(), validatedUser);
}
}

View File

@ -41,10 +41,17 @@ public class AMQPRedirectHandler extends RedirectHandler<AMQPRedirectContext> {
}
@Override
protected void cannotRedirect(AMQPRedirectContext context) throws Exception {
protected void cannotRedirect(AMQPRedirectContext context) {
ErrorCondition error = new ErrorCondition();
error.setCondition(ConnectionError.CONNECTION_FORCED);
error.setDescription(String.format("Broker balancer %s is not ready to redirect", context.getConnection().getTransportConnection().getRedirectTo()));
switch (context.getResult().status) {
case REFUSED_USE_ANOTHER:
error.setDescription(String.format("Broker balancer %s, rejected this connection", context.getConnection().getTransportConnection().getRedirectTo()));
break;
case REFUSED_UNAVAILABLE:
error.setDescription(String.format("Broker balancer %s is not ready to redirect", context.getConnection().getTransportConnection().getRedirectTo()));
break;
}
Connection protonConnection = context.getProtonConnection();
protonConnection.setCondition(error);
@ -52,7 +59,7 @@ public class AMQPRedirectHandler extends RedirectHandler<AMQPRedirectContext> {
}
@Override
protected void redirectTo(AMQPRedirectContext context) throws Exception {
protected void redirectTo(AMQPRedirectContext context) {
String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, context.getTarget().getConnector().getParams());
int port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, context.getTarget().getConnector().getParams());

View File

@ -59,6 +59,7 @@ public class MQTTConnection implements RemotingConnection {
this.creationTime = System.currentTimeMillis();
this.dataReceived = new AtomicBoolean();
this.destroyed = false;
transportConnection.setProtocolConnection(this);
}

View File

@ -23,7 +23,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.util.CharsetUtil;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ServerSession;
@ -61,13 +60,13 @@ public class MQTTConnectionManager {
*/
void connect(String cId,
String username,
byte[] passwordInBytes,
String password,
boolean will,
byte[] willMessage,
String willTopic,
boolean willRetain,
int willQosLevel,
boolean cleanSession) throws Exception {
boolean cleanSession, String validatedUser) throws Exception {
String clientId = validateClientId(cId, cleanSession);
if (clientId == null) {
session.getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_IDENTIFIER_REJECTED);
@ -79,11 +78,10 @@ public class MQTTConnectionManager {
MQTTSessionState sessionState = getSessionState(clientId);
synchronized (sessionState) {
session.setSessionState(sessionState);
String password = passwordInBytes == null ? null : new String(passwordInBytes, CharsetUtil.UTF_8);
session.getConnection().setClientID(clientId);
ServerSessionImpl serverSession = createServerSession(username, password);
ServerSessionImpl serverSession = createServerSession(username, password, validatedUser);
serverSession.start();
ServerSessionImpl internalServerSession = createServerSession(username, password);
ServerSessionImpl internalServerSession = createServerSession(username, password, validatedUser);
internalServerSession.disableSecurity();
internalServerSession.start();
session.setServerSession(serverSession, internalServerSession);
@ -120,10 +118,9 @@ public class MQTTConnectionManager {
* @return
* @throws Exception
*/
ServerSessionImpl createServerSession(String username, String password) throws Exception {
ServerSessionImpl createServerSession(String username, String password, String validatedUser) throws Exception {
String id = UUIDGenerator.getInstance().generateStringUUID();
ActiveMQServer server = session.getServer();
ServerSession serverSession = server.createSession(id,
username,
password,
@ -138,7 +135,7 @@ public class MQTTConnectionManager {
MQTTUtil.SESSION_AUTO_CREATE_QUEUE,
server.newOperationContext(),
session.getProtocolManager().getPrefixes(),
session.getProtocolManager().getSecurityDomain());
session.getProtocolManager().getSecurityDomain(), validatedUser);
return (ServerSessionImpl) serverSession;
}

View File

@ -38,6 +38,7 @@ import io.netty.handler.codec.mqtt.MqttSubAckPayload;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.logs.AuditLogger;
@ -177,12 +178,15 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
* @param connect
*/
void handleConnect(MqttConnectMessage connect) throws Exception {
final String username = connect.payload().userName();
final String password = connect.payload().passwordInBytes() == null ? null : new String( connect.payload().passwordInBytes(), CharsetUtil.UTF_8);
final String validatedUser = server.validateUser(username, password, session.getConnection(), session.getProtocolManager().getSecurityDomain());
if (connection.getTransportConnection().getRedirectTo() == null ||
!protocolManager.getRedirectHandler().redirect(connection, session, connect)) {
connectionEntry.ttl = connect.variableHeader().keepAliveTimeSeconds() * 1500L;
String clientId = connect.payload().clientIdentifier();
session.getConnectionManager().connect(clientId, connect.payload().userName(), connect.payload().passwordInBytes(), connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession());
session.getConnectionManager().connect(clientId, username, password, connect.variableHeader().isWillFlag(), connect.payload().willMessageInBytes(), connect.payload().willTopic(), connect.variableHeader().isWillRetain(), connect.variableHeader().willQos(), connect.variableHeader().isCleanSession(), validatedUser);
}
}

View File

@ -36,13 +36,20 @@ public class MQTTRedirectHandler extends RedirectHandler<MQTTRedirectContext> {
}
@Override
protected void cannotRedirect(MQTTRedirectContext context) throws Exception {
context.getMQTTSession().getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
protected void cannotRedirect(MQTTRedirectContext context) {
switch (context.getResult().status) {
case REFUSED_USE_ANOTHER:
context.getMQTTSession().getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_USE_ANOTHER_SERVER);
break;
case REFUSED_UNAVAILABLE:
context.getMQTTSession().getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
break;
}
context.getMQTTSession().getProtocolHandler().disconnect(true);
}
@Override
protected void redirectTo(MQTTRedirectContext context) throws Exception {
protected void redirectTo(MQTTRedirectContext context) {
String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, context.getTarget().getConnector().getParams());
int port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, context.getTarget().getConnector().getParams());

View File

@ -197,6 +197,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private final AtomicBoolean disableTtl = new AtomicBoolean(false);
private String validatedUser = null;
public OpenWireConnection(Connection connection,
ActiveMQServer server,
OpenWireProtocolManager openWireProtocolManager,
@ -210,6 +212,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
this.outWireFormat = wf.copy();
this.useKeepAlive = openWireProtocolManager.isUseKeepAlive();
this.maxInactivityDuration = openWireProtocolManager.getMaxInactivityDuration();
this.transportConnection.setProtocolConnection(this);
}
// SecurityAuth implementation
@ -768,7 +771,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
}
private void createInternalSession(ConnectionInfo info) throws Exception {
internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes(), protocolManager.getSecurityDomain());
internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext, protocolManager.getPrefixes(), protocolManager.getSecurityDomain(), validatedUser);
}
//raise the refCount of context
@ -1010,6 +1013,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
return protocolManager.isSupportAdvisory();
}
public String getValidatedUser() {
return validatedUser;
}
public void setValidatedUser(String validatedUser) {
this.validatedUser = validatedUser;
}
class SlowConsumerDetection implements SlowConsumerDetectionListener {
@Override
@ -1126,12 +1137,13 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
@Override
public Response processAddConnection(ConnectionInfo info) throws Exception {
try {
if (transportConnection.getRedirectTo() != null && protocolManager.getRedirectHandler()
.redirect(OpenWireConnection.this, info)) {
shutdown(true);
return null;
protocolManager.validateUser(OpenWireConnection.this, info);
if (transportConnection.getRedirectTo() != null) {
if (protocolManager.getRedirectHandler().redirect(OpenWireConnection.this, info)) {
shutdown(true);
return null;
}
}
protocolManager.addConnection(OpenWireConnection.this, info);
} catch (Exception e) {
Response resp = new ExceptionResponse(e);

View File

@ -357,19 +357,21 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
return websocketRegistryNames;
}
public void addConnection(OpenWireConnection connection, ConnectionInfo info) throws Exception {
public void validateUser(OpenWireConnection connection, ConnectionInfo info) throws Exception {
String username = info.getUserName();
String password = info.getPassword();
try {
validateUser(username, password, connection);
connection.setValidatedUser(validateUser(username, password, connection));
} catch (ActiveMQSecurityException e) {
// We need to send an exception used by the openwire
SecurityException ex = new SecurityException("User name [" + username + "] or password is invalid.");
ex.initCause(e);
throw ex;
}
}
public void addConnection(OpenWireConnection connection, ConnectionInfo info) throws Exception {
String clientId = info.getClientId();
if (clientId == null) {
throw new InvalidClientIDException("No clientID specified for connection request");
@ -529,8 +531,8 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
return false;
}
public void validateUser(String login, String passcode, OpenWireConnection connection) throws Exception {
server.getSecurityStore().authenticate(login, passcode, connection, getSecurityDomain());
public String validateUser(String login, String passcode, OpenWireConnection connection) throws Exception {
return server.validateUser(login, passcode, connection, getSecurityDomain());
}
public void sendBrokerInfo(OpenWireConnection connection) throws Exception {

View File

@ -133,7 +133,7 @@ public class AMQSession implements SessionCallback {
// now
try {
coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true, connection.getOperationContext(), protocolManager.getPrefixes(), protocolManager.getSecurityDomain());
coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true, connection.getOperationContext(), protocolManager.getPrefixes(), protocolManager.getSecurityDomain(), connection.getValidatedUser());
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.error("error init session", e);
}

View File

@ -72,7 +72,7 @@ public class OpenWireConnectionTest {
ServerSession serverSession = Mockito.mock(ServerSession.class);
Mockito.when(serverSession.getName()).thenReturn("session");
Mockito.doReturn(serverSession).when(server).createSession(Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyInt(), Mockito.any(), Mockito.anyBoolean(), Mockito.anyBoolean(), Mockito.anyBoolean(),
Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyString());
Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyBoolean(), Mockito.any(), Mockito.any(), Mockito.anyString(), Mockito.any());
OpenWireProtocolManager openWireProtocolManager = new OpenWireProtocolManager(null, server,null, null);
openWireProtocolManager.setSecurityDomain("securityDomain");

View File

@ -230,7 +230,8 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
if (stompSession == null) {
stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
String name = UUIDGenerator.getInstance().generateStringUUID();
ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, !transacted, false, false, false, null, stompSession, true, server.newOperationContext(), getPrefixes(), getSecurityDomain());
final String validatedUser = server.validateUser(connection.getLogin(), connection.getPasscode(), connection, getSecurityDomain());
ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, !transacted, false, false, false, null, stompSession, true, server.newOperationContext(), getPrefixes(), getSecurityDomain(), validatedUser);
stompSession.setServerSession(session);
sessions.put(id, stompSession);
}

View File

@ -117,11 +117,12 @@ public abstract class AbstractControl extends StandardMBean {
boolean createMessageId,
Long...queueID) throws Exception {
ManagementRemotingConnection fakeConnection = new ManagementRemotingConnection();
final String validatedUser = server.validateUser(user, password, fakeConnection, null);
ServerSession serverSession = server.createSession("management::" + UUIDGenerator.getInstance().generateStringUUID(), user, password,
Integer.MAX_VALUE, fakeConnection,
true, true, false,
false, address.toString(), fakeConnection.callback,
false, new DummyOperationContext(), Collections.emptyMap(), null);
false, new DummyOperationContext(), Collections.emptyMap(), null, validatedUser);
try {
CoreMessage message = new CoreMessage(storageManager.generateID(), 50);
if (headers != null) {

View File

@ -22,6 +22,7 @@ import org.apache.activemq.artemis.api.core.management.BrokerBalancerControl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.balancing.BrokerBalancer;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.json.JsonObjectBuilder;
@ -59,17 +60,16 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker
@Override
public CompositeData getTarget(String key) throws Exception {
Target target = balancer.getTarget(key);
if (target != null) {
TargetResult result = balancer.getTarget(key);
if (TargetResult.Status.OK == result.status) {
CompositeData connectorData = null;
TransportConfiguration connector = target.getConnector();
TransportConfiguration connector = result.target.getConnector();
if (connector != null) {
TabularData paramsData = new TabularDataSupport(getParametersType());
for (Map.Entry<String, Object> param : connector.getParams().entrySet()) {
paramsData.put(new CompositeDataSupport(getParameterType(), new String[]{"key", "value"},
new Object[]{param.getKey(), param == null ? param : param.getValue().toString()}));
new Object[]{param.getKey(), param.getValue() == null ? null : param.getValue().toString()}));
}
connectorData = new CompositeDataSupport(getTransportConfigurationType(),
@ -77,11 +77,9 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker
new Object[]{connector.getName(), connector.getFactoryClassName(), paramsData});
}
CompositeData targetData = new CompositeDataSupport(getTargetCompositeType(),
new String[]{"nodeID", "local", "connector"},
new Object[]{target.getNodeID(), target.isLocal(), connectorData});
return targetData;
return new CompositeDataSupport(getTargetCompositeType(),
new String[]{"nodeID", "local", "connector"},
new Object[]{result.target.getNodeID(), result.target.isLocal(), connectorData});
}
return null;
@ -89,14 +87,13 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker
@Override
public String getTargetAsJSON(String key) {
Target target = balancer.getTarget(key);
if (target != null) {
TransportConfiguration connector = target.getConnector();
TargetResult result = balancer.getTarget(key);
if (TargetResult.Status.OK == result.status) {
TransportConfiguration connector = result.target.getConnector();
JsonObjectBuilder targetDataBuilder = JsonLoader.createObjectBuilder()
.add("nodeID", target.getNodeID())
.add("local", target.isLocal());
.add("nodeID", result.target.getNodeID())
.add("local", result.target.isLocal());
if (connector == null) {
targetDataBuilder.addNull("connector");

View File

@ -163,10 +163,6 @@ public class ActiveMQPacketHandler implements ChannelHandler {
connection.setClientID(((CreateSessionMessage_V2) request).getClientID());
}
if (connection.getTransportConnection().getRedirectTo() != null) {
protocolManager.getRedirectHandler().redirect(connection, request);
}
Channel channel = connection.getChannel(request.getSessionChannelID(), request.getWindowSize());
ActiveMQPrincipal activeMQPrincipal = null;
@ -175,12 +171,17 @@ public class ActiveMQPacketHandler implements ChannelHandler {
activeMQPrincipal = connection.getDefaultActiveMQPrincipal();
}
final String validatedUser = server.validateUser(activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), connection, protocolManager.getSecurityDomain());
if (connection.getTransportConnection().getRedirectTo() != null) {
protocolManager.getRedirectHandler().redirect(connection, request);
}
OperationContext sessionOperationContext = server.newOperationContext();
Map<SimpleString, RoutingType> routingTypeMap = protocolManager.getPrefixes();
CoreSessionCallback sessionCallback = new CoreSessionCallback(request.getName(), protocolManager, channel, connection);
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), sessionCallback, true, sessionOperationContext, routingTypeMap, protocolManager.getSecurityDomain());
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), sessionCallback, true, sessionOperationContext, routingTypeMap, protocolManager.getSecurityDomain(), validatedUser);
ServerProducer serverProducer = new ServerProducerImpl(session.getName(), "CORE", request.getDefaultAddress());
session.addProducer(serverProducer);
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(server, session, channel);

View File

@ -40,7 +40,12 @@ public class ActiveMQRedirectHandler extends RedirectHandler<ActiveMQRedirectCon
@Override
public void cannotRedirect(ActiveMQRedirectContext context) throws Exception {
throw ActiveMQMessageBundle.BUNDLE.cannotRedirect();
switch (context.getResult().status) {
case REFUSED_UNAVAILABLE:
throw ActiveMQMessageBundle.BUNDLE.cannotRedirect();
case REFUSED_USE_ANOTHER:
throw ActiveMQMessageBundle.BUNDLE.balancerReject();
}
}
@Override

View File

@ -190,7 +190,7 @@ public class SecurityStoreImpl implements SecurityStore, HierarchicalRepositoryC
authenticationFailed(user, connection);
}
if (AuditLogger.isAnyLoggingEnabled() && connection != null) {
if (connection != null) {
connection.setAuditSubject(subject);
}
if (AuditLogger.isResourceLoggingEnabled()) {

View File

@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
import org.apache.activemq.artemis.api.core.ActiveMQRedirectedException;
import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
import org.apache.activemq.artemis.api.core.ActiveMQReplicationTimeooutException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.ActiveMQSessionCreationException;
@ -518,4 +519,7 @@ public interface ActiveMQMessageBundle {
@Message(id = 229239, value = "There is no retention configured. In order to use the replay method you must specify journal-retention-directory element on the broker.xml")
IllegalArgumentException noRetention();
@Message(id = 229240, value = "Balancer rejected the connection")
ActiveMQRemoteDisconnectException balancerReject();
}

View File

@ -338,7 +338,7 @@ public interface ActiveMQServer extends ServiceComponent {
boolean autoCreateQueues,
OperationContext context,
Map<SimpleString, RoutingType> prefixes,
String securityDomain) throws Exception;
String securityDomain, String validatedUser) throws Exception;
/** This is to be used in places where security is bypassed, like internal sessions, broker connections, etc... */
ServerSession createInternalSession(String name,
@ -959,4 +959,6 @@ public interface ActiveMQServer extends ServiceComponent {
void reloadConfigurationFile() throws Exception;
BrokerBalancerManager getBalancerManager();
}
String validateUser(String username, String password, RemotingConnection connection, String securityDomain) throws Exception;
}

View File

@ -26,6 +26,7 @@ import org.apache.activemq.artemis.core.server.balancing.pools.Pool;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.jboss.logging.Logger;
@ -46,7 +47,7 @@ public class BrokerBalancer implements ActiveMQComponent {
private final TargetKeyResolver targetKeyResolver;
private final Target localTarget;
private final TargetResult localTarget;
private final Pattern localTargetFilter;
@ -54,7 +55,7 @@ public class BrokerBalancer implements ActiveMQComponent {
private final Policy policy;
private final Cache<String, Target> cache;
private final Cache<String, TargetResult> cache;
private volatile boolean started = false;
@ -67,7 +68,7 @@ public class BrokerBalancer implements ActiveMQComponent {
}
public Target getLocalTarget() {
return localTarget;
return localTarget.target;
}
public String getLocalTargetFilter() {
@ -82,7 +83,7 @@ public class BrokerBalancer implements ActiveMQComponent {
return policy;
}
public Cache<String, Target> getCache() {
public Cache<String, TargetResult> getCache() {
return cache;
}
@ -99,7 +100,7 @@ public class BrokerBalancer implements ActiveMQComponent {
this.targetKeyResolver = new TargetKeyResolver(targetKey, targetKeyFilter);
this.localTarget = localTarget;
this.localTarget = new TargetResult(localTarget);
this.localTargetFilter = localTargetFilter != null ? Pattern.compile(localTargetFilter) : null;
@ -134,7 +135,7 @@ public class BrokerBalancer implements ActiveMQComponent {
}
}
public Target getTarget(Connection connection, String clientID, String username) {
public TargetResult getTarget(Connection connection, String clientID, String username) {
if (clientID != null && clientID.startsWith(BrokerBalancer.CLIENT_ID_PREFIX)) {
if (logger.isDebugEnabled()) {
logger.debug("The clientID [" + clientID + "] starts with BrokerBalancer.CLIENT_ID_PREFIX");
@ -146,7 +147,7 @@ public class BrokerBalancer implements ActiveMQComponent {
return getTarget(targetKeyResolver.resolve(connection, clientID, username));
}
public Target getTarget(String key) {
public TargetResult getTarget(String key) {
if (this.localTargetFilter != null && this.localTargetFilter.matcher(key).matches()) {
if (logger.isDebugEnabled()) {
@ -157,45 +158,47 @@ public class BrokerBalancer implements ActiveMQComponent {
}
if (pool == null) {
return null;
return TargetResult.REFUSED_USE_ANOTHER_RESULT;
}
Target target = null;
TargetResult result = null;
if (cache != null) {
target = cache.getIfPresent(key);
result = cache.getIfPresent(key);
}
if (target != null) {
if (pool.isTargetReady(target)) {
if (result != null) {
if (pool.isTargetReady(result.target)) {
if (logger.isDebugEnabled()) {
logger.debug("The cache returns [" + target + "] ready for " + targetKey + "[" + key + "]");
logger.debug("The cache returns [" + result.target + "] ready for " + targetKey + "[" + key + "]");
}
return target;
return result;
}
if (logger.isDebugEnabled()) {
logger.debug("The cache returns [" + target + "] not ready for " + targetKey + "[" + key + "]");
logger.debug("The cache returns [" + result.target + "] not ready for " + targetKey + "[" + key + "]");
}
}
List<Target> targets = pool.getTargets();
target = policy.selectTarget(targets, key);
Target target = policy.selectTarget(targets, key);
if (logger.isDebugEnabled()) {
logger.debug("The policy selects [" + target + "] from " + targets + " for " + targetKey + "[" + key + "]");
}
if (target != null && cache != null) {
if (logger.isDebugEnabled()) {
logger.debug("Caching " + targetKey + "[" + key + "] for [" + target + "]");
if (target != null) {
result = new TargetResult(target);
if (cache != null) {
if (logger.isDebugEnabled()) {
logger.debug("Caching " + targetKey + "[" + key + "] for [" + target + "]");
}
cache.put(key, result);
}
cache.put(key, target);
}
return target;
return result != null ? result : TargetResult.REFUSED_UNAVAILABLE_RESULT;
}
}

View File

@ -18,6 +18,7 @@
package org.apache.activemq.artemis.core.server.balancing;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
public class RedirectContext {
@ -27,7 +28,7 @@ public class RedirectContext {
private final String username;
private Target target;
private TargetResult result;
public RemotingConnection getConnection() {
return connection;
@ -42,11 +43,15 @@ public class RedirectContext {
}
public Target getTarget() {
return target;
return result.target;
}
public void setTarget(Target target) {
this.target = target;
public TargetResult getResult() {
return result;
}
public void setResult(TargetResult result) {
this.result = result;
}
public RedirectContext(RemotingConnection connection, String clientID, String username) {

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.balancing;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
public abstract class RedirectHandler<T extends RedirectContext> {
@ -51,9 +52,9 @@ public abstract class RedirectHandler<T extends RedirectContext> {
return true;
}
context.setTarget(brokerBalancer.getTarget(transportConnection, context.getClientID(), context.getUsername()));
context.setResult(brokerBalancer.getTarget(transportConnection, context.getClientID(), context.getUsername()));
if (context.getTarget() == null) {
if (TargetResult.Status.OK != context.getResult().status) {
ActiveMQServerLogger.LOGGER.cannotRedirectClientConnection(transportConnection);
cannotRedirect(context);

View File

@ -56,4 +56,5 @@ public interface Target {
<T> T getAttribute(String resourceName, String attributeName, Class<T> attributeClass, int timeout) throws Exception;
<T> T invokeOperation(String resourceName, String operationName, Object[] operationParams, Class<T> operationClass, int timeout) throws Exception;
}

View File

@ -18,7 +18,7 @@
package org.apache.activemq.artemis.core.server.balancing.targets;
public enum TargetKey {
CLIENT_ID, SNI_HOST, SOURCE_IP, USER_NAME;
CLIENT_ID, SNI_HOST, SOURCE_IP, USER_NAME, ROLE_NAME;
public static final String validValues;
@ -46,6 +46,8 @@ public enum TargetKey {
return SOURCE_IP;
case "USER_NAME":
return USER_NAME;
case "ROLE_NAME":
return ROLE_NAME;
default:
throw new IllegalStateException("Invalid RedirectKey:" + type + " valid Types: " + validValues);
}

View File

@ -17,7 +17,10 @@
package org.apache.activemq.artemis.core.server.balancing.targets;
import javax.security.auth.Subject;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal;
import org.jboss.logging.Logger;
import java.util.regex.Matcher;
@ -80,6 +83,35 @@ public class TargetKeyResolver {
case USER_NAME:
keyValue = username;
break;
case ROLE_NAME:
if (connection != null && connection.getProtocolConnection() != null) {
Subject subject = connection.getProtocolConnection().getAuditSubject();
if (subject != null) {
for (RolePrincipal candidateRole : subject.getPrincipals(RolePrincipal.class)) {
String roleName = candidateRole.getName();
if (roleName != null) {
if (keyFilter != null) {
Matcher keyMatcher = keyFilter.matcher(roleName);
if (keyMatcher.find()) {
keyValue = keyMatcher.group();
if (logger.isDebugEnabled()) {
logger.debugf("role match for %s via %s", roleName, keyMatcher);
}
return keyValue;
}
} else {
// with no filter, first role is the candidate
keyValue = roleName;
if (logger.isDebugEnabled()) {
logger.debugf("first role match: %s", roleName);
}
return keyValue;
}
}
}
}
}
break;
default:
throw new IllegalStateException("Unexpected value: " + key);
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.targets;
public class TargetResult {
public static final TargetResult REFUSED_UNAVAILABLE_RESULT = new TargetResult(Status.REFUSED_UNAVAILABLE);
public static final TargetResult REFUSED_USE_ANOTHER_RESULT = new TargetResult(Status.REFUSED_USE_ANOTHER);
public Status status;
public Target target;
public TargetResult(Target t) {
this.target = t;
this.status = Status.OK;
}
private TargetResult(Status s) {
this.status = s;
}
public enum Status {
OK,
REFUSED_UNAVAILABLE, // pool is not yet ready, possibly transient
REFUSED_USE_ANOTHER // rejected, go else where, non-transient
}
}

View File

@ -1674,12 +1674,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean autoCreateQueues,
final OperationContext context,
final Map<SimpleString, RoutingType> prefixes,
final String securityDomain) throws Exception {
String validatedUser = "";
if (securityStore != null) {
validatedUser = securityStore.authenticate(username, password, connection, securityDomain);
}
final String securityDomain,
String validatedUser) throws Exception {
checkSessionLimit(validatedUser);
@ -1693,6 +1689,16 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return session;
}
@Override
public String validateUser(String username, String password, RemotingConnection connection, String securityDomain) throws Exception {
String validatedUser = "";
if (securityStore != null) {
validatedUser = securityStore.authenticate(username, password, connection, securityDomain);
}
return validatedUser;
}
@Override
public ServerSession createInternalSession(String name,
int minLargeMessageSize,

View File

@ -18,7 +18,6 @@
package org.apache.activemq.artemis.core.server.balancing;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
@ -29,6 +28,7 @@ import org.apache.activemq.artemis.core.server.balancing.pools.Pool;
import org.apache.activemq.artemis.core.server.balancing.targets.LocalTarget;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -74,8 +74,8 @@ public class BrokerBalancerTest {
@Test
public void getTarget() {
assertEquals( localTarget, underTest.getTarget("FOO_EE"));
assertNotEquals( localTarget, underTest.getTarget("BAR_EE"));
assertEquals( localTarget, underTest.getTarget("FOO_EE").target);
assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT, underTest.getTarget("BAR_EE"));
}
}

View File

@ -17,7 +17,15 @@
package org.apache.activemq.artemis.core.server.balancing.targets;
import javax.security.auth.Subject;
import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal;
import org.apache.commons.collections.set.ListOrderedSet;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@ -107,4 +115,65 @@ public class TargetKeyResolverTest {
Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(null, null, null));
}
@Test
public void testRoleNameKeyWithFilter() throws Exception {
TargetKeyResolver targetKeyResolver = new TargetKeyResolver(TargetKey.ROLE_NAME, "B");
Connection connection = Mockito.mock(Connection.class);
Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(connection, null, null));
RemotingConnection protocolConnection = Mockito.mock(RemotingConnection.class);
Mockito.when(connection.getProtocolConnection()).thenReturn(protocolConnection);
Subject subject = Mockito.mock(Subject.class);
Mockito.when(protocolConnection.getAuditSubject()).thenReturn(subject);
Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(connection, null, null));
Set<RolePrincipal> rolePrincipals = new HashSet<>();
Mockito.when(subject.getPrincipals(RolePrincipal.class)).thenReturn(rolePrincipals);
Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(connection, null, null));
rolePrincipals.add(new RolePrincipal("A"));
Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(connection, null, null));
rolePrincipals.add(new RolePrincipal("B"));
Assert.assertEquals("B", targetKeyResolver.resolve(connection, null, null));
}
@Test
public void testRoleNameKeyWithoutFilter() throws Exception {
TargetKeyResolver targetKeyResolver = new TargetKeyResolver(TargetKey.ROLE_NAME, null);
Connection connection = Mockito.mock(Connection.class);
Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(connection, null, null));
RemotingConnection protocolConnection = Mockito.mock(RemotingConnection.class);
Mockito.when(connection.getProtocolConnection()).thenReturn(protocolConnection);
Subject subject = Mockito.mock(Subject.class);
Mockito.when(protocolConnection.getAuditSubject()).thenReturn(subject);
Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(connection, null, null));
Set<RolePrincipal> rolePrincipals = new ListOrderedSet();
Mockito.when(subject.getPrincipals(RolePrincipal.class)).thenReturn(rolePrincipals);
Assert.assertEquals(TargetKeyResolver.DEFAULT_KEY_VALUE, targetKeyResolver.resolve(connection, null, null));
final RolePrincipal roleA = new RolePrincipal("A");
rolePrincipals.add(roleA);
Assert.assertEquals("A", targetKeyResolver.resolve(connection, null, null));
rolePrincipals.add(new RolePrincipal("B"));
Assert.assertEquals("A", targetKeyResolver.resolve(connection, null, null));
rolePrincipals.remove(roleA);
// with no filter, the first entry matches
Assert.assertEquals("B", targetKeyResolver.resolve(connection, null, null));
}
}

View File

@ -13,10 +13,11 @@ The remote target is another reachable broker.
## Target Key
The broker balancer uses a target key to select a target broker.
It is a string retrieved from an incoming client connection, the supported values are:
* `CLIENT_ID` is the JMS client ID;
* `SNI_HOST` is the hostname indicated by the client in the SNI extension of the TLS protocol;
* `SOURCE_IP` is the source IP address of the client;
* `CLIENT_ID` is the JMS client ID.
* `SNI_HOST` is the hostname indicated by the client in the SNI extension of the TLS protocol.
* `SOURCE_IP` is the source IP address of the client.
* `USER_NAME` is the username indicated by the client.
* `ROLE_NAME` is a role associated with the authenticated user of the connection.
## Pools
The pool is a group of target brokers with periodic checks on their state.
@ -108,7 +109,7 @@ for more details about setting the `cache-timeout` parameter.
## Defining broker balancers
A broker balancer is defined by the `broker-balancer` element, it includes the following items:
* the `name` attribute defines the name of the broker balancer and is used to reference the balancer from an acceptor;
* the `target-key` element defines what key to select a target broker, the supported values are: `CLIENT_ID`, `SNI_HOST`, `SOURCE_IP`, `USER_NAME`, default is `SOURCE_IP`, see [target key](#target-key) for further details;
* the `target-key` element defines what key to select a target broker, the supported values are: `CLIENT_ID`, `SNI_HOST`, `SOURCE_IP`, `USER_NAME`, `ROLE_NAME`, default is `SOURCE_IP`, see [target key](#target-key) for further details;
* the `target-key-filter` element defines a regular expression to filter the resolved keys;
* the `local-target-filter` element defines a regular expression to match the keys that have to return a local target;
* the `cache-timeout` element is the time period for a target broker to remain in the cache, measured in milliseconds, setting `0` will disable the cache, default is `-1`, meaning no expiration;

View File

@ -207,7 +207,7 @@
<activemq-surefire-argline>-Dorg.apache.commons.logging.Log=org.apache.activemq.artemis.logs.JBossLoggingApacheLoggerBridge -Dorg.apache.activemq.artemis.utils.RetryRule.retry=${retryTests} -Dbrokerconfig.maxDiskUsage=100 -Dorg.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_QUIET_PERIOD=0 -Dorg.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_SHUTDOWN_TIMEOUT=0 -Djava.util.logging.manager=org.jboss.logmanager.LogManager
-Dlogging.configuration="file:${activemq.basedir}/tests/config/${logging.config}"
-Djava.library.path="${activemq.basedir}/target/bin/lib/linux-x86_64:${activemq.basedir}/target/bin/lib/linux-i686" -Djgroups.bind_addr=localhost -Dorg.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory.localBindAddress=localhost
-Djava.library.path="${activemq.basedir}/target/bin/lib/linux-x86_64:${activemq.basedir}/target/bin/lib/linux-i686" -Djgroups.bind_addr=localhost
-Djava.net.preferIPv4Stack=true -Dbasedir=${basedir}
</activemq-surefire-argline>
<activemq.basedir>${project.basedir}</activemq.basedir>

View File

@ -189,4 +189,62 @@ public class AmqpRedirectTest extends BalancingTestBase {
stopServers(0, 1);
}
@Test
public void testBalancerRejectionUseAnother() throws Exception {
setupLiveServerWithDiscovery(0, GROUP_ADDRESS, GROUP_PORT, true, true, false);
// only accepts users with RoleName==B so will reject
setupBalancerServerWithLocalTarget(0, TargetKey.ROLE_NAME, "B", null);
startServers(0);
URI uri = new URI("tcp://localhost:" + TransportConstants.DEFAULT_PORT);
AmqpClient client = new AmqpClient(uri, "admin", "admin");
AmqpConnection connection = client.createConnection();
connection.setContainerId(getName());
connection.setStateInspector(new AmqpValidator() {
@Override
public void inspectOpenedResource(Connection connection) {
if (!connection.getRemoteProperties().containsKey(AmqpSupport.CONNECTION_OPEN_FAILED)) {
markAsInvalid("Broker did not set connection establishment failed hint");
}
}
@Override
public void inspectClosedResource(Connection connection) {
ErrorCondition remoteError = connection.getRemoteCondition();
if (remoteError == null || remoteError.getCondition() == null) {
markAsInvalid("Broker did not add error condition for connection");
return;
}
if (!remoteError.getCondition().equals(ConnectionError.CONNECTION_FORCED)) {
markAsInvalid("Broker did not set condition to " + ConnectionError.CONNECTION_FORCED);
return;
}
String expectedDescription = "Broker balancer " + BROKER_BALANCER_NAME + ", rejected this connection";
String actualDescription = remoteError.getDescription();
if (!expectedDescription.equals(actualDescription)) {
markAsInvalid("Broker did not set description as expected, was: " + actualDescription);
return;
}
}
});
try {
connection.connect();
fail("Expected connection to fail, without redirect");
} catch (Exception e) {
// Expected
}
connection.getStateInspector().assertValid();
connection.close();
stopServers(0);
}
}

View File

@ -137,7 +137,25 @@ public class BalancingTestBase extends ClusterTestBase {
acceptor.getParams().put("redirect-to", BROKER_BALANCER_NAME);
}
protected void setupBalancerServerWithLocalTarget(final int node, final TargetKey targetKey, final String targetKeyFilter, final String localTargetFilter) {
Configuration configuration = getServer(node).getConfiguration();
BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration().setName(BROKER_BALANCER_NAME);
brokerBalancerConfiguration.setTargetKey(targetKey).setLocalTargetFilter(localTargetFilter).setTargetKeyFilter(targetKeyFilter);
configuration.setBalancerConfigurations(Collections.singletonList(brokerBalancerConfiguration));
TransportConfiguration acceptor = getDefaultServerAcceptor(node);
acceptor.getParams().put("redirect-to", BROKER_BALANCER_NAME);
}
protected ConnectionFactory createFactory(String protocol, boolean sslEnabled, String host, int port, String clientID, String user, String password) throws Exception {
return createFactory(protocol, sslEnabled, host, port, clientID, user, password, -1);
}
protected ConnectionFactory createFactory(String protocol, boolean sslEnabled, String host, int port, String clientID, String user, String password, int retries) throws Exception {
switch (protocol) {
case CORE_PROTOCOL: {
StringBuilder urlBuilder = new StringBuilder();
@ -146,7 +164,7 @@ public class BalancingTestBase extends ClusterTestBase {
urlBuilder.append(host);
urlBuilder.append(":");
urlBuilder.append(port);
urlBuilder.append("?ha=true&reconnectAttempts=30");
urlBuilder.append("?ha=true&reconnectAttempts=10&initialConnectAttempts=" + retries);
urlBuilder.append("&sniHost=");
urlBuilder.append(host);
@ -197,8 +215,10 @@ public class BalancingTestBase extends ClusterTestBase {
urlBuilder.append(")");
}
urlBuilder.append("?failover.startupMaxReconnectAttempts=" + retries);
if (clientID != null) {
urlBuilder.append("?jms.clientID=");
urlBuilder.append("&jms.clientID=");
urlBuilder.append(clientID);
}
@ -223,8 +243,10 @@ public class BalancingTestBase extends ClusterTestBase {
urlBuilder.append(")");
}
urlBuilder.append("?startupMaxReconnectAttempts=" + retries + "&maxReconnectAttempts=" + retries);
if (clientID != null) {
urlBuilder.append("?jms.clientID=");
urlBuilder.append("&jms.clientID=");
urlBuilder.append(clientID);
}

View File

@ -24,8 +24,11 @@ import org.apache.activemq.artemis.api.core.management.BrokerBalancerControl;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.balancing.policies.FirstElementPolicy;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.integration.security.SecurityTest;
import org.apache.activemq.artemis.utils.Wait;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
@ -37,6 +40,8 @@ import org.junit.Test;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;
import java.lang.management.ManagementFactory;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@ -44,7 +49,16 @@ import java.util.concurrent.TimeUnit;
public class MQTTRedirectTest extends BalancingTestBase {
private final boolean discovery = true;
static {
String path = System.getProperty("java.security.auth.login.config");
if (path == null) {
URL resource = SecurityTest.class.getClassLoader().getResource("login.config");
if (resource != null) {
path = resource.getFile();
System.setProperty("java.security.auth.login.config", path);
}
}
}
@Test
public void testSimpleRedirect() throws Exception {
@ -52,11 +66,7 @@ public class MQTTRedirectTest extends BalancingTestBase {
setupLiveServerWithDiscovery(0, GROUP_ADDRESS, GROUP_PORT, true, true, false);
setupLiveServerWithDiscovery(1, GROUP_ADDRESS, GROUP_PORT, true, true, false);
if (discovery) {
setupBalancerServerWithDiscovery(0, TargetKey.USER_NAME, FirstElementPolicy.NAME, null, false, null, 1);
} else {
setupBalancerServerWithStaticConnectors(0, TargetKey.USER_NAME, FirstElementPolicy.NAME, null, false, null, 1, 1);
}
setupBalancerServerWithDiscovery(0, TargetKey.USER_NAME, FirstElementPolicy.NAME, null, false, null, 1);
startServers(0, 1);
@ -94,7 +104,7 @@ public class MQTTRedirectTest extends BalancingTestBase {
CompositeData hostData = targetConnectorParams.get(new Object[]{TransportConstants.HOST_PROP_NAME});
CompositeData portData = targetConnectorParams.get(new Object[]{TransportConstants.PORT_PROP_NAME});
String host = hostData != null ? (String)hostData.get("value") : TransportConstants.DEFAULT_HOST;
int port = portData != null ? Integer.valueOf((String)portData.get("value")) : TransportConstants.DEFAULT_PORT;
int port = portData != null ? Integer.parseInt((String)portData.get("value")) : TransportConstants.DEFAULT_PORT;
CountDownLatch latch = new CountDownLatch(1);
List<MqttMessage> messages = new ArrayList<>();
@ -119,7 +129,40 @@ public class MQTTRedirectTest extends BalancingTestBase {
client1.close();
Assert.assertEquals(0, queueControl0.countMessages());
Wait.assertEquals(0, () -> queueControl1.countMessages());
Wait.assertEquals(0, (Wait.LongCondition) queueControl1::countMessages);
}
@Test
public void testRoleNameKeyLocalTarget() throws Exception {
ActiveMQJAASSecurityManager securityManager = new ActiveMQJAASSecurityManager("PropertiesLogin");
servers[0] = addServer(ActiveMQServers.newActiveMQServer(createDefaultConfig(true).setSecurityEnabled(true), ManagementFactory.getPlatformMBeanServer(), securityManager, false));
setupBalancerServerWithLocalTarget(0, TargetKey.ROLE_NAME, "b", "b");
startServers(0);
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
connOpts.setUserName("a");
connOpts.setPassword("a".toCharArray());
MqttClient client0 = new MqttClient("tcp://" + TransportConstants.DEFAULT_HOST + ":" + TransportConstants.DEFAULT_PORT, "TEST", new MemoryPersistence());
try {
client0.connect(connOpts);
fail("Expect to be rejected as not in role b");
} catch (MqttException e) {
Assert.assertEquals(MqttConnectReturnCode.CONNECTION_REFUSED_USE_ANOTHER_SERVER, MqttConnectReturnCode.valueOf((byte) e.getReasonCode()));
}
client0.close();
MqttClient client1 = new MqttClient("tcp://" + TransportConstants.DEFAULT_HOST + ":" + TransportConstants.DEFAULT_PORT, "TEST", new MemoryPersistence());
connOpts.setUserName("b");
connOpts.setPassword("b".toCharArray());
// expect to be accepted, b has role b
client1.connect(connOpts);
client1.disconnect();
client1.close();
}
}

View File

@ -18,12 +18,17 @@
package org.apache.activemq.artemis.tests.integration.balancing;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.balancing.policies.FirstElementPolicy;
import org.apache.activemq.artemis.core.server.balancing.policies.Policy;
import org.apache.activemq.artemis.core.server.balancing.policies.PolicyFactory;
import org.apache.activemq.artemis.core.server.balancing.policies.PolicyFactoryResolver;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.integration.security.SecurityTest;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
@ -33,12 +38,16 @@ import org.junit.runners.Parameterized;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@RunWith(Parameterized.class)
public class TargetKeyTest extends BalancingTestBase {
@ -57,6 +66,17 @@ public class TargetKeyTest extends BalancingTestBase {
}
static {
String path = System.getProperty("java.security.auth.login.config");
if (path == null) {
URL resource = SecurityTest.class.getClassLoader().getResource("login.config");
if (resource != null) {
path = resource.getFile();
System.setProperty("java.security.auth.login.config", path);
}
}
}
private final String protocol;
private final List<String> keys = new ArrayList<>();
@ -174,6 +194,42 @@ public class TargetKeyTest extends BalancingTestBase {
Assert.assertEquals("admin", keys.get(0));
}
@Test
public void testRoleNameKeyLocalTarget() throws Exception {
ActiveMQJAASSecurityManager securityManager = new ActiveMQJAASSecurityManager("PropertiesLogin");
servers[0] = addServer(ActiveMQServers.newActiveMQServer(createDefaultConfig(true).setSecurityEnabled(true), ManagementFactory.getPlatformMBeanServer(), securityManager, false));
setupBalancerServerWithLocalTarget(0, TargetKey.ROLE_NAME, "b", "b");
// ensure advisory permission is present for openwire connection creation by 'b'
HierarchicalRepository<Set<Role>> securityRepository = servers[0].getSecurityRepository();
Role role = new Role("b", true, true, true, true, true, true, false, false, true, true);
Set<Role> roles = new HashSet<>();
roles.add(role);
securityRepository.addMatch("ActiveMQ.Advisory.#", roles);
startServers(0);
final int noRetriesSuchThatWeGetAnErrorOnRejection = 0;
ConnectionFactory connectionFactory = createFactory(protocol, false, TransportConstants.DEFAULT_HOST,
TransportConstants.DEFAULT_PORT + 0, null, "a", "a", noRetriesSuchThatWeGetAnErrorOnRejection);
// expect disconnect/reject as not role b
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
fail("Expect to be rejected as not in role b");
} catch (Exception expectedButNotSpecificDueToDifferentProtocolsInPlay) {
}
connectionFactory = createFactory(protocol, false, TransportConstants.DEFAULT_HOST,
TransportConstants.DEFAULT_PORT + 0, null, "b", "b");
// expect to be accepted, b has role b
try (Connection connection = connectionFactory.createConnection()) {
connection.start();
}
}
private boolean checkLocalHostname(String host) {
try {
return InetAddress.getByName(host).isLoopbackAddress();