Wait the perfect amount of time by using count down latches.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@491455 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-12-31 17:33:56 +00:00
parent a810f4300a
commit 3070d33275
3 changed files with 54 additions and 19 deletions

View File

@ -43,6 +43,7 @@ import java.util.Iterator;
import java.util.Collections;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.net.URI;
/**
@ -178,10 +179,14 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
return null;
}
protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception {
protected MessageConsumer createConsumer(String brokerName, Destination dest) throws Exception {
return createConsumer(brokerName, dest, null);
}
protected MessageConsumer createConsumer(String brokerName, Destination dest, CountDownLatch latch) throws Exception {
BrokerItem brokerItem = (BrokerItem)brokers.get(brokerName);
if (brokerItem != null) {
return brokerItem.createConsumer(dest);
return brokerItem.createConsumer(dest, latch);
}
return null;
}
@ -321,19 +326,26 @@ public class JmsMultipleBrokersTestSupport extends CombinationTestSupport {
}
public MessageConsumer createConsumer(Destination dest) throws Exception {
return createConsumer(dest, null);
}
public MessageConsumer createConsumer(Destination dest, CountDownLatch latch) throws Exception {
Connection c = createConnection();
c.start();
Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
return createConsumer(dest, s);
return createConsumerWithSession(dest, s, latch);
}
public MessageConsumer createConsumer(Destination dest, Session sess) throws Exception {
public MessageConsumer createConsumerWithSession(Destination dest, Session sess) throws Exception {
return createConsumerWithSession(dest, sess, null);
}
public MessageConsumer createConsumerWithSession(Destination dest, Session sess, CountDownLatch latch) throws Exception {
MessageConsumer client = sess.createConsumer(dest);
MessageIdList messageIdList = new MessageIdList();
messageIdList.setCountDownLatch(latch);
messageIdList.setParent(allMessages);
client.setMessageListener(messageIdList);
consumers.put(client, messageIdList);
return client;
}

View File

@ -17,14 +17,17 @@
*/
package org.apache.activemq.usecases;
import org.apache.activemq.util.MessageIdList;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import java.util.Map;
import java.util.HashMap;
import java.net.URI;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.util.MessageIdList;
/**
* @version $Revision: 1.1.1.1 $
@ -45,14 +48,20 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport
// Setup topic destination
Destination dest = createDestination("TEST.FOO", true);
CountDownLatch latch = new CountDownLatch(
BROKER_COUNT * PRODUCER_COUNT *
BROKER_COUNT * CONSUMER_COUNT *
MESSAGE_COUNT);
// Setup consumers
for (int i=1; i<=BROKER_COUNT; i++) {
for (int j=0; j<CONSUMER_COUNT; j++) {
consumerMap.put("Consumer:" + i + ":" + j, createConsumer("Broker" + i, dest));
consumerMap.put("Consumer:" + i + ":" + j, createConsumer("Broker" + i, dest, latch));
}
}
//wait for consumers to get propagated
Thread.sleep(2000);
Thread.sleep(5000);
// Send messages
for (int i=1; i<=BROKER_COUNT; i++) {
@ -60,32 +69,37 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport
sendMessages("Broker" + i, dest, MESSAGE_COUNT);
}
}
assertTrue("Missing "+latch.getCount()+ " messages", latch.await(30, TimeUnit.SECONDS));
// Get message count
for (int i=1; i<=BROKER_COUNT; i++) {
for (int j=0; j<CONSUMER_COUNT; j++) {
MessageIdList msgs = getConsumerMessages("Broker" + i, (MessageConsumer)consumerMap.get("Consumer:" + i + ":" + j));
msgs.waitForMessagesToArrive(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT);
assertEquals(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT, msgs.getMessageCount());
}
}
}
public void testQueueAllConnected() throws Exception {
bridgeAllBrokers();
startAllBrokers();
// Setup topic destination
Destination dest = createDestination("TEST.FOO", false);
CountDownLatch latch = new CountDownLatch(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT);
// Setup consumers
for (int i=1; i<=BROKER_COUNT; i++) {
for (int j=0; j<CONSUMER_COUNT; j++) {
consumerMap.put("Consumer:" + i + ":" + j, createConsumer("Broker" + i, dest));
consumerMap.put("Consumer:" + i + ":" + j, createConsumer("Broker" + i, dest, latch));
}
}
Thread.sleep(2000);
//wait for consumers to get propagated
Thread.sleep(5000);
// Send messages
for (int i=1; i<=BROKER_COUNT; i++) {
@ -95,8 +109,8 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport
}
// Wait for messages to be delivered
Thread.sleep(2000);
assertTrue("Missing "+latch.getCount()+ " messages", latch.await(30, TimeUnit.SECONDS));
// Get message count
int totalMsg = 0;
for (int i=1; i<=BROKER_COUNT; i++) {
@ -105,7 +119,6 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport
totalMsg += msgs.getMessageCount();
}
}
assertEquals(BROKER_COUNT * PRODUCER_COUNT * MESSAGE_COUNT, totalMsg);
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.util;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import javax.jms.JMSException;
import javax.jms.Message;
@ -51,6 +52,8 @@ public class MessageIdList extends Assert implements MessageListener {
private MessageListener parent;
private long maximumDuration = 15000L;
private CountDownLatch countDownLatch;
public MessageIdList() {
this(new Object());
}
@ -99,6 +102,9 @@ public class MessageIdList extends Assert implements MessageListener {
public void onMessage(Message message) {
String id=null;
try {
if( countDownLatch != null )
countDownLatch.countDown();
id = message.getJMSMessageID();
synchronized (semaphore) {
messageIds.add(id);
@ -231,4 +237,8 @@ public class MessageIdList extends Assert implements MessageListener {
this.maximumDuration=maximumDuration;
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
}