From 0f60b5a8e4009187bd787153977c0f0138a69e8c Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Fri, 18 Sep 2020 09:34:16 -0500 Subject: [PATCH] ARTEMIS-2906 add lastAckTimestamp to message counter --- .../core/management/MessageCounterInfo.java | 14 ++++++++++- .../core/messagecounter/MessageCounter.java | 24 +++++++++++++++++-- .../impl/MessageCounterHelper.java | 3 ++- .../impl/MessageCounterManagerImpl.java | 6 +++++ docs/user-manual/en/management.md | 4 ++++ .../management/QueueControlTest.java | 18 ++++++++++---- 6 files changed, 61 insertions(+), 8 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/MessageCounterInfo.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/MessageCounterInfo.java index 7b58afcb66..e58b80dc6b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/MessageCounterInfo.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/MessageCounterInfo.java @@ -42,6 +42,8 @@ public final class MessageCounterInfo { private final String lastAddTimestamp; + private final String lastAckTimestamp; + private final String updateTimestamp; /** @@ -58,9 +60,10 @@ public final class MessageCounterInfo { int depth = data.getInt("messageCount"); int depthDelta = data.getInt("messageCountDelta"); String lastAddTimestamp = data.getString("lastAddTimestamp"); + String lastAckTimestamp = data.getString("lastAckTimestamp"); String updateTimestamp = data.getString("updateTimestamp"); - return new MessageCounterInfo(name, subscription, durable, count, countDelta, depth, depthDelta, lastAddTimestamp, updateTimestamp); + return new MessageCounterInfo(name, subscription, durable, count, countDelta, depth, depthDelta, lastAddTimestamp, lastAckTimestamp, updateTimestamp); } // Constructors -------------------------------------------------- @@ -73,6 +76,7 @@ public final class MessageCounterInfo { final int depth, final int depthDelta, final String lastAddTimestamp, + final String lastAckTimestamp, final String udpateTimestamp) { this.name = name; this.subscription = subscription; @@ -82,6 +86,7 @@ public final class MessageCounterInfo { this.depth = depth; this.depthDelta = depthDelta; this.lastAddTimestamp = lastAddTimestamp; + this.lastAckTimestamp = lastAckTimestamp; this.updateTimestamp = udpateTimestamp; } @@ -143,6 +148,13 @@ public final class MessageCounterInfo { return lastAddTimestamp; } + /** + * Returns the timestamp of the last time a message from the queue was acknolwedged. + */ + public String getLastAckTimestamp() { + return lastAckTimestamp; + } + /** * Returns the timestamp of the last time the queue was updated. */ diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/MessageCounter.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/MessageCounter.java index be581823b9..27545a639b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/MessageCounter.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/MessageCounter.java @@ -64,6 +64,8 @@ public class MessageCounter { private long timeLastAdd; + private long timeLastAck; + // per hour day counter history private int dayCounterMax; @@ -71,6 +73,8 @@ public class MessageCounter { private long lastMessagesAdded; + private long lastMessagesAcked; + // Static -------------------------------------------------------- // Constructors -------------------------------------------------- @@ -111,19 +115,28 @@ public class MessageCounter { @Override public void run() { long latestMessagesAdded = serverQueue.getMessagesAdded(); + long latestMessagesAcked = serverQueue.getMessagesAcknowledged(); long newMessagesAdded = latestMessagesAdded - lastMessagesAdded; + long newMessagesAcked = latestMessagesAcked - lastMessagesAcked; countTotal += newMessagesAdded; lastMessagesAdded = latestMessagesAdded; + lastMessagesAcked = latestMessagesAcked; + + long timestamp = System.currentTimeMillis(); if (newMessagesAdded > 0) { - timeLastAdd = System.currentTimeMillis(); + timeLastAdd = timestamp; + } + + if (newMessagesAcked > 0) { + timeLastAck = timestamp; } // update timestamp - timeLastUpdate = System.currentTimeMillis(); + timeLastUpdate = timestamp; // update message history updateHistory(newMessagesAdded); @@ -208,12 +221,17 @@ public class MessageCounter { return timeLastAdd; } + public long getLastAckedMessageTime() { + return timeLastAck; + } + public void resetCounter() { countTotal = 0; countTotalLast = 0; depthLast = 0; timeLastUpdate = 0; timeLastAdd = 0; + timeLastAck = 0; } private void setHistoryLimit(final int daycountmax) { @@ -320,6 +338,7 @@ public class MessageCounter { public String toJSon() { DateFormat dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM); String lastAddTimestamp = dateFormat.format(new Date(this.getLastAddedMessageTime())); + String lastAckTimestamp = dateFormat.format(new Date(this.getLastAckedMessageTime())); String updateTimestamp = dateFormat.format(new Date(this.getLastUpdate())); return JsonLoader .createObjectBuilder() @@ -331,6 +350,7 @@ public class MessageCounter { .add("messageCount", this.getMessageCount()) .add("messageCountDelta", this.getMessageCountDelta()) .add("lastAddTimestamp", lastAddTimestamp) + .add("lastAckTimestamp", lastAckTimestamp) .add("updateTimestamp", updateTimestamp) .build() .toString(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/impl/MessageCounterHelper.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/impl/MessageCounterHelper.java index c8fe282cea..8230e01d72 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/impl/MessageCounterHelper.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/impl/MessageCounterHelper.java @@ -53,7 +53,7 @@ public class MessageCounterHelper { return null; } - String ret0 = "\n" + "" + "" + "" + "" + "" + "" + "" + "" + "" + "" + "" + "\n"; + String ret0 = "
TypeNameSubscriptionDurableCountCountDeltaDepthDepthDeltaLast AddLast Update
\n" + "" + "" + "" + "" + "" + "" + "" + "" + "" + "" + "" + "" + "\n"; StringBuilder ret = new StringBuilder(ret0); for (int i = 0; i < counters.length; i++) { MessageCounter counter = counters[i]; @@ -77,6 +77,7 @@ public class MessageCounterHelper { ret.append(""); ret.append(""); ret.append(""); + ret.append(""); ret.append(""); ret.append("\n"); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/impl/MessageCounterManagerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/impl/MessageCounterManagerImpl.java index 1fd4fbe802..03470f5df4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/impl/MessageCounterManagerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/messagecounter/impl/MessageCounterManagerImpl.java @@ -102,6 +102,12 @@ public class MessageCounterManagerImpl implements MessageCounterManager { } } + public MessageCounter getMessageCounter(String counter) { + synchronized (messageCounters) { + return messageCounters.get(counter); + } + } + public Set getMessageCounters() { synchronized (messageCounters) { return new HashSet<>(messageCounters.values()); diff --git a/docs/user-manual/en/management.md b/docs/user-manual/en/management.md index fc43438f60..64992772d7 100644 --- a/docs/user-manual/en/management.md +++ b/docs/user-manual/en/management.md @@ -923,6 +923,10 @@ Message counters give additional information about the queues: The timestamp of the last time a message was added to the queue +- `lastAckTimestamp` + + The timestamp of the last time a message from the queue was acknowledged + - `udpateTimestamp` The timestamp of the last message counter update diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java index fae4596175..e38d03959c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlTest.java @@ -2721,7 +2721,7 @@ public class QueueControlTest extends ManagementTestBase { ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mbeanServer); serverControl.enableMessageCounters(); - serverControl.setMessageCounterSamplePeriod(100); + serverControl.setMessageCounterSamplePeriod(99999); String jsonString = queueControl.listMessageCounter(); MessageCounterInfo info = MessageCounterInfo.fromJSON(jsonString); @@ -2731,34 +2731,44 @@ public class QueueControlTest extends ManagementTestBase { ClientProducer producer = session.createProducer(address); producer.send(session.createMessage(durable)); + Wait.assertTrue(() -> server.locateQueue(queue).getMessageCount() == 1); - Thread.sleep(200); + ((MessageCounterManagerImpl)server.getManagementService().getMessageCounterManager()).getMessageCounter(queue.toString()).onTimer(); + Thread.sleep(50); jsonString = queueControl.listMessageCounter(); info = MessageCounterInfo.fromJSON(jsonString); Assert.assertEquals(1, info.getDepth()); Assert.assertEquals(1, info.getDepthDelta()); Assert.assertEquals(1, info.getCount()); Assert.assertEquals(1, info.getCountDelta()); + Assert.assertEquals(info.getUpdateTimestamp(), info.getLastAddTimestamp()); + Assert.assertEquals("12/31/69, 6:00:00 PM", info.getLastAckTimestamp()); // no acks received yet producer.send(session.createMessage(durable)); + Wait.assertTrue(() -> server.locateQueue(queue).getMessageCount() == 2); - Thread.sleep(200); + ((MessageCounterManagerImpl)server.getManagementService().getMessageCounterManager()).getMessageCounter(queue.toString()).onTimer(); + Thread.sleep(50); jsonString = queueControl.listMessageCounter(); info = MessageCounterInfo.fromJSON(jsonString); Assert.assertEquals(2, info.getDepth()); Assert.assertEquals(1, info.getDepthDelta()); Assert.assertEquals(2, info.getCount()); Assert.assertEquals(1, info.getCountDelta()); + Assert.assertEquals(info.getUpdateTimestamp(), info.getLastAddTimestamp()); + Assert.assertEquals("12/31/69, 6:00:00 PM", info.getLastAckTimestamp()); // no acks received yet consumeMessages(2, session, queue); - Thread.sleep(200); + ((MessageCounterManagerImpl)server.getManagementService().getMessageCounterManager()).getMessageCounter(queue.toString()).onTimer(); + Thread.sleep(50); jsonString = queueControl.listMessageCounter(); info = MessageCounterInfo.fromJSON(jsonString); Assert.assertEquals(0, info.getDepth()); Assert.assertEquals(-2, info.getDepthDelta()); Assert.assertEquals(2, info.getCount()); Assert.assertEquals(0, info.getCountDelta()); + Assert.assertEquals(info.getUpdateTimestamp(), info.getLastAckTimestamp()); session.deleteQueue(queue); }
TypeNameSubscriptionDurableCountCountDeltaDepthDepthDeltaLast AddLast AckLast Update
" + MessageCounterHelper.prettify(counter.getMessageCount()) + "" + MessageCounterHelper.prettify(counter.getMessageCountDelta()) + "" + MessageCounterHelper.asDate(counter.getLastAddedMessageTime()) + "" + MessageCounterHelper.asDate(counter.getLastAckedMessageTime()) + "" + MessageCounterHelper.asDate(counter.getLastUpdate()) + "