https://issues.apache.org/jira/browse/AMQ-3564 - new masterslave: discovery scheme for network connectors to master slave pairs, more intuitive configuration. Layers over static:failover:(a,b)?maxReconnectAttempts=0. Impl and test

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1188641 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2011-10-25 13:03:18 +00:00
parent f4df288872
commit 9ba8d2681f
5 changed files with 511 additions and 0 deletions

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.discovery.masterslave;
import java.net.URI;
import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A static DiscoveryAgent that supports connecting to a Master / Slave tuple
* of brokers.
*/
public class MasterSlaveDiscoveryAgent extends SimpleDiscoveryAgent {
private final static Logger LOG = LoggerFactory.getLogger(MasterSlaveDiscoveryAgent.class);
private String[] msServices = new String[]{};
@Override
public String[] getServices() {
return msServices;
}
@Override
public void setServices(String services) {
this.msServices = services.split(",");
configureServices();
}
@Override
public void setServices(String services[]) {
this.msServices = services;
configureServices();
}
@Override
public void setServices(URI services[]) {
this.msServices = new String[services.length];
for (int i = 0; i < services.length; i++) {
this.msServices[i] = services[i].toString();
}
configureServices();
}
protected void configureServices() {
if ((msServices == null) || (msServices.length < 2)) {
LOG.error("masterSlave requires at least 2 URIs");
msServices = new String[]{};
throw new IllegalArgumentException("Expecting at least 2 arguments");
}
StringBuffer buf = new StringBuffer();
buf.append("failover:(");
for (int i = 0; i < (msServices.length - 1); i++) {
buf.append(msServices[i]);
buf.append(',');
}
buf.append(msServices[msServices.length - 1]);
buf.append(")?randomize=false&maxReconnectAttempts=0");
super.setServices(new String[]{buf.toString()});
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.discovery.masterslave;
import org.apache.activemq.transport.discovery.DiscoveryAgent;
import org.apache.activemq.transport.discovery.DiscoveryAgentFactory;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.URISupport.CompositeData;
import java.io.IOException;
import java.net.URI;
import java.util.Map;
public class MasterSlaveDiscoveryAgentFactory extends DiscoveryAgentFactory {
@Override
protected DiscoveryAgent doCreateDiscoveryAgent(URI uri) throws IOException {
try {
CompositeData data = URISupport.parseComposite(uri);
Map options = data.getParameters();
MasterSlaveDiscoveryAgent rc = new MasterSlaveDiscoveryAgent();
IntrospectionSupport.setProperties(rc, options);
rc.setServices(data.getComponents());
return rc;
} catch (Throwable e) {
throw IOExceptionSupport.create("Could not create discovery agent: " + uri, e);
}
}
}

View File

@ -0,0 +1,25 @@
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<html>
<head>
</head>
<body>
Static discovery implementation for a Master / Slave tuple
</body>
</html>

View File

@ -0,0 +1,17 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.transport.discovery.masterslave.MasterSlaveDiscoveryAgentFactory

View File

@ -0,0 +1,338 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.discovery;
import java.io.File;
import java.net.URI;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.apache.activemq.util.IOHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class MasterSlaveDiscoveryTest extends TestCase {
private static final Log LOG = LogFactory.getLog(MasterSlaveDiscoveryTest.class);
private static final int NUMBER = 10;
private static final String BROKER_A_DIRECTORY = "target/activemq-data/kahadbA";
private static final String BROKER_A1_NAME = "BROKERA1";
private static final String BROKER_A1_BIND_ADDRESS = "tcp://127.0.0.1:61616";
private static final String BROKER_A2_NAME = "BROKERA2";
private static final String BROKER_A2_BIND_ADDRESS = "tcp://127.0.0.1:61617";
private static final String BROKER_B_DIRECTORY = "target/activemq-data/kahadbB";
private static final String BROKER_B1_NAME = "BROKERB1";
private static final String BROKER_B1_BIND_ADDRESS = "tcp://127.0.0.1:61626";
private static final String BROKER_B2_NAME = "BROKERB2";
private static final String BROKER_B2_BIND_ADDRESS = "tcp://127.0.0.1:61627";
private BrokerService brokerA1;
private BrokerService brokerA2;
private BrokerService brokerB1;
private BrokerService brokerB2;
private String clientUrlA;
private String clientUrlB;
public void testNetworkFailback() throws Exception {
final long timeout = 5000; // 5 seconds
final String queueName = getClass().getName();
ActiveMQConnectionFactory factoryA = new ActiveMQConnectionFactory(clientUrlA);
ActiveMQConnection connectionA = (ActiveMQConnection) factoryA.createConnection();
connectionA.start();
Session sessionA = connectionA.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queueA = sessionA.createQueue(queueName);
MessageProducer producerA = sessionA.createProducer(queueA);
ActiveMQConnectionFactory factoryB = new ActiveMQConnectionFactory(clientUrlB);
ActiveMQConnection connectionB = (ActiveMQConnection) factoryB.createConnection();
connectionB.start();
Session sessionB = connectionB.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queueB = sessionB.createQueue(queueName);
MessageConsumer consumerB = sessionA.createConsumer(queueB);
// Test initial configuration is working
String msgStr = queueName + "-" + System.currentTimeMillis();
Message msgSent = sessionA.createTextMessage(msgStr);
producerA.send(msgSent);
Message msgReceived = null;
try {
msgReceived = consumerB.receive(timeout);
} catch (JMSException e) {
fail("Message Timeout");
}
assertTrue(msgReceived instanceof TextMessage);
assertEquals(((TextMessage) msgReceived).getText(), msgStr);
// Test Failover
assertTrue(brokerB2.isSlave());
brokerB1.stop();
brokerB2.waitUntilStarted();
assertFalse(brokerB2.isSlave());
msgStr = queueName + "-" + System.currentTimeMillis();
msgSent = sessionA.createTextMessage(msgStr);
producerA.send(msgSent);
try {
msgReceived = consumerB.receive(timeout);
} catch (JMSException e) {
fail("Message Timeout");
}
assertTrue(msgReceived instanceof TextMessage);
assertEquals(((TextMessage)msgReceived).getText(), msgStr);
// Test Failback
new Thread(new Runnable() {
@Override
public void run() {
try {
brokerB1.start();
} catch (Exception e) {
e.printStackTrace();
fail("Failed to start broker");
}
}
}, "BrokerB1 Restarting").start();
brokerB1.waitUntilStarted();
assertTrue(brokerB1.isSlave());
new Thread(new Runnable() {
@Override
public void run() {
try {
brokerB2.stop();
} catch (Exception e) {
e.printStackTrace();
fail("Failed to stop broker");
}
}
}, "BrokerB2 Stopping").start();
brokerB2.waitUntilStopped();
brokerB1.waitUntilStarted();
msgStr = queueName + "-" + System.currentTimeMillis();
msgSent = sessionA.createTextMessage(msgStr);
producerA.send(msgSent);
try {
msgReceived = consumerB.receive(timeout);
} catch (JMSException e) {
fail("Message Timeout");
}
assertTrue(msgReceived instanceof TextMessage);
assertEquals(((TextMessage)msgReceived).getText(), msgStr);
connectionA.close();
connectionB.close();
}
@Override
protected void setUp() throws Exception {
brokerA1 = createBrokerA1();
brokerA1.waitUntilStarted(); // wait to ensure A1 is master
brokerA2 = createBrokerA2();
String connectStringA1 = brokerA1.getTransportConnectors().get(0).getPublishableConnectString();
String connectStringA2 = brokerA2.getTransportConnectors().get(0).getPublishableConnectString();
clientUrlA = "failover:(" + connectStringA1 + "," + connectStringA2 + ")?randomize=false&updateURIsSupported=false";
brokerB1 = createBrokerB1();
brokerB1.waitUntilStarted(); // wait to ensure B1 is master
brokerB2 = createBrokerB2();
String connectStringB1 = brokerB1.getTransportConnectors().get(0).getPublishableConnectString();
String connectStringB2 = brokerB2.getTransportConnectors().get(0).getPublishableConnectString();
clientUrlB = "failover:(" + connectStringB1 + "," + connectStringB2 + ")?randomize=false&updateURIsSupported=false";
}
@Override
protected void tearDown() throws Exception {
if (brokerB2 != null) {
brokerB2.stop();
brokerB2 = null;
}
if (brokerB1 != null) {
brokerB1.stop();
brokerB1 = null;
}
if (brokerA1 != null) {
brokerA1.stop();
brokerA1 = null;
}
if (brokerA2 != null) {
brokerA2.stop();
brokerA2 = null;
}
}
protected BrokerService createBrokerA1() throws Exception {
final BrokerService answer = new BrokerService();
answer.setUseJmx(false);
answer.setBrokerName(BROKER_A1_NAME);
File directory = new File(BROKER_A_DIRECTORY);
IOHelper.deleteChildren(directory);
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(directory);
answer.setPersistent(true);
answer.setPersistenceAdapter(kaha);
NetworkConnector network = answer.addNetworkConnector("masterslave:(" + BROKER_B1_BIND_ADDRESS + "," + BROKER_B2_BIND_ADDRESS + ")?useExponentialBackOff=false&discovered.randomize=true&discovered.maxReconnectAttempts=0");
network.setDuplex(false);
// lazy create
TransportConnector transportConnector = new TransportConnector();
transportConnector.setUri(new URI(BROKER_A1_BIND_ADDRESS));
answer.addConnector(transportConnector);
answer.setUseShutdownHook(false);
answer.start();
return answer;
}
protected BrokerService createBrokerA2() throws Exception {
final BrokerService answer = new BrokerService();
answer.setUseJmx(false);
answer.setBrokerName(BROKER_A2_NAME);
File directory = new File(BROKER_A_DIRECTORY);
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(directory);
answer.setPersistent(true);
answer.setPersistenceAdapter(kaha);
// it is possible to *replace* the default implied failover options via..
NetworkConnector network = answer.addNetworkConnector("masterslave:(" + BROKER_B1_BIND_ADDRESS + "," + BROKER_B2_BIND_ADDRESS + ")");
network.setDuplex(false);
// lazy create
TransportConnector transportConnector = new TransportConnector();
transportConnector.setUri(new URI(BROKER_A2_BIND_ADDRESS));
answer.addConnector(transportConnector);
answer.setUseShutdownHook(false);
new Thread(new Runnable() {
@Override
public void run() {
try {
answer.start();
} catch (Exception e) {
e.printStackTrace();
fail("Failed to start broker");
}
}
}, "BrokerA2 Starting").start();
return answer;
}
protected BrokerService createBrokerB1() throws Exception {
final BrokerService answer = new BrokerService();
answer.setUseJmx(false);
answer.setBrokerName(BROKER_B1_NAME);
File directory = new File(BROKER_B_DIRECTORY);
IOHelper.deleteChildren(directory);
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(directory);
answer.setPersistent(true);
answer.setPersistenceAdapter(kaha);
NetworkConnector network = answer.addNetworkConnector("masterslave:(" + BROKER_A1_BIND_ADDRESS + "," + BROKER_A2_BIND_ADDRESS + ")");
network.setDuplex(false);
// lazy create
TransportConnector transportConnector = new TransportConnector();
transportConnector.setUri(new URI(BROKER_B1_BIND_ADDRESS));
answer.addConnector(transportConnector);
answer.setUseShutdownHook(false);
answer.start();
return answer;
}
protected BrokerService createBrokerB2() throws Exception {
final BrokerService answer = new BrokerService();
answer.setUseJmx(false);
answer.setBrokerName(BROKER_B2_NAME);
File directory = new File(BROKER_B_DIRECTORY);
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(directory);
answer.setPersistent(true);
answer.setPersistenceAdapter(kaha);
NetworkConnector network = answer.addNetworkConnector("masterslave:(" + BROKER_A1_BIND_ADDRESS + "," + BROKER_A2_BIND_ADDRESS + ")");
network.setDuplex(false);
// lazy create
TransportConnector transportConnector = new TransportConnector();
transportConnector.setUri(new URI(BROKER_B2_BIND_ADDRESS));
answer.addConnector(transportConnector);
answer.setUseShutdownHook(false);
new Thread(new Runnable() {
@Override
public void run() {
try {
answer.start();
} catch (Exception e) {
e.printStackTrace();
fail("Failed to start broker");
}
}
}, "BrokerB2 Starting").start();
return answer;
}
}