resolve AMQ-1995

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@711617 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2008-11-05 17:01:05 +00:00
parent f8eed489fb
commit 7238f80d9b
1 changed files with 15 additions and 10 deletions

View File

@ -48,6 +48,11 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
protected int numProducers = 1; protected int numProducers = 1;
public void testConcurrentProducerRequestReply() throws Exception {
numProducers = 10;
testLoadRequestReply();
}
public void testLoadRequestReply() throws Exception { public void testLoadRequestReply() throws Exception {
for (int i=0; i< numConsumers; i++) { for (int i=0; i< numConsumers; i++) {
serverSession.createConsumer(serverDestination).setMessageListener(new MessageListener() { serverSession.createConsumer(serverDestination).setMessageListener(new MessageListener() {
@ -73,23 +78,24 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
public Producer(int numToSend) { public Producer(int numToSend) {
this.numToSend = numToSend; this.numToSend = numToSend;
} }
public void run() { public void run() {
MessageProducer producer;
try { try {
producer = clientSession.createProducer(serverDestination); Session session = clientConnection.createSession(clientTransactional,
clientTransactional ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(serverDestination);
for (int i =0; i< numToSend; i++) { for (int i =0; i< numToSend; i++) {
TemporaryQueue replyTo = clientSession.createTemporaryQueue(); TemporaryQueue replyTo = session.createTemporaryQueue();
MessageConsumer consumer = clientSession.createConsumer(replyTo); MessageConsumer consumer = session.createConsumer(replyTo);
Message msg = clientSession.createMessage(); Message msg = session.createMessage();
msg.setJMSReplyTo(replyTo); msg.setJMSReplyTo(replyTo);
producer.send(msg); producer.send(msg);
if (clientTransactional) { if (clientTransactional) {
clientSession.commit(); session.commit();
} }
Message reply = consumer.receive(); consumer.receive();
if (clientTransactional) { if (clientTransactional) {
clientSession.commit(); session.commit();
} }
consumer.close(); consumer.close();
if (deleteTempQueue) { if (deleteTempQueue) {
@ -98,7 +104,6 @@ public class TempQueueMemoryTest extends EmbeddedBrokerTestSupport {
// temp queue will be cleaned up on clientConnection.close // temp queue will be cleaned up on clientConnection.close
} }
} }
} catch (IllegalStateException IgnoredAsCanOcurrDuringShutdown) {
} catch (JMSException e) { } catch (JMSException e) {
// TODO Auto-generated catch block // TODO Auto-generated catch block
e.printStackTrace(); e.printStackTrace();