diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java index 95fa3330ec..14ea3fe34f 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopic.java @@ -43,6 +43,7 @@ public class VirtualTopic implements VirtualDestination { private boolean selectorAware = false; private boolean local = false; private boolean concurrentSend = false; + private boolean transactedSend = false; @Override public ActiveMQDestination getVirtualDestination() { @@ -181,4 +182,16 @@ public class VirtualTopic implements VirtualDestination { public void setConcurrentSend(boolean concurrentSend) { this.concurrentSend = concurrentSend; } + + public boolean isTransactedSend() { + return transactedSend; + } + + /** + * When true, dispatch to matching destinations always uses a transaction. + * @param transactedSend + */ + public void setTransactedSend(boolean transactedSend) { + this.transactedSend = transactedSend; + } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java index 7967562ab9..36c08e084a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/virtual/VirtualTopicInterceptor.java @@ -43,6 +43,8 @@ public class VirtualTopicInterceptor extends DestinationFilter { private final String postfix; private final boolean local; private final boolean concurrentSend; + private final boolean transactedSend; + private final LRUCache cache = new LRUCache(); public VirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) { @@ -51,6 +53,7 @@ public class VirtualTopicInterceptor extends DestinationFilter { this.postfix = virtualTopic.getPostfix(); this.local = virtualTopic.isLocal(); this.concurrentSend = virtualTopic.isConcurrentSend(); + this.transactedSend = virtualTopic.isTransactedSend(); } public Topic getTopic() { @@ -120,7 +123,7 @@ public class VirtualTopicInterceptor extends DestinationFilter { private LocalTransactionId beginLocalTransaction(int numDestinations, ConnectionContext connectionContext, Message message) throws Exception { LocalTransactionId result = null; - if (numDestinations > 1 && message.isPersistent() && message.getTransactionId() == null) { + if (transactedSend && numDestinations > 1 && message.isPersistent() && message.getTransactionId() == null) { result = new LocalTransactionId(new ConnectionId(message.getMessageId().getProducerId().toString()), message.getMessageId().getProducerSequenceId()); connectionContext.getBroker().beginTransaction(connectionContext, result); connectionContext.setTransaction(connectionContext.getTransactions().get(result)); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java index 90cdeea90c..4ba82eb6a4 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicFanoutPerfTest.java @@ -55,6 +55,7 @@ public class VirtualTopicFanoutPerfTest { for (VirtualDestination virtualDestination : ((VirtualDestinationInterceptor) destinationInterceptor).getVirtualDestinations()) { if (virtualDestination instanceof VirtualTopic) { ((VirtualTopic) virtualDestination).setConcurrentSend(true); + ((VirtualTopic) virtualDestination).setTransactedSend(true); } } }