diff --git a/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java b/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java index df20109d22..a481fb7f09 100644 --- a/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/advisory/MasterSlaveTempQueueMemoryTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.advisory; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -156,5 +157,29 @@ public class MasterSlaveTempQueueMemoryTest extends TempQueueMemoryTest { messagesToSend = 10; testLoadRequestReply(); } + + public void testLoadRequestReplyWithTransactions() throws Exception { + serverTransactional = clientTransactional = true; + messagesToSend = 100; + reInitialiseSessions(); + testLoadRequestReply(); + } + + public void testConcurrentConsumerLoadRequestReplyWithTransactions() throws Exception { + serverTransactional = true; + numConsumers = numProducers = 10; + messagesToSend = 100; + reInitialiseSessions(); + testLoadRequestReply(); + } + protected void reInitialiseSessions() throws Exception { + // reinitialize so they can respect the transactional flags + serverSession.close(); + clientSession.close(); + serverSession = serverConnection.createSession(serverTransactional, + serverTransactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); + clientSession = clientConnection.createSession(clientTransactional, + clientTransactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java b/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java index e729075678..400f36cafb 100644 --- a/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/advisory/TempQueueMemoryTest.java @@ -16,8 +16,11 @@ */ package org.apache.activemq.advisory; +import java.util.Vector; + import javax.jms.Connection; import javax.jms.Destination; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; @@ -39,38 +42,73 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport { protected Destination serverDestination; protected int messagesToSend = 2000; protected boolean deleteTempQueue = true; + protected boolean serverTransactional = false; + protected boolean clientTransactional = false; + protected int numConsumers = 1; + protected int numProducers = 1; + public void testLoadRequestReply() throws Exception { - MessageConsumer serverConsumer = serverSession.createConsumer(serverDestination); - serverConsumer.setMessageListener(new MessageListener() { - public void onMessage(Message msg) { + for (int i=0; i< numConsumers; i++) { + serverSession.createConsumer(serverDestination).setMessageListener(new MessageListener() { + public void onMessage(Message msg) { + try { + Destination replyTo = msg.getJMSReplyTo(); + MessageProducer producer = serverSession.createProducer(replyTo); + producer.send(replyTo, msg); + if (serverTransactional) { + serverSession.commit(); + } + producer.close(); + } catch (Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + }); + } + + class Producer extends Thread { + private int numToSend; + public Producer(int numToSend) { + this.numToSend = numToSend; + } + public void run() { + MessageProducer producer; try { - Destination replyTo = msg.getJMSReplyTo(); - MessageProducer producer = serverSession.createProducer(replyTo); - producer.send(replyTo, msg); - producer.close(); - } catch (Exception e) { + producer = clientSession.createProducer(serverDestination); + + for (int i =0; i< numToSend; i++) { + TemporaryQueue replyTo = clientSession.createTemporaryQueue(); + MessageConsumer consumer = clientSession.createConsumer(replyTo); + Message msg = clientSession.createMessage(); + msg.setJMSReplyTo(replyTo); + producer.send(msg); + if (clientTransactional) { + clientSession.commit(); + } + Message reply = consumer.receive(); + if (clientTransactional) { + clientSession.commit(); + } + consumer.close(); + if (deleteTempQueue) { + replyTo.delete(); + } else { + // temp queue will be cleaned up on clientConnection.close + } + } + } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } - }); - - MessageProducer producer = clientSession.createProducer(serverDestination); - for (int i =0; i< messagesToSend; i++) { - TemporaryQueue replyTo = clientSession.createTemporaryQueue(); - MessageConsumer consumer = clientSession.createConsumer(replyTo); - Message msg = clientSession.createMessage(); - msg.setJMSReplyTo(replyTo); - producer.send(msg); - Message reply = consumer.receive(); - consumer.close(); - if (deleteTempQueue) { - replyTo.delete(); - } else { - // temp queue will be cleaned up on clientConnection.close - } } + Vector threads = new Vector(numProducers); + for (int i=0; i threads) throws Exception { + for (Thread thread: threads) { + thread.start(); + } + for (Thread thread: threads) { + thread.join(); + } } protected void setUp() throws Exception { @@ -108,9 +155,13 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport { protected void tearDown() throws Exception { super.tearDown(); + serverTransactional = clientTransactional = false; + numConsumers = numProducers = 1; + messagesToSend = 2000; } protected Destination createDestination() { return new ActiveMQQueue(getClass().getName()); } + }