diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java index b51f2b0737..2c22c65cd5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java @@ -45,6 +45,7 @@ public class VirtualTopic implements VirtualDestination { private boolean concurrentSend = false; private boolean transactedSend = false; private boolean dropOnResourceLimit = false; + private boolean setOriginalDestination = true; @Override public ActiveMQDestination getVirtualDestination() { @@ -252,4 +253,12 @@ public class VirtualTopic implements VirtualDestination { public void setDropOnResourceLimit(boolean dropOnResourceLimit) { this.dropOnResourceLimit = dropOnResourceLimit; } + + public boolean isSetOriginalDestination() { + return setOriginalDestination; + } + + public void setSetOriginalDestination(boolean setOriginalDestination) { + this.setOriginalDestination = setOriginalDestination; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java index 36cae3fe5d..c8f058a2a5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java @@ -47,6 +47,7 @@ public class VirtualTopicInterceptor extends DestinationFilter { private final boolean concurrentSend; private final boolean transactedSend; private final boolean dropMessageOnResourceLimit; + private final boolean setOriginalDestination; private final LRUCache cache = new LRUCache(); @@ -58,6 +59,7 @@ public class VirtualTopicInterceptor extends DestinationFilter { this.concurrentSend = virtualTopic.isConcurrentSend(); this.transactedSend = virtualTopic.isTransactedSend(); this.dropMessageOnResourceLimit = virtualTopic.isDropOnResourceLimit(); + this.setOriginalDestination = virtualTopic.isSetOriginalDestination(); } public Topic getTopic() { @@ -137,8 +139,10 @@ public class VirtualTopicInterceptor extends DestinationFilter { private Message copy(Message original, ActiveMQDestination target) { Message msg = original.copy(); - msg.setDestination(target); - msg.setOriginalDestination(original.getDestination()); + if (setOriginalDestination) { + msg.setDestination(target); + msg.setOriginalDestination(original.getDestination()); + } return msg; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java index f370efc449..2c4556656b 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MessageDestinationVirtualTopicTest.java @@ -43,6 +43,8 @@ public class MessageDestinationVirtualTopicTest { private SimpleMessageListener listener2; + private SimpleMessageListener listener3; + @Resource(name = "broker1") private BrokerService broker1; @@ -78,8 +80,16 @@ public class MessageDestinationVirtualTopicTest { listener1 = new SimpleMessageListener(); consumer1.setMessageListener(listener1); + // Create listener on Broker B1 for VT T2 witout setOriginalDest + Queue consumer3Queue = session1.createQueue("Consumer.A.VirtualTopic.T2"); + + // Bind listener on queue for consumer D + MessageConsumer consumerD = session1.createConsumer(consumer3Queue); + listener3 = new SimpleMessageListener(); + consumerD.setMessageListener(listener3); + // Create producer for topic, on B1 - Topic virtualTopicT1 = session1.createTopic("VirtualTopic.T1"); + Topic virtualTopicT1 = session1.createTopic("VirtualTopic.T1,VirtualTopic.T2"); producer = session1.createProducer(virtualTopicT1); } @@ -94,9 +104,10 @@ public class MessageDestinationVirtualTopicTest { init(); // Create a monitor - CountDownLatch monitor = new CountDownLatch(2); + CountDownLatch monitor = new CountDownLatch(3); listener1.setCountDown(monitor); listener2.setCountDown(monitor); + listener3.setCountDown(monitor); LOG.info("Sending message"); // Send a message on the topic @@ -112,9 +123,13 @@ public class MessageDestinationVirtualTopicTest { String lastJMSDestination1 = listener1.getLastJMSDestination(); System.err.println(lastJMSDestination1); + String lastJMSDestination3 = listener3.getLastJMSDestination(); + System.err.println(lastJMSDestination3); + // The destination names assertEquals("queue://Consumer.D.VirtualTopic.T1", lastJMSDestination2); assertEquals("queue://Consumer.C.VirtualTopic.T1", lastJMSDestination1); + assertEquals("topic://VirtualTopic.T2", lastJMSDestination3); } } \ No newline at end of file diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml index 6b6199c2ca..ab225d8931 100644 --- a/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/broker/virtual/virtual-topic-network-test.xml @@ -44,7 +44,9 @@ - + + + @@ -66,6 +68,7 @@ + @@ -109,6 +112,7 @@ +