ARTEMIS-3569 Validate users on AMQP remote open
This commit is contained in:
parent
b979189187
commit
f8472fd736
|
@ -58,7 +58,6 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
|
|||
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
|
||||
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerSenderContext;
|
||||
import org.apache.activemq.artemis.protocol.amqp.proton.transaction.ProtonTransactionHandler;
|
||||
import org.apache.activemq.artemis.protocol.amqp.sasl.PlainSASLResult;
|
||||
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.Connection;
|
||||
|
@ -194,30 +193,20 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
|
||||
String name = UUIDGenerator.getInstance().generateStringUUID();
|
||||
|
||||
String user = null;
|
||||
String passcode = null;
|
||||
if (saslResult != null) {
|
||||
user = saslResult.getUser();
|
||||
if (saslResult instanceof PlainSASLResult) {
|
||||
passcode = ((PlainSASLResult) saslResult).getPassword();
|
||||
}
|
||||
}
|
||||
|
||||
if (connection.isBridgeConnection()) {
|
||||
serverSession = manager.getServer().createInternalSession(name, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection,
|
||||
false, // boolean autoCommitSends
|
||||
false, // boolean autoCommitAcks,
|
||||
false, // boolean preAcknowledge,
|
||||
true, //boolean xa,
|
||||
(String) null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain());
|
||||
null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain());
|
||||
} else {
|
||||
final String validatedUser = manager.getServer().validateUser(user, passcode, protonSPI.getProtonConnectionDelegate(), manager.getSecurityDomain());
|
||||
serverSession = manager.getServer().createSession(name, user, passcode, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection,
|
||||
serverSession = manager.getServer().createSession(name, connection.getUser(), connection.getPassword(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection,
|
||||
false, // boolean autoCommitSends
|
||||
false, // boolean autoCommitAcks,
|
||||
false, // boolean preAcknowledge,
|
||||
true, //boolean xa,
|
||||
(String) null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain(), validatedUser);
|
||||
null, this, true, operationContext, manager.getPrefixes(), manager.getSecurityDomain(), connection.getValidatedUser());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
|
|||
|
||||
import java.net.URI;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
|
@ -61,6 +62,7 @@ import org.apache.qpid.proton.amqp.Symbol;
|
|||
import org.apache.qpid.proton.amqp.messaging.Source;
|
||||
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
|
||||
import org.apache.qpid.proton.amqp.transaction.Coordinator;
|
||||
import org.apache.qpid.proton.amqp.transport.AmqpError;
|
||||
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
|
||||
import org.apache.qpid.proton.engine.Connection;
|
||||
import org.apache.qpid.proton.engine.Delivery;
|
||||
|
@ -109,6 +111,10 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
private final ScheduleOperator scheduleOp = new ScheduleOperator(new ScheduleRunnable());
|
||||
private final AtomicReference<Future<?>> scheduledFutureRef = new AtomicReference(VOID_FUTURE);
|
||||
|
||||
private String user;
|
||||
private String password;
|
||||
private String validatedUser;
|
||||
|
||||
public AMQPConnectionContext(ProtonProtocolManager protocolManager,
|
||||
AMQPConnectionCallback connectionSP,
|
||||
String containerId,
|
||||
|
@ -238,6 +244,18 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
return handler;
|
||||
}
|
||||
|
||||
public String getUser() {
|
||||
return user;
|
||||
}
|
||||
|
||||
public String getPassword() {
|
||||
return password;
|
||||
}
|
||||
|
||||
public String getValidatedUser() {
|
||||
return validatedUser;
|
||||
}
|
||||
|
||||
public void destroy() {
|
||||
handler.runLater(() -> connectionCallback.close());
|
||||
}
|
||||
|
@ -530,8 +548,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
log.error("Error init connection", e);
|
||||
}
|
||||
|
||||
if ((connectionCallback.getTransportConnection().getRedirectTo() != null && protocolManager.getRedirectHandler()
|
||||
.redirect(this, connection)) || !validateConnection(connection)) {
|
||||
if (!validateUser(connection) || (connectionCallback.getTransportConnection().getRedirectTo() != null
|
||||
&& protocolManager.getRedirectHandler().redirect(this, connection)) || !validateConnection(connection)) {
|
||||
connection.close();
|
||||
} else {
|
||||
connection.setContext(AMQPConnectionContext.this);
|
||||
|
@ -557,6 +575,37 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
|
|||
}
|
||||
}
|
||||
|
||||
private boolean validateUser(Connection connection) throws Exception {
|
||||
user = null;
|
||||
password = null;
|
||||
validatedUser = null;
|
||||
|
||||
SASLResult saslResult = getSASLResult();
|
||||
if (saslResult != null) {
|
||||
user = saslResult.getUser();
|
||||
if (saslResult instanceof PlainSASLResult) {
|
||||
password = ((PlainSASLResult) saslResult).getPassword();
|
||||
}
|
||||
}
|
||||
|
||||
if (isIncomingConnection() && saslClientFactory == null && !isBridgeConnection()) {
|
||||
try {
|
||||
validatedUser = protocolManager.getServer().validateUser(user, password, connectionCallback.getProtonConnectionDelegate(), protocolManager.getSecurityDomain());
|
||||
} catch (ActiveMQSecurityException e) {
|
||||
log.warn(e.getMessage(), e);
|
||||
ErrorCondition error = new ErrorCondition();
|
||||
error.setCondition(AmqpError.UNAUTHORIZED_ACCESS);
|
||||
error.setDescription(e.getMessage() == null ? e.getClass().getSimpleName() : e.getMessage());
|
||||
connection.setCondition(error);
|
||||
connection.setProperties(Collections.singletonMap(AmqpSupport.CONNECTION_OPEN_FAILED, true));
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
class ScheduleOperator implements UnaryOperator<Future<?>> {
|
||||
|
||||
private long delay;
|
||||
|
|
|
@ -44,7 +44,7 @@ public class AMQPRedirectHandler extends RedirectHandler<AMQPRedirectContext> {
|
|||
protected void cannotRedirect(AMQPRedirectContext context) {
|
||||
ErrorCondition error = new ErrorCondition();
|
||||
error.setCondition(ConnectionError.CONNECTION_FORCED);
|
||||
switch (context.getResult().status) {
|
||||
switch (context.getResult().getStatus()) {
|
||||
case REFUSED_USE_ANOTHER:
|
||||
error.setDescription(String.format("Broker balancer %s, rejected this connection", context.getConnection().getTransportConnection().getRedirectTo()));
|
||||
break;
|
||||
|
|
|
@ -37,7 +37,7 @@ public class MQTTRedirectHandler extends RedirectHandler<MQTTRedirectContext> {
|
|||
|
||||
@Override
|
||||
protected void cannotRedirect(MQTTRedirectContext context) {
|
||||
switch (context.getResult().status) {
|
||||
switch (context.getResult().getStatus()) {
|
||||
case REFUSED_USE_ANOTHER:
|
||||
context.getMQTTSession().getProtocolHandler().sendConnack(MqttConnectReturnCode.CONNECTION_REFUSED_USE_ANOTHER_SERVER);
|
||||
break;
|
||||
|
|
|
@ -61,9 +61,9 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker
|
|||
@Override
|
||||
public CompositeData getTarget(String key) throws Exception {
|
||||
TargetResult result = balancer.getTarget(key);
|
||||
if (TargetResult.Status.OK == result.status) {
|
||||
if (TargetResult.Status.OK == result.getStatus()) {
|
||||
CompositeData connectorData = null;
|
||||
TransportConfiguration connector = result.target.getConnector();
|
||||
TransportConfiguration connector = result.getTarget().getConnector();
|
||||
|
||||
if (connector != null) {
|
||||
TabularData paramsData = new TabularDataSupport(getParametersType());
|
||||
|
@ -79,7 +79,7 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker
|
|||
|
||||
return new CompositeDataSupport(getTargetCompositeType(),
|
||||
new String[]{"nodeID", "local", "connector"},
|
||||
new Object[]{result.target.getNodeID(), result.target.isLocal(), connectorData});
|
||||
new Object[]{result.getTarget().getNodeID(), result.getTarget().isLocal(), connectorData});
|
||||
}
|
||||
|
||||
return null;
|
||||
|
@ -88,12 +88,12 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker
|
|||
@Override
|
||||
public String getTargetAsJSON(String key) {
|
||||
TargetResult result = balancer.getTarget(key);
|
||||
if (TargetResult.Status.OK == result.status) {
|
||||
TransportConfiguration connector = result.target.getConnector();
|
||||
if (TargetResult.Status.OK == result.getStatus()) {
|
||||
TransportConfiguration connector = result.getTarget().getConnector();
|
||||
|
||||
JsonObjectBuilder targetDataBuilder = JsonLoader.createObjectBuilder()
|
||||
.add("nodeID", result.target.getNodeID())
|
||||
.add("local", result.target.isLocal());
|
||||
.add("nodeID", result.getTarget().getNodeID())
|
||||
.add("local", result.getTarget().isLocal());
|
||||
|
||||
if (connector == null) {
|
||||
targetDataBuilder.addNull("connector");
|
||||
|
|
|
@ -40,7 +40,7 @@ public class ActiveMQRedirectHandler extends RedirectHandler<ActiveMQRedirectCon
|
|||
|
||||
@Override
|
||||
public void cannotRedirect(ActiveMQRedirectContext context) throws Exception {
|
||||
switch (context.getResult().status) {
|
||||
switch (context.getResult().getStatus()) {
|
||||
case REFUSED_UNAVAILABLE:
|
||||
throw ActiveMQMessageBundle.BUNDLE.cannotRedirect();
|
||||
case REFUSED_USE_ANOTHER:
|
||||
|
|
|
@ -68,7 +68,7 @@ public class BrokerBalancer implements ActiveMQComponent {
|
|||
}
|
||||
|
||||
public Target getLocalTarget() {
|
||||
return localTarget.target;
|
||||
return localTarget.getTarget();
|
||||
}
|
||||
|
||||
public String getLocalTargetFilter() {
|
||||
|
@ -168,16 +168,16 @@ public class BrokerBalancer implements ActiveMQComponent {
|
|||
}
|
||||
|
||||
if (result != null) {
|
||||
if (pool.isTargetReady(result.target)) {
|
||||
if (pool.isTargetReady(result.getTarget())) {
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("The cache returns [" + result.target + "] ready for " + targetKey + "[" + key + "]");
|
||||
logger.debug("The cache returns [" + result.getTarget() + "] ready for " + targetKey + "[" + key + "]");
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("The cache returns [" + result.target + "] not ready for " + targetKey + "[" + key + "]");
|
||||
logger.debug("The cache returns [" + result.getTarget() + "] not ready for " + targetKey + "[" + key + "]");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -43,7 +43,7 @@ public class RedirectContext {
|
|||
}
|
||||
|
||||
public Target getTarget() {
|
||||
return result.target;
|
||||
return result.getTarget();
|
||||
}
|
||||
|
||||
public TargetResult getResult() {
|
||||
|
|
|
@ -54,7 +54,7 @@ public abstract class RedirectHandler<T extends RedirectContext> {
|
|||
|
||||
context.setResult(brokerBalancer.getTarget(transportConnection, context.getClientID(), context.getUsername()));
|
||||
|
||||
if (TargetResult.Status.OK != context.getResult().status) {
|
||||
if (TargetResult.Status.OK != context.getResult().getStatus()) {
|
||||
ActiveMQServerLogger.LOGGER.cannotRedirectClientConnection(transportConnection);
|
||||
|
||||
cannotRedirect(context);
|
||||
|
|
|
@ -22,16 +22,25 @@ public class TargetResult {
|
|||
public static final TargetResult REFUSED_UNAVAILABLE_RESULT = new TargetResult(Status.REFUSED_UNAVAILABLE);
|
||||
public static final TargetResult REFUSED_USE_ANOTHER_RESULT = new TargetResult(Status.REFUSED_USE_ANOTHER);
|
||||
|
||||
public Status status;
|
||||
public Target target;
|
||||
private final Status status;
|
||||
private final Target target;
|
||||
|
||||
public TargetResult(Target t) {
|
||||
this.target = t;
|
||||
public Status getStatus() {
|
||||
return status;
|
||||
}
|
||||
|
||||
public Target getTarget() {
|
||||
return target;
|
||||
}
|
||||
|
||||
public TargetResult(Target target) {
|
||||
this.status = Status.OK;
|
||||
this.target = target;
|
||||
}
|
||||
|
||||
private TargetResult(Status s) {
|
||||
this.status = s;
|
||||
this.target = null;
|
||||
}
|
||||
|
||||
public enum Status {
|
||||
|
|
|
@ -1676,6 +1676,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
final Map<SimpleString, RoutingType> prefixes,
|
||||
final String securityDomain,
|
||||
String validatedUser) throws Exception {
|
||||
if (validatedUser == null) {
|
||||
validatedUser = validateUser(username, password, connection, securityDomain);
|
||||
}
|
||||
|
||||
checkSessionLimit(validatedUser);
|
||||
|
||||
|
|
|
@ -74,7 +74,7 @@ public class BrokerBalancerTest {
|
|||
|
||||
@Test
|
||||
public void getTarget() {
|
||||
assertEquals( localTarget, underTest.getTarget("FOO_EE").target);
|
||||
assertEquals( localTarget, underTest.getTarget("FOO_EE").getTarget());
|
||||
assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT, underTest.getTarget("BAR_EE"));
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue