From 679db08db3dba27475b9e82c20d3dafeb155631f Mon Sep 17 00:00:00 2001 From: gtully Date: Tue, 4 Apr 2017 10:16:00 +0100 Subject: [PATCH] [AMQ-6643] ensure a wildcard virtual topic subscriber is restricted to the wildcard destination - avoid duplicate and spurious dispatch. fix and test --- .../region/virtual/MappedQueueFilter.java | 46 +++--- .../activemq/command/ActiveMQDestination.java | 6 + .../mqtt/PahoVirtualTopicMQTTTest.java | 11 +- .../virtual/VirtualTopicWildcardTest.java | 149 ++++++++++++++++++ 4 files changed, 180 insertions(+), 32 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicWildcardTest.java diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java index e8de9102e8..db02490dc3 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/MappedQueueFilter.java @@ -48,34 +48,34 @@ public class MappedQueueFilter extends DestinationFilter { // recover messages for first consumer only boolean noSubs = getConsumers().isEmpty(); - if (!sub.getActiveMQDestination().isPattern() || sub.getActiveMQDestination().equals(next.getActiveMQDestination())) { + // for virtual consumer wildcard dests, only subscribe to exact match to ensure no duplicates + if (sub.getActiveMQDestination().compareTo(next.getActiveMQDestination()) == 0) { super.addSubscription(context, sub); + } + if (noSubs && !getConsumers().isEmpty()) { + // new subscription added, recover retroactive messages + final RegionBroker regionBroker = (RegionBroker) context.getBroker().getAdaptor(RegionBroker.class); + final Set virtualDests = regionBroker.getDestinations(virtualDestination); - if (noSubs && !getConsumers().isEmpty()) { - // new subscription added, recover retroactive messages - final RegionBroker regionBroker = (RegionBroker) context.getBroker().getAdaptor(RegionBroker.class); - final Set virtualDests = regionBroker.getDestinations(virtualDestination); + final ActiveMQDestination newDestination = sub.getActiveMQDestination(); + final BaseDestination regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]); - final ActiveMQDestination newDestination = sub.getActiveMQDestination(); - final BaseDestination regionDest = getBaseDestination((Destination) regionBroker.getDestinations(newDestination).toArray()[0]); + for (Destination virtualDest : virtualDests) { + if (virtualDest.getActiveMQDestination().isTopic() && + (virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive())) { - for (Destination virtualDest : virtualDests) { - if (virtualDest.getActiveMQDestination().isTopic() && - (virtualDest.isAlwaysRetroactive() || sub.getConsumerInfo().isRetroactive())) { + Topic topic = (Topic) getBaseDestination(virtualDest); + if (topic != null) { + // re-use browse() to get recovered messages + final Message[] messages = topic.getSubscriptionRecoveryPolicy().browse(topic.getActiveMQDestination()); - Topic topic = (Topic) getBaseDestination(virtualDest); - if (topic != null) { - // re-use browse() to get recovered messages - final Message[] messages = topic.getSubscriptionRecoveryPolicy().browse(topic.getActiveMQDestination()); - - // add recovered messages to subscription - for (Message message : messages) { - final Message copy = message.copy(); - copy.setOriginalDestination(message.getDestination()); - copy.setDestination(newDestination); - copy.setRegionDestination(regionDest); - sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy); - } + // add recovered messages to subscription + for (Message message : messages) { + final Message copy = message.copy(); + copy.setOriginalDestination(message.getDestination()); + copy.setDestination(newDestination); + copy.setRegionDestination(regionDest); + sub.addRecoveredMessage(context, newDestination.isQueue() ? new IndirectMessageReference(copy) : copy); } } } diff --git a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java index 4819a1a9c0..149145d895 100644 --- a/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/ActiveMQDestination.java @@ -159,6 +159,12 @@ public abstract class ActiveMQDestination extends JNDIBaseStorable implements Da return 1; } else { if (destination.getDestinationType() == destination2.getDestinationType()) { + + if (destination.isPattern() && destination2.isPattern() ) { + if (destination.getPhysicalName().compareTo(destination2.getPhysicalName()) == 0) { + return 0; + } + } if (destination.isPattern()) { DestinationFilter filter = DestinationFilter.parseFilter(destination); if (filter.matches(destination2)) { diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoVirtualTopicMQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoVirtualTopicMQTTTest.java index 5f58202fc6..55103acae9 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoVirtualTopicMQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/PahoVirtualTopicMQTTTest.java @@ -18,7 +18,6 @@ package org.apache.activemq.transport.mqtt; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.command.ActiveMQQueue; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.junit.Before; @@ -72,14 +71,8 @@ public class PahoVirtualTopicMQTTTest extends PahoMQTTTest { RegionBroker regionBroker = (RegionBroker) brokerService.getBroker().getAdaptor(RegionBroker.class); - String[] queues = new String[]{"Consumer.client-10:AT_LEAST_ONCE.VirtualTopic.user10.>", - "Consumer.client-10:AT_LEAST_ONCE.VirtualTopic.user10.client-10.>", - "Consumer.client-1:AT_LEAST_ONCE.VirtualTopic.user1.>", - "Consumer.client-1:AT_LEAST_ONCE.VirtualTopic.user1.client-1.>"}; - - for (String queueName : queues) { - Destination queue = regionBroker.getQueueRegion().getDestinations(new ActiveMQQueue(queueName)).iterator().next(); - assertEquals("Queue " + queueName + " have more than one consumer", 1, queue.getConsumers().size()); + for (Destination queue : regionBroker.getQueueRegion().getDestinationMap().values()) { + assertEquals("Queue " + queue.getActiveMQDestination() + " have more than one consumer", 1, queue.getConsumers().size()); } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicWildcardTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicWildcardTest.java new file mode 100644 index 0000000000..c256e40cf9 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicWildcardTest.java @@ -0,0 +1,149 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; +import org.apache.activemq.broker.region.virtual.VirtualTopic; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.spring.ConsumerBean; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertNotNull; +import static org.junit.Assert.assertTrue; + +// https://issues.apache.org/jira/browse/AMQ-6643 +public class VirtualTopicWildcardTest { + + private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicWildcardTest.class); + + protected int total = 3; + protected Connection connection; + BrokerService brokerService; + + @Before + public void init() throws Exception { + brokerService = createBroker(); + brokerService.start(); + connection = createConnection(); + connection.start(); + } + + @After + public void afer() throws Exception { + connection.close(); + brokerService.stop(); + } + + @Test + public void testWildcardAndSimpleConsumerShareMessages() throws Exception { + + ConsumerBean messageList1 = new ConsumerBean("1:"); + ConsumerBean messageList2 = new ConsumerBean("2:"); + ConsumerBean messageList3 = new ConsumerBean("3:"); + + messageList1.setVerbose(true); + messageList2.setVerbose(true); + messageList3.setVerbose(true); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination producerDestination = new ActiveMQTopic("VirtualTopic.TEST.A.IT"); + Destination destination1 = new ActiveMQQueue("Consumer.1.VirtualTopic.TEST.>"); + Destination destination2 = new ActiveMQQueue("Consumer.1.VirtualTopic.TEST.A.IT"); + Destination destination3 = new ActiveMQQueue("Consumer.1.VirtualTopic.TEST.B.IT"); + + LOG.info("Sending to: " + producerDestination); + LOG.info("Consuming from: " + destination1 + " and " + destination2 + ", and " + destination3); + + MessageConsumer c1 = session.createConsumer(destination1, null); + MessageConsumer c2 = session.createConsumer(destination2, null); + // this consumer should get no messages + MessageConsumer c3 = session.createConsumer(destination3, null); + + c1.setMessageListener(messageList1); + c2.setMessageListener(messageList2); + c3.setMessageListener(messageList3); + + // create topic producer + MessageProducer producer = session.createProducer(producerDestination); + assertNotNull(producer); + + for (int i = 0; i < total; i++) { + producer.send(createMessage(session, i)); + } + + assertMessagesArrived(messageList1, messageList2); + assertEquals(0, messageList3.getMessages().size()); + + } + + private Message createMessage(Session session, int i) throws JMSException { + return session.createTextMessage("val=" + i); + } + + private Connection createConnection() throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerService.getVmConnectorURI()); + cf.setWatchTopicAdvisories(false); + return cf.createConnection(); + } + + protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) { + try { + assertTrue("expected", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("One: " + messageList1.getMessages().size() + ", Two:" + messageList2.getMessages().size()); + return messageList1.getMessages().size() + messageList2.getMessages().size() == 2 * total; + } + })); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + protected BrokerService createBroker() throws Exception { + + BrokerService broker = new BrokerService(); + broker.setAdvisorySupport(false); + broker.setPersistent(false); + + VirtualTopic virtualTopic = new VirtualTopic(); + VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); + interceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic}); + broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor}); + return broker; + } +}