test case for https://issues.apache.org/activemq/browse/AMQ-2303 - durable consumers recovery

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@788068 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-06-24 16:09:17 +00:00
parent e2778fbf7d
commit 29e3702cfd
1 changed files with 132 additions and 2 deletions

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.bugs;
import java.io.File;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@ -26,18 +27,30 @@ import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicPublisher;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.activemq.store.amq.AMQPersistenceAdapterFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -56,7 +69,120 @@ public class DurableConsumerTest extends TestCase {
protected byte[] payload = new byte[1024*32];
protected ConnectionFactory factory;
protected Vector<Exception> exceptions = new Vector<Exception>();
private static final String TOPIC_NAME = "failoverTopic";
private static final String CONNECTION_URL = "failover:(tcp://localhost:61616,tcp://localhost:61617)";
private class SimpleTopicSubscriber implements MessageListener, ExceptionListener {
private TopicConnection topicConnection = null;
private String clientId;
public SimpleTopicSubscriber(String connectionURL, String clientId, String topicName) {
ActiveMQConnectionFactory topicConnectionFactory = null;
TopicSession topicSession = null;
Topic topic = null;
TopicSubscriber topicSubscriber = null;
topicConnectionFactory = new ActiveMQConnectionFactory(connectionURL);
try {
topic = new ActiveMQTopic(topicName);
topicConnection = topicConnectionFactory.createTopicConnection();
topicConnection.setClientID((clientId));
topicConnection.start();
topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topicSubscriber = topicSession.createDurableSubscriber(topic, (clientId));
this.clientId = clientId;
topicSubscriber.setMessageListener(this);
} catch (JMSException e) {
e.printStackTrace();
}
}
public void onMessage(Message arg0) {
}
public void closeConnection() {
if (topicConnection != null) {
try {
topicConnection.close();
} catch (JMSException e) {
}
}
}
public void onException(JMSException exception) {
exceptions.add(exception);
}
}
private class MessagePublisher implements Runnable {
private boolean shouldPublish = true;
public void run() {
TopicConnectionFactory topicConnectionFactory = null;
TopicConnection topicConnection = null;
TopicSession topicSession = null;
Topic topic = null;
TopicPublisher topicPublisher = null;
Message message = null;
topicConnectionFactory = new ActiveMQConnectionFactory(CONNECTION_URL);
try {
topic = new ActiveMQTopic(TOPIC_NAME);
topicConnection = topicConnectionFactory.createTopicConnection();
topicSession = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
topicPublisher = topicSession.createPublisher(topic);
message = topicSession.createMessage();
} catch( Exception ex ) {
exceptions.add(ex);
}
while (shouldPublish) {
try {
topicPublisher.publish(message, DeliveryMode.PERSISTENT, 1, 2 * 60 * 60 * 1000);
} catch (JMSException ex) {
exceptions.add(ex);
}
try {
Thread.sleep(1);
} catch (Exception ex) {
}
}
}
}
public void testFailover() throws Exception {
Thread publisherThread = new Thread( new MessagePublisher() );
publisherThread.start();
for( int i = 0; i < 100; i++ ) {
final int id = i;
Thread thread = new Thread( new Runnable() {
public void run() {
SimpleTopicSubscriber sub = new SimpleTopicSubscriber(CONNECTION_URL, System.currentTimeMillis()+"-"+id, TOPIC_NAME);
}
} );
thread.start();
LOG.info( "subscribed " + i + " of 100" );
}
Thread.sleep(5000);
broker.stop();
broker = createBroker(false);
Thread.sleep(5000);
assertEquals(0, exceptions.size());
}
public void testConcurrentDurableConsumer() throws Exception {
factory = createConnectionFactory();
@ -131,7 +257,7 @@ public class DurableConsumerTest extends TestCase {
LOG.info("Sent msg " + i);
}
}
Thread.sleep(2000);
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
@ -220,10 +346,14 @@ public class DurableConsumerTest extends TestCase {
answer.start();
return answer;
}
protected void configureBroker(BrokerService answer,boolean deleteStore) throws Exception {
File dataDirFile = new File("target/"+ getName());
AMQPersistenceAdapterFactory fact = new AMQPersistenceAdapterFactory();
fact.setDataDirectory(dataDirFile);
fact.setForceRecoverReferenceStore(true);
answer.setPersistenceAdapter(fact.createPersistenceAdapter());
answer.setDeleteAllMessagesOnStartup(deleteStore);
answer.addConnector(bindAddress);
answer.setUseShutdownHook(false);