ARTEMIS-2971: add ANONYMOUS and EXTERNAL SASL mechanism support for outgoing AMQP server connections
This commit is contained in:
parent
34976f090b
commit
5ff075b7ff
|
@ -29,6 +29,8 @@ import java.security.Principal;
|
||||||
|
|
||||||
public class CertificateUtil {
|
public class CertificateUtil {
|
||||||
|
|
||||||
|
private static final String SSL_HANDLER_NAME = "ssl";
|
||||||
|
|
||||||
public static X509Certificate[] getCertsFromConnection(RemotingConnection remotingConnection) {
|
public static X509Certificate[] getCertsFromConnection(RemotingConnection remotingConnection) {
|
||||||
X509Certificate[] certificates = null;
|
X509Certificate[] certificates = null;
|
||||||
if (remotingConnection != null) {
|
if (remotingConnection != null) {
|
||||||
|
@ -46,7 +48,7 @@ public class CertificateUtil {
|
||||||
Connection transportConnection = remotingConnection.getTransportConnection();
|
Connection transportConnection = remotingConnection.getTransportConnection();
|
||||||
if (transportConnection instanceof NettyConnection) {
|
if (transportConnection instanceof NettyConnection) {
|
||||||
NettyConnection nettyConnection = (NettyConnection) transportConnection;
|
NettyConnection nettyConnection = (NettyConnection) transportConnection;
|
||||||
ChannelHandler channelHandler = nettyConnection.getChannel().pipeline().get("ssl");
|
ChannelHandler channelHandler = nettyConnection.getChannel().pipeline().get(SSL_HANDLER_NAME);
|
||||||
if (channelHandler != null && channelHandler instanceof SslHandler) {
|
if (channelHandler != null && channelHandler instanceof SslHandler) {
|
||||||
SslHandler sslHandler = (SslHandler) channelHandler;
|
SslHandler sslHandler = (SslHandler) channelHandler;
|
||||||
try {
|
try {
|
||||||
|
@ -59,4 +61,15 @@ public class CertificateUtil {
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static Principal getLocalPrincipalFromConnection(NettyConnection nettyConnection) {
|
||||||
|
Principal result = null;
|
||||||
|
ChannelHandler handler = nettyConnection.getChannel().pipeline().get(SSL_HANDLER_NAME);
|
||||||
|
if (handler instanceof SslHandler) {
|
||||||
|
SslHandler sslHandler = (SslHandler) handler;
|
||||||
|
result = sslHandler.engine().getSession().getLocalPrincipal();
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.connect;
|
||||||
|
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -39,6 +40,7 @@ import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBroker
|
||||||
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
|
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
|
||||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||||
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
|
||||||
|
import org.apache.activemq.artemis.core.remoting.CertificateUtil;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
|
||||||
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
|
||||||
|
@ -239,21 +241,12 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
||||||
senders.clear();
|
senders.clear();
|
||||||
receivers.clear();
|
receivers.clear();
|
||||||
|
|
||||||
ClientSASLFactory saslFactory = null;
|
ClientSASLFactory saslFactory = new SaslFactory(connection, brokerConnectConfiguration);
|
||||||
|
|
||||||
if (brokerConnectConfiguration.getUser() != null && brokerConnectConfiguration.getPassword() != null) {
|
|
||||||
saslFactory = availableMechanims -> {
|
|
||||||
if (availableMechanims != null && Arrays.asList(availableMechanims).contains("PLAIN")) {
|
|
||||||
return new PlainSASLMechanism(brokerConnectConfiguration.getUser(), brokerConnectConfiguration.getPassword());
|
|
||||||
} else {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
ConnectionEntry entry = protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory);
|
ConnectionEntry entry = protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory);
|
||||||
server.getRemotingService().addConnectionEntry(connection, entry);
|
server.getRemotingService().addConnectionEntry(connection, entry);
|
||||||
protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection;
|
protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection;
|
||||||
|
|
||||||
connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler(), this, server.getExecutorFactory().getExecutor()));
|
connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler(), this, server.getExecutorFactory().getExecutor()));
|
||||||
|
|
||||||
session = protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session();
|
session = protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session();
|
||||||
|
@ -585,6 +578,11 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
||||||
protonRemotingConnection.flush();
|
protonRemotingConnection.flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final String EXTERNAL = "EXTERNAL";
|
||||||
|
private static final String PLAIN = "PLAIN";
|
||||||
|
private static final String ANONYMOUS = "ANONYMOUS";
|
||||||
|
private static final byte[] EMPTY = new byte[0];
|
||||||
|
|
||||||
private static class PlainSASLMechanism implements ClientSASL {
|
private static class PlainSASLMechanism implements ClientSASL {
|
||||||
|
|
||||||
private final byte[] initialResponse;
|
private final byte[] initialResponse;
|
||||||
|
@ -600,7 +598,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getName() {
|
public String getName() {
|
||||||
return "PLAIN";
|
return PLAIN;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -610,7 +608,81 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public byte[] getResponse(byte[] challenge) {
|
public byte[] getResponse(byte[] challenge) {
|
||||||
return new byte[0];
|
return EMPTY;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isApplicable(final String username, final String password) {
|
||||||
|
return username != null && username.length() > 0 && password != null && password.length() > 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class AnonymousSASLMechanism implements ClientSASL {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getName() {
|
||||||
|
return ANONYMOUS;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] getInitialResponse() {
|
||||||
|
return EMPTY;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] getResponse(byte[] challenge) {
|
||||||
|
return EMPTY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class ExternalSASLMechanism implements ClientSASL {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getName() {
|
||||||
|
return EXTERNAL;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] getInitialResponse() {
|
||||||
|
return EMPTY;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public byte[] getResponse(byte[] challenge) {
|
||||||
|
return EMPTY;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static boolean isApplicable(final NettyConnection connection) {
|
||||||
|
return CertificateUtil.getLocalPrincipalFromConnection(connection) != null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class SaslFactory implements ClientSASLFactory {
|
||||||
|
|
||||||
|
private final NettyConnection connection;
|
||||||
|
private final AMQPBrokerConnectConfiguration brokerConnectConfiguration;
|
||||||
|
|
||||||
|
SaslFactory(NettyConnection connection, AMQPBrokerConnectConfiguration brokerConnectConfiguration) {
|
||||||
|
this.connection = connection;
|
||||||
|
this.brokerConnectConfiguration = brokerConnectConfiguration;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ClientSASL chooseMechanism(String[] offeredMechanims) {
|
||||||
|
List<String> availableMechanisms = offeredMechanims == null ? Collections.emptyList() : Arrays.asList(offeredMechanims);
|
||||||
|
|
||||||
|
if (availableMechanisms.contains(EXTERNAL) && ExternalSASLMechanism.isApplicable(connection)) {
|
||||||
|
return new ExternalSASLMechanism();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (availableMechanisms.contains(PLAIN) && PlainSASLMechanism.isApplicable(brokerConnectConfiguration.getUser(), brokerConnectConfiguration.getPassword())) {
|
||||||
|
return new PlainSASLMechanism(brokerConnectConfiguration.getUser(), brokerConnectConfiguration.getPassword());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (availableMechanisms.contains(ANONYMOUS)) {
|
||||||
|
return new AnonymousSASLMechanism();
|
||||||
|
}
|
||||||
|
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
1
pom.xml
1
pom.xml
|
@ -133,6 +133,7 @@
|
||||||
|
|
||||||
<!-- used on tests -->
|
<!-- used on tests -->
|
||||||
<groovy.version>2.5.10</groovy.version>
|
<groovy.version>2.5.10</groovy.version>
|
||||||
|
<vertx.version>3.9.4</vertx.version>
|
||||||
|
|
||||||
<owasp.version>1.4.3</owasp.version>
|
<owasp.version>1.4.3</owasp.version>
|
||||||
<spring.version>5.1.7.RELEASE</spring.version>
|
<spring.version>5.1.7.RELEASE</spring.version>
|
||||||
|
|
|
@ -417,6 +417,13 @@
|
||||||
<artifactId>netty-tcnative-boringssl-static</artifactId>
|
<artifactId>netty-tcnative-boringssl-static</artifactId>
|
||||||
<version>${netty-tcnative-version}</version>
|
<version>${netty-tcnative-version}</version>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>io.vertx</groupId>
|
||||||
|
<artifactId>vertx-proton</artifactId>
|
||||||
|
<version>${vertx.version}</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
@ -541,3 +548,4 @@
|
||||||
</profile>
|
</profile>
|
||||||
</profiles>
|
</profiles>
|
||||||
</project>
|
</project>
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,299 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.amqp.connect;
|
||||||
|
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
|
||||||
|
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||||
|
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
|
||||||
|
import org.apache.qpid.proton.engine.Sasl;
|
||||||
|
import org.apache.qpid.proton.engine.Sasl.SaslOutcome;
|
||||||
|
import org.apache.qpid.proton.engine.Transport;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import io.vertx.core.Handler;
|
||||||
|
import io.vertx.core.Vertx;
|
||||||
|
import io.vertx.core.http.ClientAuth;
|
||||||
|
import io.vertx.core.net.JksOptions;
|
||||||
|
import io.vertx.core.net.NetSocket;
|
||||||
|
import io.vertx.proton.ProtonConnection;
|
||||||
|
import io.vertx.proton.ProtonServerOptions;
|
||||||
|
import io.vertx.proton.sasl.ProtonSaslAuthenticator;
|
||||||
|
|
||||||
|
public class AMQPConnectSaslTest extends AmqpClientTestSupport {
|
||||||
|
|
||||||
|
private static final int BROKER_PORT_NUM = AMQP_PORT + 1;
|
||||||
|
|
||||||
|
private static final String SERVER_KEYSTORE_NAME = "keystore1.jks";
|
||||||
|
private static final String SERVER_KEYSTORE_PASSWORD = "changeit";
|
||||||
|
private static final String CLIENT_KEYSTORE_NAME = "client_not_revoked.jks";
|
||||||
|
private static final String CLIENT_KEYSTORE_PASSWORD = "changeit";
|
||||||
|
private static final String TRUSTSTORE_NAME = "truststore.jks";
|
||||||
|
private static final String TRUSTSTORE_PASSWORD = "changeit";
|
||||||
|
|
||||||
|
private static final String USER = "MY_USER";
|
||||||
|
private static final String PASSWD = "PASSWD_VALUE";
|
||||||
|
|
||||||
|
private static final String PLAIN = "PLAIN";
|
||||||
|
private static final String ANONYMOUS = "ANONYMOUS";
|
||||||
|
private static final String EXTERNAL = "EXTERNAL";
|
||||||
|
|
||||||
|
private Vertx vertx;
|
||||||
|
private MockServer mockServer;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ActiveMQServer createServer() throws Exception {
|
||||||
|
// Creates the broker used to make the outgoing connection. The port passed is for
|
||||||
|
// that brokers acceptor. The test server connected to by the broker binds to a random port.
|
||||||
|
return createServer(BROKER_PORT_NUM, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
@Override
|
||||||
|
public void setUp() throws Exception {
|
||||||
|
super.setUp();
|
||||||
|
vertx = Vertx.vertx();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
@Override
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
try {
|
||||||
|
super.tearDown();
|
||||||
|
} finally {
|
||||||
|
if (mockServer != null) {
|
||||||
|
mockServer.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
CountDownLatch closeLatch = new CountDownLatch(1);
|
||||||
|
vertx.close(x -> closeLatch.countDown());
|
||||||
|
assertTrue("Vert.x instant not closed in alotted time", closeLatch.await(5, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 20000)
|
||||||
|
public void testConnectsWithAnonymous() throws Exception {
|
||||||
|
CountDownLatch serverConnectionOpen = new CountDownLatch(1);
|
||||||
|
TestAuthenticator authenticator = new TestAuthenticator(true, PLAIN, ANONYMOUS);
|
||||||
|
|
||||||
|
mockServer = new MockServer(vertx, () -> authenticator, serverConnection -> {
|
||||||
|
serverConnection.openHandler(serverSender -> {
|
||||||
|
serverConnectionOpen.countDown();
|
||||||
|
serverConnection.closeHandler(x -> serverConnection.close());
|
||||||
|
serverConnection.open();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
// No user or pass given, it will have to select ANONYMOUS even though PLAIN also offered
|
||||||
|
AMQPBrokerConnectConfiguration amqpConnection =
|
||||||
|
new AMQPBrokerConnectConfiguration("testSimpleConnect", "tcp://localhost:" + mockServer.actualPort());
|
||||||
|
amqpConnection.setReconnectAttempts(0);// No reconnects
|
||||||
|
|
||||||
|
server.getConfiguration().addAMQPConnection(amqpConnection);
|
||||||
|
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
boolean awaitConnectionOpen = serverConnectionOpen.await(10, TimeUnit.SECONDS);
|
||||||
|
assertTrue("Broker did not open connection in alotted time", awaitConnectionOpen);
|
||||||
|
|
||||||
|
assertEquals(ANONYMOUS, authenticator.getChosenMech());
|
||||||
|
assertArrayEquals(new byte[0], authenticator.getInitialResponse());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 20000)
|
||||||
|
public void testConnectsWithPlain() throws Exception {
|
||||||
|
CountDownLatch serverConnectionOpen = new CountDownLatch(1);
|
||||||
|
TestAuthenticator authenticator = new TestAuthenticator(true, PLAIN, ANONYMOUS);
|
||||||
|
|
||||||
|
mockServer = new MockServer(vertx, () -> authenticator, serverConnection -> {
|
||||||
|
serverConnection.openHandler(serverSender -> {
|
||||||
|
serverConnectionOpen.countDown();
|
||||||
|
serverConnection.closeHandler(x -> serverConnection.close());
|
||||||
|
serverConnection.open();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
// User and pass are given, it will select PLAIN
|
||||||
|
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("testSimpleConnect", "tcp://localhost:" + mockServer.actualPort());
|
||||||
|
amqpConnection.setReconnectAttempts(0);// No reconnects
|
||||||
|
amqpConnection.setUser(USER);
|
||||||
|
amqpConnection.setPassword(PASSWD);
|
||||||
|
|
||||||
|
server.getConfiguration().addAMQPConnection(amqpConnection);
|
||||||
|
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
boolean awaitConnectionOpen = serverConnectionOpen.await(10, TimeUnit.SECONDS);
|
||||||
|
assertTrue("Broker did not open connection in alotted time", awaitConnectionOpen);
|
||||||
|
|
||||||
|
assertEquals(PLAIN, authenticator.getChosenMech());
|
||||||
|
assertArrayEquals(expectedPlainInitialResponse(USER, PASSWD), authenticator.getInitialResponse());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 20000)
|
||||||
|
public void testConnectsWithExternal() throws Exception {
|
||||||
|
doConnectWithExternalTestImpl(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 20000)
|
||||||
|
public void testExternalIgnoredWhenNoClientCertSupplied() throws Exception {
|
||||||
|
doConnectWithExternalTestImpl(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doConnectWithExternalTestImpl(boolean requireClientCert) throws ExecutionException, InterruptedException, Exception {
|
||||||
|
CountDownLatch serverConnectionOpen = new CountDownLatch(1);
|
||||||
|
// The test server always offers EXTERNAL, i.e sometimes mistakenly, to verify that the broker only selects it when it actually
|
||||||
|
// has a client-cert. Real servers shouldnt actually offer the mechanism to a client that didnt have to provide a cert.
|
||||||
|
TestAuthenticator authenticator = new TestAuthenticator(true, EXTERNAL, PLAIN);
|
||||||
|
|
||||||
|
final String keyStorePath = this.getClass().getClassLoader().getResource(SERVER_KEYSTORE_NAME).getFile();
|
||||||
|
JksOptions jksKeyStoreOptions = new JksOptions().setPath(keyStorePath).setPassword(SERVER_KEYSTORE_PASSWORD);
|
||||||
|
|
||||||
|
ProtonServerOptions serverOptions = new ProtonServerOptions();
|
||||||
|
serverOptions.setSsl(true);
|
||||||
|
serverOptions.setKeyStoreOptions(jksKeyStoreOptions);
|
||||||
|
|
||||||
|
if (requireClientCert) {
|
||||||
|
final String trustStorePath = this.getClass().getClassLoader().getResource(TRUSTSTORE_NAME).getFile();
|
||||||
|
JksOptions jksTrustStoreOptions = new JksOptions().setPath(trustStorePath).setPassword(TRUSTSTORE_PASSWORD);
|
||||||
|
|
||||||
|
serverOptions.setTrustStoreOptions(jksTrustStoreOptions);
|
||||||
|
serverOptions.setClientAuth(ClientAuth.REQUIRED);
|
||||||
|
}
|
||||||
|
|
||||||
|
mockServer = new MockServer(vertx, serverOptions, () -> authenticator, serverConnection -> {
|
||||||
|
serverConnection.openHandler(serverSender -> {
|
||||||
|
serverConnectionOpen.countDown();
|
||||||
|
serverConnection.closeHandler(x -> serverConnection.close());
|
||||||
|
serverConnection.open();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
String amqpServerConnectionURI = "tcp://localhost:" + mockServer.actualPort() +
|
||||||
|
"?sslEnabled=true;trustStorePath=" + TRUSTSTORE_NAME + ";trustStorePassword=" + TRUSTSTORE_PASSWORD;
|
||||||
|
if (requireClientCert) {
|
||||||
|
amqpServerConnectionURI += ";keyStorePath=" + CLIENT_KEYSTORE_NAME + ";keyStorePassword=" + CLIENT_KEYSTORE_PASSWORD;
|
||||||
|
}
|
||||||
|
|
||||||
|
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("testSimpleConnect", amqpServerConnectionURI);
|
||||||
|
amqpConnection.setReconnectAttempts(0);// No reconnects
|
||||||
|
amqpConnection.setUser(USER); // Wont matter if EXTERNAL is offered and a client-certificate is provided, but will otherwise.
|
||||||
|
amqpConnection.setPassword(PASSWD);
|
||||||
|
|
||||||
|
server.getConfiguration().addAMQPConnection(amqpConnection);
|
||||||
|
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
boolean awaitConnectionOpen = serverConnectionOpen.await(10, TimeUnit.SECONDS);
|
||||||
|
assertTrue("Broker did not open connection in alotted time", awaitConnectionOpen);
|
||||||
|
|
||||||
|
if (requireClientCert) {
|
||||||
|
assertEquals(EXTERNAL, authenticator.getChosenMech());
|
||||||
|
assertArrayEquals(new byte[0], authenticator.getInitialResponse());
|
||||||
|
} else {
|
||||||
|
assertEquals(PLAIN, authenticator.getChosenMech());
|
||||||
|
assertArrayEquals(expectedPlainInitialResponse(USER, PASSWD), authenticator.getInitialResponse());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static byte[] expectedPlainInitialResponse(String username, String password) {
|
||||||
|
Objects.requireNonNull(username);
|
||||||
|
Objects.requireNonNull(password);
|
||||||
|
if (username.isEmpty() || password.isEmpty()) {
|
||||||
|
throw new IllegalArgumentException("Must provide at least 1 character in user and pass");
|
||||||
|
}
|
||||||
|
|
||||||
|
byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
|
||||||
|
byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
|
||||||
|
|
||||||
|
byte[] data = new byte[usernameBytes.length + passwordBytes.length + 2];
|
||||||
|
System.arraycopy(usernameBytes, 0, data, 1, usernameBytes.length);
|
||||||
|
System.arraycopy(passwordBytes, 0, data, 2 + usernameBytes.length, passwordBytes.length);
|
||||||
|
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final class TestAuthenticator implements ProtonSaslAuthenticator {
|
||||||
|
private Sasl sasl;
|
||||||
|
private boolean succeed;
|
||||||
|
private String[] offeredMechs;
|
||||||
|
String chosenMech = null;
|
||||||
|
byte[] initialResponse = null;
|
||||||
|
boolean done = false;
|
||||||
|
|
||||||
|
TestAuthenticator(boolean succeed, String... offeredMechs) {
|
||||||
|
if (offeredMechs.length == 0) {
|
||||||
|
throw new IllegalArgumentException("Must provide at least 1 mechanism to offer");
|
||||||
|
}
|
||||||
|
|
||||||
|
this.offeredMechs = offeredMechs;
|
||||||
|
this.succeed = succeed;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void init(NetSocket socket, ProtonConnection protonConnection, Transport transport) {
|
||||||
|
this.sasl = transport.sasl();
|
||||||
|
sasl.server();
|
||||||
|
sasl.allowSkip(false);
|
||||||
|
sasl.setMechanisms(offeredMechs);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void process(Handler<Boolean> processComplete) {
|
||||||
|
if (!done) {
|
||||||
|
String[] remoteMechanisms = sasl.getRemoteMechanisms();
|
||||||
|
if (remoteMechanisms.length > 0) {
|
||||||
|
chosenMech = remoteMechanisms[0];
|
||||||
|
|
||||||
|
initialResponse = new byte[sasl.pending()];
|
||||||
|
sasl.recv(initialResponse, 0, initialResponse.length);
|
||||||
|
|
||||||
|
if (succeed) {
|
||||||
|
sasl.done(SaslOutcome.PN_SASL_OK);
|
||||||
|
} else {
|
||||||
|
sasl.done(SaslOutcome.PN_SASL_AUTH);
|
||||||
|
}
|
||||||
|
|
||||||
|
done = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
processComplete.handle(done);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean succeeded() {
|
||||||
|
return succeed;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getChosenMech() {
|
||||||
|
return chosenMech;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] getInitialResponse() {
|
||||||
|
return initialResponse;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,81 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
* <p>
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
* <p>
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.tests.integration.amqp.connect;
|
||||||
|
|
||||||
|
import io.vertx.core.Handler;
|
||||||
|
import io.vertx.core.Vertx;
|
||||||
|
import io.vertx.proton.ProtonConnection;
|
||||||
|
import io.vertx.proton.ProtonServer;
|
||||||
|
import io.vertx.proton.ProtonServerOptions;
|
||||||
|
import io.vertx.proton.sasl.ProtonSaslAuthenticatorFactory;
|
||||||
|
|
||||||
|
import java.util.Objects;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
public class MockServer {
|
||||||
|
private ProtonServer server;
|
||||||
|
|
||||||
|
public MockServer(Vertx vertx, Handler<ProtonConnection> connectionHandler) throws ExecutionException, InterruptedException {
|
||||||
|
this(vertx, new ProtonServerOptions(), null, connectionHandler);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MockServer(Vertx vertx, ProtonSaslAuthenticatorFactory authFactory, Handler<ProtonConnection> connectionHandler) throws ExecutionException, InterruptedException {
|
||||||
|
this(vertx, new ProtonServerOptions(), authFactory, connectionHandler);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MockServer(Vertx vertx, ProtonServerOptions options, ProtonSaslAuthenticatorFactory authFactory, Handler<ProtonConnection> connectionHandler) throws ExecutionException, InterruptedException {
|
||||||
|
Objects.requireNonNull(options, "options must not be null");
|
||||||
|
|
||||||
|
server = ProtonServer.create(vertx, options);
|
||||||
|
server.connectHandler(connectionHandler);
|
||||||
|
if (authFactory != null) {
|
||||||
|
server.saslAuthenticatorFactory(authFactory);
|
||||||
|
}
|
||||||
|
|
||||||
|
AtomicReference<Throwable> failure = new AtomicReference<>();
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
// Passing port 0 to have the server choose port at bind.
|
||||||
|
// Use actualPort() to discover port used.
|
||||||
|
server.listen(0, res -> {
|
||||||
|
if (!res.succeeded()) {
|
||||||
|
failure.set(res.cause());
|
||||||
|
}
|
||||||
|
latch.countDown();
|
||||||
|
});
|
||||||
|
|
||||||
|
latch.await();
|
||||||
|
|
||||||
|
if (failure.get() != null) {
|
||||||
|
throw new ExecutionException(failure.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public int actualPort() {
|
||||||
|
return server.actualPort();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void close() {
|
||||||
|
server.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
ProtonServer getProtonServer() {
|
||||||
|
return server;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue