mirror of https://github.com/apache/activemq.git
additional fix for http://issues.apache.org/activemq/browse/AMQ-2016
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@739292 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3804d66f6f
commit
51f68fb110
|
@ -233,12 +233,13 @@ public class Queue extends BaseDestination implements Task {
|
|||
// set a flag if this is a first consumer
|
||||
if (consumers.size() == 0) {
|
||||
firstConsumer = true;
|
||||
if (consumersBeforeDispatchStarts != 0) {
|
||||
consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts - 1);
|
||||
}
|
||||
} else {
|
||||
firstConsumer = false;
|
||||
}
|
||||
|
||||
if (consumersBeforeStartsLatch != null) {
|
||||
consumersBeforeStartsLatch.countDown();
|
||||
if (consumersBeforeStartsLatch != null) {
|
||||
consumersBeforeStartsLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
addToConsumerList(sub);
|
||||
|
@ -647,7 +648,6 @@ public class Queue extends BaseDestination implements Task {
|
|||
|
||||
public void setConsumersBeforeDispatchStarts(int consumersBeforeDispatchStarts) {
|
||||
this.consumersBeforeDispatchStarts = consumersBeforeDispatchStarts;
|
||||
consumersBeforeStartsLatch = new CountDownLatch(consumersBeforeDispatchStarts);
|
||||
}
|
||||
|
||||
// Implementation methods
|
||||
|
|
|
@ -85,6 +85,7 @@ public class MessageGroupDelayedTest extends JmsTestSupport {
|
|||
// Setup a destination policy where it takes only 1 message at a time.
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
PolicyEntry policy = new PolicyEntry();
|
||||
log.info("testing with consumersBeforeDispatchStarts=" + consumersBeforeDispatchStarts + " and timeBeforeDispatchStarts=" + timeBeforeDispatchStarts);
|
||||
policy.setConsumersBeforeDispatchStarts(consumersBeforeDispatchStarts);
|
||||
policy.setTimeBeforeDispatchStarts(timeBeforeDispatchStarts);
|
||||
policyMap.setDefaultEntry(policy);
|
||||
|
@ -109,7 +110,7 @@ public class MessageGroupDelayedTest extends JmsTestSupport {
|
|||
|
||||
public void testDelayedDirectConnectionListener() throws Exception {
|
||||
|
||||
for(int i = 0; i < 10; i++) {
|
||||
for(int i = 0; i < 10; i++) {
|
||||
Message msga = session.createTextMessage("hello a");
|
||||
msga.setStringProperty("JMSXGroupID", "A");
|
||||
producer.send(msga);
|
||||
|
@ -153,8 +154,10 @@ public class MessageGroupDelayedTest extends JmsTestSupport {
|
|||
|
||||
for (String worker: messageCount.keySet()) {
|
||||
log.info("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker));
|
||||
assertEquals(10, messageCount.get(worker).intValue());
|
||||
assertEquals(1, messageGroups.get(worker).size());
|
||||
assertEquals("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker)
|
||||
, 10, messageCount.get(worker).intValue());
|
||||
assertEquals("worker " + worker + " received " + messageCount.get(worker) + " messages from groups " + messageGroups.get(worker)
|
||||
, 1, messageGroups.get(worker).size());
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue