From c276e2e652d2964cdc69ebf9b72663f0d985031d Mon Sep 17 00:00:00 2001 From: Dejan Bosanac Date: Mon, 21 Dec 2015 15:19:01 +0100 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-6100 - Virtual topic message destination should be the target queue (cherry picked from commit 4e63ee7cc7c4ed7d1fb8ae916c0984b974c175c0) --- .../virtual/VirtualTopicInterceptor.java | 11 +- .../MessageDestinationVirtualTopicTest.java | 120 ++++++++++++++++++ .../broker/virtual/SimpleMessageListener.java | 79 ++++++++++++ .../broker/virtual/VirtualTopicDLQTest.java | 2 +- .../virtual/virtual-topic-network-test.xml | 100 +++++++++++++++ 5 files changed, 309 insertions(+), 3 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/SimpleMessageListener.java create mode 100644 activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java index f673770930..65d3efc71a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java @@ -91,7 +91,7 @@ public class VirtualTopicInterceptor extends DestinationFilter { public void run() { try { if (exceptionAtomicReference.get() == null) { - dest.send(context, message.copy()); + dest.send(context, copy(message, dest.getActiveMQDestination())); } } catch (Exception e) { exceptionAtomicReference.set(e); @@ -112,7 +112,7 @@ public class VirtualTopicInterceptor extends DestinationFilter { } else { for (final Destination dest : destinations) { if (shouldDispatch(broker, message, dest)) { - dest.send(context, message.copy()); + dest.send(context, copy(message, dest.getActiveMQDestination())); } } } @@ -121,6 +121,13 @@ public class VirtualTopicInterceptor extends DestinationFilter { } } + private Message copy(Message original, ActiveMQDestination target) { + Message msg = original.copy(); + msg.setDestination(target); + msg.setOriginalDestination(original.getDestination()); + return msg; + } + private LocalTransactionId beginLocalTransaction(int numDestinations, ConnectionContext connectionContext, Message message) throws Exception { LocalTransactionId result = null; if (transactedSend && numDestinations > 1 && message.isPersistent() && message.getTransactionId() == null) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java new file mode 100644 index 0000000000..f370efc449 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import javax.annotation.Resource; +import javax.jms.*; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertEquals; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration({ "virtual-topic-network-test.xml" }) +public class MessageDestinationVirtualTopicTest { + + private static final Logger LOG = LoggerFactory.getLogger(MessageDestinationVirtualTopicTest.class); + + private SimpleMessageListener listener1; + + private SimpleMessageListener listener2; + + @Resource(name = "broker1") + private BrokerService broker1; + + @Resource(name = "broker2") + private BrokerService broker2; + + private MessageProducer producer; + + private Session session1; + + public void init() throws JMSException { + // Create connection on Broker B2 + ConnectionFactory broker2ConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:62616"); + Connection connection2 = broker2ConnectionFactory.createConnection(); + connection2.start(); + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerDQueue = session2.createQueue("Consumer.D.VirtualTopic.T1"); + + // Bind listener on queue for consumer D + MessageConsumer consumer = session2.createConsumer(consumerDQueue); + listener2 = new SimpleMessageListener(); + consumer.setMessageListener(listener2); + + // Create connection on Broker B1 + ConnectionFactory broker1ConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); + Connection connection1 = broker1ConnectionFactory.createConnection(); + connection1.start(); + session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue consumerCQueue = session1.createQueue("Consumer.C.VirtualTopic.T1"); + + // Bind listener on queue for consumer D + MessageConsumer consumer1 = session1.createConsumer(consumerCQueue); + listener1 = new SimpleMessageListener(); + consumer1.setMessageListener(listener1); + + // Create producer for topic, on B1 + Topic virtualTopicT1 = session1.createTopic("VirtualTopic.T1"); + producer = session1.createProducer(virtualTopicT1); + } + + @Test + public void testDestinationNames() throws Exception { + + LOG.info("Started waiting for broker 1 and 2"); + broker1.waitUntilStarted(); + broker2.waitUntilStarted(); + LOG.info("Broker 1 and 2 have started"); + + init(); + + // Create a monitor + CountDownLatch monitor = new CountDownLatch(2); + listener1.setCountDown(monitor); + listener2.setCountDown(monitor); + + LOG.info("Sending message"); + // Send a message on the topic + TextMessage message = session1.createTextMessage("Hello World !"); + producer.send(message); + LOG.info("Waiting for message reception"); + // Wait the two messages in the related queues + monitor.await(); + + // Get the message destinations + String lastJMSDestination2 = listener2.getLastJMSDestination(); + System.err.println(lastJMSDestination2); + String lastJMSDestination1 = listener1.getLastJMSDestination(); + System.err.println(lastJMSDestination1); + + // The destination names + assertEquals("queue://Consumer.D.VirtualTopic.T1", lastJMSDestination2); + assertEquals("queue://Consumer.C.VirtualTopic.T1", lastJMSDestination1); + + } +} \ No newline at end of file diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/SimpleMessageListener.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/SimpleMessageListener.java new file mode 100644 index 0000000000..166bfb5326 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/SimpleMessageListener.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import java.util.Enumeration; +import java.util.concurrent.CountDownLatch; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.TextMessage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SimpleMessageListener implements MessageListener { + + private static final Logger LOG = LoggerFactory.getLogger(SimpleMessageListener.class); + + private CountDownLatch messageReceivedToken; + + private String lastJMSDestination; + + @Override + public void onMessage(Message message) { + try { + Thread.sleep(2000L); + if (message instanceof TextMessage) { + LOG.info("Dest:" + message.getJMSDestination()); + lastJMSDestination = message.getJMSDestination().toString(); + + Enumeration propertyNames = message.getPropertyNames(); + while (propertyNames.hasMoreElements()) { + Object object = propertyNames.nextElement(); + } + + } + messageReceivedToken.countDown(); + + } + catch (JMSException e) { + LOG.error("Error while listening to a message", message); + } + catch (InterruptedException e) { + LOG.error("Interrupted while listening to a message", message); + } + } + + /** + * @param countDown + * the countDown to set + */ + public void setCountDown(CountDownLatch countDown) { + this.messageReceivedToken = countDown; + } + + /** + * @return the lastJMSDestination + */ + public String getLastJMSDestination() { + return lastJMSDestination; + } + +} + diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java index 7c853cfd92..11e2d7f785 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java @@ -69,7 +69,7 @@ public class VirtualTopicDLQTest extends TestCase { // Expected Individual Dead Letter Queue names that are tied to the // Subscriber Queues - private static final String dlqPrefix = "ActiveMQ.DLQ.Topic."; + private static final String dlqPrefix = "ActiveMQ.DLQ.Queue."; // Number of messages private static final int numberMessages = 6; diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml new file mode 100644 index 0000000000..0c2b1ece31 --- /dev/null +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml @@ -0,0 +1,100 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +