mirror of
https://github.com/apache/activemq.git
synced 2025-02-18 07:56:20 +00:00
https://issues.apache.org/jira/browse/AMQ-4598 - negative queue size after purge
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1496430 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
06dae40d94
commit
e3fb8be2ba
@ -563,6 +563,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (MessageReference ref : unAckedMessages) {
|
for (MessageReference ref : unAckedMessages) {
|
||||||
QueueMessageReference qmr = (QueueMessageReference) ref;
|
QueueMessageReference qmr = (QueueMessageReference) ref;
|
||||||
if (qmr.getLockOwner() == sub) {
|
if (qmr.getLockOwner() == sub) {
|
||||||
@ -581,8 +582,10 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (!qmr.isDropped()) {
|
||||||
redeliveredWaitingDispatch.addMessageLast(qmr);
|
redeliveredWaitingDispatch.addMessageLast(qmr);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
if (sub instanceof QueueBrowserSubscription) {
|
if (sub instanceof QueueBrowserSubscription) {
|
||||||
((QueueBrowserSubscription)sub).decrementQueueRef();
|
((QueueBrowserSubscription)sub).decrementQueueRef();
|
||||||
browserDispatches.remove(sub);
|
browserDispatches.remove(sub);
|
||||||
@ -1800,6 +1803,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void dropMessage(QueueMessageReference reference) {
|
private void dropMessage(QueueMessageReference reference) {
|
||||||
|
if (!reference.isDropped()) {
|
||||||
reference.drop();
|
reference.drop();
|
||||||
destinationStatistics.getMessages().decrement();
|
destinationStatistics.getMessages().decrement();
|
||||||
pagedInMessagesLock.writeLock().lock();
|
pagedInMessagesLock.writeLock().lock();
|
||||||
@ -1809,6 +1813,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||||||
pagedInMessagesLock.writeLock().unlock();
|
pagedInMessagesLock.writeLock().unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void messageExpired(ConnectionContext context, MessageReference reference) {
|
public void messageExpired(ConnectionContext context, MessageReference reference) {
|
||||||
messageExpired(context, null, reference);
|
messageExpired(context, null, reference);
|
||||||
|
@ -134,6 +134,14 @@ public class QueuePurgeTest extends CombinationTestSupport {
|
|||||||
assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0,
|
assertEquals("Queue size is not zero, it's " + proxy.getQueueSize(), 0,
|
||||||
proxy.getQueueSize());
|
proxy.getQueueSize());
|
||||||
assertEquals("usage goes to duck", 0, proxy.getMemoryPercentUsage());
|
assertEquals("usage goes to duck", 0, proxy.getMemoryPercentUsage());
|
||||||
|
Message msg;
|
||||||
|
do {
|
||||||
|
msg = consumer.receive(1000);
|
||||||
|
if (msg != null) {
|
||||||
|
msg.acknowledge();
|
||||||
|
}
|
||||||
|
} while (msg != null);
|
||||||
|
assertEquals("Queue size not valid", 0, proxy.getQueueSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
private QueueViewMBean getProxyToQueueViewMBean()
|
private QueueViewMBean getProxyToQueueViewMBean()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user