mirror of https://github.com/apache/activemq.git
AMQ-6100 - use setOriginalDestination=false to make this behaviour optional b/c amqp cannot see the original dest property due to immutability of the message properties and folks can depend on the existing original destination behaviour
This commit is contained in:
parent
c1e7dbd53b
commit
573b366ca0
|
@ -45,6 +45,7 @@ public class VirtualTopic implements VirtualDestination {
|
|||
private boolean concurrentSend = false;
|
||||
private boolean transactedSend = false;
|
||||
private boolean dropOnResourceLimit = false;
|
||||
private boolean setOriginalDestination = true;
|
||||
|
||||
@Override
|
||||
public ActiveMQDestination getVirtualDestination() {
|
||||
|
@ -252,4 +253,12 @@ public class VirtualTopic implements VirtualDestination {
|
|||
public void setDropOnResourceLimit(boolean dropOnResourceLimit) {
|
||||
this.dropOnResourceLimit = dropOnResourceLimit;
|
||||
}
|
||||
|
||||
public boolean isSetOriginalDestination() {
|
||||
return setOriginalDestination;
|
||||
}
|
||||
|
||||
public void setSetOriginalDestination(boolean setOriginalDestination) {
|
||||
this.setOriginalDestination = setOriginalDestination;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -47,6 +47,7 @@ public class VirtualTopicInterceptor extends DestinationFilter {
|
|||
private final boolean concurrentSend;
|
||||
private final boolean transactedSend;
|
||||
private final boolean dropMessageOnResourceLimit;
|
||||
private final boolean setOriginalDestination;
|
||||
|
||||
private final LRUCache<ActiveMQDestination, ActiveMQQueue> cache = new LRUCache<ActiveMQDestination, ActiveMQQueue>();
|
||||
|
||||
|
@ -58,6 +59,7 @@ public class VirtualTopicInterceptor extends DestinationFilter {
|
|||
this.concurrentSend = virtualTopic.isConcurrentSend();
|
||||
this.transactedSend = virtualTopic.isTransactedSend();
|
||||
this.dropMessageOnResourceLimit = virtualTopic.isDropOnResourceLimit();
|
||||
this.setOriginalDestination = virtualTopic.isSetOriginalDestination();
|
||||
}
|
||||
|
||||
public Topic getTopic() {
|
||||
|
@ -137,8 +139,10 @@ public class VirtualTopicInterceptor extends DestinationFilter {
|
|||
|
||||
private Message copy(Message original, ActiveMQDestination target) {
|
||||
Message msg = original.copy();
|
||||
msg.setDestination(target);
|
||||
msg.setOriginalDestination(original.getDestination());
|
||||
if (setOriginalDestination) {
|
||||
msg.setDestination(target);
|
||||
msg.setOriginalDestination(original.getDestination());
|
||||
}
|
||||
return msg;
|
||||
}
|
||||
|
||||
|
|
|
@ -43,6 +43,8 @@ public class MessageDestinationVirtualTopicTest {
|
|||
|
||||
private SimpleMessageListener listener2;
|
||||
|
||||
private SimpleMessageListener listener3;
|
||||
|
||||
@Resource(name = "broker1")
|
||||
private BrokerService broker1;
|
||||
|
||||
|
@ -78,8 +80,16 @@ public class MessageDestinationVirtualTopicTest {
|
|||
listener1 = new SimpleMessageListener();
|
||||
consumer1.setMessageListener(listener1);
|
||||
|
||||
// Create listener on Broker B1 for VT T2 witout setOriginalDest
|
||||
Queue consumer3Queue = session1.createQueue("Consumer.A.VirtualTopic.T2");
|
||||
|
||||
// Bind listener on queue for consumer D
|
||||
MessageConsumer consumerD = session1.createConsumer(consumer3Queue);
|
||||
listener3 = new SimpleMessageListener();
|
||||
consumerD.setMessageListener(listener3);
|
||||
|
||||
// Create producer for topic, on B1
|
||||
Topic virtualTopicT1 = session1.createTopic("VirtualTopic.T1");
|
||||
Topic virtualTopicT1 = session1.createTopic("VirtualTopic.T1,VirtualTopic.T2");
|
||||
producer = session1.createProducer(virtualTopicT1);
|
||||
}
|
||||
|
||||
|
@ -94,9 +104,10 @@ public class MessageDestinationVirtualTopicTest {
|
|||
init();
|
||||
|
||||
// Create a monitor
|
||||
CountDownLatch monitor = new CountDownLatch(2);
|
||||
CountDownLatch monitor = new CountDownLatch(3);
|
||||
listener1.setCountDown(monitor);
|
||||
listener2.setCountDown(monitor);
|
||||
listener3.setCountDown(monitor);
|
||||
|
||||
LOG.info("Sending message");
|
||||
// Send a message on the topic
|
||||
|
@ -112,9 +123,13 @@ public class MessageDestinationVirtualTopicTest {
|
|||
String lastJMSDestination1 = listener1.getLastJMSDestination();
|
||||
System.err.println(lastJMSDestination1);
|
||||
|
||||
String lastJMSDestination3 = listener3.getLastJMSDestination();
|
||||
System.err.println(lastJMSDestination3);
|
||||
|
||||
// The destination names
|
||||
assertEquals("queue://Consumer.D.VirtualTopic.T1", lastJMSDestination2);
|
||||
assertEquals("queue://Consumer.C.VirtualTopic.T1", lastJMSDestination1);
|
||||
assertEquals("topic://VirtualTopic.T2", lastJMSDestination3);
|
||||
|
||||
}
|
||||
}
|
|
@ -44,7 +44,9 @@
|
|||
<amq:virtualDestinations>
|
||||
<!-- Virtual topic policies -->
|
||||
<!-- they should be local to avoid message duplicate -->
|
||||
<amq:virtualTopic name="VirtualTopic.>" prefix="Consumer.*."/>
|
||||
<amq:virtualTopic name="VirtualTopic.T1" prefix="Consumer.*."/>
|
||||
<amq:virtualTopic name="VirtualTopic.T2" prefix="Consumer.*." setOriginalDestination="false"/>
|
||||
|
||||
</amq:virtualDestinations>
|
||||
</amq:virtualDestinationInterceptor>
|
||||
</amq:destinationInterceptors>
|
||||
|
@ -66,6 +68,7 @@
|
|||
<amq:destinations>
|
||||
<!-- topics -->
|
||||
<amq:topic physicalName="VirtualTopic.T1" />
|
||||
<amq:topic physicalName="VirtualTopic.T2" />
|
||||
</amq:destinations>
|
||||
|
||||
</amq:broker>
|
||||
|
@ -109,6 +112,7 @@
|
|||
<amq:destinations>
|
||||
<!-- topics -->
|
||||
<amq:topic physicalName="VirtualTopic.T1" />
|
||||
<amq:topic physicalName="VirtualTopic.T2" />
|
||||
</amq:destinations>
|
||||
|
||||
</amq:broker>
|
||||
|
|
Loading…
Reference in New Issue