This commit is contained in:
Martyn Taylor 2017-10-09 13:29:49 +01:00
commit 88e1fdc789
10 changed files with 372 additions and 138 deletions

View File

@ -175,4 +175,8 @@ public class ActiveMQProtonRemotingConnection extends AbstractRemotingConnection
public String getClientID() {
return amqpConnection.getContainer();
}
public void open() {
amqpConnection.open();
}
}

View File

@ -124,7 +124,7 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
String id = server.getConfiguration().getName();
boolean useCoreSubscriptionNaming = server.getConfiguration().isAmqpUseCoreSubscriptionNaming();
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool());
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(this, connectionCallback, id, (int) ttl, getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool(), true, null, null);
Executor executor = server.getExecutorFactory().getExecutor();

View File

@ -23,6 +23,7 @@ import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.qpid.proton.amqp.Symbol;
@ -49,19 +50,18 @@ public class AMQPClientConnectionFactory {
this.useCoreSubscriptionNaming = false;
}
public ActiveMQProtonRemotingConnection createConnection(ProtonProtocolManager protocolManager, Connection connection, Optional<EventHandler> eventHandler) {
public ActiveMQProtonRemotingConnection createConnection(ProtonProtocolManager protocolManager, Connection connection, Optional<EventHandler> eventHandler, ClientSASLFactory clientSASLFactory) {
AMQPConnectionCallback connectionCallback = new AMQPConnectionCallback(protocolManager, connection, server.getExecutorFactory().getExecutor(), server);
Executor executor = server.getExecutorFactory().getExecutor();
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool());
AMQPConnectionContext amqpConnection = new AMQPConnectionContext(protocolManager, connectionCallback, containerId, ttl, protocolManager.getMaxFrameSize(), AMQPConstants.Connection.DEFAULT_CHANNEL_MAX, useCoreSubscriptionNaming, server.getScheduledPool(), false, clientSASLFactory, connectionProperties);
eventHandler.ifPresent(amqpConnection::addEventHandler);
ActiveMQProtonRemotingConnection delegate = new ActiveMQProtonRemotingConnection(protocolManager, amqpConnection, connection, executor);
connectionCallback.setProtonConnectionDelegate(delegate);
amqpConnection.open(connectionProperties);
return delegate;
}
}

View File

@ -22,6 +22,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.BaseConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
@ -40,16 +41,19 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
private static final Logger log = Logger.getLogger(ProtonClientConnectionManager.class);
private final AMQPClientConnectionFactory connectionFactory;
private final Optional<EventHandler> eventHandler;
private final ClientSASLFactory clientSASLFactory;
public ProtonClientConnectionManager(AMQPClientConnectionFactory connectionFactory, Optional<EventHandler> eventHandler) {
public ProtonClientConnectionManager(AMQPClientConnectionFactory connectionFactory, Optional<EventHandler> eventHandler, ClientSASLFactory clientSASLFactory) {
this.connectionFactory = connectionFactory;
this.eventHandler = eventHandler;
this.clientSASLFactory = clientSASLFactory;
}
@Override
public void connectionCreated(ActiveMQComponent component, Connection connection, ProtonProtocolManager protocolManager) {
ActiveMQProtonRemotingConnection amqpConnection = connectionFactory.createConnection(protocolManager, connection, eventHandler);
ActiveMQProtonRemotingConnection amqpConnection = connectionFactory.createConnection(protocolManager, connection, eventHandler, clientSASLFactory);
connectionMap.put(connection.getID(), amqpConnection);
amqpConnection.open();
log.info("Connection " + amqpConnection.getRemoteAddress() + " created");
}
@ -60,6 +64,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
if (connection != null) {
log.info("Connection " + connection.getRemoteAddress() + " destroyed");
connection.disconnect(false);
} else {
log.error("Connection with id " + connectionID + " not found in connectionDestroyed");
}
}
@ -69,6 +75,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
if (connection != null) {
log.info("Connection " + connection.getRemoteAddress() + " exception: " + me.getMessage());
connection.fail(me);
} else {
log.error("Connection with id " + connectionID + " not found in connectionException");
}
}
@ -78,6 +86,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
if (connection != null) {
log.info("Connection " + connection.getRemoteAddress() + " ready");
connection.getTransportConnection().fireReady(true);
} else {
log.error("Connection with id " + connectionID + " not found in connectionReadyForWrites()!");
}
}
@ -92,6 +102,8 @@ public class ProtonClientConnectionManager implements BaseConnectionLifeCycleLis
RemotingConnection connection = connectionMap.get(connectionID);
if (connection != null) {
connection.bufferReceived(connectionID, buffer);
} else {
log.error("Connection with id " + connectionID + " not found in bufferReceived()!");
}
}
}

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler;
import org.apache.activemq.artemis.protocol.amqp.sasl.AnonymousServerSASL;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ByteUtil;
@ -68,6 +69,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
protected AMQPConnectionCallback connectionCallback;
private final String containerId;
private final boolean isIncomingConnection;
private final ClientSASLFactory saslClientFactory;
private final Map<Symbol, Object> connectionProperties = new HashMap<>();
private final ScheduledExecutorService scheduledPool;
@ -84,19 +87,28 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
int maxFrameSize,
int channelMax,
boolean useCoreSubscriptionNaming,
ScheduledExecutorService scheduledPool) {
ScheduledExecutorService scheduledPool,
boolean isIncomingConnection,
ClientSASLFactory saslClientFactory,
Map<Symbol, Object> connectionProperties) {
this.protocolManager = protocolManager;
this.connectionCallback = connectionSP;
this.useCoreSubscriptionNaming = useCoreSubscriptionNaming;
this.containerId = (containerId != null) ? containerId : UUID.randomUUID().toString();
this.isIncomingConnection = isIncomingConnection;
this.saslClientFactory = saslClientFactory;
connectionProperties.put(AmqpSupport.PRODUCT, "apache-activemq-artemis");
connectionProperties.put(AmqpSupport.VERSION, VersionLoader.getVersion().getFullVersion());
this.connectionProperties.put(AmqpSupport.PRODUCT, "apache-activemq-artemis");
this.connectionProperties.put(AmqpSupport.VERSION, VersionLoader.getVersion().getFullVersion());
if (connectionProperties != null) {
this.connectionProperties.putAll(connectionProperties);
}
this.scheduledPool = scheduledPool;
connectionCallback.setConnection(this);
this.handler = new ProtonHandler(protocolManager.getServer().getExecutorFactory().getExecutor());
this.handler = new ProtonHandler(protocolManager.getServer().getExecutorFactory().getExecutor(), isIncomingConnection);
handler.addEventHandler(this);
Transport transport = handler.getTransport();
transport.setEmitFlowEventOnSend(false);
@ -106,6 +118,17 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
transport.setChannelMax(channelMax);
transport.setInitialRemoteMaxFrameSize(protocolManager.getInitialRemoteMaxFrameSize());
transport.setMaxFrameSize(maxFrameSize);
if (!isIncomingConnection && saslClientFactory != null) {
handler.createClientSASL();
}
}
public boolean isIncomingConnection() {
return isIncomingConnection;
}
public ClientSASLFactory getSaslClientFactory() {
return saslClientFactory;
}
protected AMQPSessionContext newSessionExtension(Session realSession) throws ActiveMQAMQPException {
@ -232,7 +255,7 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
return ExtCapability.getCapabilities();
}
public void open(Map<Symbol, Object> connectionProperties) {
public void open() {
handler.open(containerId, connectionProperties);
}
@ -270,56 +293,6 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
return useCoreSubscriptionNaming;
}
@Override
public void onInit(Connection connection) throws Exception {
}
@Override
public void onLocalOpen(Connection connection) throws Exception {
}
@Override
public void onLocalClose(Connection connection) throws Exception {
}
@Override
public void onFinal(Connection connection) throws Exception {
}
@Override
public void onInit(Session session) throws Exception {
}
@Override
public void onFinal(Session session) throws Exception {
}
@Override
public void onInit(Link link) throws Exception {
}
@Override
public void onLocalOpen(Link link) throws Exception {
}
@Override
public void onLocalClose(Link link) throws Exception {
}
@Override
public void onFinal(Link link) throws Exception {
}
@Override
public void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl) {
if (sasl) {
@ -343,6 +316,25 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
handler.setChosenMechanism(connectionCallback.getServerSASL(mech));
}
@Override
public void onSaslMechanismsOffered(final ProtonHandler handler, final String[] mechanisms) {
if (saslClientFactory != null) {
handler.setClientMechanism(saslClientFactory.chooseMechanism(mechanisms));
}
}
@Override
public void onAuthFailed(final ProtonHandler protonHandler, final Connection connection) {
connectionCallback.close();
handler.close(null);
}
@Override
public void onAuthSuccess(final ProtonHandler protonHandler, final Connection connection) {
connection.open();
flush();
}
@Override
public void onTransport(Transport transport) {
handler.flushBytes();
@ -437,10 +429,6 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
}
}
@Override
public void onLocalClose(Session session) throws Exception {
}
@Override
public void onRemoteClose(Session session) throws Exception {
lock();

View File

@ -29,58 +29,66 @@ import org.apache.qpid.proton.engine.Transport;
*/
public interface EventHandler {
void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl);
default void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl) { }
void onSaslRemoteMechanismChosen(ProtonHandler handler, String mech);
default void onSaslRemoteMechanismChosen(ProtonHandler handler, String mech) { }
void onInit(Connection connection) throws Exception;
default void onAuthFailed(ProtonHandler protonHandler, Connection connection) { }
void onLocalOpen(Connection connection) throws Exception;
default void onAuthSuccess(ProtonHandler protonHandler, Connection connection) { }
void onRemoteOpen(Connection connection) throws Exception;
default void onSaslMechanismsOffered(ProtonHandler handler, String[] mechanisms) { }
void onLocalClose(Connection connection) throws Exception;
default void onInit(Connection connection) throws Exception { }
void onRemoteClose(Connection connection) throws Exception;
default void onLocalOpen(Connection connection) throws Exception { }
void onFinal(Connection connection) throws Exception;
default void onRemoteOpen(Connection connection) throws Exception { }
void onInit(Session session) throws Exception;
default void onLocalClose(Connection connection) throws Exception { }
void onLocalOpen(Session session) throws Exception;
default void onRemoteClose(Connection connection) throws Exception { }
void onRemoteOpen(Session session) throws Exception;
default void onFinal(Connection connection) throws Exception { }
void onLocalClose(Session session) throws Exception;
default void onInit(Session session) throws Exception { }
void onRemoteClose(Session session) throws Exception;
default void onLocalOpen(Session session) throws Exception { }
void onFinal(Session session) throws Exception;
default void onRemoteOpen(Session session) throws Exception { }
void onInit(Link link) throws Exception;
default void onLocalClose(Session session) throws Exception { }
void onLocalOpen(Link link) throws Exception;
default void onRemoteClose(Session session) throws Exception { }
void onRemoteOpen(Link link) throws Exception;
default void onFinal(Session session) throws Exception { }
void onLocalClose(Link link) throws Exception;
default void onInit(Link link) throws Exception { }
void onRemoteClose(Link link) throws Exception;
default void onLocalOpen(Link link) throws Exception { }
void onFlow(Link link) throws Exception;
default void onRemoteOpen(Link link) throws Exception { }
void onFinal(Link link) throws Exception;
default void onLocalClose(Link link) throws Exception { }
void onRemoteDetach(Link link) throws Exception;
default void onRemoteClose(Link link) throws Exception { }
void onLocalDetach(Link link) throws Exception;
default void onFlow(Link link) throws Exception { }
void onDelivery(Delivery delivery) throws Exception;
default void onFinal(Link link) throws Exception { }
void onTransport(Transport transport) throws Exception;
default void onRemoteDetach(Link link) throws Exception { }
void pushBytes(ByteBuf bytes);
default void onLocalDetach(Link link) throws Exception { }
boolean flowControl(ReadyListener readyListener);
default void onDelivery(Delivery delivery) throws Exception { }
default void onTransport(Transport transport) throws Exception { }
default void pushBytes(ByteBuf bytes) { }
default boolean flowControl(ReadyListener readyListener) {
return true;
}
}

View File

@ -16,8 +16,10 @@
*/
package org.apache.activemq.artemis.protocol.amqp.proton.handler;
import javax.security.auth.Subject;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@ -25,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonInitializable;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult;
import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@ -60,14 +63,17 @@ public class ProtonHandler extends ProtonInitializable {
private List<EventHandler> handlers = new ArrayList<>();
private Sasl serverSasl;
private Sasl sasl;
private ServerSASL chosenMechanism;
private ClientSASL clientSASLMechanism;
private final ReentrantLock lock = new ReentrantLock();
private final long creationTime;
private final boolean isServer;
private SASLResult saslResult;
protected volatile boolean dataReceived;
@ -80,12 +86,13 @@ public class ProtonHandler extends ProtonInitializable {
boolean inDispatch = false;
public ProtonHandler(Executor flushExecutor) {
public ProtonHandler(Executor flushExecutor, boolean isServer) {
this.flushExecutor = flushExecutor;
this.readyListener = () -> flushExecutor.execute(() -> {
flush();
});
this.creationTime = System.currentTimeMillis();
this.isServer = isServer;
transport.bind(connection);
connection.collect(collector);
}
@ -157,9 +164,9 @@ public class ProtonHandler extends ProtonInitializable {
}
public void createServerSASL(String[] mechanisms) {
this.serverSasl = transport.sasl();
this.serverSasl.server();
serverSasl.setMechanisms(mechanisms);
this.sasl = transport.sasl();
this.sasl.server();
sasl.setMechanisms(mechanisms);
}
public void flushBytes() {
@ -210,7 +217,11 @@ public class ProtonHandler extends ProtonInitializable {
try {
byte auth = buffer.getByte(4);
if (auth == SASL || auth == BARE) {
dispatchAuth(auth == SASL);
if (isServer) {
dispatchAuth(auth == SASL);
} else if (auth == BARE && clientSASLMechanism == null) {
dispatchAuthSuccess();
}
/*
* there is a chance that if SASL Handshake has been carried out that the capacity may change.
* */
@ -260,7 +271,7 @@ public class ProtonHandler extends ProtonInitializable {
lock.lock();
try {
transport.process();
checkServerSASL();
checkSASL();
} finally {
lock.unlock();
}
@ -282,52 +293,131 @@ public class ProtonHandler extends ProtonInitializable {
flush();
}
protected void checkServerSASL() {
if (serverSasl != null && serverSasl.getRemoteMechanisms().length > 0) {
protected void checkSASL() {
if (isServer) {
if (sasl != null && sasl.getRemoteMechanisms().length > 0) {
if (chosenMechanism == null) {
if (log.isTraceEnabled()) {
log.trace("SASL chosenMechanism: " + serverSasl.getRemoteMechanisms()[0]);
}
dispatchRemoteMechanismChosen(serverSasl.getRemoteMechanisms()[0]);
}
if (chosenMechanism != null) {
byte[] dataSASL = new byte[serverSasl.pending()];
serverSasl.recv(dataSASL, 0, dataSASL.length);
if (log.isTraceEnabled()) {
log.trace("Working on sasl::" + (dataSASL != null && dataSASL.length > 0 ? ByteUtil.bytesToHex(dataSASL, 2) : "Anonymous"));
}
byte[] response = chosenMechanism.processSASL(dataSASL);
if (response != null) {
serverSasl.send(response, 0, response.length);
}
saslResult = chosenMechanism.result();
if (saslResult != null) {
if (saslResult.isSuccess()) {
saslComplete(Sasl.SaslOutcome.PN_SASL_OK);
} else {
saslComplete(Sasl.SaslOutcome.PN_SASL_AUTH);
if (chosenMechanism == null) {
if (log.isTraceEnabled()) {
log.trace("SASL chosenMechanism: " + sasl.getRemoteMechanisms()[0]);
}
dispatchRemoteMechanismChosen(sasl.getRemoteMechanisms()[0]);
}
if (chosenMechanism != null) {
byte[] dataSASL = new byte[sasl.pending()];
sasl.recv(dataSASL, 0, dataSASL.length);
if (log.isTraceEnabled()) {
log.trace("Working on sasl::" + (dataSASL != null && dataSASL.length > 0 ? ByteUtil.bytesToHex(dataSASL, 2) : "Anonymous"));
}
byte[] response = chosenMechanism.processSASL(dataSASL);
if (response != null) {
sasl.send(response, 0, response.length);
}
saslResult = chosenMechanism.result();
if (saslResult != null) {
if (saslResult.isSuccess()) {
saslComplete(Sasl.SaslOutcome.PN_SASL_OK);
} else {
saslComplete(Sasl.SaslOutcome.PN_SASL_AUTH);
}
}
} else {
// no auth available, system error
saslComplete(Sasl.SaslOutcome.PN_SASL_SYS);
}
}
} else {
if (sasl != null) {
switch (sasl.getState()) {
case PN_SASL_IDLE:
if (sasl.getRemoteMechanisms().length != 0) {
dispatchMechanismsOffered(sasl.getRemoteMechanisms());
if (clientSASLMechanism == null) {
log.infof("Outbound connection failed - unknown mechanism, offered mechanisms: %s",
Arrays.asList(sasl.getRemoteMechanisms()));
sasl = null;
dispatchAuthFailed();
} else {
sasl.setMechanisms(clientSASLMechanism.getName());
byte[] initialResponse = clientSASLMechanism.getInitialResponse();
if (initialResponse != null) {
sasl.send(initialResponse, 0, initialResponse.length);
}
}
}
break;
case PN_SASL_STEP:
int challengeSize = sasl.pending();
byte[] challenge = new byte[challengeSize];
sasl.recv(challenge, 0, challengeSize);
byte[] response = clientSASLMechanism.getResponse(challenge);
sasl.send(response, 0, response.length);
break;
case PN_SASL_FAIL:
log.info("Outbound connection failed, authentication failure");
sasl = null;
dispatchAuthFailed();
break;
case PN_SASL_PASS:
log.debug("Outbound connection succeeded");
saslResult = new SASLResult() {
@Override
public String getUser() {
return null;
}
@Override
public Subject getSubject() {
return null;
}
@Override
public boolean isSuccess() {
return true;
}
};
sasl = null;
dispatchAuthSuccess();
break;
case PN_SASL_CONF:
// do nothing
break;
}
} else {
// no auth available, system error
saslComplete(Sasl.SaslOutcome.PN_SASL_SYS);
}
}
}
private void saslComplete(Sasl.SaslOutcome saslOutcome) {
serverSasl.done(saslOutcome);
serverSasl = null;
sasl.done(saslOutcome);
sasl = null;
if (chosenMechanism != null) {
chosenMechanism.done();
}
}
private void dispatchAuthFailed() {
for (EventHandler h : handlers) {
h.onAuthFailed(this, getConnection());
}
}
private void dispatchAuthSuccess() {
for (EventHandler h : handlers) {
h.onAuthSuccess(this, getConnection());
}
}
private void dispatchMechanismsOffered(final String[] mechs) {
for (EventHandler h : handlers) {
h.onSaslMechanismsOffered(this, mechs);
}
}
private void dispatchAuth(boolean sasl) {
for (EventHandler h : handlers) {
h.onAuthInit(this, getConnection(), sasl);
@ -393,4 +483,13 @@ public class ProtonHandler extends ProtonInitializable {
public void setChosenMechanism(ServerSASL chosenMechanism) {
this.chosenMechanism = chosenMechanism;
}
public void setClientMechanism(final ClientSASL saslClientMech) {
this.clientSASLMechanism = saslClientMech;
}
public void createClientSASL() {
this.sasl = transport.sasl();
this.sasl.client();
}
}

View File

@ -0,0 +1,23 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.sasl;
public interface ClientSASL {
String getName();
byte[] getInitialResponse();
byte[] getResponse(byte[] challenge);
}

View File

@ -0,0 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.sasl;
public interface ClientSASLFactory {
ClientSASL chooseMechanism(String[] availableMechanims);
}

View File

@ -16,10 +16,13 @@
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
@ -28,16 +31,37 @@ import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFac
import org.apache.activemq.artemis.protocol.amqp.client.AMQPClientConnectionFactory;
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientConnectionManager;
import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASL;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.engine.Connection;
import org.junit.Test;
public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
private boolean securityEnabled;
@Test(timeout = 60000)
public void testOutboundConnection() throws Throwable {
final ActiveMQServer remote = createServer(AMQP_PORT + 1);
remote.start();
runOutboundConnectionTest(false);
}
@Test(timeout = 60000)
public void testOutboundConnectionWithSecurity() throws Throwable {
runOutboundConnectionTest(true);
}
private void runOutboundConnectionTest(boolean withSecurity) throws Exception {
final ActiveMQServer remote;
try {
securityEnabled = withSecurity;
remote = createServer(AMQP_PORT + 1);
} finally {
securityEnabled = false;
}
try {
Wait.waitFor(remote::isActive);
} catch (Exception e) {
@ -45,10 +69,30 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
throw e;
}
final Map<String, Object> config = new LinkedHashMap<>();
config.put(TransportConstants.HOST_PROP_NAME, "localhost");
final Map<String, Object> config = new LinkedHashMap<>(); config.put(TransportConstants.HOST_PROP_NAME, "localhost");
config.put(TransportConstants.PORT_PROP_NAME, String.valueOf(AMQP_PORT + 1));
ProtonClientConnectionManager lifeCycleListener = new ProtonClientConnectionManager(new AMQPClientConnectionFactory(server, "myid", Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000), Optional.empty());
final ClientSASLFactory clientSASLFactory;
if (withSecurity) {
clientSASLFactory = availableMechanims -> {
if (availableMechanims != null && Arrays.asList(availableMechanims).contains("PLAIN")) {
return new PlainSASLMechanism(fullUser, fullPass);
} else {
return null;
}
};
} else {
clientSASLFactory = null;
}
final AtomicBoolean connectionOpened = new AtomicBoolean();
EventHandler eventHandler = new EventHandler() {
@Override
public void onRemoteOpen(Connection connection) throws Exception {
connectionOpened.set(true);
}
};
ProtonClientConnectionManager lifeCycleListener = new ProtonClientConnectionManager(new AMQPClientConnectionFactory(server, "myid", Collections.singletonMap(Symbol.getSymbol("myprop"), "propvalue"), 5000), Optional.of(eventHandler), clientSASLFactory);
ProtonClientProtocolManager protocolManager = new ProtonClientProtocolManager(new ProtonProtocolManagerFactory(), server);
NettyConnector connector = new NettyConnector(config, lifeCycleListener, lifeCycleListener, server.getExecutorFactory().getExecutor(), server.getExecutorFactory().getExecutor(), server.getScheduledPool(), protocolManager);
connector.start();
@ -57,7 +101,8 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
try {
Wait.waitFor(() -> remote.getConnectionCount() > 0);
assertEquals(1, remote.getConnectionCount());
Wait.waitFor(connectionOpened::get);
assertTrue("Remote connection was not opened - authentication error?", connectionOpened.get());
lifeCycleListener.stop();
Wait.waitFor(() -> remote.getConnectionCount() == 0);
@ -67,4 +112,38 @@ public class AmqpOutboundConnectionTest extends AmqpClientTestSupport {
remote.stop();
}
}
@Override
protected boolean isSecurityEnabled() {
return securityEnabled;
}
private static class PlainSASLMechanism implements ClientSASL {
private final byte[] initialResponse;
PlainSASLMechanism(String username, String password) {
byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
byte[] encoded = new byte[usernameBytes.length + passwordBytes.length + 2];
System.arraycopy(usernameBytes, 0, encoded, 1, usernameBytes.length);
System.arraycopy(passwordBytes, 0, encoded, usernameBytes.length + 2, passwordBytes.length);
initialResponse = encoded;
}
@Override
public String getName() {
return "PLAIN";
}
@Override
public byte[] getInitialResponse() {
return initialResponse;
}
@Override
public byte[] getResponse(byte[] challenge) {
return new byte[0];
}
}
}