mirror of https://github.com/apache/activemq.git
Adding some network reconnect tests. These are used to validate the our network connections get re-established after a broker restart.
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@423783 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c13a547e1e
commit
2fc3e8ee7f
|
@ -0,0 +1,314 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.network;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.advisory.ConsumerEvent;
|
||||
import org.apache.activemq.advisory.ConsumerEventSource;
|
||||
import org.apache.activemq.advisory.ConsumerListener;
|
||||
import org.apache.activemq.broker.BrokerFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
|
||||
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* These test cases are used to verifiy that network connections get re established in all broker
|
||||
* restart scenarios.
|
||||
*
|
||||
* @author chirino
|
||||
*/
|
||||
public class NetworkReconnectTest extends TestCase {
|
||||
|
||||
private BrokerService producerBroker;
|
||||
private BrokerService consumerBroker;
|
||||
private ActiveMQConnectionFactory producerConnectionFactory;
|
||||
private ActiveMQConnectionFactory consumerConnectionFactory;
|
||||
private Destination destination;
|
||||
private ArrayList connections = new ArrayList();
|
||||
|
||||
public void testMultipleProducerBrokerRestarts() throws Exception {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
testWithProducerBrokerRestart();
|
||||
disposeConsumerConnections();
|
||||
}
|
||||
}
|
||||
|
||||
public void testWithoutRestarts() throws Exception {
|
||||
startProducerBroker();
|
||||
startConsumerBroker();
|
||||
|
||||
MessageConsumer consumer = createConsumer();
|
||||
AtomicInteger counter = createConsumerCounter(producerConnectionFactory);
|
||||
waitForConsumerToArrive(counter);
|
||||
|
||||
String messageId = sendMessage();
|
||||
Message message = consumer.receive(1000);
|
||||
|
||||
assertEquals(messageId, message.getJMSMessageID());
|
||||
|
||||
assertNull( consumer.receiveNoWait() );
|
||||
|
||||
}
|
||||
|
||||
public void testWithProducerBrokerRestart() throws Exception {
|
||||
startProducerBroker();
|
||||
startConsumerBroker();
|
||||
|
||||
MessageConsumer consumer = createConsumer();
|
||||
AtomicInteger counter = createConsumerCounter(producerConnectionFactory);
|
||||
waitForConsumerToArrive(counter);
|
||||
|
||||
String messageId = sendMessage();
|
||||
Message message = consumer.receive(1000);
|
||||
|
||||
assertEquals(messageId, message.getJMSMessageID());
|
||||
assertNull( consumer.receiveNoWait() );
|
||||
|
||||
// Restart the first broker...
|
||||
stopProducerBroker();
|
||||
startProducerBroker();
|
||||
|
||||
counter = createConsumerCounter(producerConnectionFactory);
|
||||
waitForConsumerToArrive(counter);
|
||||
|
||||
messageId = sendMessage();
|
||||
message = consumer.receive(1000);
|
||||
|
||||
assertEquals(messageId, message.getJMSMessageID());
|
||||
assertNull( consumer.receiveNoWait() );
|
||||
|
||||
}
|
||||
|
||||
public void testWithConsumerBrokerRestart() throws Exception {
|
||||
|
||||
startProducerBroker();
|
||||
startConsumerBroker();
|
||||
|
||||
MessageConsumer consumer = createConsumer();
|
||||
AtomicInteger counter = createConsumerCounter(producerConnectionFactory);
|
||||
waitForConsumerToArrive(counter);
|
||||
|
||||
String messageId = sendMessage();
|
||||
Message message = consumer.receive(1000);
|
||||
|
||||
assertEquals(messageId, message.getJMSMessageID());
|
||||
assertNull( consumer.receiveNoWait() );
|
||||
|
||||
// Restart the first broker...
|
||||
stopConsumerBroker();
|
||||
waitForConsumerToLeave(counter);
|
||||
startConsumerBroker();
|
||||
|
||||
consumer = createConsumer();
|
||||
waitForConsumerToArrive(counter);
|
||||
|
||||
messageId = sendMessage();
|
||||
message = consumer.receive(1000);
|
||||
|
||||
assertEquals(messageId, message.getJMSMessageID());
|
||||
assertNull( consumer.receiveNoWait() );
|
||||
|
||||
}
|
||||
|
||||
public void testWithConsumerBrokerStartDelay() throws Exception {
|
||||
|
||||
startConsumerBroker();
|
||||
MessageConsumer consumer = createConsumer();
|
||||
|
||||
Thread.sleep(1000*5);
|
||||
|
||||
startProducerBroker();
|
||||
AtomicInteger counter = createConsumerCounter(producerConnectionFactory);
|
||||
waitForConsumerToArrive(counter);
|
||||
|
||||
String messageId = sendMessage();
|
||||
Message message = consumer.receive(1000);
|
||||
|
||||
assertEquals(messageId, message.getJMSMessageID());
|
||||
|
||||
assertNull( consumer.receiveNoWait() );
|
||||
|
||||
}
|
||||
|
||||
|
||||
public void testWithProducerBrokerStartDelay() throws Exception {
|
||||
|
||||
startProducerBroker();
|
||||
AtomicInteger counter = createConsumerCounter(producerConnectionFactory);
|
||||
|
||||
Thread.sleep(1000*5);
|
||||
|
||||
startConsumerBroker();
|
||||
MessageConsumer consumer = createConsumer();
|
||||
|
||||
waitForConsumerToArrive(counter);
|
||||
|
||||
String messageId = sendMessage();
|
||||
Message message = consumer.receive(1000);
|
||||
|
||||
assertEquals(messageId, message.getJMSMessageID());
|
||||
|
||||
assertNull( consumer.receiveNoWait() );
|
||||
|
||||
}
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
producerConnectionFactory = createProducerConnectionFactory();
|
||||
consumerConnectionFactory = createConsumerConnectionFactory();
|
||||
destination = new ActiveMQQueue("RECONNECT.TEST.QUEUE");
|
||||
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
disposeConsumerConnections();
|
||||
try {
|
||||
stopProducerBroker();
|
||||
} catch (Throwable e) {
|
||||
}
|
||||
try {
|
||||
stopConsumerBroker();
|
||||
} catch (Throwable e) {
|
||||
}
|
||||
}
|
||||
|
||||
protected void disposeConsumerConnections() {
|
||||
for (Iterator iter = connections.iterator(); iter.hasNext();) {
|
||||
Connection connection = (Connection) iter.next();
|
||||
try { connection.close(); } catch (Throwable ignore) {}
|
||||
}
|
||||
}
|
||||
|
||||
protected void startProducerBroker() throws Exception {
|
||||
if( producerBroker==null ) {
|
||||
producerBroker = createFirstBroker();
|
||||
producerBroker.start();
|
||||
}
|
||||
}
|
||||
|
||||
protected void stopProducerBroker() throws Exception {
|
||||
if( producerBroker!=null ) {
|
||||
producerBroker.stop();
|
||||
producerBroker=null;
|
||||
}
|
||||
}
|
||||
|
||||
protected void startConsumerBroker() throws Exception {
|
||||
if( consumerBroker==null ) {
|
||||
consumerBroker = createSecondBroker();
|
||||
consumerBroker.start();
|
||||
}
|
||||
}
|
||||
|
||||
protected void stopConsumerBroker() throws Exception {
|
||||
if( consumerBroker!=null ) {
|
||||
consumerBroker.stop();
|
||||
consumerBroker=null;
|
||||
}
|
||||
}
|
||||
|
||||
protected BrokerService createFirstBroker() throws Exception {
|
||||
return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker1.xml"));
|
||||
}
|
||||
|
||||
protected BrokerService createSecondBroker() throws Exception {
|
||||
return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/reconnect-broker2.xml"));
|
||||
}
|
||||
|
||||
protected ActiveMQConnectionFactory createProducerConnectionFactory() {
|
||||
return new ActiveMQConnectionFactory("vm://broker1");
|
||||
}
|
||||
|
||||
protected ActiveMQConnectionFactory createConsumerConnectionFactory() {
|
||||
return new ActiveMQConnectionFactory("vm://broker2");
|
||||
}
|
||||
|
||||
protected String sendMessage() throws JMSException {
|
||||
Connection connection = null;
|
||||
try {
|
||||
connection = producerConnectionFactory.createConnection();
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
Message message = session.createMessage();
|
||||
producer.send(message);
|
||||
return message.getJMSMessageID();
|
||||
} finally {
|
||||
try { connection.close(); } catch (Throwable ignore) {}
|
||||
}
|
||||
}
|
||||
|
||||
protected MessageConsumer createConsumer() throws JMSException {
|
||||
Connection connection = consumerConnectionFactory.createConnection();
|
||||
connections.add(connection);
|
||||
connection.start();
|
||||
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
return session.createConsumer(destination);
|
||||
}
|
||||
|
||||
protected AtomicInteger createConsumerCounter(ActiveMQConnectionFactory cf) throws Exception {
|
||||
final AtomicInteger rc = new AtomicInteger(0);
|
||||
Connection connection = cf.createConnection();
|
||||
connections.add(connection);
|
||||
connection.start();
|
||||
|
||||
ConsumerEventSource source = new ConsumerEventSource(connection, destination);
|
||||
source.setConsumerListener(new ConsumerListener(){
|
||||
public void onConsumerEvent(ConsumerEvent event) {
|
||||
rc.set(event.getConsumerCount());
|
||||
}
|
||||
});
|
||||
source.start();
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
protected void waitForConsumerToArrive(AtomicInteger consumerCounter) throws InterruptedException {
|
||||
for( int i=0; i < 100; i++ ) {
|
||||
if( consumerCounter.get() > 0 ) {
|
||||
return;
|
||||
}
|
||||
Thread.sleep(50);
|
||||
}
|
||||
fail("The consumer did not arrive.");
|
||||
}
|
||||
|
||||
protected void waitForConsumerToLeave(AtomicInteger consumerCounter) throws InterruptedException {
|
||||
for( int i=0; i < 100; i++ ) {
|
||||
if( consumerCounter.get() == 0 ) {
|
||||
return;
|
||||
}
|
||||
Thread.sleep(50);
|
||||
}
|
||||
fail("The consumer did not leave.");
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,90 @@
|
|||
/**
|
||||
*
|
||||
* Copyright 2005-2006 The Apache Software Foundation
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.network;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
|
||||
import org.apache.activemq.broker.BrokerFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
|
||||
|
||||
/**
|
||||
* Test network reconnects over SSH tunnels. This case can be especially tricky since the SSH tunnels
|
||||
* fool the TCP transport into thinking that they are initially connected.
|
||||
*
|
||||
* @author chirino
|
||||
*/
|
||||
public class SSHTunnelNetworkReconnectTest extends NetworkReconnectTest {
|
||||
|
||||
ArrayList processes = new ArrayList();
|
||||
|
||||
|
||||
protected BrokerService createFirstBroker() throws Exception {
|
||||
return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/ssh-reconnect-broker1.xml"));
|
||||
}
|
||||
|
||||
protected BrokerService createSecondBroker() throws Exception {
|
||||
return BrokerFactory.createBroker(new URI("xbean:org/apache/activemq/network/ssh-reconnect-broker2.xml"));
|
||||
}
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
startProcess("ssh -Nn -L60006:localhost:61616 localhost");
|
||||
startProcess("ssh -Nn -L60007:localhost:61617 localhost");
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
for (Iterator iter = processes.iterator(); iter.hasNext();) {
|
||||
Process p = (Process) iter.next();
|
||||
p.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
private void startProcess(String command) throws IOException {
|
||||
final Process process = Runtime.getRuntime().exec(command);
|
||||
processes.add(process);
|
||||
new Thread("stdout: "+command){
|
||||
public void run() {
|
||||
try {
|
||||
InputStream is = process.getInputStream();
|
||||
int c;
|
||||
while((c=is.read())>=0) {
|
||||
System.out.write(c);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
}
|
||||
}
|
||||
}.start();
|
||||
new Thread("stderr: "+command){
|
||||
public void run() {
|
||||
try {
|
||||
InputStream is = process.getErrorStream();
|
||||
int c;
|
||||
while((c=is.read())>=0) {
|
||||
System.err.write(c);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
}
|
||||
}
|
||||
}.start();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Copyright 2005-2006 The Apache Software Foundation
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<beans xmlns="http://activemq.org/config/1.0">
|
||||
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
|
||||
|
||||
<broker brokerName="broker1" persistent="false" useShutdownHook="false" useJmx="false">
|
||||
|
||||
<transportConnectors>
|
||||
<transportConnector uri="tcp://localhost:61616"/>
|
||||
<transportConnector uri="vm://broker1"/>
|
||||
</transportConnectors>
|
||||
|
||||
<networkConnectors>
|
||||
<networkConnector uri="static:(tcp://localhost:61617)"/>
|
||||
</networkConnectors>
|
||||
|
||||
</broker>
|
||||
|
||||
</beans>
|
||||
|
|
@ -0,0 +1,35 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Copyright 2005-2006 The Apache Software Foundation
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<beans xmlns="http://activemq.org/config/1.0">
|
||||
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
|
||||
|
||||
<broker brokerName="broker2" persistent="false" useShutdownHook="false" useJmx="false">
|
||||
|
||||
<transportConnectors>
|
||||
<transportConnector uri="tcp://localhost:61617"/>
|
||||
<transportConnector uri="vm://broker2"/>
|
||||
</transportConnectors>
|
||||
|
||||
<networkConnectors>
|
||||
<networkConnector uri="static:(tcp://localhost:61616)"/>
|
||||
</networkConnectors>
|
||||
|
||||
</broker>
|
||||
|
||||
|
||||
</beans>
|
||||
|
|
@ -0,0 +1,34 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Copyright 2005-2006 The Apache Software Foundation
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<beans xmlns="http://activemq.org/config/1.0">
|
||||
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
|
||||
|
||||
<broker brokerName="broker1" persistent="false" useShutdownHook="false" useJmx="false">
|
||||
|
||||
<transportConnectors>
|
||||
<transportConnector uri="tcp://localhost:61616"/>
|
||||
<transportConnector uri="vm://broker1"/>
|
||||
</transportConnectors>
|
||||
|
||||
<networkConnectors>
|
||||
<networkConnector uri="static:(tcp://localhost:60007)"/>
|
||||
</networkConnectors>
|
||||
|
||||
</broker>
|
||||
|
||||
</beans>
|
||||
|
|
@ -0,0 +1,35 @@
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<!--
|
||||
Copyright 2005-2006 The Apache Software Foundation
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
<beans xmlns="http://activemq.org/config/1.0">
|
||||
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
|
||||
|
||||
<broker brokerName="broker2" persistent="false" useShutdownHook="false" useJmx="false">
|
||||
|
||||
<transportConnectors>
|
||||
<transportConnector uri="tcp://localhost:61617"/>
|
||||
<transportConnector uri="vm://broker2"/>
|
||||
</transportConnectors>
|
||||
|
||||
<networkConnectors>
|
||||
<networkConnector uri="static:(tcp://localhost:60006)"/>
|
||||
</networkConnectors>
|
||||
|
||||
</broker>
|
||||
|
||||
|
||||
</beans>
|
||||
|
Loading…
Reference in New Issue