Merge pull request #1247 from Nikita-Shupletsov/virtual_topic_fix

[AMQ-9530]Fix SelectorAwareVirtualTopicInterceptor ClassCastException if next is not Topic.
This commit is contained in:
JB Onofré 2024-11-08 11:09:15 +01:00 committed by GitHub
commit be1856cace
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 173 additions and 15 deletions

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import java.util.Optional;
public class BaseVirtualDestinationFilter extends DestinationFilter {
public BaseVirtualDestinationFilter(Destination next) {
super(next);
}
BaseDestination getBaseDestination(Destination virtualDest) {
if (virtualDest instanceof BaseDestination) {
return (BaseDestination) virtualDest;
} else if (virtualDest instanceof DestinationFilter) {
return ((DestinationFilter) virtualDest).getAdaptor(BaseDestination.class);
}
return null;
}
}

View File

@ -16,12 +16,12 @@
*/ */
package org.apache.activemq.broker.region.virtual; package org.apache.activemq.broker.region.virtual;
import java.util.Optional;
import java.util.Set; import java.util.Set;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.BaseDestination; import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.IndirectMessageReference; import org.apache.activemq.broker.region.IndirectMessageReference;
import org.apache.activemq.broker.region.RegionBroker; import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
@ -34,7 +34,7 @@ import org.apache.activemq.util.SubscriptionKey;
* Creates a mapped Queue that can recover messages from subscription recovery * Creates a mapped Queue that can recover messages from subscription recovery
* policy of its Virtual Topic. * policy of its Virtual Topic.
*/ */
public class MappedQueueFilter extends DestinationFilter { public class MappedQueueFilter extends BaseVirtualDestinationFilter {
private final ActiveMQDestination virtualDestination; private final ActiveMQDestination virtualDestination;
@ -87,15 +87,6 @@ public class MappedQueueFilter extends DestinationFilter {
} }
} }
private BaseDestination getBaseDestination(Destination virtualDest) {
if (virtualDest instanceof BaseDestination) {
return (BaseDestination) virtualDest;
} else if (virtualDest instanceof DestinationFilter) {
return ((DestinationFilter) virtualDest).getAdaptor(BaseDestination.class);
}
return null;
}
@Override @Override
public synchronized void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception { public synchronized void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
super.removeSubscription(context, sub, lastDeliveredSequenceId); super.removeSubscription(context, sub, lastDeliveredSequenceId);

View File

@ -17,9 +17,9 @@
package org.apache.activemq.broker.region.virtual; package org.apache.activemq.broker.region.virtual;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.BaseDestination;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
@ -41,8 +41,11 @@ public class SelectorAwareVirtualTopicInterceptor extends VirtualTopicIntercepto
public SelectorAwareVirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) { public SelectorAwareVirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) {
super(next, virtualTopic); super(next, virtualTopic);
BaseDestination baseDestination = getBaseDestination(next);
selectorCachePlugin = (SubQueueSelectorCacheBroker) selectorCachePlugin = (SubQueueSelectorCacheBroker)
((Topic)next).createConnectionContext().getBroker().getAdaptor(SubQueueSelectorCacheBroker.class); (baseDestination != null
? baseDestination.createConnectionContext().getBroker().getAdaptor(SubQueueSelectorCacheBroker.class)
: null);
} }
/** /**

View File

@ -25,7 +25,6 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
@ -39,7 +38,7 @@ import jakarta.jms.ResourceAllocationException;
/** /**
* A Destination which implements <a href="https://activemq.apache.org/virtual-destinations">Virtual Topic</a> * A Destination which implements <a href="https://activemq.apache.org/virtual-destinations">Virtual Topic</a>
*/ */
public class VirtualTopicInterceptor extends DestinationFilter { public class VirtualTopicInterceptor extends BaseVirtualDestinationFilter {
private final String prefix; private final String prefix;
private final String postfix; private final String postfix;

View File

@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.virtual;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.DestinationFilter;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.spring.ConsumerBean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class VirtualTopicSelectorWithAnotherInterceptorTest extends CompositeTopicTest {
private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicSelectorWithAnotherInterceptorTest.class);
protected Destination getConsumer1Dsetination() {
return new ActiveMQQueue("Consumer.1.VirtualTopic.TEST");
}
protected Destination getConsumer2Dsetination() {
return new ActiveMQQueue("Consumer.2.VirtualTopic.TEST");
}
protected Destination getProducerDestination() {
return new ActiveMQTopic("VirtualTopic.TEST");
}
@Override
protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) {
messageList1.assertMessagesArrived(total / 2);
messageList2.assertMessagesArrived(total / 2);
messageList1.flushMessages();
messageList2.flushMessages();
LOG.info("validate no other messages on queues");
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination1 = getConsumer1Dsetination();
Destination destination2 = getConsumer2Dsetination();
MessageConsumer c1 = session.createConsumer(destination1, null);
MessageConsumer c2 = session.createConsumer(destination2, null);
c1.setMessageListener(messageList1);
c2.setMessageListener(messageList2);
LOG.info("send one simple message that should go to both consumers");
MessageProducer producer = session.createProducer(getProducerDestination());
assertNotNull(producer);
producer.send(session.createTextMessage("Last Message"));
messageList1.assertMessagesArrived(1);
messageList2.assertMessagesArrived(1);
} catch (JMSException e) {
e.printStackTrace();
fail("unexpeced ex while waiting for last messages: " + e);
}
}
@Override
protected BrokerService createBroker() throws Exception {
// use message selectors on consumers that need to propagate up to the virtual
// topic dispatch so that un matched messages do not linger on subscription queues
messageSelector1 = "odd = 'yes'";
messageSelector2 = "odd = 'no'";
BrokerService broker = new BrokerService();
broker.setPersistent(false);
VirtualTopic virtualTopic = new VirtualTopic();
// the new config that enables selectors on the interceptor
virtualTopic.setSelectorAware(true);
VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
interceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic});
TestDestinationInterceptor testInterceptor = new TestDestinationInterceptor();
broker.setDestinationInterceptors(new DestinationInterceptor[]{testInterceptor, interceptor});
return broker;
}
private static class TestDestinationInterceptor implements DestinationInterceptor {
@Override
public org.apache.activemq.broker.region.Destination intercept(org.apache.activemq.broker.region.Destination destination) {
return new DestinationFilter(destination);
}
@Override
public void remove(org.apache.activemq.broker.region.Destination destination) {
}
@Override
public void create(Broker broker, ConnectionContext context, ActiveMQDestination destination) throws Exception {
}
}
}