From c6f73b0c0a3156d4c83c481f82f1c12b015a653d Mon Sep 17 00:00:00 2001 From: Andy Taylor Date: Tue, 28 Feb 2017 10:05:39 +0000 Subject: [PATCH] ARTEMIS-1042 - support amqp failover list https://issues.apache.org/jira/browse/ARTEMIS-1042 --- .../core/client/impl/TopologyMemberImpl.java | 18 +++ .../amqp/broker/AMQPConnectionCallback.java | 13 ++ .../amqp/proton/AMQPConnectionContext.java | 24 +++ .../protocol/amqp/proton/AmqpSupport.java | 6 + .../amqp/AmqpNettyFailoverTest.java | 140 ++++++++++++++++++ 5 files changed, 201 insertions(+) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java index cf62e17ec7..c82a939ba7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.core.client.impl; +import java.net.URI; +import java.net.URISyntaxException; import java.util.Map; import org.apache.activemq.artemis.api.core.Pair; @@ -129,6 +131,22 @@ public final class TopologyMemberImpl implements TopologyMember { return "tcp://" + host + ":" + port; } + public URI toBackupURI() { + TransportConfiguration backupConnector = getBackup(); + if (backupConnector == null) { + return null; + } + Map props = backupConnector.getParams(); + String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, "localhost", props); + int port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, 0, props); + boolean sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, false, props); + try { + return new URI("tcp://" + host + ":" + port + "?" + TransportConstants.SSL_ENABLED_PROP_NAME + "=" + sslEnabled); + } catch (URISyntaxException e) { + return null; + } + } + @Override public String toString() { return "TopologyMember[id = " + nodeId + ", connector=" + connector + ", backupGroupName=" + backupGroupName + ", scaleDownGroupName=" + scaleDownGroupName + "]"; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java index 850671a642..7e7dc60c26 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPConnectionCallback.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.protocol.amqp.broker; +import java.net.URI; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -30,10 +31,13 @@ import io.netty.channel.ChannelFutureListener; import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; +import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl; import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; +import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; @@ -249,4 +253,13 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener { return new XidImpl("amqp".getBytes(), 1, bytes); } + public URI getFailoverList() { + ClusterManager clusterManager = server.getClusterManager(); + ClusterConnection clusterConnection = clusterManager.getDefaultConnection(null); + if (clusterConnection != null) { + TopologyMemberImpl member = clusterConnection.getTopology().getMember(server.getNodeID().toString()); + return member.toBackupURI(); + } + return null; + } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index bac3e7e9bf..d6cab99c9c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton; +import java.net.URI; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.UUID; @@ -24,6 +26,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; @@ -47,6 +50,12 @@ import org.jboss.logging.Logger; import io.netty.buffer.ByteBuf; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT; +import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME; + public class AMQPConnectionContext extends ProtonInitializable { private static final Logger log = Logger.getLogger(AMQPConnectionContext.class); @@ -206,6 +215,21 @@ public class AMQPConnectionContext extends ProtonInitializable { } public Symbol[] getConnectionCapabilitiesOffered() { + URI tc = connectionCallback.getFailoverList(); + if (tc != null) { + Map hostDetails = new HashMap<>(); + hostDetails.put(NETWORK_HOST, tc.getHost()); + boolean isSSL = tc.getQuery().contains(TransportConstants.SSL_ENABLED_PROP_NAME + "=true"); + if (isSSL) { + hostDetails.put(SCHEME, "amqps"); + } else { + hostDetails.put(SCHEME, "amqp"); + } + hostDetails.put(HOSTNAME, tc.getHost()); + hostDetails.put(PORT, tc.getPort()); + + connectionProperties.put(FAILOVER_SERVER_LIST, Arrays.asList(hostDetails)); + } return ExtCapability.getCapabilities(); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java index fa77ad5d2f..227ee5ded4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java @@ -56,6 +56,12 @@ public class AmqpSupport { public static final Symbol RESOURCE_DELETED = Symbol.valueOf("amqp:resource-deleted"); public static final Symbol CONNECTION_FORCED = Symbol.valueOf("amqp:connection:forced"); public static final Symbol SHARED_SUBS = Symbol.valueOf("SHARED-SUBS"); + static final Symbol NETWORK_HOST = Symbol.valueOf("network-host"); + static final Symbol PORT = Symbol.valueOf("port"); + static final Symbol SCHEME = Symbol.valueOf("scheme"); + static final Symbol HOSTNAME = Symbol.valueOf("hostname"); + + static final Symbol FAILOVER_SERVER_LIST = Symbol.valueOf("failover-server-list"); // Symbols used in configuration of newly opened links. diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java new file mode 100644 index 0000000000..f264269f85 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpNettyFailoverTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants; +import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTest; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +@RunWith(Parameterized.class) +public class AmqpNettyFailoverTest extends FailoverTest { + + + // this will ensure that all tests in this class are run twice, + // once with "true" passed to the class' constructor and once with "false" + @Parameterized.Parameters(name = "{0}") + public static Collection getParameters() { + + // these 3 are for comparison + return Arrays.asList(new Object[][]{{"NON_SSL", 0}, {"SSL", 1}}); + } + + + private final int protocol; + + public AmqpNettyFailoverTest(String name, int protocol) { + this.protocol = protocol; + } + + @Override + protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { + return getNettyAcceptorTransportConfig(live); + } + + @Override + protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) { + return getNettyConnectorTransportConfig(live); + } + + + @Test + public void testFailoverListWithAMQP() throws Exception { + JmsConnectionFactory factory = getJmsConnectionFactory(); + try (Connection connection = factory.createConnection()) { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + javax.jms.Queue queue = session.createQueue(ADDRESS.toString()); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("hello before failover")); + liveServer.crash(true, true); + producer.send(session.createTextMessage("hello after failover")); + MessageConsumer consumer = session.createConsumer(queue); + connection.start(); + TextMessage receive = (TextMessage) consumer.receive(5000); + Assert.assertNotNull(receive); + Assert.assertEquals("hello before failover", receive.getText()); + receive = (TextMessage) consumer.receive(5000); + Assert.assertEquals("hello after failover", receive.getText()); + Assert.assertNotNull(receive); + } + } + + private JmsConnectionFactory getJmsConnectionFactory() { + if (protocol == 0) { + return new JmsConnectionFactory("failover:(amqp://localhost:61616)"); + } else { + String keystore = this.getClass().getClassLoader().getResource("client-side-keystore.jks").getFile(); + String truststore = this.getClass().getClassLoader().getResource("client-side-truststore.jks").getFile(); + // return new JmsConnectionFactory("amqps://localhost:61616?transport.keyStoreLocation=" + keystore + "&transport.keyStorePassword=secureexample&transport.trustStoreLocation=" + truststore + "&transport.trustStorePassword=secureexample&transport.verifyHost=false"); + return new JmsConnectionFactory("failover:(amqps://localhost:61616?transport.keyStoreLocation=" + keystore + "&transport.keyStorePassword=secureexample&transport.trustStoreLocation=" + truststore + "&transport.trustStorePassword=secureexample&transport.verifyHost=false)"); + } + } + + private TransportConfiguration getNettyAcceptorTransportConfig(final boolean live) { + Map server1Params = new HashMap<>(); + if (protocol == 1) { + server1Params.put(TransportConstants.SSL_ENABLED_PROP_NAME, "true"); + + server1Params.put(TransportConstants.KEYSTORE_PATH_PROP_NAME, "server-side-keystore.jks"); + server1Params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "secureexample"); + server1Params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, "server-side-truststore.jks"); + server1Params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "secureexample"); + //server1Params.put(TransportConstants.NEED_CLIENT_AUTH_PROP_NAME, true); + + } + if (live) { + return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params); + } + + + server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1); + + return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params); + } + + private TransportConfiguration getNettyConnectorTransportConfig(final boolean live) { + Map server1Params = new HashMap<>(); + if (protocol == 1) { + server1Params.put(TransportConstants.SSL_ENABLED_PROP_NAME, "true"); + server1Params.put(TransportConstants.SSL_ENABLED_PROP_NAME, true); + server1Params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, "client-side-truststore.jks"); + server1Params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "secureexample"); + server1Params.put(TransportConstants.KEYSTORE_PATH_PROP_NAME, "client-side-keystore.jks"); + server1Params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "secureexample"); + } + if (live) { + return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params); + } + server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1); + return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params); + } + +}