mirror of https://github.com/apache/activemq.git
AMQ-6947 - Update Queue metrics on expiration
The updated dropMessage method only decrements the destination metrics if a message is removed from the pagedInMessages list to prevent duplicate updates. There is also a case where we still need to update metrics if the message never makes it into the pagedInMessages list in the first place and that is on expiration so this patch fixes that. A couple existing tests found this issue.
This commit is contained in:
parent
021c82859c
commit
6e468b4540
|
@ -1872,8 +1872,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
pagedInMessagesLock.writeLock().lock();
|
pagedInMessagesLock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
if (pagedInMessages.remove(reference) != null) {
|
if (pagedInMessages.remove(reference) != null) {
|
||||||
getDestinationStatistics().getDequeues().increment();
|
updateMetricsOnMessageDrop();
|
||||||
getDestinationStatistics().getMessages().decrement();
|
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
pagedInMessagesLock.writeLock().unlock();
|
pagedInMessagesLock.writeLock().unlock();
|
||||||
|
@ -1881,6 +1880,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void updateMetricsOnMessageDrop() {
|
||||||
|
getDestinationStatistics().getDequeues().increment();
|
||||||
|
getDestinationStatistics().getMessages().decrement();
|
||||||
|
}
|
||||||
|
|
||||||
public void messageExpired(ConnectionContext context, MessageReference reference) {
|
public void messageExpired(ConnectionContext context, MessageReference reference) {
|
||||||
messageExpired(context, null, reference);
|
messageExpired(context, null, reference);
|
||||||
}
|
}
|
||||||
|
@ -2037,6 +2041,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
||||||
if (processExpired && ref.isExpired()) {
|
if (processExpired && ref.isExpired()) {
|
||||||
if (broker.isExpired(ref)) {
|
if (broker.isExpired(ref)) {
|
||||||
messageExpired(createConnectionContext(), ref);
|
messageExpired(createConnectionContext(), ref);
|
||||||
|
|
||||||
|
//We need to update the metrics here because the drop message
|
||||||
|
//method will only update if the message was removed from the
|
||||||
|
//pagedInMessages list which won't happen in this case
|
||||||
|
updateMetricsOnMessageDrop();
|
||||||
} else {
|
} else {
|
||||||
ref.decrementReferenceCount();
|
ref.decrementReferenceCount();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue