This closes #671

This commit is contained in:
Clebert Suconic 2016-07-26 18:20:31 -04:00
commit a242f2761b
11 changed files with 174 additions and 8 deletions

View File

@ -98,6 +98,12 @@ public interface QueueControl {
@Attribute(desc = "number of messages acknowledged from this queue since it was created") @Attribute(desc = "number of messages acknowledged from this queue since it was created")
long getMessagesAcknowledged(); long getMessagesAcknowledged();
/**
* Returns the number of messages expired from this queue since it was created.
*/
@Attribute(desc = "number of messages expired from this queue since it was created")
long getMessagesExpired();
/** /**
* Returns the first message on the queue as JSON * Returns the first message on the queue as JSON
*/ */
@ -434,6 +440,12 @@ public interface QueueControl {
@Operation(desc = "Resets the MessagesAcknowledged property", impact = MBeanOperationInfo.ACTION) @Operation(desc = "Resets the MessagesAcknowledged property", impact = MBeanOperationInfo.ACTION)
void resetMessagesAcknowledged() throws Exception; void resetMessagesAcknowledged() throws Exception;
/**
* Resets the MessagesExpired property
*/
@Operation(desc = "Resets the MessagesExpired property", impact = MBeanOperationInfo.ACTION)
void resetMessagesExpired() throws Exception;
/** /**
* it will flush one cycle on internal executors, so you would be sure that any pending tasks are done before you call * it will flush one cycle on internal executors, so you would be sure that any pending tasks are done before you call
* any other measure. * any other measure.

View File

@ -55,6 +55,12 @@ public interface JMSQueueControl extends DestinationControl {
@Attribute(desc = "number of consumers consuming messages from this queue") @Attribute(desc = "number of consumers consuming messages from this queue")
int getConsumerCount(); int getConsumerCount();
/**
* Returns the number of messages expired from this queue since it was created.
*/
@Attribute(desc = "the number of messages expired from this queue since it was created")
long getMessagesExpired();
/** /**
* returns the selector for the queue * returns the selector for the queue
*/ */

View File

@ -124,6 +124,11 @@ public class JMSQueueControlImpl extends StandardMBean implements JMSQueueContro
return coreQueueControl.getMessagesAdded(); return coreQueueControl.getMessagesAdded();
} }
@Override
public long getMessagesExpired() {
return coreQueueControl.getMessagesExpired();
}
@Override @Override
public int getConsumerCount() { public int getConsumerCount() {
return coreQueueControl.getConsumerCount(); return coreQueueControl.getConsumerCount();

View File

@ -256,6 +256,19 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
} }
} }
@Override
public long getMessagesExpired() {
checkStarted();
clearIO();
try {
return queue.getMessagesExpired();
}
finally {
blockOnIO();
}
}
@Override @Override
public long getID() { public long getID() {
checkStarted(); checkStarted();
@ -1011,6 +1024,20 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
} }
@Override
public void resetMessagesExpired() throws Exception {
checkStarted();
clearIO();
try {
queue.resetMessagesExpired();
}
finally {
blockOnIO();
}
}
// Package protected --------------------------------------------- // Package protected ---------------------------------------------
// Protected ----------------------------------------------------- // Protected -----------------------------------------------------

View File

@ -121,6 +121,8 @@ public interface Queue extends Bindable {
long getMessagesAcknowledged(); long getMessagesAcknowledged();
long getMessagesExpired();
MessageReference removeReferenceWithID(long id) throws Exception; MessageReference removeReferenceWithID(long id) throws Exception;
MessageReference getReference(long id) throws ActiveMQException; MessageReference getReference(long id) throws ActiveMQException;
@ -234,6 +236,8 @@ public interface Queue extends Bindable {
void resetMessagesAcknowledged(); void resetMessagesAcknowledged();
void resetMessagesExpired();
void incrementMesssagesAdded(); void incrementMesssagesAdded();
/** /**

View File

@ -167,6 +167,8 @@ public class QueueImpl implements Queue {
private long messagesAcknowledged; private long messagesAcknowledged;
private long messagesExpired;
protected final AtomicInteger deliveringCount = new AtomicInteger(0); protected final AtomicInteger deliveringCount = new AtomicInteger(0);
private boolean paused; private boolean paused;
@ -962,6 +964,10 @@ public class QueueImpl implements Queue {
@Override @Override
public void acknowledge(final MessageReference ref) throws Exception { public void acknowledge(final MessageReference ref) throws Exception {
acknowledge(ref, OperationType.NORMAL);
}
private void acknowledge(final MessageReference ref, OperationType type) throws Exception {
if (ref.isPaged()) { if (ref.isPaged()) {
pageSubscription.ack((PagedReference) ref); pageSubscription.ack((PagedReference) ref);
postAcknowledge(ref); postAcknowledge(ref);
@ -977,12 +983,21 @@ public class QueueImpl implements Queue {
postAcknowledge(ref); postAcknowledge(ref);
} }
messagesAcknowledged++; if (type == OperationType.EXPIRED) {
messagesExpired++;
}
else {
messagesAcknowledged++;
}
} }
@Override @Override
public void acknowledge(final Transaction tx, final MessageReference ref) throws Exception { public void acknowledge(final Transaction tx, final MessageReference ref) throws Exception {
acknowledge(tx, ref, OperationType.NORMAL);
}
private void acknowledge(final Transaction tx, final MessageReference ref, OperationType type) throws Exception {
if (ref.isPaged()) { if (ref.isPaged()) {
pageSubscription.ackTx(tx, (PagedReference) ref); pageSubscription.ackTx(tx, (PagedReference) ref);
@ -1002,7 +1017,12 @@ public class QueueImpl implements Queue {
getRefsOperation(tx).addAck(ref); getRefsOperation(tx).addAck(ref);
} }
messagesAcknowledged++; if (type == OperationType.EXPIRED) {
messagesExpired++;
}
else {
messagesAcknowledged++;
}
} }
@Override @Override
@ -1075,13 +1095,13 @@ public class QueueImpl implements Queue {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("moving expired reference " + ref + " to address = " + expiryAddress + " from queue=" + this.getName()); logger.trace("moving expired reference " + ref + " to address = " + expiryAddress + " from queue=" + this.getName());
} }
move(null, expiryAddress, ref, true, false); move(null, expiryAddress, ref, true, false, OperationType.EXPIRED);
} }
else { else {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("expiry is null, just acking expired message for reference " + ref + " from queue=" + this.getName()); logger.trace("expiry is null, just acking expired message for reference " + ref + " from queue=" + this.getName());
} }
acknowledge(ref); acknowledge(ref, OperationType.EXPIRED);
} }
} }
@ -1127,6 +1147,11 @@ public class QueueImpl implements Queue {
return messagesAcknowledged; return messagesAcknowledged;
} }
@Override
public long getMessagesExpired() {
return messagesExpired;
}
@Override @Override
public int deleteAllReferences() throws Exception { public int deleteAllReferences() throws Exception {
return deleteAllReferences(DEFAULT_FLUSH_LIMIT); return deleteAllReferences(DEFAULT_FLUSH_LIMIT);
@ -1508,7 +1533,7 @@ public class QueueImpl implements Queue {
refRemoved(ref); refRemoved(ref);
incDelivering(); incDelivering();
try { try {
move(null, toAddress, ref, false, rejectDuplicate); move(null, toAddress, ref, false, rejectDuplicate, OperationType.NORMAL);
} }
catch (Exception e) { catch (Exception e) {
decDelivering(); decDelivering();
@ -2353,7 +2378,7 @@ public class QueueImpl implements Queue {
} }
else { else {
ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name); ActiveMQServerLogger.LOGGER.messageExceededMaxDeliverySendtoDLA(ref, deadLetterAddress, name);
move(tx, deadLetterAddress, ref, false, false); move(tx, deadLetterAddress, ref, false, false, OperationType.NORMAL);
} }
} }
else { else {
@ -2367,7 +2392,8 @@ public class QueueImpl implements Queue {
final SimpleString address, final SimpleString address,
final MessageReference ref, final MessageReference ref,
final boolean expiry, final boolean expiry,
final boolean rejectDuplicate) throws Exception { final boolean rejectDuplicate,
final OperationType type) throws Exception {
Transaction tx; Transaction tx;
if (originalTX != null) { if (originalTX != null) {
@ -2384,7 +2410,7 @@ public class QueueImpl implements Queue {
postOffice.route(copyMessage, null, tx, false, rejectDuplicate); postOffice.route(copyMessage, null, tx, false, rejectDuplicate);
acknowledge(tx, ref); acknowledge(tx, ref, type);
if (originalTX == null) { if (originalTX == null) {
tx.commit(); tx.commit();
@ -2633,6 +2659,11 @@ public class QueueImpl implements Queue {
messagesAcknowledged = 0; messagesAcknowledged = 0;
} }
@Override
public synchronized void resetMessagesExpired() {
messagesExpired = 0;
}
@Override @Override
public float getRate() { public float getRate() {
float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f); float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f);
@ -2988,5 +3019,9 @@ public class QueueImpl implements Queue {
} }
} }
} }
private enum OperationType {
EXPIRED, NORMAL
}
} }

View File

@ -1046,6 +1046,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
return 0; return 0;
} }
@Override
public long getMessagesExpired() {
return 0;
}
@Override @Override
public MessageReference removeReferenceWithID(long id) throws Exception { public MessageReference removeReferenceWithID(long id) throws Exception {
return null; return null;
@ -1255,6 +1260,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
} }
@Override
public void resetMessagesExpired() {
}
@Override @Override
public void incrementMesssagesAdded() { public void incrementMesssagesAdded() {

View File

@ -119,6 +119,11 @@ public class JMSQueueControlUsingJMSTest extends JMSQueueControlTest {
return (Integer) proxy.retrieveAttributeValue("consumerCount"); return (Integer) proxy.retrieveAttributeValue("consumerCount");
} }
@Override
public long getMessagesExpired() {
return ((Number) proxy.retrieveAttributeValue("getMessagesExpired")).longValue();
}
@Override @Override
public String getDeadLetterAddress() { public String getDeadLetterAddress() {
return (String) proxy.retrieveAttributeValue("deadLetterAddress"); return (String) proxy.retrieveAttributeValue("deadLetterAddress");

View File

@ -1984,6 +1984,46 @@ public class QueueControlTest extends ManagementTestBase {
session.deleteQueue(queue); session.deleteQueue(queue);
} }
@Test
public void testResetMessagesExpired() throws Exception {
SimpleString address = RandomUtil.randomSimpleString();
SimpleString queue = RandomUtil.randomSimpleString();
session.createQueue(address, queue, null, false);
QueueControl queueControl = createManagementControl(address, queue);
Assert.assertEquals(0, queueControl.getMessagesExpired());
ClientProducer producer = session.createProducer(address);
ClientMessage message = session.createMessage(false);
producer.send(message);
// the message IDs are set on the server
Map<String, Object>[] messages = queueControl.listMessages(null);
Assert.assertEquals(1, messages.length);
long messageID = (Long) messages[0].get("messageID");
queueControl.expireMessage(messageID);
Assert.assertEquals(1, queueControl.getMessagesExpired());
message = session.createMessage(false);
producer.send(message);
// the message IDs are set on the server
messages = queueControl.listMessages(null);
Assert.assertEquals(1, messages.length);
messageID = (Long) messages[0].get("messageID");
queueControl.expireMessage(messageID);
Assert.assertEquals(2, queueControl.getMessagesExpired());
queueControl.resetMessagesExpired();
Assert.assertEquals(0, queueControl.getMessagesExpired());
session.deleteQueue(queue);
}
//make sure notifications are always received no matter whether //make sure notifications are always received no matter whether
//a Queue is created via QueueControl or by JMSServerManager directly. //a Queue is created via QueueControl or by JMSServerManager directly.
@Test @Test

View File

@ -120,6 +120,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
return (Integer) proxy.retrieveAttributeValue("messagesAcknowledged"); return (Integer) proxy.retrieveAttributeValue("messagesAcknowledged");
} }
@Override
public long getMessagesExpired() {
return ((Number) proxy.retrieveAttributeValue("messagesExpired")).longValue();
}
@Override @Override
public void resetMessagesAdded() throws Exception { public void resetMessagesAdded() throws Exception {
proxy.invokeOperation("resetMessagesAdded"); proxy.invokeOperation("resetMessagesAdded");
@ -130,6 +135,11 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
proxy.invokeOperation("resetMessagesAcknowledged"); proxy.invokeOperation("resetMessagesAcknowledged");
} }
@Override
public void resetMessagesExpired() throws Exception {
proxy.invokeOperation("resetMessagesExpired");
}
@Override @Override
public String getName() { public String getName() {
return (String) proxy.retrieveAttributeValue("name"); return (String) proxy.retrieveAttributeValue("name");

View File

@ -311,6 +311,12 @@ public class FakeQueue implements Queue {
return 0; return 0;
} }
@Override
public long getMessagesExpired() {
// no-op
return 0;
}
@Override @Override
public void resetMessagesAdded() { public void resetMessagesAdded() {
// no-op // no-op
@ -323,6 +329,12 @@ public class FakeQueue implements Queue {
} }
@Override
public void resetMessagesExpired() {
// no-op
}
@Override @Override
public void incrementMesssagesAdded() { public void incrementMesssagesAdded() {