diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index c1078e39ba..834cd1425b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -654,7 +654,7 @@ public class Topic extends BaseDestination implements Task { return result.toArray(new Message[result.size()]); } - private void doBrowse(final List browseList, final int max) { + public void doBrowse(final List browseList, final int max) { try { if (topicStore != null) { final List toExpire = new ArrayList(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java b/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java index 7476c3ec9e..92bb14cbd5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java @@ -18,6 +18,8 @@ package org.apache.activemq.plugin; import java.io.File; import java.net.URI; +import java.util.ArrayList; +import java.util.List; import java.util.Set; import javax.jms.JMSException; @@ -33,7 +35,9 @@ import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.SubscriptionViewMBean; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQMapMessage; import org.apache.activemq.command.Message; @@ -54,11 +58,17 @@ import org.slf4j.LoggerFactory; */ public class StatisticsBroker extends BrokerFilter { private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class); + static final String STATS_PREFIX = "ActiveMQ.Statistics"; + static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination"; static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker"; static final String STATS_BROKER_RESET_HEADER = "ActiveMQ.Statistics.Broker.Reset"; static final String STATS_SUBSCRIPTION_PREFIX = "ActiveMQ.Statistics.Subscription"; - static final String STATS_DENOTE_END_LIST = STATS_DESTINATION_PREFIX + ".List.End.With.Null"; + + // Query-message properties controlling features of Destination-query replies: + static final String STATS_DENOTE_END_LIST = "ActiveMQ.Statistics.Destination.List.End.With.Null"; + static final String STATS_FIRST_MESSAGE_TIMESTAMP = "ActiveMQ.Statistics.Destination.Include.First.Message.Timestamp"; + private static final IdGenerator ID_GENERATOR = new IdGenerator(); private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); protected final ProducerId advisoryProducerId = new ProducerId(); @@ -85,26 +95,27 @@ public class StatisticsBroker extends BrokerFilter { public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { ActiveMQDestination msgDest = messageSend.getDestination(); ActiveMQDestination replyTo = messageSend.getReplyTo(); - if (replyTo != null) { + if ((replyTo != null) && (msgDest.getPhysicalName().startsWith(STATS_PREFIX))) { String physicalName = msgDest.getPhysicalName(); - boolean destStats = physicalName.regionMatches(true, 0, STATS_DESTINATION_PREFIX, 0, - STATS_DESTINATION_PREFIX.length()); - boolean brokerStats = physicalName.regionMatches(true, 0, STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX - .length()); - boolean subStats = physicalName.regionMatches(true, 0, STATS_SUBSCRIPTION_PREFIX, 0, STATS_SUBSCRIPTION_PREFIX - .length()); + boolean destStats = physicalName.startsWith(STATS_DESTINATION_PREFIX); + boolean brokerStats = physicalName.startsWith(STATS_BROKER_PREFIX); + boolean subStats = physicalName.startsWith(STATS_SUBSCRIPTION_PREFIX); BrokerService brokerService = getBrokerService(); RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); if (destStats) { - String destinationName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length()); + String destinationName = physicalName.substring(STATS_DESTINATION_PREFIX.length()); if (destinationName.startsWith(".")) { destinationName = destinationName.substring(1); } String destinationQuery = destinationName.replace(STATS_DENOTE_END_LIST,""); - boolean endListMessage = !destinationName.equals(destinationQuery); + boolean endListMessage = !destinationName.equals(destinationQuery) + || messageSend.getProperties().containsKey(STATS_DENOTE_END_LIST); ActiveMQDestination queryDestination = ActiveMQDestination.createDestination(destinationQuery,msgDest.getDestinationType()); Set destinations = getDestinations(queryDestination); + boolean includeFirstMessageTimestamp = messageSend.getProperties().containsKey(STATS_FIRST_MESSAGE_TIMESTAMP); + List tempFirstMessage = includeFirstMessageTimestamp ? new ArrayList<>(1) : null; + for (Destination dest : destinations) { DestinationStatistics stats = dest.getDestinationStatistics(); if (stats != null) { @@ -129,6 +140,21 @@ public class StatisticsBroker extends BrokerFilter { statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime()); statsMessage.setLong("consumerCount", stats.getConsumers().getCount()); statsMessage.setLong("producerCount", stats.getProducers().getCount()); + if (includeFirstMessageTimestamp) { + if (dest instanceof Queue) { + ((Queue) dest).doBrowse(tempFirstMessage, 1); + } + else if (dest instanceof Topic) { + ((Topic) dest).doBrowse(tempFirstMessage, 1); + } + if (!tempFirstMessage.isEmpty()) { + Message message = tempFirstMessage.get(0); + // NOTICE: Client-side, you may get the broker "now" Timestamp by msg.getJMSTimestamp() + // This allows for calculating age. + statsMessage.setLong("firstMessageTimestamp", message.getBrokerInTime()); + tempFirstMessage.clear(); + } + } statsMessage.setJMSCorrelationID(messageSend.getCorrelationId()); sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo); } diff --git a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java b/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java index cc406d8cb1..1b49f0b008 100644 --- a/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java +++ b/activemq-partition/src/test/java/org/apache/activemq/partition/PartitionBrokerTest.java @@ -17,7 +17,6 @@ package org.apache.activemq.partition; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.AutoFailTestSupport; import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java index b003a161e7..9a8a5d263c 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java @@ -17,7 +17,6 @@ package org.apache.activemq.plugin; import java.net.URI; - import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.MapMessage; @@ -153,6 +152,76 @@ public class BrokerStatisticsPluginTest extends TestCase{ */ } + public void testDestinationStatsWithNullTermination() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue replyTo = session.createTemporaryQueue(); + MessageConsumer consumer = session.createConsumer(replyTo); + Queue testQueue = session.createQueue("Test.Queue"); + MessageProducer producer = session.createProducer(null); + Queue query = session.createQueue(StatisticsBroker.STATS_DESTINATION_PREFIX + "." + testQueue.getQueueName()); + Message msg = session.createMessage(); + // Instruct to terminate query reply with a null-message + msg.setBooleanProperty(StatisticsBroker.STATS_DENOTE_END_LIST, true); + + producer.send(testQueue, msg); + + msg.setJMSReplyTo(replyTo); + producer.send(query, msg); + MapMessage reply = (MapMessage) consumer.receive(10 * 1000); + assertNotNull(reply); + assertTrue(reply.getMapNames().hasMoreElements()); + assertEquals(1, reply.getLong("size")); + assertTrue(reply.getJMSTimestamp() > 0); + assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority()); + + /* + for (Enumeration e = reply.getMapNames(); e.hasMoreElements();) { + String name = e.nextElement().toString(); + System.err.println(name+"="+reply.getObject(name)); + } + */ + + // Assert that we got a null-termination + MapMessage nullReply = (MapMessage) consumer.receive(10 * 1000); + assertNotNull(nullReply); + // No props in null-message + assertFalse(nullReply.getMapNames().hasMoreElements()); + assertTrue(nullReply.getJMSTimestamp() > 0); + assertEquals(Message.DEFAULT_PRIORITY, nullReply.getJMSPriority()); + } + + public void testDestinationStatsWithFirstMessageTimestamp() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue replyTo = session.createTemporaryQueue(); + MessageConsumer consumer = session.createConsumer(replyTo); + Queue testQueue = session.createQueue("Test.Queue"); + MessageProducer producer = session.createProducer(null); + Queue query = session.createQueue(StatisticsBroker.STATS_DESTINATION_PREFIX + "." + testQueue.getQueueName()); + Message msg = session.createMessage(); + // Instruct to include timestamp of first message in the queue + msg.setBooleanProperty(StatisticsBroker.STATS_FIRST_MESSAGE_TIMESTAMP, true); + + producer.send(testQueue, msg); + + msg.setJMSReplyTo(replyTo); + producer.send(query, msg); + MapMessage reply = (MapMessage) consumer.receive(10 * 1000); + assertNotNull(reply); + assertTrue(reply.getMapNames().hasMoreElements()); + assertEquals(1, reply.getLong("size")); + assertTrue(reply.getJMSTimestamp() > 0); + // Assert that we got the brokerInTime for the first message in queue as value of key "firstMessageTimestamp" + assertTrue(System.currentTimeMillis() >= reply.getLong("firstMessageTimestamp")); + assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority()); + + /* + for (Enumeration e = reply.getMapNames(); e.hasMoreElements();) { + String name = e.nextElement().toString(); + System.err.println(name+"="+reply.getObject(name)); + } + */ + } + @SuppressWarnings("unused") public void testSubscriptionStats() throws Exception{ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);