From 0d9c588ed7df63812f0c5bf0a5c3dc6f119cd208 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Wed, 27 Jan 2010 17:46:08 +0000 Subject: [PATCH] resolve intermitent failure on loaded machines, consumer could complete before producer thread got to do any work at all git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@903758 13f79535-47bb-0310-9956-ffa450edef68 --- .../bugs/TempStorageBlockedBrokerTest.java | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java index ab137ab374..a25ef8e27e 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/TempStorageBlockedBrokerTest.java @@ -17,7 +17,11 @@ package org.apache.activemq.bugs; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.io.File; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.DeliveryMode; @@ -80,6 +84,7 @@ public class TempStorageBlockedBrokerTest { final Connection producerConnection = factory.createConnection(); producerConnection.start(); + final CountDownLatch producerHasSentTenMessages = new CountDownLatch(10); Thread producingThread = new Thread("Producing thread") { @Override public void run() { @@ -92,10 +97,12 @@ public class TempStorageBlockedBrokerTest { producer.send(message); messagesSent.incrementAndGet(); + producerHasSentTenMessages.countDown(); Thread.sleep(10); - LOG.info("Sent Message " + idx); - LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage()); - + if (idx != 0 && idx%100 == 0) { + LOG.info("Sent Message " + idx); + LOG.info("Temp Store Usage " + broker.getSystemUsage().getTempUsage().getUsage()); + } } producer.close(); session.close(); @@ -106,12 +113,16 @@ public class TempStorageBlockedBrokerTest { }; producingThread.start(); + assertTrue("producer has sent 10 in a reasonable time", producerHasSentTenMessages.await(30, TimeUnit.SECONDS)); + int count = 0; Message m = null; while ((m = consumer.receive(messageReceiveTimeout)) != null) { count++; - LOG.info("Recieved Message (" + count + "):" + m); + if (count != 0 && count%10 == 0) { + LOG.info("Recieved Message (" + count + "):" + m); + } messagesConsumed.incrementAndGet(); try { Thread.sleep(100); @@ -120,19 +131,13 @@ public class TempStorageBlockedBrokerTest { } } - LOG.info("Connection Timeout: Retrying"); - - // session.close(); - // consumerConnection.close(); - // - // consumerConnection2.start(); - // session2 = consumerConnection2.createSession(false, - // Session.AUTO_ACKNOWLEDGE); - // consumer = session2.createConsumer(destination); + LOG.info("Connection Timeout: Retrying.. count: " + count); while ((m = consumer.receive(messageReceiveTimeout)) != null) { count++; - LOG.info("Recieved Message (" + count + "):" + m); + if (count != 0 && count%10 == 0) { + LOG.info("Recieved Message (" + count + "):" + m); + } messagesConsumed.incrementAndGet(); try { Thread.sleep(100);