git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1458514 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-03-19 21:02:24 +00:00
parent 84eab9b5bb
commit 6cdf756e91
1 changed files with 18 additions and 7 deletions

View File

@ -130,9 +130,14 @@ public class AMQ4351Test extends BrokerTestSupport {
final AtomicLong size = new AtomicLong(); final AtomicLong size = new AtomicLong();
final AtomicBoolean done = new AtomicBoolean(); final AtomicBoolean done = new AtomicBoolean();
CountDownLatch doneLatch = new CountDownLatch(1); CountDownLatch doneLatch = new CountDownLatch(1);
CountDownLatch started;
CountDownLatch finished;
public ConsumingClient(String name) {
public ConsumingClient(String name, CountDownLatch started, CountDownLatch finished) {
this.name = name; this.name = name;
this.started = started;
this.finished = finished;
} }
public void start() { public void start() {
@ -141,6 +146,7 @@ public class AMQ4351Test extends BrokerTestSupport {
} }
public void stopAsync() { public void stopAsync() {
finished.countDown();
done.set(true); done.set(true);
} }
@ -158,6 +164,7 @@ public class AMQ4351Test extends BrokerTestSupport {
try { try {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumer = session.createDurableSubscriber(destination, name, null, false); MessageConsumer consumer = session.createDurableSubscriber(destination, name, null, false);
started.countDown();
while( !done.get() ) { while( !done.get() ) {
Message msg = consumer.receive(100); Message msg = consumer.receive(100);
if(msg!=null ) { if(msg!=null ) {
@ -181,24 +188,28 @@ public class AMQ4351Test extends BrokerTestSupport {
public void testAMQ4351() throws InterruptedException, JMSException { public void testAMQ4351() throws InterruptedException, JMSException {
LOG.info("Start test."); LOG.info("Start test.");
int subs = 100;
CountDownLatch startedLatch = new CountDownLatch(subs - 1);
CountDownLatch shutdownLatch = new CountDownLatch(subs - 4);
ProducingClient producer = new ProducingClient(); ProducingClient producer = new ProducingClient();
ConsumingClient listener1 = new ConsumingClient("subscriber-1"); ConsumingClient listener1 = new ConsumingClient("subscriber-1", startedLatch, shutdownLatch);
ConsumingClient listener2 = new ConsumingClient("subscriber-2"); ConsumingClient listener2 = new ConsumingClient("subscriber-2", startedLatch, shutdownLatch);
ConsumingClient listener3 = new ConsumingClient("subscriber-3"); ConsumingClient listener3 = new ConsumingClient("subscriber-3", startedLatch, shutdownLatch);
try { try {
listener1.start(); listener1.start();
listener2.start(); listener2.start();
listener3.start(); listener3.start();
int subs = 100;
List<ConsumingClient> subscribers = new ArrayList<ConsumingClient>(subs); List<ConsumingClient> subscribers = new ArrayList<ConsumingClient>(subs);
for (int i = 4; i < subs; i++) { for (int i = 4; i < subs; i++) {
ConsumingClient client = new ConsumingClient("subscriber-" + i); ConsumingClient client = new ConsumingClient("subscriber-" + i, startedLatch, shutdownLatch);
subscribers.add(client); subscribers.add(client);
client.start(); client.start();
} }
startedLatch.await(10, TimeUnit.SECONDS);
LOG.info("All subscribers started."); LOG.info("All subscribers started.");
producer.sendMessage(); producer.sendMessage();
@ -207,12 +218,12 @@ public class AMQ4351Test extends BrokerTestSupport {
for (ConsumingClient client : subscribers) { for (ConsumingClient client : subscribers) {
client.stopAsync(); client.stopAsync();
} }
shutdownLatch.await(10, TimeUnit.SECONDS);
// Start producing messages for 10 minutes, at high rate // Start producing messages for 10 minutes, at high rate
LOG.info("Starting mass message producer..."); LOG.info("Starting mass message producer...");
producer.start(); producer.start();
long lastSize = listener1.size.get(); long lastSize = listener1.size.get();
for( int i=0 ; i < 10; i++ ) { for( int i=0 ; i < 10; i++ ) {
Thread.sleep(1000); Thread.sleep(1000);