diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/CertificateUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/CertificateUtil.java index 57c2309ef9..e2ebad6de3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/CertificateUtil.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/CertificateUtil.java @@ -29,6 +29,8 @@ import java.security.Principal; public class CertificateUtil { + private static final String SSL_HANDLER_NAME = "ssl"; + public static X509Certificate[] getCertsFromConnection(RemotingConnection remotingConnection) { X509Certificate[] certificates = null; if (remotingConnection != null) { @@ -46,7 +48,7 @@ public class CertificateUtil { Connection transportConnection = remotingConnection.getTransportConnection(); if (transportConnection instanceof NettyConnection) { NettyConnection nettyConnection = (NettyConnection) transportConnection; - ChannelHandler channelHandler = nettyConnection.getChannel().pipeline().get("ssl"); + ChannelHandler channelHandler = nettyConnection.getChannel().pipeline().get(SSL_HANDLER_NAME); if (channelHandler != null && channelHandler instanceof SslHandler) { SslHandler sslHandler = (SslHandler) channelHandler; try { @@ -59,4 +61,15 @@ public class CertificateUtil { return result; } + + public static Principal getLocalPrincipalFromConnection(NettyConnection nettyConnection) { + Principal result = null; + ChannelHandler handler = nettyConnection.getChannel().pipeline().get(SSL_HANDLER_NAME); + if (handler instanceof SslHandler) { + SslHandler sslHandler = (SslHandler) handler; + result = sslHandler.engine().getSession().getLocalPrincipal(); + } + + return result; + } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java index dcac1bea6b..304eaf07d0 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/AMQPBrokerConnection.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.connect; import java.nio.charset.StandardCharsets; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -39,6 +40,7 @@ import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBroker import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.QueueBinding; +import org.apache.activemq.artemis.core.remoting.CertificateUtil; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection; import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector; import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; @@ -239,21 +241,12 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, senders.clear(); receivers.clear(); - ClientSASLFactory saslFactory = null; - - if (brokerConnectConfiguration.getUser() != null && brokerConnectConfiguration.getPassword() != null) { - saslFactory = availableMechanims -> { - if (availableMechanims != null && Arrays.asList(availableMechanims).contains("PLAIN")) { - return new PlainSASLMechanism(brokerConnectConfiguration.getUser(), brokerConnectConfiguration.getPassword()); - } else { - return null; - } - }; - } + ClientSASLFactory saslFactory = new SaslFactory(connection, brokerConnectConfiguration); ConnectionEntry entry = protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory); server.getRemotingService().addConnectionEntry(connection, entry); protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection; + connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler(), this, server.getExecutorFactory().getExecutor())); session = protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session(); @@ -585,6 +578,11 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, protonRemotingConnection.flush(); } + private static final String EXTERNAL = "EXTERNAL"; + private static final String PLAIN = "PLAIN"; + private static final String ANONYMOUS = "ANONYMOUS"; + private static final byte[] EMPTY = new byte[0]; + private static class PlainSASLMechanism implements ClientSASL { private final byte[] initialResponse; @@ -600,7 +598,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, @Override public String getName() { - return "PLAIN"; + return PLAIN; } @Override @@ -610,7 +608,81 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener, @Override public byte[] getResponse(byte[] challenge) { - return new byte[0]; + return EMPTY; + } + + public static boolean isApplicable(final String username, final String password) { + return username != null && username.length() > 0 && password != null && password.length() > 0; + } + } + + private static class AnonymousSASLMechanism implements ClientSASL { + + @Override + public String getName() { + return ANONYMOUS; + } + + @Override + public byte[] getInitialResponse() { + return EMPTY; + } + + @Override + public byte[] getResponse(byte[] challenge) { + return EMPTY; + } + } + + private static class ExternalSASLMechanism implements ClientSASL { + + @Override + public String getName() { + return EXTERNAL; + } + + @Override + public byte[] getInitialResponse() { + return EMPTY; + } + + @Override + public byte[] getResponse(byte[] challenge) { + return EMPTY; + } + + public static boolean isApplicable(final NettyConnection connection) { + return CertificateUtil.getLocalPrincipalFromConnection(connection) != null; + } + } + + private static final class SaslFactory implements ClientSASLFactory { + + private final NettyConnection connection; + private final AMQPBrokerConnectConfiguration brokerConnectConfiguration; + + SaslFactory(NettyConnection connection, AMQPBrokerConnectConfiguration brokerConnectConfiguration) { + this.connection = connection; + this.brokerConnectConfiguration = brokerConnectConfiguration; + } + + @Override + public ClientSASL chooseMechanism(String[] offeredMechanims) { + List availableMechanisms = offeredMechanims == null ? Collections.emptyList() : Arrays.asList(offeredMechanims); + + if (availableMechanisms.contains(EXTERNAL) && ExternalSASLMechanism.isApplicable(connection)) { + return new ExternalSASLMechanism(); + } + + if (availableMechanisms.contains(PLAIN) && PlainSASLMechanism.isApplicable(brokerConnectConfiguration.getUser(), brokerConnectConfiguration.getPassword())) { + return new PlainSASLMechanism(brokerConnectConfiguration.getUser(), brokerConnectConfiguration.getPassword()); + } + + if (availableMechanisms.contains(ANONYMOUS)) { + return new AnonymousSASLMechanism(); + } + + return null; } } diff --git a/pom.xml b/pom.xml index 11b9190362..af9ae750e0 100644 --- a/pom.xml +++ b/pom.xml @@ -133,6 +133,7 @@ 2.5.10 + 3.9.4 1.4.3 5.1.7.RELEASE diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml index 151f656900..c4ccfb7b9b 100644 --- a/tests/integration-tests/pom.xml +++ b/tests/integration-tests/pom.xml @@ -417,6 +417,13 @@ netty-tcnative-boringssl-static ${netty-tcnative-version} + + + io.vertx + vertx-proton + ${vertx.version} + test + @@ -541,3 +548,4 @@ + diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java new file mode 100644 index 0000000000..43c938d40c --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/AMQPConnectSaslTest.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import java.nio.charset.StandardCharsets; +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport; +import org.apache.qpid.proton.engine.Sasl; +import org.apache.qpid.proton.engine.Sasl.SaslOutcome; +import org.apache.qpid.proton.engine.Transport; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.core.http.ClientAuth; +import io.vertx.core.net.JksOptions; +import io.vertx.core.net.NetSocket; +import io.vertx.proton.ProtonConnection; +import io.vertx.proton.ProtonServerOptions; +import io.vertx.proton.sasl.ProtonSaslAuthenticator; + +public class AMQPConnectSaslTest extends AmqpClientTestSupport { + + private static final int BROKER_PORT_NUM = AMQP_PORT + 1; + + private static final String SERVER_KEYSTORE_NAME = "keystore1.jks"; + private static final String SERVER_KEYSTORE_PASSWORD = "changeit"; + private static final String CLIENT_KEYSTORE_NAME = "client_not_revoked.jks"; + private static final String CLIENT_KEYSTORE_PASSWORD = "changeit"; + private static final String TRUSTSTORE_NAME = "truststore.jks"; + private static final String TRUSTSTORE_PASSWORD = "changeit"; + + private static final String USER = "MY_USER"; + private static final String PASSWD = "PASSWD_VALUE"; + + private static final String PLAIN = "PLAIN"; + private static final String ANONYMOUS = "ANONYMOUS"; + private static final String EXTERNAL = "EXTERNAL"; + + private Vertx vertx; + private MockServer mockServer; + + @Override + protected ActiveMQServer createServer() throws Exception { + // Creates the broker used to make the outgoing connection. The port passed is for + // that brokers acceptor. The test server connected to by the broker binds to a random port. + return createServer(BROKER_PORT_NUM, false); + } + + @Before + @Override + public void setUp() throws Exception { + super.setUp(); + vertx = Vertx.vertx(); + } + + @After + @Override + public void tearDown() throws Exception { + try { + super.tearDown(); + } finally { + if (mockServer != null) { + mockServer.close(); + } + + CountDownLatch closeLatch = new CountDownLatch(1); + vertx.close(x -> closeLatch.countDown()); + assertTrue("Vert.x instant not closed in alotted time", closeLatch.await(5, TimeUnit.SECONDS)); + } + } + + @Test(timeout = 20000) + public void testConnectsWithAnonymous() throws Exception { + CountDownLatch serverConnectionOpen = new CountDownLatch(1); + TestAuthenticator authenticator = new TestAuthenticator(true, PLAIN, ANONYMOUS); + + mockServer = new MockServer(vertx, () -> authenticator, serverConnection -> { + serverConnection.openHandler(serverSender -> { + serverConnectionOpen.countDown(); + serverConnection.closeHandler(x -> serverConnection.close()); + serverConnection.open(); + }); + }); + + // No user or pass given, it will have to select ANONYMOUS even though PLAIN also offered + AMQPBrokerConnectConfiguration amqpConnection = + new AMQPBrokerConnectConfiguration("testSimpleConnect", "tcp://localhost:" + mockServer.actualPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + + server.getConfiguration().addAMQPConnection(amqpConnection); + + server.start(); + + boolean awaitConnectionOpen = serverConnectionOpen.await(10, TimeUnit.SECONDS); + assertTrue("Broker did not open connection in alotted time", awaitConnectionOpen); + + assertEquals(ANONYMOUS, authenticator.getChosenMech()); + assertArrayEquals(new byte[0], authenticator.getInitialResponse()); + } + + @Test(timeout = 20000) + public void testConnectsWithPlain() throws Exception { + CountDownLatch serverConnectionOpen = new CountDownLatch(1); + TestAuthenticator authenticator = new TestAuthenticator(true, PLAIN, ANONYMOUS); + + mockServer = new MockServer(vertx, () -> authenticator, serverConnection -> { + serverConnection.openHandler(serverSender -> { + serverConnectionOpen.countDown(); + serverConnection.closeHandler(x -> serverConnection.close()); + serverConnection.open(); + }); + }); + + // User and pass are given, it will select PLAIN + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("testSimpleConnect", "tcp://localhost:" + mockServer.actualPort()); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.setUser(USER); + amqpConnection.setPassword(PASSWD); + + server.getConfiguration().addAMQPConnection(amqpConnection); + + server.start(); + + boolean awaitConnectionOpen = serverConnectionOpen.await(10, TimeUnit.SECONDS); + assertTrue("Broker did not open connection in alotted time", awaitConnectionOpen); + + assertEquals(PLAIN, authenticator.getChosenMech()); + assertArrayEquals(expectedPlainInitialResponse(USER, PASSWD), authenticator.getInitialResponse()); + } + + @Test(timeout = 20000) + public void testConnectsWithExternal() throws Exception { + doConnectWithExternalTestImpl(true); + } + + @Test(timeout = 20000) + public void testExternalIgnoredWhenNoClientCertSupplied() throws Exception { + doConnectWithExternalTestImpl(false); + } + + private void doConnectWithExternalTestImpl(boolean requireClientCert) throws ExecutionException, InterruptedException, Exception { + CountDownLatch serverConnectionOpen = new CountDownLatch(1); + // The test server always offers EXTERNAL, i.e sometimes mistakenly, to verify that the broker only selects it when it actually + // has a client-cert. Real servers shouldnt actually offer the mechanism to a client that didnt have to provide a cert. + TestAuthenticator authenticator = new TestAuthenticator(true, EXTERNAL, PLAIN); + + final String keyStorePath = this.getClass().getClassLoader().getResource(SERVER_KEYSTORE_NAME).getFile(); + JksOptions jksKeyStoreOptions = new JksOptions().setPath(keyStorePath).setPassword(SERVER_KEYSTORE_PASSWORD); + + ProtonServerOptions serverOptions = new ProtonServerOptions(); + serverOptions.setSsl(true); + serverOptions.setKeyStoreOptions(jksKeyStoreOptions); + + if (requireClientCert) { + final String trustStorePath = this.getClass().getClassLoader().getResource(TRUSTSTORE_NAME).getFile(); + JksOptions jksTrustStoreOptions = new JksOptions().setPath(trustStorePath).setPassword(TRUSTSTORE_PASSWORD); + + serverOptions.setTrustStoreOptions(jksTrustStoreOptions); + serverOptions.setClientAuth(ClientAuth.REQUIRED); + } + + mockServer = new MockServer(vertx, serverOptions, () -> authenticator, serverConnection -> { + serverConnection.openHandler(serverSender -> { + serverConnectionOpen.countDown(); + serverConnection.closeHandler(x -> serverConnection.close()); + serverConnection.open(); + }); + }); + + String amqpServerConnectionURI = "tcp://localhost:" + mockServer.actualPort() + + "?sslEnabled=true;trustStorePath=" + TRUSTSTORE_NAME + ";trustStorePassword=" + TRUSTSTORE_PASSWORD; + if (requireClientCert) { + amqpServerConnectionURI += ";keyStorePath=" + CLIENT_KEYSTORE_NAME + ";keyStorePassword=" + CLIENT_KEYSTORE_PASSWORD; + } + + AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("testSimpleConnect", amqpServerConnectionURI); + amqpConnection.setReconnectAttempts(0);// No reconnects + amqpConnection.setUser(USER); // Wont matter if EXTERNAL is offered and a client-certificate is provided, but will otherwise. + amqpConnection.setPassword(PASSWD); + + server.getConfiguration().addAMQPConnection(amqpConnection); + + server.start(); + + boolean awaitConnectionOpen = serverConnectionOpen.await(10, TimeUnit.SECONDS); + assertTrue("Broker did not open connection in alotted time", awaitConnectionOpen); + + if (requireClientCert) { + assertEquals(EXTERNAL, authenticator.getChosenMech()); + assertArrayEquals(new byte[0], authenticator.getInitialResponse()); + } else { + assertEquals(PLAIN, authenticator.getChosenMech()); + assertArrayEquals(expectedPlainInitialResponse(USER, PASSWD), authenticator.getInitialResponse()); + } + } + + private static byte[] expectedPlainInitialResponse(String username, String password) { + Objects.requireNonNull(username); + Objects.requireNonNull(password); + if (username.isEmpty() || password.isEmpty()) { + throw new IllegalArgumentException("Must provide at least 1 character in user and pass"); + } + + byte[] usernameBytes = username.getBytes(StandardCharsets.UTF_8); + byte[] passwordBytes = password.getBytes(StandardCharsets.UTF_8); + + byte[] data = new byte[usernameBytes.length + passwordBytes.length + 2]; + System.arraycopy(usernameBytes, 0, data, 1, usernameBytes.length); + System.arraycopy(passwordBytes, 0, data, 2 + usernameBytes.length, passwordBytes.length); + + return data; + } + + private static final class TestAuthenticator implements ProtonSaslAuthenticator { + private Sasl sasl; + private boolean succeed; + private String[] offeredMechs; + String chosenMech = null; + byte[] initialResponse = null; + boolean done = false; + + TestAuthenticator(boolean succeed, String... offeredMechs) { + if (offeredMechs.length == 0) { + throw new IllegalArgumentException("Must provide at least 1 mechanism to offer"); + } + + this.offeredMechs = offeredMechs; + this.succeed = succeed; + } + + @Override + public void init(NetSocket socket, ProtonConnection protonConnection, Transport transport) { + this.sasl = transport.sasl(); + sasl.server(); + sasl.allowSkip(false); + sasl.setMechanisms(offeredMechs); + } + + @Override + public void process(Handler processComplete) { + if (!done) { + String[] remoteMechanisms = sasl.getRemoteMechanisms(); + if (remoteMechanisms.length > 0) { + chosenMech = remoteMechanisms[0]; + + initialResponse = new byte[sasl.pending()]; + sasl.recv(initialResponse, 0, initialResponse.length); + + if (succeed) { + sasl.done(SaslOutcome.PN_SASL_OK); + } else { + sasl.done(SaslOutcome.PN_SASL_AUTH); + } + + done = true; + } + } + + processComplete.handle(done); + } + + @Override + public boolean succeeded() { + return succeed; + } + + public String getChosenMech() { + return chosenMech; + } + + public byte[] getInitialResponse() { + return initialResponse; + } + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/MockServer.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/MockServer.java new file mode 100644 index 0000000000..b142940eb3 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/connect/MockServer.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp.connect; + +import io.vertx.core.Handler; +import io.vertx.core.Vertx; +import io.vertx.proton.ProtonConnection; +import io.vertx.proton.ProtonServer; +import io.vertx.proton.ProtonServerOptions; +import io.vertx.proton.sasl.ProtonSaslAuthenticatorFactory; + +import java.util.Objects; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicReference; + +public class MockServer { + private ProtonServer server; + + public MockServer(Vertx vertx, Handler connectionHandler) throws ExecutionException, InterruptedException { + this(vertx, new ProtonServerOptions(), null, connectionHandler); + } + + public MockServer(Vertx vertx, ProtonSaslAuthenticatorFactory authFactory, Handler connectionHandler) throws ExecutionException, InterruptedException { + this(vertx, new ProtonServerOptions(), authFactory, connectionHandler); + } + + public MockServer(Vertx vertx, ProtonServerOptions options, ProtonSaslAuthenticatorFactory authFactory, Handler connectionHandler) throws ExecutionException, InterruptedException { + Objects.requireNonNull(options, "options must not be null"); + + server = ProtonServer.create(vertx, options); + server.connectHandler(connectionHandler); + if (authFactory != null) { + server.saslAuthenticatorFactory(authFactory); + } + + AtomicReference failure = new AtomicReference<>(); + CountDownLatch latch = new CountDownLatch(1); + + // Passing port 0 to have the server choose port at bind. + // Use actualPort() to discover port used. + server.listen(0, res -> { + if (!res.succeeded()) { + failure.set(res.cause()); + } + latch.countDown(); + }); + + latch.await(); + + if (failure.get() != null) { + throw new ExecutionException(failure.get()); + } + } + + public int actualPort() { + return server.actualPort(); + } + + public void close() { + server.close(); + } + + ProtonServer getProtonServer() { + return server; + } +}