AMQ-9452: unwrap BaseDestination to access queue/topic message

Change-Id: Ic05002ecb428e2aa5abeb9dc3e499e3aae550051

(cherry picked from commit 4d40023968)
This commit is contained in:
Grzegorz Kochanski 2024-03-14 09:32:40 +01:00 committed by JB Onofré
parent 781a23d145
commit 8beea0faca
2 changed files with 22 additions and 2 deletions

View File

@ -34,6 +34,7 @@ import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.SubscriptionViewMBean; import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.DestinationStatistics; import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.RegionBroker;
@ -141,6 +142,10 @@ public class StatisticsBroker extends BrokerFilter {
statsMessage.setLong("consumerCount", stats.getConsumers().getCount()); statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
statsMessage.setLong("producerCount", stats.getProducers().getCount()); statsMessage.setLong("producerCount", stats.getProducers().getCount());
if (includeFirstMessageTimestamp) { if (includeFirstMessageTimestamp) {
//AMQ-9452: unwrap BaseDestination
while (dest instanceof DestinationFilter) {
dest = ((DestinationFilter) dest).getNext();
}
if (dest instanceof Queue) { if (dest instanceof Queue) {
((Queue) dest).doBrowse(tempFirstMessage, 1); ((Queue) dest).doBrowse(tempFirstMessage, 1);
} }

View File

@ -17,6 +17,8 @@
package org.apache.activemq.plugin; package org.apache.activemq.plugin;
import java.net.URI; import java.net.URI;
import java.util.Set;
import java.util.stream.Collectors;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
import javax.jms.MapMessage; import javax.jms.MapMessage;
@ -26,12 +28,17 @@ import javax.jms.MessageProducer;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory; import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerPlugin; import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.command.ActiveMQDestination;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -210,6 +217,7 @@ public class BrokerStatisticsPluginTest extends TestCase{
assertTrue(reply.getMapNames().hasMoreElements()); assertTrue(reply.getMapNames().hasMoreElements());
assertEquals(1, reply.getLong("size")); assertEquals(1, reply.getLong("size"));
assertTrue(reply.getJMSTimestamp() > 0); assertTrue(reply.getJMSTimestamp() > 0);
assertTrue(reply.getLong("firstMessageTimestamp") > 0);
// Assert that we got the brokerInTime for the first message in queue as value of key "firstMessageTimestamp" // Assert that we got the brokerInTime for the first message in queue as value of key "firstMessageTimestamp"
assertTrue(System.currentTimeMillis() >= reply.getLong("firstMessageTimestamp")); assertTrue(System.currentTimeMillis() >= reply.getLong("firstMessageTimestamp"));
assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority()); assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority());
@ -269,8 +277,15 @@ public class BrokerStatisticsPluginTest extends TestCase{
protected BrokerService createBroker() throws Exception { protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService(); BrokerService answer = new BrokerService();
BrokerPlugin[] plugins = new BrokerPlugin[1]; BrokerPlugin[] plugins = new BrokerPlugin[2];
plugins[0] = new StatisticsBrokerPlugin(); //AMQ-9452: proxy destinations with DestinationFilter
plugins[0] = new BrokerPluginSupport() {
@Override
public Set<Destination> getDestinations(ActiveMQDestination destination) {
return super.getDestinations(destination).stream().map(DestinationFilter::new).collect(Collectors.toSet());
}
};
plugins[1] = new StatisticsBrokerPlugin();
answer.setPlugins(plugins); answer.setPlugins(plugins);
answer.setDeleteAllMessagesOnStartup(true); answer.setDeleteAllMessagesOnStartup(true);
answer.addConnector("tcp://localhost:0"); answer.addConnector("tcp://localhost:0");