From ae6a2b87eaa9325d2437db2ccd5152f6db0d2ad3 Mon Sep 17 00:00:00 2001 From: Thiago Kronig Date: Wed, 10 Jun 2015 20:37:58 -0300 Subject: [PATCH] ARTEMIS-127 Fix some concurrency idioms for ActimeMQ Tests --- .../JmsCreateConsumerInOnMessageTest.java | 6 ++-- .../JmsMultipleClientsTestSupport.java | 4 +-- .../activemq/LargeMessageTestSupport.java | 8 ++--- .../OnePrefetchAsyncConsumerTest.java | 2 +- .../region/QueueResendDuringShutdownTest.java | 4 +-- .../org/apache/activemq/bugs/AMQ2149Test.java | 2 +- .../org/apache/activemq/bugs/AMQ4607Test.java | 2 +- .../apache/activemq/bugs/CraigsBugTest.java | 7 ++-- .../activemq/bugs/amq1974/TryJmsClient.java | 5 ++- .../activemq/bugs/amq1974/TryJmsManager.java | 5 ++- .../apache/activemq/spring/ConsumerBean.java | 33 +++++++++++-------- .../activemq/spring/SpringConsumer.java | 8 ++--- .../store/kahadb/plist/PListTest.java | 2 +- .../activemq/streams/JMSInputStreamTest.java | 2 +- .../activemq/transport/TopicClusterTest.java | 4 +-- .../transport/failover/AMQ1925Test.java | 27 ++++----------- .../transport/udp/UdpTestSupport.java | 6 ++-- ...ConcurrentProducerDurableConsumerTest.java | 14 +++----- .../ConcurrentProducerQueueConsumerTest.java | 14 +++----- .../MultiBrokersMultiClientsTest.java | 2 +- .../NoDuplicateOnTopicNetworkTest.java | 2 +- .../usecases/ReliableReconnectTest.java | 8 ++--- .../VerifyNetworkConsumersDisconnectTest.java | 2 +- .../apache/activemq/util/MessageIdList.java | 32 +++++++++++------- 24 files changed, 95 insertions(+), 106 deletions(-) diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java index 7a219e2cc3..c0a4f5fd68 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsCreateConsumerInOnMessageTest.java @@ -36,7 +36,7 @@ public class JmsCreateConsumerInOnMessageTest extends TestSupport implements Mes private MessageConsumer testConsumer; private MessageProducer producer; private Topic topic; - private Object lock = new Object(); + private final Object lock = new Object(); /* * @see junit.framework.TestCase#setUp() @@ -71,8 +71,8 @@ public class JmsCreateConsumerInOnMessageTest extends TestSupport implements Mes public void testCreateConsumer() throws Exception { Message msg = super.createMessage(); producer.send(msg); - if (testConsumer == null) { - synchronized (lock) { + synchronized (lock) { + while(testConsumer == null) { lock.wait(3000); } } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java index 5eaab8dda7..5c73a6e4e0 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsMultipleClientsTestSupport.java @@ -83,8 +83,6 @@ public class JmsMultipleClientsTestSupport { protected List connections = Collections.synchronizedList(new ArrayList()); protected MessageIdList allMessagesList = new MessageIdList(); - private AtomicInteger producerLock; - protected void startProducers(Destination dest, int msgCount) throws Exception { startProducers(createConnectionFactory(), dest, msgCount); } @@ -92,7 +90,7 @@ public class JmsMultipleClientsTestSupport { protected void startProducers(final ConnectionFactory factory, final Destination dest, final int msgCount) throws Exception { // Use concurrent send if (useConcurrentSend) { - producerLock = new AtomicInteger(producerCount); + final AtomicInteger producerLock = new AtomicInteger(producerCount); for (int i = 0; i < producerCount; i++) { Thread t = new Thread(new Runnable() { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java index fc772185cf..d1ab8a5ba7 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/LargeMessageTestSupport.java @@ -61,7 +61,7 @@ public class LargeMessageTestSupport extends ClientTestSupport implements Messag protected int deliveryMode = DeliveryMode.PERSISTENT; protected IdGenerator idGen = new IdGenerator(); protected boolean validMessageConsumption = true; - protected AtomicInteger messageCount = new AtomicInteger(0); + protected final AtomicInteger messageCount = new AtomicInteger(0); protected int prefetchValue = 10000000; @@ -182,9 +182,9 @@ public class LargeMessageTestSupport extends ClientTestSupport implements Messag producer.send(msg); } long now = System.currentTimeMillis(); - while (now + 60000 > System.currentTimeMillis() && messageCount.get() < MESSAGE_COUNT) { - LOG.info("message count = " + messageCount); - synchronized (messageCount) { + synchronized (messageCount) { + while (now + 60000 > System.currentTimeMillis() && messageCount.get() < MESSAGE_COUNT) { + LOG.info("message count = " + messageCount); messageCount.wait(1000); } } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java index 085119847f..26c6bf102a 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/OnePrefetchAsyncConsumerTest.java @@ -154,7 +154,7 @@ public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport { } private class TestServerSession implements ServerSession { - TestServerSessionPool pool; + final TestServerSessionPool pool; Session session; public TestServerSession(TestServerSessionPool pool) throws JMSException { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java index 0439fa847c..c7154a9768 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/region/QueueResendDuringShutdownTest.java @@ -54,7 +54,7 @@ public class QueueResendDuringShutdownTest { private Connection producerConnection; private Queue queue; - private Object messageReceiveSync = new Object(); + private final Object messageReceiveSync = new Object(); private int receiveCount; @Before @@ -239,7 +239,7 @@ public class QueueResendDuringShutdownTest { protected void waitForMessage (long delayMs) { try { synchronized ( this.messageReceiveSync ) { - if ( this.receiveCount == 0 ) { + while ( this.receiveCount == 0 ) { this.messageReceiveSync.wait(delayMs); } } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java index b2eba61366..c28d3ad295 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2149Test.java @@ -563,7 +563,7 @@ public class AMQ2149Test { } class TeardownTask implements Callable { - private Object brokerLock; + private final Object brokerLock; private BrokerService broker; public TeardownTask(Object brokerLock, BrokerService broker) { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java index 265b692e6f..b567c93c05 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4607Test.java @@ -49,7 +49,7 @@ public class AMQ4607Test extends JmsMultipleBrokersTestSupport implements Uncaug public boolean duplex = true; protected Map consumerMap; - Map unhandeledExceptions = new HashMap(); + final Map unhandeledExceptions = new HashMap(); private void assertNoUnhandeledExceptions() { for( Entry e: unhandeledExceptions.entrySet()) { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java index f956da6d75..d71a9e42a6 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/CraigsBugTest.java @@ -25,6 +25,9 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.EmbeddedBrokerTestSupport; import org.apache.activemq.command.ActiveMQQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + public class CraigsBugTest extends EmbeddedBrokerTestSupport { private String connectionUri; @@ -49,9 +52,7 @@ public class CraigsBugTest extends EmbeddedBrokerTestSupport { conn.start(); try { - synchronized (this) { - wait(3000); - } + new CountDownLatch(1).await(3, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java index c8b4503c4d..1f25109d5c 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsClient.java @@ -25,6 +25,7 @@ import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent; import javax.jms.*; import java.io.File; import java.net.URISyntaxException; +import java.util.concurrent.CountDownLatch; public class TryJmsClient { @@ -59,9 +60,7 @@ public class TryJmsClient startMessageSend(); - synchronized(this) { - this.wait(); - } + new CountDownLatch(1).await(); } private void startUsageMonitor(final BrokerService brokerService) { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java index 3f5898719d..c8eb7b34d8 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/amq1974/TryJmsManager.java @@ -25,6 +25,7 @@ import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent; import javax.jms.*; import java.io.File; import java.net.URISyntaxException; +import java.util.concurrent.CountDownLatch; public class TryJmsManager { @@ -59,9 +60,7 @@ public class TryJmsManager { startMessageConsumer(); - synchronized(this) { - this.wait(); - } + new CountDownLatch(1).await(); } private void startUsageMonitor(final BrokerService brokerService) { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java index 8f22c33d40..4e1ab59b32 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/ConsumerBean.java @@ -73,14 +73,19 @@ public class ConsumerBean extends Assert implements MessageListener { long start = System.currentTimeMillis(); - try { - if (hasReceivedMessage()) { - synchronized (messages) { + synchronized(messages) + { + try + { + while (hasReceivedMessage()) + { messages.wait(4000); } } - } catch (InterruptedException e) { - LOG.info("Caught: " + e); + catch (InterruptedException e) + { + LOG.info("Caught: " + e); + } } long end = System.currentTimeMillis() - start; @@ -101,18 +106,18 @@ public class ConsumerBean extends Assert implements MessageListener { LOG.info("Waiting for (" + maxRemainingMessageCount + ") message(s) to arrive"); long start = System.currentTimeMillis(); long endTime = start + maxWaitTime; - while (maxRemainingMessageCount > 0) { - try { - synchronized (messages) { + synchronized (messages) { + while (maxRemainingMessageCount > 0) { + try { messages.wait(1000); + if (hasReceivedMessages(messageCount) || System.currentTimeMillis() > endTime) { + break; + } + } catch (InterruptedException e) { + LOG.info("Caught: " + e); } - if (hasReceivedMessages(messageCount) || System.currentTimeMillis() > endTime) { - break; - } - } catch (InterruptedException e) { - LOG.info("Caught: " + e); + maxRemainingMessageCount = Math.max(0, messageCount - messages.size()); } - maxRemainingMessageCount = Math.max(0, messageCount - messages.size()); } long end = System.currentTimeMillis() - start; LOG.info("End of wait for " + end + " millis"); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/SpringConsumer.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/SpringConsumer.java index 118e0361c8..ed0a48af8b 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/SpringConsumer.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/spring/SpringConsumer.java @@ -43,13 +43,13 @@ public class SpringConsumer extends ConsumerBean implements MessageListener { try { ConnectionFactory factory = template.getConnectionFactory(); - connection = factory.createConnection(); + final Connection c = connection = factory.createConnection(); // we might be a reusable connection in spring // so lets only set the client ID once if its not set - synchronized (connection) { - if (connection.getClientID() == null) { - connection.setClientID(myId); + synchronized (c) { + if (c.getClientID() == null) { + c.setClientID(myId); } } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java index 555503e8e7..71e4618691 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/store/kahadb/plist/PListTest.java @@ -617,7 +617,7 @@ public class PListTest { } } - Map locks = new HashMap(); + final Map locks = new HashMap(); private Object plistLocks(PList plist) { Object lock = null; diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java index f3926628f1..b07c8cce6b 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/streams/JMSInputStreamTest.java @@ -250,7 +250,7 @@ public class JMSInputStreamTest extends JmsTestSupport { } out.flush(); synchronized (complete) { - if (!complete.get()) { + while (!complete.get()) { complete.wait(30000); } } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java index 4db7c239c8..26c215a47b 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/TopicClusterTest.java @@ -55,7 +55,7 @@ public class TopicClusterTest extends TestCase implements MessageListener { protected Destination destination; protected boolean topic = true; - protected AtomicInteger receivedMessageCount = new AtomicInteger(0); + protected final AtomicInteger receivedMessageCount = new AtomicInteger(0); protected int deliveryMode = DeliveryMode.NON_PERSISTENT; protected MessageProducer[] producers; protected Connection[] connections; @@ -166,7 +166,7 @@ public class TopicClusterTest extends TestCase implements MessageListener { } } synchronized (receivedMessageCount) { - if (receivedMessageCount.get() < expectedReceiveCount()) { + while (receivedMessageCount.get() < expectedReceiveCount()) { receivedMessageCount.wait(20000); } } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java index dfb5dfde47..d03dbcd41c 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/AMQ1925Test.java @@ -21,6 +21,7 @@ import java.net.URI; import java.util.ArrayList; import java.util.Collection; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.Connection; @@ -73,14 +74,12 @@ public class AMQ1925Test extends TestCase implements ExceptionListener { // The runnable is likely to interrupt during the session#commit, since // this takes the longest - final Object starter = new Object(); + final CountDownLatch starter = new CountDownLatch(1); final AtomicBoolean restarted = new AtomicBoolean(); new Thread(new Runnable() { public void run() { try { - synchronized (starter) { - starter.wait(); - } + starter.await(); // Simulate broker failure & restart bs.stop(); @@ -97,9 +96,6 @@ public class AMQ1925Test extends TestCase implements ExceptionListener { } }).start(); - synchronized (starter) { - starter.notifyAll(); - } for (int i = 0; i < MESSAGE_COUNT; i++) { Message message = consumer.receive(500); assertNotNull("No Message " + i + " found", message); @@ -108,9 +104,7 @@ public class AMQ1925Test extends TestCase implements ExceptionListener { assertFalse("Timing problem, restarted too soon", restarted .get()); if (i == 10) { - synchronized (starter) { - starter.notifyAll(); - } + starter.countDown(); } if (i > MESSAGE_COUNT - 100) { assertTrue("Timing problem, restarted too late", restarted @@ -143,14 +137,12 @@ public class AMQ1925Test extends TestCase implements ExceptionListener { // The runnable is likely to interrupt during the session#commit, since // this takes the longest - final Object starter = new Object(); + final CountDownLatch starter = new CountDownLatch(1); final AtomicBoolean restarted = new AtomicBoolean(); new Thread(new Runnable() { public void run() { try { - synchronized (starter) { - starter.wait(); - } + starter.await(); // Simulate broker failure & restart bs.stop(); @@ -167,9 +159,6 @@ public class AMQ1925Test extends TestCase implements ExceptionListener { } }).start(); - synchronized (starter) { - starter.notifyAll(); - } Collection results = new ArrayList(MESSAGE_COUNT); for (int i = 0; i < MESSAGE_COUNT; i++) { Message message1 = consumer1.receive(20); @@ -191,9 +180,7 @@ public class AMQ1925Test extends TestCase implements ExceptionListener { assertFalse("Timing problem, restarted too soon", restarted .get()); if (i == 10) { - synchronized (starter) { - starter.notifyAll(); - } + starter.countDown(); } if (i > MESSAGE_COUNT - 50) { assertTrue("Timing problem, restarted too late", restarted diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java index 1d770de24e..7defe954ce 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java @@ -46,7 +46,7 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen protected Transport producer; protected Transport consumer; - protected Object lock = new Object(); + protected final Object lock = new Object(); protected Command receivedCommand; protected TransportServer server; protected boolean large; @@ -251,10 +251,10 @@ public abstract class UdpTestSupport extends TestCase implements TransportListen Command answer = null; synchronized (lock) { answer = receivedCommand; - if (answer == null) { + while (answer == null) { lock.wait(waitForCommandTimeout); + answer = receivedCommand; } - answer = receivedCommand; } assertNotNull("Should have received a Command by now!", answer); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java index e1035a6350..0e71dfea3a 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerDurableConsumerTest.java @@ -107,7 +107,7 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport { // periodically start a durable sub that has a backlog final int consumersToActivate = 5; - final Object addConsumerSignal = new Object(); + final CountDownLatch addConsumerSignal = new CountDownLatch(1); Executors.newCachedThreadPool(new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -120,9 +120,7 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport { MessageConsumer consumer = null; for (int i = 0; i < consumersToActivate; i++) { LOG.info("Waiting for add signal from producer..."); - synchronized (addConsumerSignal) { - addConsumerSignal.wait(30 * 60 * 1000); - } + addConsumerSignal.await(30, TimeUnit.MINUTES); TimedMessageListener listener = new TimedMessageListener(); consumer = createDurableSubscriber(factory.createConnection(), destination, "consumer" + (i + 1)); LOG.info("Created consumer " + consumer); @@ -254,7 +252,7 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport { final int numIterations, Session session, MessageProducer producer, - Object addConsumerSignal) throws Exception { + CountDownLatch addConsumerSignal) throws Exception { long start; long count = 0; double batchMax = 0, max = 0, sum = 0; @@ -269,10 +267,8 @@ public class ConcurrentProducerDurableConsumerTest extends TestSupport { max = Math.max(max, (System.currentTimeMillis() - singleSendstart)); if (++count % 500 == 0) { if (addConsumerSignal != null) { - synchronized (addConsumerSignal) { - addConsumerSignal.notifyAll(); - LOG.info("Signalled add consumer"); - } + addConsumerSignal.countDown(); + LOG.info("Signalled add consumer"); } }; if (count % 5000 == 0) { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java index 34807c64f2..931fb55f08 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ConcurrentProducerQueueConsumerTest.java @@ -95,7 +95,7 @@ public class ConcurrentProducerQueueConsumerTest extends TestSupport // periodically start a queue consumer final int consumersToActivate = 5; - final Object addConsumerSignal = new Object(); + final CountDownLatch addConsumerSignal = new CountDownLatch(1); Executors.newCachedThreadPool(new ThreadFactory() { @Override public Thread newThread(Runnable r) { @@ -108,9 +108,7 @@ public class ConcurrentProducerQueueConsumerTest extends TestSupport MessageConsumer consumer = null; for (int i = 0; i < consumersToActivate; i++) { LOG.info("Waiting for add signal from producer..."); - synchronized (addConsumerSignal) { - addConsumerSignal.wait(30 * 60 * 1000); - } + addConsumerSignal.await(30, TimeUnit.MINUTES); TimedMessageListener listener = new TimedMessageListener(); consumer = createConsumer(factory.createConnection(), destination); LOG.info("Created consumer " + consumer); @@ -241,7 +239,7 @@ public class ConcurrentProducerQueueConsumerTest extends TestSupport final int numIterations, Session session, MessageProducer producer, - Object addConsumerSignal) throws Exception { + CountDownLatch addConsumerSignal) throws Exception { long start; long count = 0; double batchMax = 0, max = 0, sum = 0; @@ -257,10 +255,8 @@ public class ConcurrentProducerQueueConsumerTest extends TestSupport max = Math.max(max, (System.currentTimeMillis() - singleSendstart)); if (++count % 500 == 0) { if (addConsumerSignal != null) { - synchronized (addConsumerSignal) { - addConsumerSignal.notifyAll(); - LOG.info("Signalled add consumer"); - } + addConsumerSignal.countDown(); + LOG.info("Signalled add consumer"); } } ; diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java index bd5c4c89ce..df02d9e7ee 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/MultiBrokersMultiClientsTest.java @@ -44,7 +44,7 @@ public class MultiBrokersMultiClientsTest extends JmsMultipleBrokersTestSupport private static final Logger LOG = LoggerFactory.getLogger(MultiBrokersMultiClientsTest.class); protected Map consumerMap; - Map unhandeledExceptions = new HashMap(); + final Map unhandeledExceptions = new HashMap(); public void testTopicAllConnected() throws Exception { bridgeAllBrokers(); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java index 2aa614d9c1..6d28f3483b 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/NoDuplicateOnTopicNetworkTest.java @@ -265,7 +265,7 @@ public class NoDuplicateOnTopicNetworkTest extends CombinationTestSupport { private MessageConsumer consumer; private final String durableID = "DURABLE_ID"; - private List receivedStrings = Collections.synchronizedList(new ArrayList()); + private final List receivedStrings = Collections.synchronizedList(new ArrayList()); private int numMessages = 10; private CountDownLatch recievedLatch = new CountDownLatch(numMessages); diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ReliableReconnectTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ReliableReconnectTest.java index 05fd5f847a..9dc0032816 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ReliableReconnectTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/ReliableReconnectTest.java @@ -47,8 +47,8 @@ public class ReliableReconnectTest extends org.apache.activemq.TestSupport { protected int deliveryMode = DeliveryMode.PERSISTENT; protected String consumerClientId; protected Destination destination; - protected AtomicBoolean closeBroker = new AtomicBoolean(false); - protected AtomicInteger messagesReceived = new AtomicInteger(0); + protected final AtomicBoolean closeBroker = new AtomicBoolean(false); + protected final AtomicInteger messagesReceived = new AtomicInteger(0); protected BrokerService broker; protected int firstBatch = MESSAGE_COUNT / 10; private IdGenerator idGen = new IdGenerator(); @@ -159,7 +159,7 @@ public class ReliableReconnectTest extends org.apache.activemq.TestSupport { connection.close(); spawnConsumer(); synchronized (closeBroker) { - if (!closeBroker.get()) { + while (!closeBroker.get()) { closeBroker.wait(); } } @@ -168,7 +168,7 @@ public class ReliableReconnectTest extends org.apache.activemq.TestSupport { startBroker(false); // System.err.println("Started Broker again"); synchronized (messagesReceived) { - if (messagesReceived.get() < MESSAGE_COUNT) { + while (messagesReceived.get() < MESSAGE_COUNT) { messagesReceived.wait(60000); } } diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java index 9eeb28c527..eb5a3e28c6 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java @@ -51,7 +51,7 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest public static final int TIMEOUT = 30000; protected Map consumerMap; - Map unhandledExceptions = new HashMap(); + final Map unhandledExceptions = new HashMap(); private void assertNoUnhandledExceptions() { for( Entry e: unhandledExceptions.entrySet()) { diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java index 7140a8650c..c644c67dd7 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/util/MessageIdList.java @@ -140,20 +140,28 @@ public class MessageIdList extends Assert implements MessageListener { long start = System.currentTimeMillis(); - for (int i = 0; i < messageCount; i++) { - try { - if (hasReceivedMessages(messageCount)) { - break; - } - long duration = System.currentTimeMillis() - start; - if (duration >= maximumDuration) { - break; - } - synchronized (semaphore) { + synchronized (semaphore) + { + for (int i = 0; i < messageCount; i++) + { + try + { + if (hasReceivedMessages(messageCount)) + { + break; + } + long duration = System.currentTimeMillis() - start; + if (duration >= maximumDuration) + { + break; + } + semaphore.wait(maximumDuration - duration); } - } catch (InterruptedException e) { - LOG.info("Caught: " + e); + catch (InterruptedException e) + { + LOG.info("Caught: " + e); + } } } long end = System.currentTimeMillis() - start;