From 2fc3e8ee7f672cf75e8a028bd72780d923287ab4 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Thu, 20 Jul 2006 05:44:53 +0000 Subject: [PATCH] Adding some network reconnect tests. These are used to validate the our network connections get re-established after a broker restart. git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@423783 13f79535-47bb-0310-9956-ffa450edef68 --- .../network/NetworkReconnectTest.java | 314 ++++++++++++++++++ .../SSHTunnelNetworkReconnectTest.java | 90 +++++ .../activemq/network/reconnect-broker1.xml | 34 ++ .../activemq/network/reconnect-broker2.xml | 35 ++ .../network/ssh-reconnect-broker1.xml | 34 ++ .../network/ssh-reconnect-broker2.xml | 35 ++ 6 files changed, 542 insertions(+) create mode 100644 activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java create mode 100644 activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java create mode 100644 activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml create mode 100644 activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml create mode 100644 activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml create mode 100644 activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml diff --git a/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java b/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java new file mode 100644 index 0000000000..e5ea637478 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/network/NetworkReconnectTest.java @@ -0,0 +1,314 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import java.net.URI; +import java.util.ArrayList; +import java.util.Iterator; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.advisory.ConsumerEvent; +import org.apache.activemq.advisory.ConsumerEventSource; +import org.apache.activemq.advisory.ConsumerListener; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; + +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger; + +/** + * These test cases are used to verifiy that network connections get re established in all broker + * restart scenarios. + * + * @author chirino + */ +public class NetworkReconnectTest extends TestCase { + + private BrokerService producerBroker; + private BrokerService consumerBroker; + private ActiveMQConnectionFactory producerConnectionFactory; + private ActiveMQConnectionFactory consumerConnectionFactory; + private Destination destination; + private ArrayList connections = new ArrayList(); + + public void testMultipleProducerBrokerRestarts() throws Exception { + for (int i = 0; i < 10; i++) { + testWithProducerBrokerRestart(); + disposeConsumerConnections(); + } + } + + public void testWithoutRestarts() throws Exception { + startProducerBroker(); + startConsumerBroker(); + + MessageConsumer consumer = createConsumer(); + AtomicInteger counter = createConsumerCounter(producerConnectionFactory); + waitForConsumerToArrive(counter); + + String messageId = sendMessage(); + Message message = consumer.receive(1000); + + assertEquals(messageId, message.getJMSMessageID()); + + assertNull( consumer.receiveNoWait() ); + + } + + public void testWithProducerBrokerRestart() throws Exception { + startProducerBroker(); + startConsumerBroker(); + + MessageConsumer consumer = createConsumer(); + AtomicInteger counter = createConsumerCounter(producerConnectionFactory); + waitForConsumerToArrive(counter); + + String messageId = sendMessage(); + Message message = consumer.receive(1000); + + assertEquals(messageId, message.getJMSMessageID()); + assertNull( consumer.receiveNoWait() ); + + // Restart the first broker... + stopProducerBroker(); + startProducerBroker(); + + counter = createConsumerCounter(producerConnectionFactory); + waitForConsumerToArrive(counter); + + messageId = sendMessage(); + message = consumer.receive(1000); + + assertEquals(messageId, message.getJMSMessageID()); + assertNull( consumer.receiveNoWait() ); + + } + + public void testWithConsumerBrokerRestart() throws Exception { + + startProducerBroker(); + startConsumerBroker(); + + MessageConsumer consumer = createConsumer(); + AtomicInteger counter = createConsumerCounter(producerConnectionFactory); + waitForConsumerToArrive(counter); + + String messageId = sendMessage(); + Message message = consumer.receive(1000); + + assertEquals(messageId, message.getJMSMessageID()); + assertNull( consumer.receiveNoWait() ); + + // Restart the first broker... + stopConsumerBroker(); + waitForConsumerToLeave(counter); + startConsumerBroker(); + + consumer = createConsumer(); + waitForConsumerToArrive(counter); + + messageId = sendMessage(); + message = consumer.receive(1000); + + assertEquals(messageId, message.getJMSMessageID()); + assertNull( consumer.receiveNoWait() ); + + } + + public void testWithConsumerBrokerStartDelay() throws Exception { + + startConsumerBroker(); + MessageConsumer consumer = createConsumer(); + + Thread.sleep(1000*5); + + startProducerBroker(); + AtomicInteger counter = createConsumerCounter(producerConnectionFactory); + waitForConsumerToArrive(counter); + + String messageId = sendMessage(); + Message message = consumer.receive(1000); + + assertEquals(messageId, message.getJMSMessageID()); + + assertNull( consumer.receiveNoWait() ); + + } + + + public void testWithProducerBrokerStartDelay() throws Exception { + + startProducerBroker(); + AtomicInteger counter = createConsumerCounter(producerConnectionFactory); + + Thread.sleep(1000*5); + + startConsumerBroker(); + MessageConsumer consumer = createConsumer(); + + waitForConsumerToArrive(counter); + + String messageId = sendMessage(); + Message message = consumer.receive(1000); + + assertEquals(messageId, message.getJMSMessageID()); + + assertNull( consumer.receiveNoWait() ); + + } + + protected void setUp() throws Exception { + producerConnectionFactory = createProducerConnectionFactory(); + consumerConnectionFactory = createConsumerConnectionFactory(); + destination = new ActiveMQQueue("RECONNECT.TEST.QUEUE"); + + } + + protected void tearDown() throws Exception { + disposeConsumerConnections(); + try { + stopProducerBroker(); + } catch (Throwable e) { + } + try { + stopConsumerBroker(); + } catch (Throwable e) { + } + } + + protected void disposeConsumerConnections() { + for (Iterator iter = connections.iterator(); iter.hasNext();) { + Connection connection = (Connection) iter.next(); + try { connection.close(); } catch (Throwable ignore) {} + } + } + + protected void startProducerBroker() throws Exception { + if( producerBroker==null ) { + producerBroker = createFirstBroker(); + producerBroker.start(); + } + } + + protected void stopProducerBroker() throws Exception { + if( producerBroker!=null ) { + producerBroker.stop(); + producerBroker=null; + } + } + + protected void startConsumerBroker() throws Exception { + if( consumerBroker==null ) { + consumerBroker = createSecondBroker(); + consumerBroker.start(); + } + } + + protected void stopConsumerBroker() throws Exception { + if( consumerBroker!=null ) { + consumerBroker.stop(); + consumerBroker=null; + } + } + + protected BrokerService createFirstBroker() throws Exception { + return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker1.xml")); + } + + protected BrokerService createSecondBroker() throws Exception { + return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker2.xml")); + } + + protected ActiveMQConnectionFactory createProducerConnectionFactory() { + return new ActiveMQConnectionFactory("vm://broker1"); + } + + protected ActiveMQConnectionFactory createConsumerConnectionFactory() { + return new ActiveMQConnectionFactory("vm://broker2"); + } + + protected String sendMessage() throws JMSException { + Connection connection = null; + try { + connection = producerConnectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + Message message = session.createMessage(); + producer.send(message); + return message.getJMSMessageID(); + } finally { + try { connection.close(); } catch (Throwable ignore) {} + } + } + + protected MessageConsumer createConsumer() throws JMSException { + Connection connection = consumerConnectionFactory.createConnection(); + connections.add(connection); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + return session.createConsumer(destination); + } + + protected AtomicInteger createConsumerCounter(ActiveMQConnectionFactory cf) throws Exception { + final AtomicInteger rc = new AtomicInteger(0); + Connection connection = cf.createConnection(); + connections.add(connection); + connection.start(); + + ConsumerEventSource source = new ConsumerEventSource(connection, destination); + source.setConsumerListener(new ConsumerListener(){ + public void onConsumerEvent(ConsumerEvent event) { + rc.set(event.getConsumerCount()); + } + }); + source.start(); + + return rc; + } + + protected void waitForConsumerToArrive(AtomicInteger consumerCounter) throws InterruptedException { + for( int i=0; i < 100; i++ ) { + if( consumerCounter.get() > 0 ) { + return; + } + Thread.sleep(50); + } + fail("The consumer did not arrive."); + } + + protected void waitForConsumerToLeave(AtomicInteger consumerCounter) throws InterruptedException { + for( int i=0; i < 100; i++ ) { + if( consumerCounter.get() == 0 ) { + return; + } + Thread.sleep(50); + } + fail("The consumer did not leave."); + } + +} diff --git a/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java b/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java new file mode 100644 index 0000000000..70595b2fcc --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/network/SSHTunnelNetworkReconnectTest.java @@ -0,0 +1,90 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Iterator; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; + + +/** + * Test network reconnects over SSH tunnels. This case can be especially tricky since the SSH tunnels + * fool the TCP transport into thinking that they are initially connected. + * + * @author chirino + */ +public class SSHTunnelNetworkReconnectTest extends NetworkReconnectTest { + + ArrayList processes = new ArrayList(); + + + protected BrokerService createFirstBroker() throws Exception { + return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/ssh-reconnect-broker1.xml")); + } + + protected BrokerService createSecondBroker() throws Exception { + return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/ssh-reconnect-broker2.xml")); + } + + protected void setUp() throws Exception { + startProcess("ssh -Nn -L60006:localhost:61616 localhost"); + startProcess("ssh -Nn -L60007:localhost:61617 localhost"); + super.setUp(); + } + + protected void tearDown() throws Exception { + super.tearDown(); + for (Iterator iter = processes.iterator(); iter.hasNext();) { + Process p = (Process) iter.next(); + p.destroy(); + } + } + + private void startProcess(String command) throws IOException { + final Process process = Runtime.getRuntime().exec(command); + processes.add(process); + new Thread("stdout: "+command){ + public void run() { + try { + InputStream is = process.getInputStream(); + int c; + while((c=is.read())>=0) { + System.out.write(c); + } + } catch (IOException e) { + } + } + }.start(); + new Thread("stderr: "+command){ + public void run() { + try { + InputStream is = process.getErrorStream(); + int c; + while((c=is.read())>=0) { + System.err.write(c); + } + } catch (IOException e) { + } + } + }.start(); + } +} diff --git a/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml b/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml new file mode 100644 index 0000000000..bbe45e50cb --- /dev/null +++ b/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker1.xml @@ -0,0 +1,34 @@ + + + + + + + + + + + + + + + + + + + + diff --git a/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml b/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml new file mode 100644 index 0000000000..79a6e64ea6 --- /dev/null +++ b/activemq-core/src/test/resources/org/apache/activemq/network/reconnect-broker2.xml @@ -0,0 +1,35 @@ + + + + + + + + + + + + + + + + + + + + + diff --git a/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml b/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml new file mode 100644 index 0000000000..69c621c037 --- /dev/null +++ b/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker1.xml @@ -0,0 +1,34 @@ + + + + + + + + + + + + + + + + + + + + diff --git a/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml b/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml new file mode 100644 index 0000000000..b9cc6017fe --- /dev/null +++ b/activemq-core/src/test/resources/org/apache/activemq/network/ssh-reconnect-broker2.xml @@ -0,0 +1,35 @@ + + + + + + + + + + + + + + + + + + + + +