ARTEMIS-1454: Support SASL in outgoing AMQP

Update ProtonHandler to allow for both client and server side SASL
and other related changes to allow for setting of client side mechanism
This commit is contained in:
Robert Godfrey 2017-09-27 23:45:14 +02:00 committed by rgodfrey
parent 30ba65a082
commit cc8a0cb90e
10 changed files with 372 additions and 138 deletions

View File

@ -175,4 +175,8 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
public String getClientID() { public String getClientID() {
return amqpConnection.getContainer(); return amqpConnection.getContainer();
} }
public void open() {
amqpConnection.open();
}
} }

View File

@ -124,7 +124,7 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
String id = server.getConfiguration().getName(); String id = server.getConfiguration().getName();
boolean useCoreSubscriptionNaming = server.getConfiguration().isAmqpUseCoreSubscriptionNaming(); boolean useCoreSubscriptionNaming = server.getConfiguration().isAmqpUseCoreSubscriptionNaming();
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool()); AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool(), true, null, null);
Executor executor = server.getExecutorFactory().getExecutor(); Executor executor = server.getExecutorFactory().getExecutor();

View File

@ -23,6 +23,7 @@ import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants; import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler; import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
@ -49,19 +50,18 @@ public class AMQPClientConnectionFactory {
this.useCoreSubscriptionNaming = false; this.useCoreSubscriptionNaming = false;
} }
public ActiveMQProtonRemotingConnection createConnection(ProtonProtocolManager protocolManager, Connection connection, Optional<EventHandler> eventHandler) { public ActiveMQProtonRemotingConnection createConnection(ProtonProtocolManager protocolManager, Connection connection, Optional<EventHandler> eventHandler, ClientSASLFactory clientSASLFactory) {
AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(protocolManager, connection, server.getExecutorFactory().getExecutor(), server); AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(protocolManager, connection, server.getExecutorFactory().getExecutor(), server);
Executor executor = server.getExecutorFactory().getExecutor(); Executor executor = server.getExecutorFactory().getExecutor();
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool()); AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool(), false, clientSASLFactory, connectionProperties);
eventHandler.ifPresent(amqpConnection::addEventHandler); eventHandler.ifPresent(amqpConnection::addEventHandler);
ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor); ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);
connectionCallback.setProtonConnectionDelegate(delegate); connectionCallback.setProtonConnectionDelegate(delegate);
amqpConnection.open(connectionProperties);
return delegate; return delegate;
} }
} }

View File

@ -22,6 +22,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection; import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager; import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler; import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener; import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler; import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
@ -40,16 +41,19 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
private static final Logger log = Logger.getLogger(ProtonClientConnectionManager.class); private static final Logger log = Logger.getLogger(ProtonClientConnectionManager.class);
private final AMQPClientConnectionFactory connectionFactory; private final AMQPClientConnectionFactory connectionFactory;
private final Optional<EventHandler> eventHandler; private final Optional<EventHandler> eventHandler;
private final ClientSASLFactory clientSASLFactory;
public ProtonClientConnectionManager(AMQPClientConnectionFactory connectionFactory, Optional<EventHandler> eventHandler) { public ProtonClientConnectionManager(AMQPClientConnectionFactory connectionFactory, Optional<EventHandler> eventHandler, ClientSASLFactory clientSASLFactory) {
this.connectionFactory = connectionFactory; this.connectionFactory = connectionFactory;
this.eventHandler = eventHandler; this.eventHandler = eventHandler;
this.clientSASLFactory = clientSASLFactory;
} }
@Override @Override
public void connectionCreated(ActiveMQComponent component, Connection connection, ProtonProtocolManager protocolManager) { public void connectionCreated(ActiveMQComponent component, Connection connection, ProtonProtocolManager protocolManager) {
ActiveMQProtonRemotingConnection amqpConnection = connectionFactory.createConnection(protocolManager, connection, eventHandler); ActiveMQProtonRemotingConnection amqpConnection = connectionFactory.createConnection(protocolManager, connection, eventHandler, clientSASLFactory);
connectionMap.put(connection.getID(), amqpConnection); connectionMap.put(connection.getID(), amqpConnection);
amqpConnection.open();
log.info("Connection " + amqpConnection.getRemoteAddress() + " created"); log.info("Connection " + amqpConnection.getRemoteAddress() + " created");
} }
@ -60,6 +64,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
if (connection != null) { if (connection != null) {
log.info("Connection " + connection.getRemoteAddress() + " destroyed"); log.info("Connection " + connection.getRemoteAddress() + " destroyed");
connection.disconnect(false); connection.disconnect(false);
} else {
log.error("Connection with id " + connectionID + " not found in connectionDestroyed");
} }
} }
@ -69,6 +75,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
if (connection != null) { if (connection != null) {
log.info("Connection " + connection.getRemoteAddress() + " exception: " + me.getMessage()); log.info("Connection " + connection.getRemoteAddress() + " exception: " + me.getMessage());
connection.fail(me); connection.fail(me);
} else {
log.error("Connection with id " + connectionID + " not found in connectionException");
} }
} }
@ -78,6 +86,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
if (connection != null) { if (connection != null) {
log.info("Connection " + connection.getRemoteAddress() + " ready"); log.info("Connection " + connection.getRemoteAddress() + " ready");
connection.getTransportConnection().fireReady(true); connection.getTransportConnection().fireReady(true);
} else {
log.error("Connection with id " + connectionID + " not found in connectionReadyForWrites()!");
} }
} }
@ -92,6 +102,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
RemotingConnection connection = connectionMap.get(connectionID); RemotingConnection connection = connectionMap.get(connectionID);
if (connection != null) { if (connection != null) {
connection.bufferReceived(connectionID, buffer); connection.bufferReceived(connectionID, buffer);
} else {
log.error("Connection with id " + connectionID + " not found in bufferReceived()!");
} }
} }
} }

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability; import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler; import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
import org.apache.activemq.artemis.protocol.amqp.sasl.AnonymousServerSASL; import org.apache.activemq.artemis.protocol.amqp.sasl.AnonymousServerSASL;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.ByteUtil;
@ -68,6 +69,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
protected AMQPConnectionCallback connectionCallback; protected AMQPConnectionCallback connectionCallback;
private final String containerId; private final String containerId;
private final boolean isIncomingConnection;
private final ClientSASLFactory saslClientFactory;
private final Map<Symbol, Object> connectionProperties = new HashMap<>(); private final Map<Symbol, Object> connectionProperties = new HashMap<>();
private final ScheduledExecutorService scheduledPool; private final ScheduledExecutorService scheduledPool;
@ -84,19 +87,28 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
int maxFrameSize, int maxFrameSize,
int channelMax, int channelMax,
boolean useCoreSubscriptionNaming, boolean useCoreSubscriptionNaming,
ScheduledExecutorService scheduledPool) { ScheduledExecutorService scheduledPool,
boolean isIncomingConnection,
ClientSASLFactory saslClientFactory,
Map<Symbol, Object> connectionProperties) {
this.protocolManager = protocolManager; this.protocolManager = protocolManager;
this.connectionCallback = connectionSP; this.connectionCallback = connectionSP;
this.useCoreSubscriptionNaming = useCoreSubscriptionNaming; this.useCoreSubscriptionNaming = useCoreSubscriptionNaming;
this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString(); this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString();
this.isIncomingConnection = isIncomingConnection;
this.saslClientFactory = saslClientFactory;
connectionProperties.put(AmqpSupport.PRODUCT, "apache-activemq-artemis"); this.connectionProperties.put(AmqpSupport.PRODUCT, "apache-activemq-artemis");
connectionProperties.put(AmqpSupport.VERSION, VersionLoader.getVersion().getFullVersion()); this.connectionProperties.put(AmqpSupport.VERSION, VersionLoader.getVersion().getFullVersion());
if (connectionProperties != null) {
this.connectionProperties.putAll(connectionProperties);
}
this.scheduledPool = scheduledPool; this.scheduledPool = scheduledPool;
connectionCallback.setConnection(this); connectionCallback.setConnection(this);
this.handler = new ProtonHandler(protocolManager.getServer().getExecutorFactory().getExecutor()); this.handler = new ProtonHandler(protocolManager.getServer().getExecutorFactory().getExecutor(), isIncomingConnection);
handler.addEventHandler(this); handler.addEventHandler(this);
Transport transport = handler.getTransport(); Transport transport = handler.getTransport();
transport.setEmitFlowEventOnSend(false); transport.setEmitFlowEventOnSend(false);
@ -106,6 +118,17 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
transport.setChannelMax(channelMax); transport.setChannelMax(channelMax);
transport.setInitialRemoteMaxFrameSize(protocolManager.getInitialRemoteMaxFrameSize()); transport.setInitialRemoteMaxFrameSize(protocolManager.getInitialRemoteMaxFrameSize());
transport.setMaxFrameSize(maxFrameSize); transport.setMaxFrameSize(maxFrameSize);
if (!isIncomingConnection && saslClientFactory != null) {
handler.createClientSASL();
}
}
public boolean isIncomingConnection() {
return isIncomingConnection;
}
public ClientSASLFactory getSaslClientFactory() {
return saslClientFactory;
} }
protected AMQPSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException { protected AMQPSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException {
@ -232,7 +255,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
return ExtCapability.getCapabilities(); return ExtCapability.getCapabilities();
} }
public void open(Map<Symbol, Object> connectionProperties) { public void open() {
handler.open(containerId, connectionProperties); handler.open(containerId, connectionProperties);
} }
@ -270,56 +293,6 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
return useCoreSubscriptionNaming; return useCoreSubscriptionNaming;
} }
@Override
public void onInit(Connection connection) throws Exception {
}
@Override
public void onLocalOpen(Connection connection) throws Exception {
}
@Override
public void onLocalClose(Connection connection) throws Exception {
}
@Override
public void onFinal(Connection connection) throws Exception {
}
@Override
public void onInit(Session session) throws Exception {
}
@Override
public void onFinal(Session session) throws Exception {
}
@Override
public void onInit(Link link) throws Exception {
}
@Override
public void onLocalOpen(Link link) throws Exception {
}
@Override
public void onLocalClose(Link link) throws Exception {
}
@Override
public void onFinal(Link link) throws Exception {
}
@Override @Override
public void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl) { public void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl) {
if (sasl) { if (sasl) {
@ -343,6 +316,25 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
handler.setChosenMechanism(connectionCallback.getServerSASL(mech)); handler.setChosenMechanism(connectionCallback.getServerSASL(mech));
} }
@Override
public void onSaslMechanismsOffered(final ProtonHandler handler, final String[] mechanisms) {
if (saslClientFactory != null) {
handler.setClientMechanism(saslClientFactory.chooseMechanism(mechanisms));
}
}
@Override
public void onAuthFailed(final ProtonHandler protonHandler, final Connection connection) {
connectionCallback.close();
handler.close(null);
}
@Override
public void onAuthSuccess(final ProtonHandler protonHandler, final Connection connection) {
connection.open();
flush();
}
@Override @Override
public void onTransport(Transport transport) { public void onTransport(Transport transport) {
handler.flushBytes(); handler.flushBytes();
@ -437,10 +429,6 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
} }
} }
@Override
public void onLocalClose(Session session) throws Exception {
}
@Override @Override
public void onRemoteClose(Session session) throws Exception { public void onRemoteClose(Session session) throws Exception {
lock(); lock();

View File

@ -29,58 +29,66 @@ import org.apache.qpid.proton.engine.Transport;
*/ */
public interface EventHandler { public interface EventHandler {
void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl); default void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl) { }
void onSaslRemoteMechanismChosen(ProtonHandler handler, String mech); default void onSaslRemoteMechanismChosen(ProtonHandler handler, String mech) { }
void onInit(Connection connection) throws Exception; default void onAuthFailed(ProtonHandler protonHandler, Connection connection) { }
void onLocalOpen(Connection connection) throws Exception; default void onAuthSuccess(ProtonHandler protonHandler, Connection connection) { }
void onRemoteOpen(Connection connection) throws Exception; default void onSaslMechanismsOffered(ProtonHandler handler, String[] mechanisms) { }
void onLocalClose(Connection connection) throws Exception; default void onInit(Connection connection) throws Exception { }
void onRemoteClose(Connection connection) throws Exception; default void onLocalOpen(Connection connection) throws Exception { }
void onFinal(Connection connection) throws Exception; default void onRemoteOpen(Connection connection) throws Exception { }
void onInit(Session session) throws Exception; default void onLocalClose(Connection connection) throws Exception { }
void onLocalOpen(Session session) throws Exception; default void onRemoteClose(Connection connection) throws Exception { }
void onRemoteOpen(Session session) throws Exception; default void onFinal(Connection connection) throws Exception { }
void onLocalClose(Session session) throws Exception; default void onInit(Session session) throws Exception { }
void onRemoteClose(Session session) throws Exception; default void onLocalOpen(Session session) throws Exception { }
void onFinal(Session session) throws Exception; default void onRemoteOpen(Session session) throws Exception { }
void onInit(Link link) throws Exception; default void onLocalClose(Session session) throws Exception { }
void onLocalOpen(Link link) throws Exception; default void onRemoteClose(Session session) throws Exception { }
void onRemoteOpen(Link link) throws Exception; default void onFinal(Session session) throws Exception { }
void onLocalClose(Link link) throws Exception; default void onInit(Link link) throws Exception { }
void onRemoteClose(Link link) throws Exception; default void onLocalOpen(Link link) throws Exception { }
void onFlow(Link link) throws Exception; default void onRemoteOpen(Link link) throws Exception { }
void onFinal(Link link) throws Exception; default void onLocalClose(Link link) throws Exception { }
void onRemoteDetach(Link link) throws Exception; default void onRemoteClose(Link link) throws Exception { }
void onLocalDetach(Link link) throws Exception; default void onFlow(Link link) throws Exception { }
void onDelivery(Delivery delivery) throws Exception; default void onFinal(Link link) throws Exception { }
void onTransport(Transport transport) throws Exception; default void onRemoteDetach(Link link) throws Exception { }
void pushBytes(ByteBuf bytes); default void onLocalDetach(Link link) throws Exception { }
boolean flowControl(ReadyListener readyListener); default void onDelivery(Delivery delivery) throws Exception { }
default void onTransport(Transport transport) throws Exception { }
default void pushBytes(ByteBuf bytes) { }
default boolean flowControl(ReadyListener readyListener) {
return true;
}
} }

View File

@ -16,8 +16,10 @@
*/ */
package org.apache.activemq.artemis.protocol.amqp.proton.handler; package org.apache.activemq.artemis.protocol.amqp.proton.handler;
import javax.security.auth.Subject;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
@ -25,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable; import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL; import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@ -60,14 +63,17 @@ public class ProtonHandler extends ProtonInitializable {
private List<EventHandler> handlers = new ArrayList<>(); private List<EventHandler> handlers = new ArrayList<>();
private Sasl serverSasl; private Sasl sasl;
private ServerSASL chosenMechanism; private ServerSASL chosenMechanism;
private ClientSASL clientSASLMechanism;
private final ReentrantLock lock = new ReentrantLock(); private final ReentrantLock lock = new ReentrantLock();
private final long creationTime; private final long creationTime;
private final boolean isServer;
private SASLResult saslResult; private SASLResult saslResult;
protected volatile boolean dataReceived; protected volatile boolean dataReceived;
@ -80,12 +86,13 @@ public class ProtonHandler extends ProtonInitializable {
boolean inDispatch = false; boolean inDispatch = false;
public ProtonHandler(Executor flushExecutor) { public ProtonHandler(Executor flushExecutor, boolean isServer) {
this.flushExecutor = flushExecutor; this.flushExecutor = flushExecutor;
this.readyListener = () -> flushExecutor.execute(() -> { this.readyListener = () -> flushExecutor.execute(() -> {
flush(); flush();
}); });
this.creationTime = System.currentTimeMillis(); this.creationTime = System.currentTimeMillis();
this.isServer = isServer;
transport.bind(connection); transport.bind(connection);
connection.collect(collector); connection.collect(collector);
} }
@ -157,9 +164,9 @@ public class ProtonHandler extends ProtonInitializable {
} }
public void createServerSASL(String[] mechanisms) { public void createServerSASL(String[] mechanisms) {
this.serverSasl = transport.sasl(); this.sasl = transport.sasl();
this.serverSasl.server(); this.sasl.server();
serverSasl.setMechanisms(mechanisms); sasl.setMechanisms(mechanisms);
} }
public void flushBytes() { public void flushBytes() {
@ -210,7 +217,11 @@ public class ProtonHandler extends ProtonInitializable {
try { try {
byte auth = buffer.getByte(4); byte auth = buffer.getByte(4);
if (auth == SASL || auth == BARE) { if (auth == SASL || auth == BARE) {
dispatchAuth(auth == SASL); if (isServer) {
dispatchAuth(auth == SASL);
} else if (auth == BARE && clientSASLMechanism == null) {
dispatchAuthSuccess();
}
/* /*
* there is a chance that if SASL Handshake has been carried out that the capacity may change. * there is a chance that if SASL Handshake has been carried out that the capacity may change.
* */ * */
@ -260,7 +271,7 @@ public class ProtonHandler extends ProtonInitializable {
lock.lock(); lock.lock();
try { try {
transport.process(); transport.process();
checkServerSASL(); checkSASL();
} finally { } finally {
lock.unlock(); lock.unlock();
} }
@ -282,52 +293,131 @@ public class ProtonHandler extends ProtonInitializable {
flush(); flush();
} }
protected void checkServerSASL() { protected void checkSASL() {
if (serverSasl != null && serverSasl.getRemoteMechanisms().length > 0) { if (isServer) {
if (sasl != null && sasl.getRemoteMechanisms().length > 0) {
if (chosenMechanism == null) { if (chosenMechanism == null) {
if (log.isTraceEnabled()) { if (log.isTraceEnabled()) {
log.trace("SASL chosenMechanism: " + serverSasl.getRemoteMechanisms()[0]); log.trace("SASL chosenMechanism: " + sasl.getRemoteMechanisms()[0]);
}
dispatchRemoteMechanismChosen(serverSasl.getRemoteMechanisms()[0]);
}
if (chosenMechanism != null) {
byte[] dataSASL = new byte[serverSasl.pending()];
serverSasl.recv(dataSASL, 0, dataSASL.length);
if (log.isTraceEnabled()) {
log.trace("Working on sasl::" + (dataSASL != null && dataSASL.length > 0 ? ByteUtil.bytesToHex(dataSASL, 2) : "Anonymous"));
}
byte[] response = chosenMechanism.processSASL(dataSASL);
if (response != null) {
serverSasl.send(response, 0, response.length);
}
saslResult = chosenMechanism.result();
if (saslResult != null) {
if (saslResult.isSuccess()) {
saslComplete(Sasl.SaslOutcome.PN_SASL_OK);
} else {
saslComplete(Sasl.SaslOutcome.PN_SASL_AUTH);
} }
dispatchRemoteMechanismChosen(sasl.getRemoteMechanisms()[0]);
}
if (chosenMechanism != null) {
byte[] dataSASL = new byte[sasl.pending()];
sasl.recv(dataSASL, 0, dataSASL.length);
if (log.isTraceEnabled()) {
log.trace("Working on sasl::" + (dataSASL != null && dataSASL.length > 0 ? ByteUtil.bytesToHex(dataSASL, 2) : "Anonymous"));
}
byte[] response = chosenMechanism.processSASL(dataSASL);
if (response != null) {
sasl.send(response, 0, response.length);
}
saslResult = chosenMechanism.result();
if (saslResult != null) {
if (saslResult.isSuccess()) {
saslComplete(Sasl.SaslOutcome.PN_SASL_OK);
} else {
saslComplete(Sasl.SaslOutcome.PN_SASL_AUTH);
}
}
} else {
// no auth available, system error
saslComplete(Sasl.SaslOutcome.PN_SASL_SYS);
}
}
} else {
if (sasl != null) {
switch (sasl.getState()) {
case PN_SASL_IDLE:
if (sasl.getRemoteMechanisms().length != 0) {
dispatchMechanismsOffered(sasl.getRemoteMechanisms());
if (clientSASLMechanism == null) {
log.infof("Outbound connection failed - unknown mechanism, offered mechanisms: %s",
Arrays.asList(sasl.getRemoteMechanisms()));
sasl = null;
dispatchAuthFailed();
} else {
sasl.setMechanisms(clientSASLMechanism.getName());
byte[] initialResponse = clientSASLMechanism.getInitialResponse();
if (initialResponse != null) {
sasl.send(initialResponse, 0, initialResponse.length);
}
}
}
break;
case PN_SASL_STEP:
int challengeSize = sasl.pending();
byte[] challenge = new byte[challengeSize];
sasl.recv(challenge, 0, challengeSize);
byte[] response = clientSASLMechanism.getResponse(challenge);
sasl.send(response, 0, response.length);
break;
case PN_SASL_FAIL:
log.info("Outbound connection failed, authentication failure");
sasl = null;
dispatchAuthFailed();
break;
case PN_SASL_PASS:
log.debug("Outbound connection succeeded");
saslResult = new SASLResult() {
@Override
public String getUser() {
return null;
}
@Override
public Subject getSubject() {
return null;
}
@Override
public boolean isSuccess() {
return true;
}
};
sasl = null;
dispatchAuthSuccess();
break;
case PN_SASL_CONF:
// do nothing
break;
} }
} else {
// no auth available, system error
saslComplete(Sasl.SaslOutcome.PN_SASL_SYS);
} }
} }
} }
private void saslComplete(Sasl.SaslOutcome saslOutcome) { private void saslComplete(Sasl.SaslOutcome saslOutcome) {
serverSasl.done(saslOutcome); sasl.done(saslOutcome);
serverSasl = null; sasl = null;
if (chosenMechanism != null) { if (chosenMechanism != null) {
chosenMechanism.done(); chosenMechanism.done();
} }
} }
private void dispatchAuthFailed() {
for (EventHandler h : handlers) {
h.onAuthFailed(this, getConnection());
}
}
private void dispatchAuthSuccess() {
for (EventHandler h : handlers) {
h.onAuthSuccess(this, getConnection());
}
}
private void dispatchMechanismsOffered(final String[] mechs) {
for (EventHandler h : handlers) {
h.onSaslMechanismsOffered(this, mechs);
}
}
private void dispatchAuth(boolean sasl) { private void dispatchAuth(boolean sasl) {
for (EventHandler h : handlers) { for (EventHandler h : handlers) {
h.onAuthInit(this, getConnection(), sasl); h.onAuthInit(this, getConnection(), sasl);
@ -393,4 +483,13 @@ public class ProtonHandler extends ProtonInitializable {
public void setChosenMechanism(ServerSASL chosenMechanism) { public void setChosenMechanism(ServerSASL chosenMechanism) {
this.chosenMechanism = chosenMechanism; this.chosenMechanism = chosenMechanism;
} }
public void setClientMechanism(final ClientSASL saslClientMech) {
this.clientSASLMechanism = saslClientMech;
}
public void createClientSASL() {
this.sasl = transport.sasl();
this.sasl.client();
}
} }

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.sasl;
public interface ClientSASL {
String getName();
byte[] getInitialResponse();
byte[] getResponse(byte[] challenge);
}

View File

@ -0,0 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.sasl;
public interface ClientSASLFactory {
ClientSASL chooseMechanism(String[] availableMechanims);
}

View File

@ -16,10 +16,13 @@
*/ */
package org.apache.activemq.artemis.tests.integration.amqp; package org.apache.activemq.artemis.tests.integration.amqp;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import java.util.Optional; import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
@ -28,16 +31,37 @@ import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFac
import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory; import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory;
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager; import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager;
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager; import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
import org.apache.activemq.artemis.tests.util.Wait; import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.Connection;
import org.junit.Test; import org.junit.Test;
public class AmqpOutboundConnectionTest extends AmqpClientTestSupport { public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
private boolean securityEnabled;
@Test(timeout = 60000) @Test(timeout = 60000)
public void testOutboundConnection() throws Throwable { public void testOutboundConnection() throws Throwable {
final ActiveMQServer remote = createServer(AMQP_PORT + 1); runOutboundConnectionTest(false);
remote.start(); }
@Test(timeout = 60000)
public void testOutboundConnectionWithSecurity() throws Throwable {
runOutboundConnectionTest(true);
}
private void runOutboundConnectionTest(boolean withSecurity) throws Exception {
final ActiveMQServer remote;
try {
securityEnabled = withSecurity;
remote = createServer(AMQP_PORT + 1);
} finally {
securityEnabled = false;
}
try { try {
Wait.waitFor(remote::isActive); Wait.waitFor(remote::isActive);
} catch (Exception e) { } catch (Exception e) {
@ -45,10 +69,30 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
throw e; throw e;
} }
final Map<String, Object> config = new LinkedHashMap<>(); final Map<String, Object> config = new LinkedHashMap<>(); config.put(TransportConstants.HOST_PROP_NAME, "localhost");
config.put(TransportConstants.HOST_PROP_NAME, "localhost");
config.put(TransportConstants.PORT_PROP_NAME, String.valueOf(AMQP_PORT + 1)); config.put(TransportConstants.PORT_PROP_NAME, String.valueOf(AMQP_PORT + 1));
ProtonClientConnectionManager lifeCycleListener = new ProtonClientConnectionManager(new AMQPClientConnectionFactory(server, "myid", Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000), Optional.empty()); final ClientSASLFactory clientSASLFactory;
if (withSecurity) {
clientSASLFactory = availableMechanims -> {
if (availableMechanims != null && Arrays.asList(availableMechanims).contains("PLAIN")) {
return new PlainSASLMechanism(fullUser, fullPass);
} else {
return null;
}
};
} else {
clientSASLFactory = null;
}
final AtomicBoolean connectionOpened = new AtomicBoolean();
EventHandler eventHandler = new EventHandler() {
@Override
public void onRemoteOpen(Connection connection) throws Exception {
connectionOpened.set(true);
}
};
ProtonClientConnectionManager lifeCycleListener = new ProtonClientConnectionManager(new AMQPClientConnectionFactory(server, "myid", Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000), Optional.of(eventHandler), clientSASLFactory);
ProtonClientProtocolManager protocolManager = new ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server); ProtonClientProtocolManager protocolManager = new ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
NettyConnector connector = new NettyConnector(config, lifeCycleListener, lifeCycleListener, server.getExecutorFactory().getExecutor(), server.getExecutorFactory().getExecutor(), server.getScheduledPool(), protocolManager); NettyConnector connector = new NettyConnector(config, lifeCycleListener, lifeCycleListener, server.getExecutorFactory().getExecutor(), server.getExecutorFactory().getExecutor(), server.getScheduledPool(), protocolManager);
connector.start(); connector.start();
@ -57,7 +101,8 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
try { try {
Wait.waitFor(() -> remote.getConnectionCount() > 0); Wait.waitFor(() -> remote.getConnectionCount() > 0);
assertEquals(1, remote.getConnectionCount()); assertEquals(1, remote.getConnectionCount());
Wait.waitFor(connectionOpened::get);
assertTrue("Remote connection was not opened - authentication error?", connectionOpened.get());
lifeCycleListener.stop(); lifeCycleListener.stop();
Wait.waitFor(() -> remote.getConnectionCount() == 0); Wait.waitFor(() -> remote.getConnectionCount() == 0);
@ -67,4 +112,38 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
remote.stop(); remote.stop();
} }
} }
@Override
protected boolean isSecurityEnabled() {
return securityEnabled;
}
private static class PlainSASLMechanism implements ClientSASL {
private final byte[] initialResponse;
PlainSASLMechanism(String username, String password) {
byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
byte[] encoded = new byte[usernameBytes.length + passwordBytes.length + 2];
System.arraycopy(usernameBytes, 0, encoded, 1, usernameBytes.length);
System.arraycopy(passwordBytes, 0, encoded, usernameBytes.length + 2, passwordBytes.length);
initialResponse = encoded;
}
@Override
public String getName() {
return "PLAIN";
}
@Override
public byte[] getInitialResponse() {
return initialResponse;
}
@Override
public byte[] getResponse(byte[] challenge) {
return new byte[0];
}
}
} }