ARTEMIS-647 track 'killed' msg count on queue

A 'killed' message is one that has been sent to a dead-letter address
or otherwise removed from the queue due to exceeding the max delivery
attempts.
This commit is contained in:
jbertram 2016-07-27 10:04:37 -05:00 committed by Clebert Suconic
parent 53c56efca9
commit 3914f1aa8b
16 changed files with 244 additions and 24 deletions

View File

@ -104,6 +104,12 @@ public interface QueueControl {
@Attribute(desc = "number of messages expired from this queue since it was created")
long getMessagesExpired();
/**
* Returns the number of messages removed from this queue since it was created due to exceeding the max delivery attempts.
*/
@Attribute(desc = "number of messages removed from this queue since it was created due to exceeding the max delivery attempts")
long getMessagesKilled();
/**
* Returns the first message on the queue as JSON
*/
@ -446,6 +452,12 @@ public interface QueueControl {
@Operation(desc = "Resets the MessagesExpired property", impact = MBeanOperationInfo.ACTION)
void resetMessagesExpired() throws Exception;
/**
* Resets the MessagesExpired property
*/
@Operation(desc = "Resets the MessagesKilled property", impact = MBeanOperationInfo.ACTION)
void resetMessagesKilled() throws Exception;
/**
* it will flush one cycle on internal executors, so you would be sure that any pending tasks are done before you call
* any other measure.

View File

@ -61,6 +61,12 @@ public interface JMSQueueControl extends DestinationControl {
@Attribute(desc = "the number of messages expired from this queue since it was created")
long getMessagesExpired();
/**
* Returns the number of messages removed from this queue since it was created due to exceeding the max delivery attempts.
*/
@Attribute(desc = "number of messages removed from this queue since it was created due to exceeding the max delivery attempts")
long getMessagesKilled();
/**
* returns the selector for the queue
*/

View File

@ -129,6 +129,11 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro
return coreQueueControl.getMessagesExpired();
}
@Override
public long getMessagesKilled() {
return coreQueueControl.getMessagesKilled();
}
@Override
public int getConsumerCount() {
return coreQueueControl.getConsumerCount();

View File

@ -269,6 +269,19 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
}
@Override
public long getMessagesKilled() {
checkStarted();
clearIO();
try {
return queue.getMessagesKilled();
}
finally {
blockOnIO();
}
}
@Override
public long getID() {
checkStarted();
@ -1038,6 +1051,20 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
}
@Override
public void resetMessagesKilled() throws Exception {
checkStarted();
clearIO();
try {
queue.resetMessagesKilled();
}
finally {
blockOnIO();
}
}
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.jboss.logging.Logger;
@ -215,11 +216,16 @@ public class PagedReferenceImpl implements PagedReference {
@Override
public void acknowledge(Transaction tx) throws Exception {
acknowledge(tx, AckReason.NORMAL);
}
@Override
public void acknowledge(Transaction tx, AckReason reason) throws Exception {
if (tx == null) {
getQueue().acknowledge(this);
getQueue().acknowledge(this, reason);
}
else {
getQueue().acknowledge(tx, this);
getQueue().acknowledge(tx, this, reason);
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.server;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.transaction.Transaction;
/**
@ -73,6 +74,8 @@ public interface MessageReference {
void acknowledge(Transaction tx) throws Exception;
void acknowledge(Transaction tx, AckReason reason) throws Exception;
void setConsumerId(Long consumerID);
Long getConsumerId();

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.paging.cursor.PageSubscription;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.ReferenceCounter;
@ -74,8 +75,12 @@ public interface Queue extends Bindable {
void acknowledge(MessageReference ref) throws Exception;
void acknowledge(final MessageReference ref, AckReason reason) throws Exception;
void acknowledge(Transaction tx, MessageReference ref) throws Exception;
void acknowledge(final Transaction tx, final MessageReference ref, AckReason reason) throws Exception;
void reacknowledge(Transaction tx, MessageReference ref) throws Exception;
void cancel(Transaction tx, MessageReference ref);
@ -123,6 +128,8 @@ public interface Queue extends Bindable {
long getMessagesExpired();
long getMessagesKilled();
MessageReference removeReferenceWithID(long id) throws Exception;
MessageReference getReference(long id) throws ActiveMQException;
@ -238,6 +245,8 @@ public interface Queue extends Bindable {
void resetMessagesExpired();
void resetMessagesKilled();
void incrementMesssagesAdded();
/**

View File

@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
public enum AckReason {
KILLED, EXPIRED, NORMAL
}

View File

@ -262,6 +262,11 @@ public class LastValueQueue extends QueueImpl {
ref.acknowledge(tx);
}
@Override
public void acknowledge(Transaction tx, AckReason reason) throws Exception {
ref.acknowledge(tx, reason);
}
@Override
public void setPersistedCount(int count) {
ref.setPersistedCount(count);

View File

@ -192,11 +192,16 @@ public class MessageReferenceImpl implements MessageReference {
@Override
public void acknowledge(Transaction tx) throws Exception {
acknowledge(tx, AckReason.NORMAL);
}
@Override
public void acknowledge(Transaction tx, AckReason reason) throws Exception {
if (tx == null) {
getQueue().acknowledge(this);
getQueue().acknowledge(this, reason);
}
else {
getQueue().acknowledge(tx, this);
getQueue().acknowledge(tx, this, reason);
}
}

View File

@ -169,6 +169,8 @@ public class QueueImpl implements Queue {
private long messagesExpired;
private long messagesKilled;
protected final AtomicInteger deliveringCount = new AtomicInteger(0);
private boolean paused;
@ -964,10 +966,11 @@ public class QueueImpl implements Queue {
@Override
public void acknowledge(final MessageReference ref) throws Exception {
acknowledge(ref, OperationType.NORMAL);
acknowledge(ref, AckReason.NORMAL);
}
private void acknowledge(final MessageReference ref, OperationType type) throws Exception {
@Override
public void acknowledge(final MessageReference ref, AckReason reason) throws Exception {
if (ref.isPaged()) {
pageSubscription.ack((PagedReference) ref);
postAcknowledge(ref);
@ -983,9 +986,12 @@ public class QueueImpl implements Queue {
postAcknowledge(ref);
}
if (type == OperationType.EXPIRED) {
if (reason == AckReason.EXPIRED) {
messagesExpired++;
}
else if (reason == AckReason.KILLED) {
messagesKilled++;
}
else {
messagesAcknowledged++;
}
@ -994,10 +1000,11 @@ public class QueueImpl implements Queue {
@Override
public void acknowledge(final Transaction tx, final MessageReference ref) throws Exception {
acknowledge(tx, ref, OperationType.NORMAL);
acknowledge(tx, ref, AckReason.NORMAL);
}
private void acknowledge(final Transaction tx, final MessageReference ref, OperationType type) throws Exception {
@Override
public void acknowledge(final Transaction tx, final MessageReference ref, AckReason reason) throws Exception {
if (ref.isPaged()) {
pageSubscription.ackTx(tx, (PagedReference) ref);
@ -1017,9 +1024,12 @@ public class QueueImpl implements Queue {
getRefsOperation(tx).addAck(ref);
}
if (type == OperationType.EXPIRED) {
if (reason == AckReason.EXPIRED) {
messagesExpired++;
}
else if (reason == AckReason.KILLED) {
messagesKilled++;
}
else {
messagesAcknowledged++;
}
@ -1095,13 +1105,13 @@ public class QueueImpl implements Queue {
if (logger.isTraceEnabled()) {
logger.trace("moving expired reference " + ref + " to address = " + expiryAddress + " from queue=" + this.getName());
}
move(null, expiryAddress, ref, true, false, OperationType.EXPIRED);
move(null, expiryAddress, ref, false, AckReason.EXPIRED);
}
else {
if (logger.isTraceEnabled()) {
logger.trace("expiry is null, just acking expired message for reference " + ref + " from queue=" + this.getName());
}
acknowledge(ref, OperationType.EXPIRED);
acknowledge(ref, AckReason.EXPIRED);
}
}
@ -1152,6 +1162,11 @@ public class QueueImpl implements Queue {
return messagesExpired;
}
@Override
public long getMessagesKilled() {
return messagesKilled;
}
@Override
public int deleteAllReferences() throws Exception {
return deleteAllReferences(DEFAULT_FLUSH_LIMIT);
@ -1533,7 +1548,7 @@ public class QueueImpl implements Queue {
refRemoved(ref);
incDelivering();
try {
move(null, toAddress, ref, false, rejectDuplicate, OperationType.NORMAL);
move(null, toAddress, ref, rejectDuplicate, AckReason.NORMAL);
}
catch (Exception e) {
decDelivering();
@ -2374,26 +2389,25 @@ public class QueueImpl implements Queue {
if (bindingList.getBindings().isEmpty()) {
ActiveMQServerLogger.LOGGER.messageExceededMaxDelivery(ref, deadLetterAddress);
ref.acknowledge(tx);
ref.acknowledge(tx, AckReason.KILLED);
}
else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
move(tx, deadLetterAddress, ref, false, false, OperationType.NORMAL);
move(tx, deadLetterAddress, ref, false, AckReason.KILLED);
}
}
else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliveryNoDLA(name);
ref.acknowledge(tx);
ref.acknowledge(tx, AckReason.KILLED);
}
}
private void move(final Transaction originalTX,
final SimpleString address,
final MessageReference ref,
final boolean expiry,
final boolean rejectDuplicate,
final OperationType type) throws Exception {
final AckReason reason) throws Exception {
Transaction tx;
if (originalTX != null) {
@ -2404,13 +2418,13 @@ public class QueueImpl implements Queue {
tx = new TransactionImpl(storageManager);
}
ServerMessage copyMessage = makeCopy(ref, expiry);
ServerMessage copyMessage = makeCopy(ref, reason == AckReason.EXPIRED);
copyMessage.setAddress(address);
postOffice.route(copyMessage, null, tx, false, rejectDuplicate);
acknowledge(tx, ref, type);
acknowledge(tx, ref, reason);
if (originalTX == null) {
tx.commit();
@ -2664,6 +2678,11 @@ public class QueueImpl implements Queue {
messagesExpired = 0;
}
@Override
public synchronized void resetMessagesKilled() {
messagesKilled = 0;
}
@Override
public float getRate() {
float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f);
@ -3019,9 +3038,5 @@ public class QueueImpl implements Queue {
}
}
}
private enum OperationType {
EXPIRED, NORMAL
}
}

View File

@ -956,11 +956,21 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public void acknowledge(MessageReference ref, AckReason reason) throws Exception {
}
@Override
public void acknowledge(Transaction tx, MessageReference ref) throws Exception {
}
@Override
public void acknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception {
}
@Override
public void reacknowledge(Transaction tx, MessageReference ref) throws Exception {
@ -1051,6 +1061,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
return 0;
}
@Override
public long getMessagesKilled() {
return 0;
}
@Override
public MessageReference removeReferenceWithID(long id) throws Exception {
return null;
@ -1265,6 +1280,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public void resetMessagesKilled() {
}
@Override
public void incrementMesssagesAdded() {

View File

@ -124,6 +124,11 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest {
return ((Number) proxy.retrieveAttributeValue("getMessagesExpired")).longValue();
}
@Override
public long getMessagesKilled() {
return ((Number) proxy.retrieveAttributeValue("messagesKilled")).longValue();
}
@Override
public String getDeadLetterAddress() {
return (String) proxy.retrieveAttributeValue("deadLetterAddress");

View File

@ -2024,6 +2024,51 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(queue);
}
@Test
public void testResetMessagesKilled() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(address, queue, null, false);
QueueControl queueControl = createManagementControl(address, queue);
Assert.assertEquals(0, queueControl.getMessagesExpired());
ClientProducer producer = session.createProducer(address);
ClientMessage message = session.createMessage(false);
producer.send(message);
// the message IDs are set on the server
Map<String, Object>[] messages = queueControl.listMessages(null);
Assert.assertEquals(1, messages.length);
long messageID = (Long) messages[0].get("messageID");
queueControl.sendMessageToDeadLetterAddress(messageID);
Assert.assertEquals(1, queueControl.getMessagesKilled());
message = session.createMessage(false);
producer.send(message);
// send to DLA the old-fashioned way
ClientConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < server.getAddressSettingsRepository().getMatch(queue.toString()).getMaxDeliveryAttempts(); i++) {
message = consumer.receive(500);
assertNotNull(message);
message.acknowledge();
session.rollback();
}
consumer.close();
Assert.assertEquals(2, queueControl.getMessagesKilled());
queueControl.resetMessagesKilled();
Assert.assertEquals(0, queueControl.getMessagesKilled());
session.deleteQueue(queue);
}
//make sure notifications are always received no matter whether
//a Queue is created via QueueControl or by JMSServerManager directly.
@Test

View File

@ -125,6 +125,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
return (Long) proxy.retrieveAttributeValue("messagesExpired", Long.class);
}
@Override
public long getMessagesKilled() {
return ((Number) proxy.retrieveAttributeValue("messagesKilled")).longValue();
}
@Override
public void resetMessagesAdded() throws Exception {
proxy.invokeOperation("resetMessagesAdded");
@ -140,6 +145,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
proxy.invokeOperation("resetMessagesExpired");
}
@Override
public void resetMessagesKilled() throws Exception {
proxy.invokeOperation("resetMessagesKilled");
}
@Override
public String getName() {
return (String) proxy.retrieveAttributeValue("name");

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingContext;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.AckReason;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.LinkedListIterator;
import org.apache.activemq.artemis.utils.ReferenceCounter;
@ -157,12 +158,24 @@ public class FakeQueue implements Queue {
}
@Override
public void acknowledge(MessageReference ref, AckReason reason) throws Exception {
// no-op
}
@Override
public void acknowledge(final Transaction tx, final MessageReference ref) throws Exception {
// no-op
}
@Override
public void acknowledge(Transaction tx, MessageReference ref, AckReason reason) throws Exception {
// no-op
}
@Override
public void addConsumer(final Consumer consumer) throws Exception {
// no-op
@ -317,6 +330,12 @@ public class FakeQueue implements Queue {
return 0;
}
@Override
public long getMessagesKilled() {
// no-op
return 0;
}
@Override
public void resetMessagesAdded() {
// no-op
@ -335,6 +354,12 @@ public class FakeQueue implements Queue {
}
@Override
public void resetMessagesKilled() {
// no-op
}
@Override
public void incrementMesssagesAdded() {