From 51f68fb11017845ca1e01c657f7b927a2ca359a9 Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Fri, 30 Jan 2009 14:31:08 +0000 Subject: [PATCH] additional fix for http://issues.apache.org/activemq/browse/AMQ-2016 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@739292 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/activemq/broker/region/Queue.java | 12 ++++++------ .../activemq/usecases/MessageGroupDelayedTest.java | 9 ++++++--- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index aa7a3d59b3..5da454a257 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -233,12 +233,13 @@ public class Queue extends BaseDestination implements Task { // set a flag if this is a first consumer if (consumers.size() == 0) { firstConsumer = true; + if (consumersBeforeDispatchStarts != 0) { + consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1); + } } else { - firstConsumer = false; - } - - if (consumersBeforeStartsLatch != null) { - consumersBeforeStartsLatch.countDown(); + if (consumersBeforeStartsLatch != null) { + consumersBeforeStartsLatch.countDown(); + } } addToConsumerList(sub); @@ -647,7 +648,6 @@ public class Queue extends BaseDestination implements Task { public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) { this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts; - consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts); } // Implementation methods diff --git a/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java b/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java index 8dfcf69c0f..1ae3c3f7c6 100644 --- a/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/usecases/MessageGroupDelayedTest.java @@ -85,6 +85,7 @@ public class MessageGroupDelayedTest extends JmsTestSupport { // Setup a destination policy where it takes only 1 message at a time. PolicyMap policyMap = new PolicyMap(); PolicyEntry policy = new PolicyEntry(); + log.info("testing with consumersBeforeDispatchStarts=" + consumersBeforeDispatchStarts + " and timeBeforeDispatchStarts=" + timeBeforeDispatchStarts); policy.setConsumersBeforeDispatchStarts(consumersBeforeDispatchStarts); policy.setTimeBeforeDispatchStarts(timeBeforeDispatchStarts); policyMap.setDefaultEntry(policy); @@ -109,7 +110,7 @@ public class MessageGroupDelayedTest extends JmsTestSupport { public void testDelayedDirectConnectionListener() throws Exception { - for(int i = 0; i < 10; i++) { + for(int i = 0; i < 10; i++) { Message msga = session.createTextMessage("hello a"); msga.setStringProperty("JMSXGroupID", "A"); producer.send(msga); @@ -153,8 +154,10 @@ public class MessageGroupDelayedTest extends JmsTestSupport { for (String worker: messageCount.keySet()) { log.info("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker)); - assertEquals(10, messageCount.get(worker).intValue()); - assertEquals(1, messageGroups.get(worker).size()); + assertEquals("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker) + , 10, messageCount.get(worker).intValue()); + assertEquals("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker) + , 1, messageGroups.get(worker).size()); } }