This commit is contained in:
Clebert Suconic 2020-11-05 11:32:35 -05:00
commit c377801150
6 changed files with 488 additions and 14 deletions

View File

@ -29,6 +29,8 @@ import java.security.Principal;
public class CertificateUtil {
private static final String SSL_HANDLER_NAME = "ssl";
public static X509Certificate[] getCertsFromConnection(RemotingConnection remotingConnection) {
X509Certificate[] certificates = null;
if (remotingConnection != null) {
@ -46,7 +48,7 @@ public class CertificateUtil {
Connection transportConnection = remotingConnection.getTransportConnection();
if (transportConnection instanceof NettyConnection) {
NettyConnection nettyConnection = (NettyConnection) transportConnection;
ChannelHandler channelHandler = nettyConnection.getChannel().pipeline().get("ssl");
ChannelHandler channelHandler = nettyConnection.getChannel().pipeline().get(SSL_HANDLER_NAME);
if (channelHandler != null && channelHandler instanceof SslHandler) {
SslHandler sslHandler = (SslHandler) channelHandler;
try {
@ -59,4 +61,15 @@ public class CertificateUtil {
return result;
}
public static Principal getLocalPrincipalFromConnection(NettyConnection nettyConnection) {
Principal result = null;
ChannelHandler handler = nettyConnection.getChannel().pipeline().get(SSL_HANDLER_NAME);
if (handler instanceof SslHandler) {
SslHandler sslHandler = (SslHandler) handler;
result = sslHandler.engine().getSession().getLocalPrincipal();
}
return result;
}
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.connect;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@ -39,6 +40,7 @@ import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBroker
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.remoting.CertificateUtil;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
@ -239,21 +241,12 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
senders.clear();
receivers.clear();
ClientSASLFactory saslFactory = null;
if (brokerConnectConfiguration.getUser() != null && brokerConnectConfiguration.getPassword() != null) {
saslFactory = availableMechanims -> {
if (availableMechanims != null && Arrays.asList(availableMechanims).contains("PLAIN")) {
return new PlainSASLMechanism(brokerConnectConfiguration.getUser(), brokerConnectConfiguration.getPassword());
} else {
return null;
}
};
}
ClientSASLFactory saslFactory = new SaslFactory(connection, brokerConnectConfiguration);
ConnectionEntry entry = protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory);
server.getRemotingService().addConnectionEntry(connection, entry);
protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection;
connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler(), this, server.getExecutorFactory().getExecutor()));
session = protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session();
@ -585,6 +578,11 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
protonRemotingConnection.flush();
}
private static final String EXTERNAL = "EXTERNAL";
private static final String PLAIN = "PLAIN";
private static final String ANONYMOUS = "ANONYMOUS";
private static final byte[] EMPTY = new byte[0];
private static class PlainSASLMechanism implements ClientSASL {
private final byte[] initialResponse;
@ -600,7 +598,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
@Override
public String getName() {
return "PLAIN";
return PLAIN;
}
@Override
@ -610,7 +608,81 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
@Override
public byte[] getResponse(byte[] challenge) {
return new byte[0];
return EMPTY;
}
public static boolean isApplicable(final String username, final String password) {
return username != null && username.length() > 0 && password != null && password.length() > 0;
}
}
private static class AnonymousSASLMechanism implements ClientSASL {
@Override
public String getName() {
return ANONYMOUS;
}
@Override
public byte[] getInitialResponse() {
return EMPTY;
}
@Override
public byte[] getResponse(byte[] challenge) {
return EMPTY;
}
}
private static class ExternalSASLMechanism implements ClientSASL {
@Override
public String getName() {
return EXTERNAL;
}
@Override
public byte[] getInitialResponse() {
return EMPTY;
}
@Override
public byte[] getResponse(byte[] challenge) {
return EMPTY;
}
public static boolean isApplicable(final NettyConnection connection) {
return CertificateUtil.getLocalPrincipalFromConnection(connection) != null;
}
}
private static final class SaslFactory implements ClientSASLFactory {
private final NettyConnection connection;
private final AMQPBrokerConnectConfiguration brokerConnectConfiguration;
SaslFactory(NettyConnection connection, AMQPBrokerConnectConfiguration brokerConnectConfiguration) {
this.connection = connection;
this.brokerConnectConfiguration = brokerConnectConfiguration;
}
@Override
public ClientSASL chooseMechanism(String[] offeredMechanims) {
List<String> availableMechanisms = offeredMechanims == null ? Collections.emptyList() : Arrays.asList(offeredMechanims);
if (availableMechanisms.contains(EXTERNAL) && ExternalSASLMechanism.isApplicable(connection)) {
return new ExternalSASLMechanism();
}
if (availableMechanisms.contains(PLAIN) && PlainSASLMechanism.isApplicable(brokerConnectConfiguration.getUser(), brokerConnectConfiguration.getPassword())) {
return new PlainSASLMechanism(brokerConnectConfiguration.getUser(), brokerConnectConfiguration.getPassword());
}
if (availableMechanisms.contains(ANONYMOUS)) {
return new AnonymousSASLMechanism();
}
return null;
}
}

View File

@ -133,6 +133,7 @@
<!-- used on tests -->
<groovy.version>2.5.10</groovy.version>
<vertx.version>3.9.4</vertx.version>
<owasp.version>1.4.3</owasp.version>
<spring.version>5.1.7.RELEASE</spring.version>

View File

@ -417,6 +417,13 @@
<artifactId>netty-tcnative-boringssl-static</artifactId>
<version>${netty-tcnative-version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-proton</artifactId>
<version>${vertx.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
@ -541,3 +548,4 @@
</profile>
</profiles>
</project>

View File

@ -0,0 +1,299 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.connect;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.qpid.proton.engine.Sasl;
import org.apache.qpid.proton.engine.Sasl.SaslOutcome;
import org.apache.qpid.proton.engine.Transport;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.ClientAuth;
import io.vertx.core.net.JksOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonServerOptions;
import io.vertx.proton.sasl.ProtonSaslAuthenticator;
public class AMQPConnectSaslTest extends AmqpClientTestSupport {
private static final int BROKER_PORT_NUM = AMQP_PORT + 1;
private static final String SERVER_KEYSTORE_NAME = "keystore1.jks";
private static final String SERVER_KEYSTORE_PASSWORD = "changeit";
private static final String CLIENT_KEYSTORE_NAME = "client_not_revoked.jks";
private static final String CLIENT_KEYSTORE_PASSWORD = "changeit";
private static final String TRUSTSTORE_NAME = "truststore.jks";
private static final String TRUSTSTORE_PASSWORD = "changeit";
private static final String USER = "MY_USER";
private static final String PASSWD = "PASSWD_VALUE";
private static final String PLAIN = "PLAIN";
private static final String ANONYMOUS = "ANONYMOUS";
private static final String EXTERNAL = "EXTERNAL";
private Vertx vertx;
private MockServer mockServer;
@Override
protected ActiveMQServer createServer() throws Exception {
// Creates the broker used to make the outgoing connection. The port passed is for
// that brokers acceptor. The test server connected to by the broker binds to a random port.
return createServer(BROKER_PORT_NUM, false);
}
@Before
@Override
public void setUp() throws Exception {
super.setUp();
vertx = Vertx.vertx();
}
@After
@Override
public void tearDown() throws Exception {
try {
super.tearDown();
} finally {
if (mockServer != null) {
mockServer.close();
}
CountDownLatch closeLatch = new CountDownLatch(1);
vertx.close(x -> closeLatch.countDown());
assertTrue("Vert.x instant not closed in alotted time", closeLatch.await(5, TimeUnit.SECONDS));
}
}
@Test(timeout = 20000)
public void testConnectsWithAnonymous() throws Exception {
CountDownLatch serverConnectionOpen = new CountDownLatch(1);
TestAuthenticator authenticator = new TestAuthenticator(true, PLAIN, ANONYMOUS);
mockServer = new MockServer(vertx, () -> authenticator, serverConnection -> {
serverConnection.openHandler(serverSender -> {
serverConnectionOpen.countDown();
serverConnection.closeHandler(x -> serverConnection.close());
serverConnection.open();
});
});
// No user or pass given, it will have to select ANONYMOUS even though PLAIN also offered
AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration("testSimpleConnect", "tcp://localhost:" + mockServer.actualPort());
amqpConnection.setReconnectAttempts(0);// No reconnects
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
boolean awaitConnectionOpen = serverConnectionOpen.await(10, TimeUnit.SECONDS);
assertTrue("Broker did not open connection in alotted time", awaitConnectionOpen);
assertEquals(ANONYMOUS, authenticator.getChosenMech());
assertArrayEquals(new byte[0], authenticator.getInitialResponse());
}
@Test(timeout = 20000)
public void testConnectsWithPlain() throws Exception {
CountDownLatch serverConnectionOpen = new CountDownLatch(1);
TestAuthenticator authenticator = new TestAuthenticator(true, PLAIN, ANONYMOUS);
mockServer = new MockServer(vertx, () -> authenticator, serverConnection -> {
serverConnection.openHandler(serverSender -> {
serverConnectionOpen.countDown();
serverConnection.closeHandler(x -> serverConnection.close());
serverConnection.open();
});
});
// User and pass are given, it will select PLAIN
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("testSimpleConnect", "tcp://localhost:" + mockServer.actualPort());
amqpConnection.setReconnectAttempts(0);// No reconnects
amqpConnection.setUser(USER);
amqpConnection.setPassword(PASSWD);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
boolean awaitConnectionOpen = serverConnectionOpen.await(10, TimeUnit.SECONDS);
assertTrue("Broker did not open connection in alotted time", awaitConnectionOpen);
assertEquals(PLAIN, authenticator.getChosenMech());
assertArrayEquals(expectedPlainInitialResponse(USER, PASSWD), authenticator.getInitialResponse());
}
@Test(timeout = 20000)
public void testConnectsWithExternal() throws Exception {
doConnectWithExternalTestImpl(true);
}
@Test(timeout = 20000)
public void testExternalIgnoredWhenNoClientCertSupplied() throws Exception {
doConnectWithExternalTestImpl(false);
}
private void doConnectWithExternalTestImpl(boolean requireClientCert) throws ExecutionException, InterruptedException, Exception {
CountDownLatch serverConnectionOpen = new CountDownLatch(1);
// The test server always offers EXTERNAL, i.e sometimes mistakenly, to verify that the broker only selects it when it actually
// has a client-cert. Real servers shouldnt actually offer the mechanism to a client that didnt have to provide a cert.
TestAuthenticator authenticator = new TestAuthenticator(true, EXTERNAL, PLAIN);
final String keyStorePath = this.getClass().getClassLoader().getResource(SERVER_KEYSTORE_NAME).getFile();
JksOptions jksKeyStoreOptions = new JksOptions().setPath(keyStorePath).setPassword(SERVER_KEYSTORE_PASSWORD);
ProtonServerOptions serverOptions = new ProtonServerOptions();
serverOptions.setSsl(true);
serverOptions.setKeyStoreOptions(jksKeyStoreOptions);
if (requireClientCert) {
final String trustStorePath = this.getClass().getClassLoader().getResource(TRUSTSTORE_NAME).getFile();
JksOptions jksTrustStoreOptions = new JksOptions().setPath(trustStorePath).setPassword(TRUSTSTORE_PASSWORD);
serverOptions.setTrustStoreOptions(jksTrustStoreOptions);
serverOptions.setClientAuth(ClientAuth.REQUIRED);
}
mockServer = new MockServer(vertx, serverOptions, () -> authenticator, serverConnection -> {
serverConnection.openHandler(serverSender -> {
serverConnectionOpen.countDown();
serverConnection.closeHandler(x -> serverConnection.close());
serverConnection.open();
});
});
String amqpServerConnectionURI = "tcp://localhost:" + mockServer.actualPort() +
"?sslEnabled=true;trustStorePath=" + TRUSTSTORE_NAME + ";trustStorePassword=" + TRUSTSTORE_PASSWORD;
if (requireClientCert) {
amqpServerConnectionURI += ";keyStorePath=" + CLIENT_KEYSTORE_NAME + ";keyStorePassword=" + CLIENT_KEYSTORE_PASSWORD;
}
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("testSimpleConnect", amqpServerConnectionURI);
amqpConnection.setReconnectAttempts(0);// No reconnects
amqpConnection.setUser(USER); // Wont matter if EXTERNAL is offered and a client-certificate is provided, but will otherwise.
amqpConnection.setPassword(PASSWD);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
boolean awaitConnectionOpen = serverConnectionOpen.await(10, TimeUnit.SECONDS);
assertTrue("Broker did not open connection in alotted time", awaitConnectionOpen);
if (requireClientCert) {
assertEquals(EXTERNAL, authenticator.getChosenMech());
assertArrayEquals(new byte[0], authenticator.getInitialResponse());
} else {
assertEquals(PLAIN, authenticator.getChosenMech());
assertArrayEquals(expectedPlainInitialResponse(USER, PASSWD), authenticator.getInitialResponse());
}
}
private static byte[] expectedPlainInitialResponse(String username, String password) {
Objects.requireNonNull(username);
Objects.requireNonNull(password);
if (username.isEmpty() || password.isEmpty()) {
throw new IllegalArgumentException("Must provide at least 1 character in user and pass");
}
byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8);
byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8);
byte[] data = new byte[usernameBytes.length + passwordBytes.length + 2];
System.arraycopy(usernameBytes, 0, data, 1, usernameBytes.length);
System.arraycopy(passwordBytes, 0, data, 2 + usernameBytes.length, passwordBytes.length);
return data;
}
private static final class TestAuthenticator implements ProtonSaslAuthenticator {
private Sasl sasl;
private boolean succeed;
private String[] offeredMechs;
String chosenMech = null;
byte[] initialResponse = null;
boolean done = false;
TestAuthenticator(boolean succeed, String... offeredMechs) {
if (offeredMechs.length == 0) {
throw new IllegalArgumentException("Must provide at least 1 mechanism to offer");
}
this.offeredMechs = offeredMechs;
this.succeed = succeed;
}
@Override
public void init(NetSocket socket, ProtonConnection protonConnection, Transport transport) {
this.sasl = transport.sasl();
sasl.server();
sasl.allowSkip(false);
sasl.setMechanisms(offeredMechs);
}
@Override
public void process(Handler<Boolean> processComplete) {
if (!done) {
String[] remoteMechanisms = sasl.getRemoteMechanisms();
if (remoteMechanisms.length > 0) {
chosenMech = remoteMechanisms[0];
initialResponse = new byte[sasl.pending()];
sasl.recv(initialResponse, 0, initialResponse.length);
if (succeed) {
sasl.done(SaslOutcome.PN_SASL_OK);
} else {
sasl.done(SaslOutcome.PN_SASL_AUTH);
}
done = true;
}
}
processComplete.handle(done);
}
@Override
public boolean succeeded() {
return succeed;
}
public String getChosenMech() {
return chosenMech;
}
public byte[] getInitialResponse() {
return initialResponse;
}
}
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.connect;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonServer;
import io.vertx.proton.ProtonServerOptions;
import io.vertx.proton.sasl.ProtonSaslAuthenticatorFactory;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
public class MockServer {
private ProtonServer server;
public MockServer(Vertx vertx, Handler<ProtonConnection> connectionHandler) throws ExecutionException, InterruptedException {
this(vertx, new ProtonServerOptions(), null, connectionHandler);
}
public MockServer(Vertx vertx, ProtonSaslAuthenticatorFactory authFactory, Handler<ProtonConnection> connectionHandler) throws ExecutionException, InterruptedException {
this(vertx, new ProtonServerOptions(), authFactory, connectionHandler);
}
public MockServer(Vertx vertx, ProtonServerOptions options, ProtonSaslAuthenticatorFactory authFactory, Handler<ProtonConnection> connectionHandler) throws ExecutionException, InterruptedException {
Objects.requireNonNull(options, "options must not be null");
server = ProtonServer.create(vertx, options);
server.connectHandler(connectionHandler);
if (authFactory != null) {
server.saslAuthenticatorFactory(authFactory);
}
AtomicReference<Throwable> failure = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
// Passing port 0 to have the server choose port at bind.
// Use actualPort() to discover port used.
server.listen(0, res -> {
if (!res.succeeded()) {
failure.set(res.cause());
}
latch.countDown();
});
latch.await();
if (failure.get() != null) {
throw new ExecutionException(failure.get());
}
}
public int actualPort() {
return server.actualPort();
}
public void close() {
server.close();
}
ProtonServer getProtonServer() {
return server;
}
}