Merge pull request #749 from stolsvik/main

StatisticsBrokerPlugin: Add feature: request destination firstMessageTimestamp
This commit is contained in:
Jean-Baptiste Onofré 2022-01-30 16:06:34 +01:00 committed by GitHub
commit bd7f391a42
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 107 additions and 13 deletions

View File

@ -654,7 +654,7 @@ public class Topic extends BaseDestination implements Task {
return result.toArray(new Message[result.size()]); return result.toArray(new Message[result.size()]);
} }
private void doBrowse(final List<Message> browseList, final int max) { public void doBrowse(final List<Message> browseList, final int max) {
try { try {
if (topicStore != null) { if (topicStore != null) {
final List<Message> toExpire = new ArrayList<Message>(); final List<Message> toExpire = new ArrayList<Message>();

View File

@ -18,6 +18,8 @@ package org.apache.activemq.plugin;
import java.io.File; import java.io.File;
import java.net.URI; import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Set; import java.util.Set;
import javax.jms.JMSException; import javax.jms.JMSException;
@ -33,7 +35,9 @@ import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.SubscriptionViewMBean; import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics; import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage; import org.apache.activemq.command.ActiveMQMapMessage;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
@ -54,11 +58,17 @@ import org.slf4j.LoggerFactory;
*/ */
public class StatisticsBroker extends BrokerFilter { public class StatisticsBroker extends BrokerFilter {
private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class); private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class);
static final String STATS_PREFIX = "ActiveMQ.Statistics";
static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination"; static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination";
static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker"; static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker";
static final String STATS_BROKER_RESET_HEADER = "ActiveMQ.Statistics.Broker.Reset"; static final String STATS_BROKER_RESET_HEADER = "ActiveMQ.Statistics.Broker.Reset";
static final String STATS_SUBSCRIPTION_PREFIX = "ActiveMQ.Statistics.Subscription"; static final String STATS_SUBSCRIPTION_PREFIX = "ActiveMQ.Statistics.Subscription";
static final String STATS_DENOTE_END_LIST = STATS_DESTINATION_PREFIX + ".List.End.With.Null";
// Query-message properties controlling features of Destination-query replies:
static final String STATS_DENOTE_END_LIST = "ActiveMQ.Statistics.Destination.List.End.With.Null";
static final String STATS_FIRST_MESSAGE_TIMESTAMP = "ActiveMQ.Statistics.Destination.Include.First.Message.Timestamp";
private static final IdGenerator ID_GENERATOR = new IdGenerator(); private static final IdGenerator ID_GENERATOR = new IdGenerator();
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator(); private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
protected final ProducerId advisoryProducerId = new ProducerId(); protected final ProducerId advisoryProducerId = new ProducerId();
@ -85,26 +95,27 @@ public class StatisticsBroker extends BrokerFilter {
public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception { public void send(ProducerBrokerExchange producerExchange, Message messageSend) throws Exception {
ActiveMQDestination msgDest = messageSend.getDestination(); ActiveMQDestination msgDest = messageSend.getDestination();
ActiveMQDestination replyTo = messageSend.getReplyTo(); ActiveMQDestination replyTo = messageSend.getReplyTo();
if (replyTo != null) { if ((replyTo != null) && (msgDest.getPhysicalName().startsWith(STATS_PREFIX))) {
String physicalName = msgDest.getPhysicalName(); String physicalName = msgDest.getPhysicalName();
boolean destStats = physicalName.regionMatches(true, 0, STATS_DESTINATION_PREFIX, 0, boolean destStats = physicalName.startsWith(STATS_DESTINATION_PREFIX);
STATS_DESTINATION_PREFIX.length()); boolean brokerStats = physicalName.startsWith(STATS_BROKER_PREFIX);
boolean brokerStats = physicalName.regionMatches(true, 0, STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX boolean subStats = physicalName.startsWith(STATS_SUBSCRIPTION_PREFIX);
.length());
boolean subStats = physicalName.regionMatches(true, 0, STATS_SUBSCRIPTION_PREFIX, 0, STATS_SUBSCRIPTION_PREFIX
.length());
BrokerService brokerService = getBrokerService(); BrokerService brokerService = getBrokerService();
RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker(); RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
if (destStats) { if (destStats) {
String destinationName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length()); String destinationName = physicalName.substring(STATS_DESTINATION_PREFIX.length());
if (destinationName.startsWith(".")) { if (destinationName.startsWith(".")) {
destinationName = destinationName.substring(1); destinationName = destinationName.substring(1);
} }
String destinationQuery = destinationName.replace(STATS_DENOTE_END_LIST,""); String destinationQuery = destinationName.replace(STATS_DENOTE_END_LIST,"");
boolean endListMessage = !destinationName.equals(destinationQuery); boolean endListMessage = !destinationName.equals(destinationQuery)
|| messageSend.getProperties().containsKey(STATS_DENOTE_END_LIST);
ActiveMQDestination queryDestination = ActiveMQDestination.createDestination(destinationQuery,msgDest.getDestinationType()); ActiveMQDestination queryDestination = ActiveMQDestination.createDestination(destinationQuery,msgDest.getDestinationType());
Set<Destination> destinations = getDestinations(queryDestination); Set<Destination> destinations = getDestinations(queryDestination);
boolean includeFirstMessageTimestamp = messageSend.getProperties().containsKey(STATS_FIRST_MESSAGE_TIMESTAMP);
List<Message> tempFirstMessage = includeFirstMessageTimestamp ? new ArrayList<>(1) : null;
for (Destination dest : destinations) { for (Destination dest : destinations) {
DestinationStatistics stats = dest.getDestinationStatistics(); DestinationStatistics stats = dest.getDestinationStatistics();
if (stats != null) { if (stats != null) {
@ -129,6 +140,21 @@ public class StatisticsBroker extends BrokerFilter {
statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime()); statsMessage.setDouble("minEnqueueTime", stats.getProcessTime().getMinTime());
statsMessage.setLong("consumerCount", stats.getConsumers().getCount()); statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
statsMessage.setLong("producerCount", stats.getProducers().getCount()); statsMessage.setLong("producerCount", stats.getProducers().getCount());
if (includeFirstMessageTimestamp) {
if (dest instanceof Queue) {
((Queue) dest).doBrowse(tempFirstMessage, 1);
}
else if (dest instanceof Topic) {
((Topic) dest).doBrowse(tempFirstMessage, 1);
}
if (!tempFirstMessage.isEmpty()) {
Message message = tempFirstMessage.get(0);
// NOTICE: Client-side, you may get the broker "now" Timestamp by msg.getJMSTimestamp()
// This allows for calculating age.
statsMessage.setLong("firstMessageTimestamp", message.getBrokerInTime());
tempFirstMessage.clear();
}
}
statsMessage.setJMSCorrelationID(messageSend.getCorrelationId()); statsMessage.setJMSCorrelationID(messageSend.getCorrelationId());
sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo); sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
} }

View File

@ -17,7 +17,6 @@
package org.apache.activemq.partition; package org.apache.activemq.partition;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.TransportConnector;

View File

@ -17,7 +17,6 @@
package org.apache.activemq.plugin; package org.apache.activemq.plugin;
import java.net.URI; import java.net.URI;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
import javax.jms.MapMessage; import javax.jms.MapMessage;
@ -153,6 +152,76 @@ public class BrokerStatisticsPluginTest extends TestCase{
*/ */
} }
public void testDestinationStatsWithNullTermination() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue replyTo = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(replyTo);
Queue testQueue = session.createQueue("Test.Queue");
MessageProducer producer = session.createProducer(null);
Queue query = session.createQueue(StatisticsBroker.STATS_DESTINATION_PREFIX + "." + testQueue.getQueueName());
Message msg = session.createMessage();
// Instruct to terminate query reply with a null-message
msg.setBooleanProperty(StatisticsBroker.STATS_DENOTE_END_LIST, true);
producer.send(testQueue, msg);
msg.setJMSReplyTo(replyTo);
producer.send(query, msg);
MapMessage reply = (MapMessage) consumer.receive(10 * 1000);
assertNotNull(reply);
assertTrue(reply.getMapNames().hasMoreElements());
assertEquals(1, reply.getLong("size"));
assertTrue(reply.getJMSTimestamp() > 0);
assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority());
/*
for (Enumeration e = reply.getMapNames(); e.hasMoreElements();) {
String name = e.nextElement().toString();
System.err.println(name+"="+reply.getObject(name));
}
*/
// Assert that we got a null-termination
MapMessage nullReply = (MapMessage) consumer.receive(10 * 1000);
assertNotNull(nullReply);
// No props in null-message
assertFalse(nullReply.getMapNames().hasMoreElements());
assertTrue(nullReply.getJMSTimestamp() > 0);
assertEquals(Message.DEFAULT_PRIORITY, nullReply.getJMSPriority());
}
public void testDestinationStatsWithFirstMessageTimestamp() throws Exception {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue replyTo = session.createTemporaryQueue();
MessageConsumer consumer = session.createConsumer(replyTo);
Queue testQueue = session.createQueue("Test.Queue");
MessageProducer producer = session.createProducer(null);
Queue query = session.createQueue(StatisticsBroker.STATS_DESTINATION_PREFIX + "." + testQueue.getQueueName());
Message msg = session.createMessage();
// Instruct to include timestamp of first message in the queue
msg.setBooleanProperty(StatisticsBroker.STATS_FIRST_MESSAGE_TIMESTAMP, true);
producer.send(testQueue, msg);
msg.setJMSReplyTo(replyTo);
producer.send(query, msg);
MapMessage reply = (MapMessage) consumer.receive(10 * 1000);
assertNotNull(reply);
assertTrue(reply.getMapNames().hasMoreElements());
assertEquals(1, reply.getLong("size"));
assertTrue(reply.getJMSTimestamp() > 0);
// Assert that we got the brokerInTime for the first message in queue as value of key "firstMessageTimestamp"
assertTrue(System.currentTimeMillis() >= reply.getLong("firstMessageTimestamp"));
assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority());
/*
for (Enumeration e = reply.getMapNames(); e.hasMoreElements();) {
String name = e.nextElement().toString();
System.err.println(name+"="+reply.getObject(name));
}
*/
}
@SuppressWarnings("unused") @SuppressWarnings("unused")
public void testSubscriptionStats() throws Exception{ public void testSubscriptionStats() throws Exception{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);