AMQ-9452: unwrap BaseDestination to access queue/topic message

Change-Id: Ic05002ecb428e2aa5abeb9dc3e499e3aae550051
This commit is contained in:
Grzegorz Kochanski 2024-03-14 09:32:40 +01:00
parent 72bbb0c521
commit 4d40023968
2 changed files with 22 additions and 2 deletions

View File

@ -34,6 +34,7 @@ import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.RegionBroker;
@ -141,6 +142,10 @@ public class StatisticsBroker extends BrokerFilter {
statsMessage.setLong("consumerCount", stats.getConsumers().getCount());
statsMessage.setLong("producerCount", stats.getProducers().getCount());
if (includeFirstMessageTimestamp) {
//AMQ-9452: unwrap BaseDestination
while (dest instanceof DestinationFilter) {
dest = ((DestinationFilter) dest).getNext();
}
if (dest instanceof Queue) {
((Queue) dest).doBrowse(tempFirstMessage, 1);
}

View File

@ -17,6 +17,9 @@
package org.apache.activemq.plugin;
import java.net.URI;
import java.util.Set;
import java.util.stream.Collectors;
import jakarta.jms.Connection;
import jakarta.jms.ConnectionFactory;
import jakarta.jms.MapMessage;
@ -31,7 +34,11 @@ import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerPluginSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.command.ActiveMQDestination;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -210,6 +217,7 @@ public class BrokerStatisticsPluginTest extends TestCase{
assertTrue(reply.getMapNames().hasMoreElements());
assertEquals(1, reply.getLong("size"));
assertTrue(reply.getJMSTimestamp() > 0);
assertTrue(reply.getLong("firstMessageTimestamp") > 0);
// Assert that we got the brokerInTime for the first message in queue as value of key "firstMessageTimestamp"
assertTrue(System.currentTimeMillis() >= reply.getLong("firstMessageTimestamp"));
assertEquals(Message.DEFAULT_PRIORITY, reply.getJMSPriority());
@ -269,8 +277,15 @@ public class BrokerStatisticsPluginTest extends TestCase{
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
BrokerPlugin[] plugins = new BrokerPlugin[1];
plugins[0] = new StatisticsBrokerPlugin();
BrokerPlugin[] plugins = new BrokerPlugin[2];
//AMQ-9452: proxy destinations with DestinationFilter
plugins[0] = new BrokerPluginSupport() {
@Override
public Set<Destination> getDestinations(ActiveMQDestination destination) {
return super.getDestinations(destination).stream().map(DestinationFilter::new).collect(Collectors.toSet());
}
};
plugins[1] = new StatisticsBrokerPlugin();
answer.setPlugins(plugins);
answer.setDeleteAllMessagesOnStartup(true);
answer.addConnector("tcp://localhost:0");