more transactional and concurrent tests for master slave to try and reproduce AMQ-1983

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@707644 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2008-10-24 14:31:01 +00:00
parent f3e603959a
commit 1651992d10
2 changed files with 101 additions and 25 deletions

View File

@ -19,6 +19,7 @@ package org.apache.activemq.advisory;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@ -157,4 +158,28 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest {
testLoadRequestReply();
}
public void testLoadRequestReplyWithTransactions() throws Exception {
serverTransactional = clientTransactional = true;
messagesToSend = 100;
reInitialiseSessions();
testLoadRequestReply();
}
public void testConcurrentConsumerLoadRequestReplyWithTransactions() throws Exception {
serverTransactional = true;
numConsumers = numProducers = 10;
messagesToSend = 100;
reInitialiseSessions();
testLoadRequestReply();
}
protected void reInitialiseSessions() throws Exception {
// reinitialize so they can respect the transactional flags
serverSession.close();
clientSession.close();
serverSession = serverConnection.createSession(serverTransactional,
serverTransactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
clientSession = clientConnection.createSession(clientTransactional,
clientTransactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
}
}

View File

@ -16,8 +16,11 @@
*/
package org.apache.activemq.advisory;
import java.util.Vector;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@ -39,15 +42,23 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
protected Destination serverDestination;
protected int messagesToSend = 2000;
protected boolean deleteTempQueue = true;
protected boolean serverTransactional = false;
protected boolean clientTransactional = false;
protected int numConsumers = 1;
protected int numProducers = 1;
public void testLoadRequestReply() throws Exception {
MessageConsumer serverConsumer = serverSession.createConsumer(serverDestination);
serverConsumer.setMessageListener(new MessageListener() {
for (int i=0; i< numConsumers; i++) {
serverSession.createConsumer(serverDestination).setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
try {
Destination replyTo = msg.getJMSReplyTo();
MessageProducer producer = serverSession.createProducer(replyTo);
producer.send(replyTo, msg);
if (serverTransactional) {
serverSession.commit();
}
producer.close();
} catch (Exception e) {
// TODO Auto-generated catch block
@ -55,15 +66,31 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
}
}
});
}
MessageProducer producer = clientSession.createProducer(serverDestination);
for (int i =0; i< messagesToSend; i++) {
class Producer extends Thread {
private int numToSend;
public Producer(int numToSend) {
this.numToSend = numToSend;
}
public void run() {
MessageProducer producer;
try {
producer = clientSession.createProducer(serverDestination);
for (int i =0; i< numToSend; i++) {
TemporaryQueue replyTo = clientSession.createTemporaryQueue();
MessageConsumer consumer = clientSession.createConsumer(replyTo);
Message msg = clientSession.createMessage();
msg.setJMSReplyTo(replyTo);
producer.send(msg);
if (clientTransactional) {
clientSession.commit();
}
Message reply = consumer.receive();
if (clientTransactional) {
clientSession.commit();
}
consumer.close();
if (deleteTempQueue) {
replyTo.delete();
@ -71,6 +98,17 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
// temp queue will be cleaned up on clientConnection.close
}
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Vector<Thread> threads = new Vector<Thread>(numProducers);
for (int i=0; i<numProducers ; i++) {
threads.add(new Producer(messagesToSend/numProducers));
}
startAndJoinThreads(threads);
clientSession.close();
serverSession.close();
@ -91,7 +129,16 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
//serverDestination +
assertTrue(rb.getDestinationMap().size()==6);
assertEquals(6, rb.getDestinationMap().size());
}
private void startAndJoinThreads(Vector<Thread> threads) throws Exception {
for (Thread thread: threads) {
thread.start();
}
for (Thread thread: threads) {
thread.join();
}
}
protected void setUp() throws Exception {
@ -108,9 +155,13 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
protected void tearDown() throws Exception {
super.tearDown();
serverTransactional = clientTransactional = false;
numConsumers = numProducers = 1;
messagesToSend = 2000;
}
protected Destination createDestination() {
return new ActiveMQQueue(getClass().getName());
}
}