[AMQ-9530]Fix SelectorAwareVirtualTopicInterceptor ClassCastException if next is not Topic.

This commit is contained in:
Nikita Shupletsov 2024-07-09 16:44:07 -07:00
parent 4e3084d9bb
commit 73368035af
5 changed files with 62 additions and 21 deletions

View File

@ -0,0 +1,23 @@
package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import java.util.Optional;
public class BaseVirtualDestinationFilter extends DestinationFilter {
public BaseVirtualDestinationFilter(Destination next) {
super(next);
}
Optional<BaseDestination> getBaseDestination(Destination virtualDest) {
if (virtualDest instanceof BaseDestination) {
return Optional.of((BaseDestination) virtualDest);
} else if (virtualDest instanceof DestinationFilter) {
return Optional.ofNullable(((DestinationFilter) virtualDest).getAdaptor(BaseDestination.class));
}
return Optional.empty();
}
}

View File

@ -16,12 +16,12 @@
*/
package org.apache.activemq.broker.region.virtual;
import java.util.Optional;
import java.util.Set;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.IndirectMessageReference;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
@ -34,7 +34,7 @@ import org.apache.activemq.util.SubscriptionKey;
* Creates a mapped Queue that can recover messages from subscription recovery
* policy of its Virtual Topic.
*/
public class MappedQueueFilter extends DestinationFilter {
public class MappedQueueFilter extends BaseVirtualDestinationFilter {
private final ActiveMQDestination virtualDestination;
@ -65,10 +65,10 @@ public class MappedQueueFilter extends DestinationFilter {
if (virtualDest.getActiveMQDestination().isTopic() &&
(virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive())) {
Topic topic = (Topic) getBaseDestination(virtualDest);
if (topic != null) {
Optional<Topic> topic = getBaseDestination(virtualDest).map(Topic.class::cast);
if (topic.isPresent()) {
// re-use browse() to get recovered messages
final Message[] messages = topic.getSubscriptionRecoveryPolicy().browse(topic.getActiveMQDestination());
final Message[] messages = topic.get().getSubscriptionRecoveryPolicy().browse(topic.get().getActiveMQDestination());
// add recovered messages to subscription
for (Message message : messages) {
@ -76,7 +76,8 @@ public class MappedQueueFilter extends DestinationFilter {
copy.setOriginalDestination(message.getDestination());
copy.setDestination(newDestination);
if (regionDest == null) {
regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]);
regionDest = regionBroker.getDestinations(newDestination).stream().findFirst()
.flatMap(this::getBaseDestination).orElse(null);
}
copy.setRegionDestination(regionDest);
sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy);
@ -87,15 +88,6 @@ public class MappedQueueFilter extends DestinationFilter {
}
}
private BaseDestination getBaseDestination(Destination virtualDest) {
if (virtualDest instanceof BaseDestination) {
return (BaseDestination) virtualDest;
} else if (virtualDest instanceof DestinationFilter) {
return ((DestinationFilter) virtualDest).getAdaptor(BaseDestination.class);
}
return null;
}
@Override
public synchronized void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
super.removeSubscription(context, sub, lastDeliveredSequenceId);

View File

@ -17,9 +17,10 @@
package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
@ -41,8 +42,12 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto
public SelectorAwareVirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) {
super(next, virtualTopic);
selectorCachePlugin = (SubQueueSelectorCacheBroker)
((Topic)next).createConnectionContext().getBroker().getAdaptor(SubQueueSelectorCacheBroker.class);
selectorCachePlugin = getBaseDestination(next)
.map(BaseDestination::createConnectionContext)
.map(ConnectionContext::getBroker)
.map(b -> b.getAdaptor(SubQueueSelectorCacheBroker.class))
.map(SubQueueSelectorCacheBroker.class::cast)
.orElse(null);
}
/**

View File

@ -25,7 +25,6 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@ -39,7 +38,7 @@ import jakarta.jms.ResourceAllocationException;
/**
* A Destination which implements <a href="https://activemq.apache.org/virtual-destinations">Virtual Topic</a>
*/
public class VirtualTopicInterceptor extends DestinationFilter {
public class VirtualTopicInterceptor extends BaseVirtualDestinationFilter {
private final String prefix;
private final String postfix;

View File

@ -22,11 +22,15 @@ import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.spring.ConsumerBean;
@ -99,7 +103,25 @@ public class VirtualTopicSelectorTest extends CompositeTopicTest {
virtualTopic.setSelectorAware(true);
VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
interceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic});
broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor});
TestDestinationInterceptor testInterceptor = new TestDestinationInterceptor();
broker.setDestinationInterceptors(new DestinationInterceptor[]{testInterceptor, interceptor});
return broker;
}
private static class TestDestinationInterceptor implements DestinationInterceptor {
@Override
public org.apache.activemq.broker.region.Destination intercept(org.apache.activemq.broker.region.Destination destination) {
return new DestinationFilter(destination);
}
@Override
public void remove(org.apache.activemq.broker.region.Destination destination) {
}
@Override
public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
}
}
}