ARTEMIS-4744 Fully support multple host broker connections URIs

Create a new NettyConnector for each connection attempt that is configured from
distinct broker connection URIs which allows for differing TLS configuration
per remote connection configuration.
This commit is contained in:
Timothy Bish 2024-04-25 13:01:00 -04:00 committed by Robbie Gemmell
parent 934fe24e5c
commit ee7a2c0944
4 changed files with 276 additions and 44 deletions

View File

@ -49,8 +49,11 @@ import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirror
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.remoting.CertificateUtil;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -65,10 +68,13 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.ActiveMQProtonRemotingConnection;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManagerFactory;
import org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnectionManager.ClientProtocolManagerWithAMQP;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationPolicySupport;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationSource;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerAggregation;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.AMQPMirrorControllerSource;
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceIDSupplier;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPLargeMessageWriter;
@ -120,10 +126,13 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
*/
public static final boolean DEFAULT_CORE_MESSAGE_TUNNELING_ENABLED = true;
private static final NettyConnectorFactory CONNECTOR_FACTORY = new NettyConnectorFactory().setServerConnector(true);
private final ProtonProtocolManagerFactory protonProtocolManagerFactory;
private final ReferenceIDSupplier referenceIdSupplier;
private final AMQPBrokerConnectConfiguration brokerConnectConfiguration;
private final ProtonProtocolManager protonProtocolManager;
private final ActiveMQServer server;
private final NettyConnector bridgesConnector;
private final List<TransportConfiguration> configurations;
private NettyConnection connection;
private Session session;
private AMQPSessionContext sessionContext;
@ -134,6 +143,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
private AMQPFederationSource brokerFederation;
private int retryCounter = 0;
private int lastRetryCounter;
private int connectionTimeout;
private boolean connecting = false;
private volatile ScheduledFuture<?> reconnectFuture;
private final Set<Queue> senders = new HashSet<>();
@ -153,16 +163,16 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
public AMQPBrokerConnection(AMQPBrokerConnectionManager bridgeManager,
AMQPBrokerConnectConfiguration brokerConnectConfiguration,
ProtonProtocolManager protonProtocolManager,
ActiveMQServer server,
NettyConnector bridgesConnector) {
ProtonProtocolManagerFactory protonProtocolManagerFactory,
ActiveMQServer server) throws Exception {
this.bridgeManager = bridgeManager;
this.brokerConnectConfiguration = brokerConnectConfiguration;
this.protonProtocolManager = protonProtocolManager;
this.server = server;
this.bridgesConnector = bridgesConnector;
connectExecutor = server.getExecutorFactory().getExecutor();
scheduledExecutorService = server.getScheduledPool();
this.configurations = brokerConnectConfiguration.getTransportConfigurations();
this.connectExecutor = server.getExecutorFactory().getExecutor();
this.scheduledExecutorService = server.getScheduledPool();
this.protonProtocolManagerFactory = protonProtocolManagerFactory;
this.referenceIdSupplier = new ReferenceIDSupplier(server);
}
@Override
@ -190,7 +200,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
}
public int getConnectionTimeout() {
return bridgesConnector.getConnectTimeoutMillis();
return connectionTimeout;
}
@Override
@ -340,19 +350,32 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
try {
connecting = true;
List<TransportConfiguration> configurationList = brokerConnectConfiguration.getTransportConfigurations();
TransportConfiguration configuration = configurations.get(retryCounter % configurations.size());
host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, configuration.getParams());
port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, configuration.getParams());
TransportConfiguration tpConfig = configurationList.get(retryCounter % configurationList.size());
ProtonProtocolManager protonProtocolManager =
(ProtonProtocolManager)protonProtocolManagerFactory.createProtocolManager(server, configuration.getExtraParams(), null, null);
NettyConnector connector = (NettyConnector)CONNECTOR_FACTORY.createConnector(
configuration.getParams(), null, this, server.getExecutorFactory().getExecutor(), server.getThreadPool(), server.getScheduledPool(), new ClientProtocolManagerWithAMQP(protonProtocolManager));
connector.start();
String hostOnParameter = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, tpConfig.getParams());
int portOnParameter = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, tpConfig.getParams());
this.host = hostOnParameter;
this.port = portOnParameter;
connection = bridgesConnector.createConnection(null, hostOnParameter, portOnParameter);
logger.debug("Connecting {}", configuration);
if (connection == null) {
retryConnection();
return;
connectionTimeout = connector.getConnectTimeoutMillis();
try {
connection = (NettyConnection) connector.createConnection();
if (connection == null) {
retryConnection();
return;
}
} finally {
if (connection == null) {
try {
connector.close();
} catch (Exception ex) {
}
}
}
lastRetryCounter = retryCounter;
@ -368,12 +391,15 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
ClientSASLFactory saslFactory = new SaslFactory(connection, brokerConnectConfiguration);
NettyConnectorCloseHandler connectorCloseHandler = new NettyConnectorCloseHandler(connector, connectExecutor);
ConnectionEntry entry = protonProtocolManager.createOutgoingConnectionEntry(connection, saslFactory);
server.getRemotingService().addConnectionEntry(connection, entry);
protonRemotingConnection = (ActiveMQProtonRemotingConnection) entry.connection;
protonRemotingConnection.getAmqpConnection().addLinkRemoteCloseListener(getName(), this::linkClosed);
protonRemotingConnection.addCloseListener(connectorCloseHandler);
protonRemotingConnection.addFailureListener(connectorCloseHandler);
connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(bridgesConnector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler(), this, server.getExecutorFactory().getExecutor()));
connection.getChannel().pipeline().addLast(new AMQPBrokerConnectionChannelHandler(connector.getChannelGroup(), protonRemotingConnection.getAmqpConnection().getHandler(), this, server.getExecutorFactory().getExecutor()));
session = protonRemotingConnection.getAmqpConnection().getHandler().getConnection().session();
sessionContext = protonRemotingConnection.getAmqpConnection().getSessionExtension(session);
@ -531,7 +557,7 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
throw new IllegalAccessException("Cannot start replica");
}
AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(protonProtocolManager, snfQueue, server, replicaConfig, this);
AMQPMirrorControllerSource newPartition = new AMQPMirrorControllerSource(referenceIdSupplier, snfQueue, server, replicaConfig, this);
this.mirrorControllerSource = newPartition;
@ -702,11 +728,11 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
AtomicBoolean cancelled = new AtomicBoolean(false);
if (bridgesConnector.getConnectTimeoutMillis() > 0) {
if (getConnectionTimeout() > 0) {
futureTimeout = server.getScheduledPool().schedule(() -> {
cancelled.set(true);
error(ActiveMQAMQPProtocolMessageBundle.BUNDLE.brokerConnectionTimeout(), lastRetryCounter);
}, bridgesConnector.getConnectTimeoutMillis(), TimeUnit.MILLISECONDS);
}, getConnectionTimeout(), TimeUnit.MILLISECONDS);
} else {
futureTimeout = null;
}
@ -1059,4 +1085,39 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
return DEFAULT_CORE_MESSAGE_TUNNELING_ENABLED;
}
}
public static class NettyConnectorCloseHandler implements FailureListener, CloseListener {
private final NettyConnector connector;
private final Executor connectionExecutor;
public NettyConnectorCloseHandler(NettyConnector connector, Executor connectionExecutor) {
this.connector = connector;
this.connectionExecutor = connectionExecutor;
}
@Override
public void connectionClosed() {
doCloseConnector();
}
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver) {
doCloseConnector();
}
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
doCloseConnector();
}
private void doCloseConnector() {
connectionExecutor.execute(() -> {
try {
connector.close();
} catch (Exception ex) {
}
});
}
}
}

View File

@ -33,8 +33,6 @@ import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBroker
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
@ -60,8 +58,6 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
private final Map<String, AMQPBrokerConnectConfiguration> amqpConnectionsConfig;
private final Map<String, AMQPBrokerConnection> amqpBrokerConnections = new HashMap<>();
private ProtonProtocolManager protonProtocolManager;
public AMQPBrokerConnectionManager(ProtonProtocolManagerFactory factory, List<AMQPBrokerConnectConfiguration> amqpConnectionsConfig, ActiveMQServer server) {
this.amqpConnectionsConfig =
amqpConnectionsConfig.stream()
@ -71,10 +67,6 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
this.protonProtocolManagerFactory = factory;
}
public ProtonProtocolManagerFactory getProtocolManagerFactory() {
return protonProtocolManagerFactory;
}
@Override
public void start() throws Exception {
if (!started) {
@ -94,14 +86,7 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
}
private void createBrokerConnection(AMQPBrokerConnectConfiguration configuration, boolean start) throws Exception {
NettyConnectorFactory factory = new NettyConnectorFactory().setServerConnector(true);
protonProtocolManager = (ProtonProtocolManager)protonProtocolManagerFactory.createProtocolManager(server, configuration.getTransportConfigurations().get(0).getExtraParams(), null, null);
NettyConnector bridgesConnector = (NettyConnector)factory.createConnector(configuration.getTransportConfigurations().get(0).getParams(), null, this, server.getExecutorFactory().getExecutor(), server.getThreadPool(), server.getScheduledPool(), new ClientProtocolManagerWithAMQP(protonProtocolManager));
bridgesConnector.start();
logger.debug("Connecting {}", configuration);
AMQPBrokerConnection amqpBrokerConnection = new AMQPBrokerConnection(this, configuration, protonProtocolManager, server, bridgesConnector);
AMQPBrokerConnection amqpBrokerConnection = new AMQPBrokerConnection(this, configuration, protonProtocolManagerFactory, server);
amqpBrokerConnections.put(configuration.getName(), amqpBrokerConnection);
server.registerBrokerConnection(amqpBrokerConnection);

View File

@ -49,7 +49,6 @@ import org.apache.activemq.artemis.core.transaction.TransactionOperationAbstract
import org.apache.activemq.artemis.core.transaction.TransactionPropertyIndexes;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessageBrokerAccessor;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.connect.AMQPBrokerConnection;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
@ -164,7 +163,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
return started;
}
public AMQPMirrorControllerSource(ProtonProtocolManager protonProtocolManager, Queue snfQueue, ActiveMQServer server, AMQPMirrorBrokerConnectionElement replicaConfig,
public AMQPMirrorControllerSource(ReferenceIDSupplier referenceIdSupplier, Queue snfQueue, ActiveMQServer server, AMQPMirrorBrokerConnectionElement replicaConfig,
AMQPBrokerConnection brokerConnection) {
super(server);
assert snfQueue != null;
@ -175,7 +174,7 @@ public class AMQPMirrorControllerSource extends BasicMirrorController<Sender> im
snfQueue.setInternalQueue(true); // to avoid redistribution kicking in
}
this.server = server;
this.idSupplier = protonProtocolManager.getReferenceIDSupplier();
this.idSupplier = referenceIdSupplier;
this.addQueues = replicaConfig.isQueueCreation();
this.deleteQueues = replicaConfig.isQueueRemoval();
this.addressFilter = new MirrorAddressFilter(replicaConfig.getAddressFilter());

View File

@ -40,6 +40,7 @@ public class AMQPConnectSaslTest extends AmqpClientTestSupport {
private static final int BROKER_PORT_NUM = AMQP_PORT + 1;
private static final String SERVER_KEYSTORE_NAME = "server-keystore.jks";
private static final String UNKNOWN_SERVER_KEYSTORE_NAME = "unknown-server-keystore.jks";
private static final String SERVER_KEYSTORE_PASSWORD = "securepass";
private static final String CLIENT_KEYSTORE_NAME = "client-keystore.jks";
private static final String CLIENT_KEYSTORE_PASSWORD = "securepass";
@ -194,7 +195,8 @@ public class AMQPConnectSaslTest extends AmqpClientTestSupport {
logger.debug("Connect test started, peer listening on: {}", remoteURI);
String amqpServerConnectionURI = "tcp://localhost:" + remoteURI.getPort() +
"?sslEnabled=true;trustStorePath=" + SERVER_TRUSTSTORE_NAME + ";trustStorePassword=" + SERVER_TRUSTSTORE_PASSWORD;
"?sslEnabled=true;trustStorePath=" + SERVER_TRUSTSTORE_NAME +
";trustStorePassword=" + SERVER_TRUSTSTORE_PASSWORD;
if (requireClientCert) {
amqpServerConnectionURI +=
";keyStorePath=" + CLIENT_KEYSTORE_NAME + ";keyStorePassword=" + CLIENT_KEYSTORE_PASSWORD;
@ -214,4 +216,189 @@ public class AMQPConnectSaslTest extends AmqpClientTestSupport {
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test(timeout = 20_000)
public void testReconnectConnectsWithVerifyHostOffOnSecondURI() throws Exception {
final String keyStorePath = this.getClass().getClassLoader().getResource(UNKNOWN_SERVER_KEYSTORE_NAME).getFile();
ProtonTestServerOptions server1Options = new ProtonTestServerOptions();
server1Options.setSecure(true);
server1Options.setKeyStoreLocation(keyStorePath);
server1Options.setKeyStorePassword(SERVER_KEYSTORE_PASSWORD);
server1Options.setVerifyHost(false);
ProtonTestServerOptions server2Options = new ProtonTestServerOptions();
server2Options.setSecure(true);
server2Options.setKeyStoreLocation(keyStorePath);
server2Options.setKeyStorePassword(SERVER_KEYSTORE_PASSWORD);
server2Options.setVerifyHost(false);
try (ProtonTestServer firstPeer = new ProtonTestServer(server1Options);
ProtonTestServer secondPeer = new ProtonTestServer(server2Options)) {
firstPeer.expectConnectionToDrop();
firstPeer.start();
secondPeer.expectSASLHeader().respondWithSASLHeader();
secondPeer.remoteSaslMechanisms().withMechanisms(EXTERNAL, PLAIN).queue();
secondPeer.expectSaslInit().withMechanism(PLAIN).withInitialResponse(secondPeer.saslPlainInitialResponse(USER, PASSWD));
secondPeer.remoteSaslOutcome().withCode(SaslCode.OK).queue();
secondPeer.expectAMQPHeader().respondWithAMQPHeader();
secondPeer.expectOpen().respond();
secondPeer.expectBegin().respond();
secondPeer.start();
final URI firstPeerURI = firstPeer.getServerURI();
logger.debug("Connect test started, first peer listening on: {}", firstPeerURI);
final URI secondPeerURI = secondPeer.getServerURI();
logger.debug("Connect test started, second peer listening on: {}", secondPeerURI);
// First connection fails because we use a server certificate with whose common name
// doesn't match the host, second connection should work as we disable host verification
String amqpServerConnectionURI =
"tcp://localhost:" + firstPeerURI.getPort() + "?verifyHost=true" +
";sslEnabled=true;trustStorePath=" + SERVER_TRUSTSTORE_NAME +
";trustStorePassword=" + SERVER_TRUSTSTORE_PASSWORD +
"#tcp://localhost:" + secondPeerURI.getPort() + "?verifyHost=false";
AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), amqpServerConnectionURI);
amqpConnection.setReconnectAttempts(20); // Allow reconnects
amqpConnection.setRetryInterval(100); // Allow reconnects
amqpConnection.setUser(USER);
amqpConnection.setPassword(PASSWD);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
firstPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
secondPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test(timeout = 20_000)
public void testReconnectionUsesConfigurationToReconnectToSecondHostAfterFirstFails() throws Exception {
final String keyStore1Path = this.getClass().getClassLoader().getResource(UNKNOWN_SERVER_KEYSTORE_NAME).getFile();
final String keyStore2Path = this.getClass().getClassLoader().getResource(SERVER_KEYSTORE_NAME).getFile();
ProtonTestServerOptions server1Options = new ProtonTestServerOptions();
server1Options.setSecure(true);
server1Options.setKeyStoreLocation(keyStore1Path);
server1Options.setKeyStorePassword(SERVER_KEYSTORE_PASSWORD);
server1Options.setVerifyHost(false);
ProtonTestServerOptions server2Options = new ProtonTestServerOptions();
server2Options.setSecure(true);
server2Options.setKeyStoreLocation(keyStore2Path);
server2Options.setKeyStorePassword(SERVER_KEYSTORE_PASSWORD);
server2Options.setVerifyHost(false);
try (ProtonTestServer firstPeer = new ProtonTestServer(server1Options);
ProtonTestServer secondPeer = new ProtonTestServer(server2Options)) {
firstPeer.expectConnectionToDrop();
firstPeer.start();
secondPeer.expectSASLHeader().respondWithSASLHeader();
secondPeer.remoteSaslMechanisms().withMechanisms(EXTERNAL, PLAIN).queue();
secondPeer.expectSaslInit().withMechanism(PLAIN)
.withInitialResponse(secondPeer.saslPlainInitialResponse(USER, PASSWD));
secondPeer.remoteSaslOutcome().withCode(SaslCode.OK).queue();
secondPeer.expectAMQPHeader().respondWithAMQPHeader();
secondPeer.expectOpen().respond();
secondPeer.expectBegin().respond();
secondPeer.start();
final URI firstPeerURI = firstPeer.getServerURI();
logger.debug("Connect test started, first peer listening on: {}", firstPeerURI);
final URI secondPeerURI = secondPeer.getServerURI();
logger.debug("Connect test started, second peer listening on: {}", secondPeerURI);
String amqpServerConnectionURI =
"tcp://127.0.0.1:" + firstPeerURI.getPort() + "?sslEnabled=true;trustStorePath=" + SERVER_TRUSTSTORE_NAME +
";trustStorePassword=" + SERVER_TRUSTSTORE_PASSWORD +
"#tcp://localhost:" + secondPeerURI.getPort();
AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), amqpServerConnectionURI);
amqpConnection.setReconnectAttempts(20); // Allow reconnects
amqpConnection.setRetryInterval(100); // Allow reconnects
amqpConnection.setUser(USER);
amqpConnection.setPassword(PASSWD);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
firstPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
secondPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
@Test(timeout = 20_000)
public void testReconnectionUsesHostSpecificConfigurationToReconnectToSecondHostAfterFirstFails() throws Exception {
final String keyStore1Path = this.getClass().getClassLoader().getResource(UNKNOWN_SERVER_KEYSTORE_NAME).getFile();
final String keyStore2Path = this.getClass().getClassLoader().getResource(SERVER_KEYSTORE_NAME).getFile();
ProtonTestServerOptions server1Options = new ProtonTestServerOptions();
server1Options.setSecure(true);
server1Options.setKeyStoreLocation(keyStore1Path);
server1Options.setKeyStorePassword(SERVER_KEYSTORE_PASSWORD);
server1Options.setVerifyHost(false);
ProtonTestServerOptions server2Options = new ProtonTestServerOptions();
server2Options.setSecure(true);
server2Options.setKeyStoreLocation(keyStore2Path);
server2Options.setKeyStorePassword(SERVER_KEYSTORE_PASSWORD);
server2Options.setVerifyHost(false);
try (ProtonTestServer firstPeer = new ProtonTestServer(server1Options);
ProtonTestServer secondPeer = new ProtonTestServer(server2Options)) {
firstPeer.expectConnectionToDrop();
firstPeer.start();
secondPeer.expectSASLHeader().respondWithSASLHeader();
secondPeer.remoteSaslMechanisms().withMechanisms(EXTERNAL, PLAIN).queue();
secondPeer.expectSaslInit().withMechanism(PLAIN)
.withInitialResponse(secondPeer.saslPlainInitialResponse(USER, PASSWD));
secondPeer.remoteSaslOutcome().withCode(SaslCode.OK).queue();
secondPeer.expectAMQPHeader().respondWithAMQPHeader();
secondPeer.expectOpen().respond();
secondPeer.expectBegin().respond();
secondPeer.start();
final URI firstPeerURI = firstPeer.getServerURI();
logger.debug("Connect test started, first peer listening on: {}", firstPeerURI);
final URI secondPeerURI = secondPeer.getServerURI();
logger.debug("Connect test started, second peer listening on: {}", secondPeerURI);
// First connection fails because we use the wrong trust store for the TLS handshake
String amqpServerConnectionURI =
"tcp://localhost:" + firstPeerURI.getPort() +
"?sslEnabled=true;trustStorePath=" + CLIENT_TRUSTSTORE_NAME +
";trustStorePassword=" + CLIENT_TRUSTSTORE_PASSWORD +
"#tcp://localhost:" + secondPeerURI.getPort() +
"?sslEnabled=true;trustStorePath=" + SERVER_TRUSTSTORE_NAME +
";trustStorePassword=" + SERVER_TRUSTSTORE_PASSWORD;
AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), amqpServerConnectionURI);
amqpConnection.setReconnectAttempts(20); // Allow reconnects
amqpConnection.setRetryInterval(100); // Allow reconnects
amqpConnection.setUser(USER);
amqpConnection.setPassword(PASSWD);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
firstPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
secondPeer.waitForScriptToComplete(5, TimeUnit.SECONDS);
}
}
}