diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/BaseVirtualDestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/BaseVirtualDestinationFilter.java new file mode 100644 index 0000000000..82783e5da5 --- /dev/null +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/BaseVirtualDestinationFilter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.region.virtual; + +import org.apache.activemq.broker.region.BaseDestination; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DestinationFilter; + +import java.util.Optional; + +public class BaseVirtualDestinationFilter extends DestinationFilter { + + public BaseVirtualDestinationFilter(Destination next) { + super(next); + } + + BaseDestination getBaseDestination(Destination virtualDest) { + if (virtualDest instanceof BaseDestination) { + return (BaseDestination) virtualDest; + } else if (virtualDest instanceof DestinationFilter) { + return ((DestinationFilter) virtualDest).getAdaptor(BaseDestination.class); + } + return null; + } +} diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java index 2baa33a398..838e81c0ed 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java @@ -16,12 +16,12 @@ */ package org.apache.activemq.broker.region.virtual; +import java.util.Optional; import java.util.Set; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.BaseDestination; import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.broker.region.DestinationFilter; import org.apache.activemq.broker.region.IndirectMessageReference; import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.Subscription; @@ -34,7 +34,7 @@ import org.apache.activemq.util.SubscriptionKey; * Creates a mapped Queue that can recover messages from subscription recovery * policy of its Virtual Topic. */ -public class MappedQueueFilter extends DestinationFilter { +public class MappedQueueFilter extends BaseVirtualDestinationFilter { private final ActiveMQDestination virtualDestination; @@ -87,15 +87,6 @@ public class MappedQueueFilter extends DestinationFilter { } } - private BaseDestination getBaseDestination(Destination virtualDest) { - if (virtualDest instanceof BaseDestination) { - return (BaseDestination) virtualDest; - } else if (virtualDest instanceof DestinationFilter) { - return ((DestinationFilter) virtualDest).getAdaptor(BaseDestination.class); - } - return null; - } - @Override public synchronized void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { super.removeSubscription(context, sub, lastDeliveredSequenceId); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java index 727f79d380..b46e4f45db 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/SelectorAwareVirtualTopicInterceptor.java @@ -17,9 +17,9 @@ package org.apache.activemq.broker.region.virtual; import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.region.BaseDestination; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Subscription; -import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.Message; import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.MessageEvaluationContext; @@ -41,8 +41,11 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto public SelectorAwareVirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) { super(next, virtualTopic); + BaseDestination baseDestination = getBaseDestination(next); selectorCachePlugin = (SubQueueSelectorCacheBroker) - ((Topic)next).createConnectionContext().getBroker().getAdaptor(SubQueueSelectorCacheBroker.class); + (baseDestination != null + ? baseDestination.createConnectionContext().getBroker().getAdaptor(SubQueueSelectorCacheBroker.class) + : null); } /** diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java index cdf683c56f..beb77dbf2a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java @@ -25,7 +25,6 @@ import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.Destination; -import org.apache.activemq.broker.region.DestinationFilter; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; @@ -39,7 +38,7 @@ import jakarta.jms.ResourceAllocationException; /** * A Destination which implements Virtual Topic */ -public class VirtualTopicInterceptor extends DestinationFilter { +public class VirtualTopicInterceptor extends BaseVirtualDestinationFilter { private final String prefix; private final String postfix; diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorWithAnotherInterceptorTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorWithAnotherInterceptorTest.java new file mode 100644 index 0000000000..a4a9967bed --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorWithAnotherInterceptorTest.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import jakarta.jms.Destination; +import jakarta.jms.JMSException; +import jakarta.jms.MessageConsumer; +import jakarta.jms.MessageProducer; +import jakarta.jms.Session; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.DestinationFilter; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; +import org.apache.activemq.broker.region.virtual.VirtualTopic; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.spring.ConsumerBean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class VirtualTopicSelectorWithAnotherInterceptorTest extends CompositeTopicTest { + + private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicSelectorWithAnotherInterceptorTest.class); + + protected Destination getConsumer1Dsetination() { + return new ActiveMQQueue("Consumer.1.VirtualTopic.TEST"); + } + + protected Destination getConsumer2Dsetination() { + return new ActiveMQQueue("Consumer.2.VirtualTopic.TEST"); + } + + protected Destination getProducerDestination() { + return new ActiveMQTopic("VirtualTopic.TEST"); + } + + @Override + protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) { + messageList1.assertMessagesArrived(total / 2); + messageList2.assertMessagesArrived(total / 2); + + messageList1.flushMessages(); + messageList2.flushMessages(); + + LOG.info("validate no other messages on queues"); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination destination1 = getConsumer1Dsetination(); + Destination destination2 = getConsumer2Dsetination(); + MessageConsumer c1 = session.createConsumer(destination1, null); + MessageConsumer c2 = session.createConsumer(destination2, null); + c1.setMessageListener(messageList1); + c2.setMessageListener(messageList2); + + + LOG.info("send one simple message that should go to both consumers"); + MessageProducer producer = session.createProducer(getProducerDestination()); + assertNotNull(producer); + + producer.send(session.createTextMessage("Last Message")); + + messageList1.assertMessagesArrived(1); + messageList2.assertMessagesArrived(1); + + } catch (JMSException e) { + e.printStackTrace(); + fail("unexpeced ex while waiting for last messages: " + e); + } + } + + @Override + protected BrokerService createBroker() throws Exception { + // use message selectors on consumers that need to propagate up to the virtual + // topic dispatch so that un matched messages do not linger on subscription queues + messageSelector1 = "odd = 'yes'"; + messageSelector2 = "odd = 'no'"; + + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + + VirtualTopic virtualTopic = new VirtualTopic(); + // the new config that enables selectors on the interceptor + virtualTopic.setSelectorAware(true); + VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); + interceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic}); + TestDestinationInterceptor testInterceptor = new TestDestinationInterceptor(); + broker.setDestinationInterceptors(new DestinationInterceptor[]{testInterceptor, interceptor}); + return broker; + } + + private static class TestDestinationInterceptor implements DestinationInterceptor { + + @Override + public org.apache.activemq.broker.region.Destination intercept(org.apache.activemq.broker.region.Destination destination) { + return new DestinationFilter(destination); + } + + @Override + public void remove(org.apache.activemq.broker.region.Destination destination) { + } + + @Override + public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception { + } + } + +}