From c81740592b29871be1bc9a74915af4ba867c9895 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20St=C3=B8lsvik?= Date: Mon, 24 Jan 2022 23:46:36 +0100 Subject: [PATCH] StatisticsBrokerPlugin: Add feat: request destination firstMessageTimestamp Adding a feature (STATS_FIRST_MESSAGE_TIMESTAMP) to the StatisticsBrokerPlugin's destination-statistics for getting the timestamp of the first message in the destination(s) being requested: If you on the query-message set the property StatisticsBroker.STATS_FIRST_MESSAGE_TIMESTAMP to anything (e.g. boolean true), a long value "firstMessageTimestamp" will be added to the statistics reply message(s). Since the reply message has JMSTimestamp set, which is the broker's now-timestamp, you may also on the query side calculate the age of the first message in milliseconds. The key name was chosen since that is the name of the corresponding feature in Artemis. This extension of the existing feature is implemented to be as non-intrusive as possible, adding very little runtime cost if not requested. It also seems like the runtime cost for enabling this feature, thus finding and adding the firstMessageTimestamp, is small. While at it, also slightly improving an existing feature (STATS_DENOTE_END_LIST) where a reply to a destination query can be "null terminated": After sending the relevant replies, the StatisticsBroker also sends an empty message. This feature is relevant if the query is a wildcard query, thus returning multiple messages: The empty message denotes the end of the replies. However, to activate this feature, a somewhat complicated query destination had to be constructed. Adopting the solution for the other StatisticsBroker feature where you may reset the broker statistics by adding a property to the query message, this null-termination feature now /also/ checks for the presence of this query modifier STATS_DENOTE_END_LIST as a property. (This property based solution was thus also adopted for the present 'firstMessageTimestamp' solution, as it was found much more intuitive). Added tests for both the STATS_FIRST_MESSAGE_TIMESTAMP query modifier, and the improved STATS_DENOTE_END_LIST property-based query modifier. Had to make the Topic.doBrowse(List browseList, int max) public - the corresponding method for Queue was already public. Made the evaluation of whether this is a StatisticsBroker-relevant message a microscopic bit more performant (exiting faster if not relevant): To the initial test of whether the message is relevant, which only checked for replyTo being set, a check for 'destination. startsWith("ActiveMQ.Statistics")' was added. Only if so, the rest of the evaluations kick in. Also using 'string.startsWith(..)' instead of the verbose 'string.regionMatches(..)'. Removed an unused import on PartitionBrokerTest.java, as IntelliJ complained about not finding it. (cherry picked from commit 9167a79b79e4c121cfe0a5b82456f52bf3ecc3c7) --- .../apache/activemq/broker/region/Topic.java | 2 +- .../activemq/plugin/StatisticsBroker.java | 46 +++++++++--- .../partition/PartitionBrokerTest.java | 1 - .../plugin/BrokerStatisticsPluginTest.java | 71 ++++++++++++++++++- 4 files changed, 107 insertions(+), 13 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index c1078e39ba..834cd1425b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -654,7 +654,7 @@ public class Topic extends BaseDestination implements Task { return result.toArray(new Message[result.size()]); } - private void doBrowse(final List browseList, final int max) { + public void doBrowse(final List browseList, final int max) { try { if (topicStore != null) { final List toExpire = new ArrayList(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java b/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java index 7476c3ec9e..92bb14cbd5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java @@ -18,6 +18,8 @@ package org.apache.activemq.plugin; import java.io.File; import java.net.URI; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import javax.jms.JMSException; @@ -33,7 +35,9 @@ import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.SubscriptionViewMBean; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMapMessage; import org.apache.activemq.command.Message; @@ -54,11 +58,17 @@ import org.slf4j.LoggerFactory; */ public class StatisticsBroker extends BrokerFilter { private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class); + static final String STATS_PREFIX = "ActiveMQ.Statistics"; + static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination"; static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker"; static final String STATS_BROKER_RESET_HEADER = "ActiveMQ.Statistics.Broker.Reset"; static final String STATS_SUBSCRIPTION_PREFIX = "ActiveMQ.Statistics.Subscription"; - static final String STATS_DENOTE_END_LIST = STATS_DESTINATION_PREFIX + ".List.End.With.Null"; + + // Query-message properties controlling features of Destination-query replies: + static final String STATS_DENOTE_END_LIST = "ActiveMQ.Statistics.Destination.List.End.With.Null"; + static final String STATS_FIRST_MESSAGE_TIMESTAMP = "ActiveMQ.Statistics.Destination.Include.First.Message.Timestamp"; + private static final IdGenerator ID_GENERATOR = new IdGenerator(); private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); protected final ProducerId advisoryProducerId = new ProducerId(); @@ -85,26 +95,27 @@ public class StatisticsBroker extends BrokerFilter { public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { ActiveMQDestination msgDest = messageSend.getDestination(); ActiveMQDestination replyTo = messageSend.getReplyTo(); - if (replyTo != null) { + if ((replyTo != null) && (msgDest.getPhysicalName().startsWith(STATS_PREFIX))) { String physicalName = msgDest.getPhysicalName(); - boolean destStats = physicalName.regionMatches(true, 0, STATS_DESTINATION_PREFIX, 0, - STATS_DESTINATION_PREFIX.length()); - boolean brokerStats = physicalName.regionMatches(true, 0, STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX - .length()); - boolean subStats = physicalName.regionMatches(true, 0, STATS_SUBSCRIPTION_PREFIX, 0, STATS_SUBSCRIPTION_PREFIX - .length()); + boolean destStats = physicalName.startsWith(STATS_DESTINATION_PREFIX); + boolean brokerStats = physicalName.startsWith(STATS_BROKER_PREFIX); + boolean subStats = physicalName.startsWith(STATS_SUBSCRIPTION_PREFIX); BrokerService brokerService = getBrokerService(); RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); if (destStats) { - String destinationName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length()); + String destinationName = physicalName.substring(STATS_DESTINATION_PREFIX.length()); if (destinationName.startsWith(".")) { destinationName = destinationName.substring(1); } String destinationQuery = destinationName.replace(STATS_DENOTE_END_LIST,""); - boolean endListMessage = !destinationName.equals(destinationQuery); + boolean endListMessage = !destinationName.equals(destinationQuery) + || messageSend.getProperties().containsKey(STATS_DENOTE_END_LIST); ActiveMQDestination queryDestination = ActiveMQDestination.createDestination(destinationQuery,msgDest.getDestinationType()); Set destinations = getDestinations(queryDestination); + boolean includeFirstMessageTimestamp = messageSend.getProperties().containsKey(STATS_FIRST_MESSAGE_TIMESTAMP); + List tempFirstMessage = includeFirstMessageTimestamp ? new ArrayList<>(1) : null; + for (Destination dest : destinations) { DestinationStatistics stats = dest.getDestinationStatistics(); if (stats != null) { @@ -129,6 +140,21 @@ public class StatisticsBroker extends BrokerFilter { statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime()); statsMessage.setLong("consumerCount", stats.getConsumers().getCount()); statsMessage.setLong("producerCount", stats.getProducers().getCount()); + if (includeFirstMessageTimestamp) { + if (dest instanceof Queue) { + ((Queue) dest).doBrowse(tempFirstMessage, 1); + } + else if (dest instanceof Topic) { + ((Topic) dest).doBrowse(tempFirstMessage, 1); + } + if (!tempFirstMessage.isEmpty()) { + Message message = tempFirstMessage.get(0); + // NOTICE: Client-side, you may get the broker "now" Timestamp by msg.getJMSTimestamp() + // This allows for calculating age. + statsMessage.setLong("firstMessageTimestamp", message.getBrokerInTime()); + tempFirstMessage.clear(); + } + } statsMessage.setJMSCorrelationID(messageSend.getCorrelationId()); sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo); } diff --git a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java b/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java index cc406d8cb1..1b49f0b008 100644 --- a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java +++ b/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java @@ -17,7 +17,6 @@ package org.apache.activemq.partition; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.AutoFailTestSupport; import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java index b003a161e7..9a8a5d263c 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java @@ -17,7 +17,6 @@ package org.apache.activemq.plugin; import java.net.URI; - import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MapMessage; @@ -153,6 +152,76 @@ public class BrokerStatisticsPluginTest extends TestCase{ */ } + public void testDestinationStatsWithNullTermination() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue replyTo = session.createTemporaryQueue(); + MessageConsumer consumer = session.createConsumer(replyTo); + Queue testQueue = session.createQueue("Test.Queue"); + MessageProducer producer = session.createProducer(null); + Queue query = session.createQueue(StatisticsBroker.STATS_DESTINATION_PREFIX + "." + testQueue.getQueueName()); + Message msg = session.createMessage(); + // Instruct to terminate query reply with a null-message + msg.setBooleanProperty(StatisticsBroker.STATS_DENOTE_END_LIST, true); + + producer.send(testQueue, msg); + + msg.setJMSReplyTo(replyTo); + producer.send(query, msg); + MapMessage reply = (MapMessage) consumer.receive(10 * 1000); + assertNotNull(reply); + assertTrue(reply.getMapNames().hasMoreElements()); + assertEquals(1, reply.getLong("size")); + assertTrue(reply.getJMSTimestamp() > 0); + assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority()); + + /* + for (Enumeration e = reply.getMapNames(); e.hasMoreElements();) { + String name = e.nextElement().toString(); + System.err.println(name+"="+reply.getObject(name)); + } + */ + + // Assert that we got a null-termination + MapMessage nullReply = (MapMessage) consumer.receive(10 * 1000); + assertNotNull(nullReply); + // No props in null-message + assertFalse(nullReply.getMapNames().hasMoreElements()); + assertTrue(nullReply.getJMSTimestamp() > 0); + assertEquals(Message.DEFAULT_PRIORITY, nullReply.getJMSPriority()); + } + + public void testDestinationStatsWithFirstMessageTimestamp() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue replyTo = session.createTemporaryQueue(); + MessageConsumer consumer = session.createConsumer(replyTo); + Queue testQueue = session.createQueue("Test.Queue"); + MessageProducer producer = session.createProducer(null); + Queue query = session.createQueue(StatisticsBroker.STATS_DESTINATION_PREFIX + "." + testQueue.getQueueName()); + Message msg = session.createMessage(); + // Instruct to include timestamp of first message in the queue + msg.setBooleanProperty(StatisticsBroker.STATS_FIRST_MESSAGE_TIMESTAMP, true); + + producer.send(testQueue, msg); + + msg.setJMSReplyTo(replyTo); + producer.send(query, msg); + MapMessage reply = (MapMessage) consumer.receive(10 * 1000); + assertNotNull(reply); + assertTrue(reply.getMapNames().hasMoreElements()); + assertEquals(1, reply.getLong("size")); + assertTrue(reply.getJMSTimestamp() > 0); + // Assert that we got the brokerInTime for the first message in queue as value of key "firstMessageTimestamp" + assertTrue(System.currentTimeMillis() >= reply.getLong("firstMessageTimestamp")); + assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority()); + + /* + for (Enumeration e = reply.getMapNames(); e.hasMoreElements();) { + String name = e.nextElement().toString(); + System.err.println(name+"="+reply.getObject(name)); + } + */ + } + @SuppressWarnings("unused") public void testSubscriptionStats() throws Exception{ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);