mirror of https://github.com/apache/activemq.git
fix issue with size of SimplePriorityMessageDispatchChannel, cause intermittent failure of FailoverConsumerUnconsumedTest
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@967052 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2677c6bf0e
commit
21e641d496
|
@ -216,6 +216,7 @@ public class SimplePriorityMessageDispatchChannel implements MessageDispatchChan
|
||||||
for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
|
for (int i = MAX_PRIORITY - 1; i >= 0; i--) {
|
||||||
List<MessageDispatch> list = lists[i];
|
List<MessageDispatch> list = lists[i];
|
||||||
result.addAll(list);
|
result.addAll(list);
|
||||||
|
size -= list.size();
|
||||||
list.clear();
|
list.clear();
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
|
|
|
@ -168,6 +168,20 @@ public class FailoverConsumerUnconsumedTest {
|
||||||
|
|
||||||
// will be stopped by the plugin
|
// will be stopped by the plugin
|
||||||
broker.waitUntilStopped();
|
broker.waitUntilStopped();
|
||||||
|
|
||||||
|
// verify interrupt
|
||||||
|
assertTrue("add messages dispatched and unconsumed are cleaned up", Wait.waitFor(new Wait.Condition() {
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
int totalUnconsumed = 0;
|
||||||
|
for (TestConsumer testConsumer : testConsumers) {
|
||||||
|
long unconsumed = testConsumer.unconsumedSize();
|
||||||
|
LOG.info(testConsumer.getConsumerId() + " unconsumed: " + unconsumed);
|
||||||
|
totalUnconsumed += unconsumed;
|
||||||
|
}
|
||||||
|
return totalUnconsumed == 0;
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
broker = createBroker(false);
|
broker = createBroker(false);
|
||||||
broker.start();
|
broker.start();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue