ARTEMIS-2906 add lastAckTimestamp to message counter

This commit is contained in:
Justin Bertram 2020-09-18 09:34:16 -05:00 committed by Clebert Suconic
parent c5bb2cccf4
commit 0f60b5a8e4
6 changed files with 61 additions and 8 deletions

View File

@ -42,6 +42,8 @@ public final class MessageCounterInfo {
private final String lastAddTimestamp;
private final String lastAckTimestamp;
private final String updateTimestamp;
/**
@ -58,9 +60,10 @@ public final class MessageCounterInfo {
int depth = data.getInt("messageCount");
int depthDelta = data.getInt("messageCountDelta");
String lastAddTimestamp = data.getString("lastAddTimestamp");
String lastAckTimestamp = data.getString("lastAckTimestamp");
String updateTimestamp = data.getString("updateTimestamp");
return new MessageCounterInfo(name, subscription, durable, count, countDelta, depth, depthDelta, lastAddTimestamp, updateTimestamp);
return new MessageCounterInfo(name, subscription, durable, count, countDelta, depth, depthDelta, lastAddTimestamp, lastAckTimestamp, updateTimestamp);
}
// Constructors --------------------------------------------------
@ -73,6 +76,7 @@ public final class MessageCounterInfo {
final int depth,
final int depthDelta,
final String lastAddTimestamp,
final String lastAckTimestamp,
final String udpateTimestamp) {
this.name = name;
this.subscription = subscription;
@ -82,6 +86,7 @@ public final class MessageCounterInfo {
this.depth = depth;
this.depthDelta = depthDelta;
this.lastAddTimestamp = lastAddTimestamp;
this.lastAckTimestamp = lastAckTimestamp;
this.updateTimestamp = udpateTimestamp;
}
@ -143,6 +148,13 @@ public final class MessageCounterInfo {
return lastAddTimestamp;
}
/**
* Returns the timestamp of the last time a message from the queue was acknolwedged.
*/
public String getLastAckTimestamp() {
return lastAckTimestamp;
}
/**
* Returns the timestamp of the last time the queue was updated.
*/

View File

@ -64,6 +64,8 @@ public class MessageCounter {
private long timeLastAdd;
private long timeLastAck;
// per hour day counter history
private int dayCounterMax;
@ -71,6 +73,8 @@ public class MessageCounter {
private long lastMessagesAdded;
private long lastMessagesAcked;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@ -111,19 +115,28 @@ public class MessageCounter {
@Override
public void run() {
long latestMessagesAdded = serverQueue.getMessagesAdded();
long latestMessagesAcked = serverQueue.getMessagesAcknowledged();
long newMessagesAdded = latestMessagesAdded - lastMessagesAdded;
long newMessagesAcked = latestMessagesAcked - lastMessagesAcked;
countTotal += newMessagesAdded;
lastMessagesAdded = latestMessagesAdded;
lastMessagesAcked = latestMessagesAcked;
long timestamp = System.currentTimeMillis();
if (newMessagesAdded > 0) {
timeLastAdd = System.currentTimeMillis();
timeLastAdd = timestamp;
}
if (newMessagesAcked > 0) {
timeLastAck = timestamp;
}
// update timestamp
timeLastUpdate = System.currentTimeMillis();
timeLastUpdate = timestamp;
// update message history
updateHistory(newMessagesAdded);
@ -208,12 +221,17 @@ public class MessageCounter {
return timeLastAdd;
}
public long getLastAckedMessageTime() {
return timeLastAck;
}
public void resetCounter() {
countTotal = 0;
countTotalLast = 0;
depthLast = 0;
timeLastUpdate = 0;
timeLastAdd = 0;
timeLastAck = 0;
}
private void setHistoryLimit(final int daycountmax) {
@ -320,6 +338,7 @@ public class MessageCounter {
public String toJSon() {
DateFormat dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM);
String lastAddTimestamp = dateFormat.format(new Date(this.getLastAddedMessageTime()));
String lastAckTimestamp = dateFormat.format(new Date(this.getLastAckedMessageTime()));
String updateTimestamp = dateFormat.format(new Date(this.getLastUpdate()));
return JsonLoader
.createObjectBuilder()
@ -331,6 +350,7 @@ public class MessageCounter {
.add("messageCount", this.getMessageCount())
.add("messageCountDelta", this.getMessageCountDelta())
.add("lastAddTimestamp", lastAddTimestamp)
.add("lastAckTimestamp", lastAckTimestamp)
.add("updateTimestamp", updateTimestamp)
.build()
.toString();

View File

@ -53,7 +53,7 @@ public class MessageCounterHelper {
return null;
}
String ret0 = "<table class=\"activemq-message-counter\">\n" + "<tr>" + "<th>Type</th>" + "<th>Name</th>" + "<th>Subscription</th>" + "<th>Durable</th>" + "<th>Count</th>" + "<th>CountDelta</th>" + "<th>Depth</th>" + "<th>DepthDelta</th>" + "<th>Last Add</th>" + "<th>Last Update</th>" + "</tr>\n";
String ret0 = "<table class=\"activemq-message-counter\">\n" + "<tr>" + "<th>Type</th>" + "<th>Name</th>" + "<th>Subscription</th>" + "<th>Durable</th>" + "<th>Count</th>" + "<th>CountDelta</th>" + "<th>Depth</th>" + "<th>DepthDelta</th>" + "<th>Last Add</th>" + "<th>Last Ack</th>" + "<th>Last Update</th>" + "</tr>\n";
StringBuilder ret = new StringBuilder(ret0);
for (int i = 0; i < counters.length; i++) {
MessageCounter counter = counters[i];
@ -77,6 +77,7 @@ public class MessageCounterHelper {
ret.append("<td>" + MessageCounterHelper.prettify(counter.getMessageCount()) + "</td>");
ret.append("<td>" + MessageCounterHelper.prettify(counter.getMessageCountDelta()) + "</td>");
ret.append("<td>" + MessageCounterHelper.asDate(counter.getLastAddedMessageTime()) + "</td>");
ret.append("<td>" + MessageCounterHelper.asDate(counter.getLastAckedMessageTime()) + "</td>");
ret.append("<td>" + MessageCounterHelper.asDate(counter.getLastUpdate()) + "</td>");
ret.append("</tr>\n");

View File

@ -102,6 +102,12 @@ public class MessageCounterManagerImpl implements MessageCounterManager {
}
}
public MessageCounter getMessageCounter(String counter) {
synchronized (messageCounters) {
return messageCounters.get(counter);
}
}
public Set<MessageCounter> getMessageCounters() {
synchronized (messageCounters) {
return new HashSet<>(messageCounters.values());

View File

@ -923,6 +923,10 @@ Message counters give additional information about the queues:
The timestamp of the last time a message was added to the queue
- `lastAckTimestamp`
The timestamp of the last time a message from the queue was acknowledged
- `udpateTimestamp`
The timestamp of the last message counter update

View File

@ -2721,7 +2721,7 @@ public class QueueControlTest extends ManagementTestBase {
ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mbeanServer);
serverControl.enableMessageCounters();
serverControl.setMessageCounterSamplePeriod(100);
serverControl.setMessageCounterSamplePeriod(99999);
String jsonString = queueControl.listMessageCounter();
MessageCounterInfo info = MessageCounterInfo.fromJSON(jsonString);
@ -2731,34 +2731,44 @@ public class QueueControlTest extends ManagementTestBase {
ClientProducer producer = session.createProducer(address);
producer.send(session.createMessage(durable));
Wait.assertTrue(() -> server.locateQueue(queue).getMessageCount() == 1);
Thread.sleep(200);
((MessageCounterManagerImpl)server.getManagementService().getMessageCounterManager()).getMessageCounter(queue.toString()).onTimer();
Thread.sleep(50);
jsonString = queueControl.listMessageCounter();
info = MessageCounterInfo.fromJSON(jsonString);
Assert.assertEquals(1, info.getDepth());
Assert.assertEquals(1, info.getDepthDelta());
Assert.assertEquals(1, info.getCount());
Assert.assertEquals(1, info.getCountDelta());
Assert.assertEquals(info.getUpdateTimestamp(), info.getLastAddTimestamp());
Assert.assertEquals("12/31/69, 6:00:00 PM", info.getLastAckTimestamp()); // no acks received yet
producer.send(session.createMessage(durable));
Wait.assertTrue(() -> server.locateQueue(queue).getMessageCount() == 2);
Thread.sleep(200);
((MessageCounterManagerImpl)server.getManagementService().getMessageCounterManager()).getMessageCounter(queue.toString()).onTimer();
Thread.sleep(50);
jsonString = queueControl.listMessageCounter();
info = MessageCounterInfo.fromJSON(jsonString);
Assert.assertEquals(2, info.getDepth());
Assert.assertEquals(1, info.getDepthDelta());
Assert.assertEquals(2, info.getCount());
Assert.assertEquals(1, info.getCountDelta());
Assert.assertEquals(info.getUpdateTimestamp(), info.getLastAddTimestamp());
Assert.assertEquals("12/31/69, 6:00:00 PM", info.getLastAckTimestamp()); // no acks received yet
consumeMessages(2, session, queue);
Thread.sleep(200);
((MessageCounterManagerImpl)server.getManagementService().getMessageCounterManager()).getMessageCounter(queue.toString()).onTimer();
Thread.sleep(50);
jsonString = queueControl.listMessageCounter();
info = MessageCounterInfo.fromJSON(jsonString);
Assert.assertEquals(0, info.getDepth());
Assert.assertEquals(-2, info.getDepthDelta());
Assert.assertEquals(2, info.getCount());
Assert.assertEquals(0, info.getCountDelta());
Assert.assertEquals(info.getUpdateTimestamp(), info.getLastAckTimestamp());
session.deleteQueue(queue);
}