This commit is contained in:
Clebert Suconic 2017-03-17 15:08:58 -04:00
commit d858858117
5 changed files with 201 additions and 0 deletions

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.activemq.artemis.core.client.impl; package org.apache.activemq.artemis.core.client.impl;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.Pair;
@ -129,6 +131,22 @@ public final class TopologyMemberImpl implements TopologyMember {
return "tcp://" + host + ":" + port; return "tcp://" + host + ":" + port;
} }
public URI toBackupURI() {
TransportConfiguration backupConnector = getBackup();
if (backupConnector == null) {
return null;
}
Map<String, Object> props = backupConnector.getParams();
String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, "localhost", props);
int port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, 0, props);
boolean sslEnabled = ConfigurationHelper.getBooleanProperty(TransportConstants.SSL_ENABLED_PROP_NAME, false, props);
try {
return new URI("tcp://" + host + ":" + port + "?" + TransportConstants.SSL_ENABLED_PROP_NAME + "=" + sslEnabled);
} catch (URISyntaxException e) {
return null;
}
}
@Override @Override
public String toString() { public String toString() {
return "TopologyMember[id = " + nodeId + ", connector=" + connector + ", backupGroupName=" + backupGroupName + ", scaleDownGroupName=" + scaleDownGroupName + "]"; return "TopologyMember[id = " + nodeId + ", connector=" + connector + ", backupGroupName=" + backupGroupName + ", scaleDownGroupName=" + scaleDownGroupName + "]";

View File

@ -16,6 +16,7 @@
*/ */
package org.apache.activemq.artemis.protocol.amqp.broker; package org.apache.activemq.artemis.protocol.amqp.broker;
import java.net.URI;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -30,10 +31,13 @@ import io.netty.channel.ChannelFutureListener;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper; import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.client.impl.TopologyMemberImpl;
import org.apache.activemq.artemis.core.remoting.CloseListener; import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener; import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl; import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@ -249,4 +253,13 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
return new XidImpl("amqp".getBytes(), 1, bytes); return new XidImpl("amqp".getBytes(), 1, bytes);
} }
public URI getFailoverList() {
ClusterManager clusterManager = server.getClusterManager();
ClusterConnection clusterConnection = clusterManager.getDefaultConnection(null);
if (clusterConnection != null) {
TopologyMemberImpl member = clusterConnection.getTopology().getMember(server.getNodeID().toString());
return member.toBackupURI();
}
return null;
}
} }

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.activemq.artemis.protocol.amqp.proton; package org.apache.activemq.artemis.protocol.amqp.proton;
import java.net.URI;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.UUID; import java.util.UUID;
@ -24,6 +26,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
@ -47,6 +50,12 @@ import org.jboss.logging.Logger;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.FAILOVER_SERVER_LIST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.HOSTNAME;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NETWORK_HOST;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.PORT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.SCHEME;
public class AMQPConnectionContext extends ProtonInitializable { public class AMQPConnectionContext extends ProtonInitializable {
private static final Logger log = Logger.getLogger(AMQPConnectionContext.class); private static final Logger log = Logger.getLogger(AMQPConnectionContext.class);
@ -206,6 +215,21 @@ public class AMQPConnectionContext extends ProtonInitializable {
} }
public Symbol[] getConnectionCapabilitiesOffered() { public Symbol[] getConnectionCapabilitiesOffered() {
URI tc = connectionCallback.getFailoverList();
if (tc != null) {
Map<Symbol,Object> hostDetails = new HashMap<>();
hostDetails.put(NETWORK_HOST, tc.getHost());
boolean isSSL = tc.getQuery().contains(TransportConstants.SSL_ENABLED_PROP_NAME + "=true");
if (isSSL) {
hostDetails.put(SCHEME, "amqps");
} else {
hostDetails.put(SCHEME, "amqp");
}
hostDetails.put(HOSTNAME, tc.getHost());
hostDetails.put(PORT, tc.getPort());
connectionProperties.put(FAILOVER_SERVER_LIST, Arrays.asList(hostDetails));
}
return ExtCapability.getCapabilities(); return ExtCapability.getCapabilities();
} }

View File

@ -56,6 +56,12 @@ public class AmqpSupport {
public static final Symbol RESOURCE_DELETED = Symbol.valueOf("amqp:resource-deleted"); public static final Symbol RESOURCE_DELETED = Symbol.valueOf("amqp:resource-deleted");
public static final Symbol CONNECTION_FORCED = Symbol.valueOf("amqp:connection:forced"); public static final Symbol CONNECTION_FORCED = Symbol.valueOf("amqp:connection:forced");
public static final Symbol SHARED_SUBS = Symbol.valueOf("SHARED-SUBS"); public static final Symbol SHARED_SUBS = Symbol.valueOf("SHARED-SUBS");
static final Symbol NETWORK_HOST = Symbol.valueOf("network-host");
static final Symbol PORT = Symbol.valueOf("port");
static final Symbol SCHEME = Symbol.valueOf("scheme");
static final Symbol HOSTNAME = Symbol.valueOf("hostname");
static final Symbol FAILOVER_SERVER_LIST = Symbol.valueOf("failover-server-list");
// Symbols used in configuration of newly opened links. // Symbols used in configuration of newly opened links.

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTest;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@RunWith(Parameterized.class)
public class AmqpNettyFailoverTest extends FailoverTest {
// this will ensure that all tests in this class are run twice,
// once with "true" passed to the class' constructor and once with "false"
@Parameterized.Parameters(name = "{0}")
public static Collection getParameters() {
// these 3 are for comparison
return Arrays.asList(new Object[][]{{"NON_SSL", 0}, {"SSL", 1}});
}
private final int protocol;
public AmqpNettyFailoverTest(String name, int protocol) {
this.protocol = protocol;
}
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
return getNettyAcceptorTransportConfig(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
return getNettyConnectorTransportConfig(live);
}
@Test
public void testFailoverListWithAMQP() throws Exception {
JmsConnectionFactory factory = getJmsConnectionFactory();
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(ADDRESS.toString());
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("hello before failover"));
liveServer.crash(true, true);
producer.send(session.createTextMessage("hello after failover"));
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
TextMessage receive = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(receive);
Assert.assertEquals("hello before failover", receive.getText());
receive = (TextMessage) consumer.receive(5000);
Assert.assertEquals("hello after failover", receive.getText());
Assert.assertNotNull(receive);
}
}
private JmsConnectionFactory getJmsConnectionFactory() {
if (protocol == 0) {
return new JmsConnectionFactory("failover:(amqp://localhost:61616)");
} else {
String keystore = this.getClass().getClassLoader().getResource("client-side-keystore.jks").getFile();
String truststore = this.getClass().getClassLoader().getResource("client-side-truststore.jks").getFile();
// return new JmsConnectionFactory("amqps://localhost:61616?transport.keyStoreLocation=" + keystore + "&transport.keyStorePassword=secureexample&transport.trustStoreLocation=" + truststore + "&transport.trustStorePassword=secureexample&transport.verifyHost=false");
return new JmsConnectionFactory("failover:(amqps://localhost:61616?transport.keyStoreLocation=" + keystore + "&transport.keyStorePassword=secureexample&transport.trustStoreLocation=" + truststore + "&transport.trustStorePassword=secureexample&transport.verifyHost=false)");
}
}
private TransportConfiguration getNettyAcceptorTransportConfig(final boolean live) {
Map<String, Object> server1Params = new HashMap<>();
if (protocol == 1) {
server1Params.put(TransportConstants.SSL_ENABLED_PROP_NAME, "true");
server1Params.put(TransportConstants.KEYSTORE_PATH_PROP_NAME, "server-side-keystore.jks");
server1Params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "secureexample");
server1Params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, "server-side-truststore.jks");
server1Params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "secureexample");
//server1Params.put(TransportConstants.NEED_CLIENT_AUTH_PROP_NAME, true);
}
if (live) {
return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params);
}
server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
return new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, server1Params);
}
private TransportConfiguration getNettyConnectorTransportConfig(final boolean live) {
Map<String, Object> server1Params = new HashMap<>();
if (protocol == 1) {
server1Params.put(TransportConstants.SSL_ENABLED_PROP_NAME, "true");
server1Params.put(TransportConstants.SSL_ENABLED_PROP_NAME, true);
server1Params.put(TransportConstants.TRUSTSTORE_PATH_PROP_NAME, "client-side-truststore.jks");
server1Params.put(TransportConstants.TRUSTSTORE_PASSWORD_PROP_NAME, "secureexample");
server1Params.put(TransportConstants.KEYSTORE_PATH_PROP_NAME, "client-side-keystore.jks");
server1Params.put(TransportConstants.KEYSTORE_PASSWORD_PROP_NAME, "secureexample");
}
if (live) {
return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params);
}
server1Params.put(org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.PORT_PROP_NAME, org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
return new TransportConfiguration(NETTY_CONNECTOR_FACTORY, server1Params);
}
}