From 4d40023968fc334982f15e9c9623b1d74d308931 Mon Sep 17 00:00:00 2001 From: Grzegorz Kochanski Date: Thu, 14 Mar 2024 09:32:40 +0100 Subject: [PATCH] AMQ-9452: unwrap BaseDestination to access queue/topic message Change-Id: Ic05002ecb428e2aa5abeb9dc3e499e3aae550051 --- .../activemq/plugin/StatisticsBroker.java | 5 +++++ .../plugin/BrokerStatisticsPluginTest.java | 19 +++++++++++++++++-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java b/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java index 3429da4a7c..1d9af16f65 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java @@ -34,6 +34,7 @@ import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.SubscriptionViewMBean; import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DestinationFilter; import org.apache.activemq.broker.region.DestinationStatistics; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.RegionBroker; @@ -141,6 +142,10 @@ public class StatisticsBroker extends BrokerFilter { statsMessage.setLong("consumerCount", stats.getConsumers().getCount()); statsMessage.setLong("producerCount", stats.getProducers().getCount()); if (includeFirstMessageTimestamp) { + //AMQ-9452: unwrap BaseDestination + while (dest instanceof DestinationFilter) { + dest = ((DestinationFilter) dest).getNext(); + } if (dest instanceof Queue) { ((Queue) dest).doBrowse(tempFirstMessage, 1); } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java index 9403a91bda..c20d6b8fd7 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java @@ -17,6 +17,9 @@ package org.apache.activemq.plugin; import java.net.URI; +import java.util.Set; +import java.util.stream.Collectors; + import jakarta.jms.Connection; import jakarta.jms.ConnectionFactory; import jakarta.jms.MapMessage; @@ -31,7 +34,11 @@ import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerPluginSupport; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DestinationFilter; +import org.apache.activemq.command.ActiveMQDestination; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -210,6 +217,7 @@ public class BrokerStatisticsPluginTest extends TestCase{ assertTrue(reply.getMapNames().hasMoreElements()); assertEquals(1, reply.getLong("size")); assertTrue(reply.getJMSTimestamp() > 0); + assertTrue(reply.getLong("firstMessageTimestamp") > 0); // Assert that we got the brokerInTime for the first message in queue as value of key "firstMessageTimestamp" assertTrue(System.currentTimeMillis() >= reply.getLong("firstMessageTimestamp")); assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority()); @@ -269,8 +277,15 @@ public class BrokerStatisticsPluginTest extends TestCase{ protected BrokerService createBroker() throws Exception { BrokerService answer = new BrokerService(); - BrokerPlugin[] plugins = new BrokerPlugin[1]; - plugins[0] = new StatisticsBrokerPlugin(); + BrokerPlugin[] plugins = new BrokerPlugin[2]; + //AMQ-9452: proxy destinations with DestinationFilter + plugins[0] = new BrokerPluginSupport() { + @Override + public Set getDestinations(ActiveMQDestination destination) { + return super.getDestinations(destination).stream().map(DestinationFilter::new).collect(Collectors.toSet()); + } + }; + plugins[1] = new StatisticsBrokerPlugin(); answer.setPlugins(plugins); answer.setDeleteAllMessagesOnStartup(true); answer.addConnector("tcp://localhost:0");