AMQ-5920 - make using a vt transaction configurable, a transaction negates concurrentstoreanddispatch and imposes local 2pc on mKahadb so needs to be off by default

(cherry picked from commit ffdaeb2bd1)
This commit is contained in:
gtully 2015-08-17 15:35:36 +01:00 committed by Timothy Bish
parent 6668f7b14f
commit 4c343ebd12
3 changed files with 18 additions and 1 deletions

View File

@ -43,6 +43,7 @@ public class VirtualTopic implements VirtualDestination {
private boolean selectorAware = false; private boolean selectorAware = false;
private boolean local = false; private boolean local = false;
private boolean concurrentSend = false; private boolean concurrentSend = false;
private boolean transactedSend = false;
@Override @Override
public ActiveMQDestination getVirtualDestination() { public ActiveMQDestination getVirtualDestination() {
@ -181,4 +182,16 @@ public class VirtualTopic implements VirtualDestination {
public void setConcurrentSend(boolean concurrentSend) { public void setConcurrentSend(boolean concurrentSend) {
this.concurrentSend = concurrentSend; this.concurrentSend = concurrentSend;
} }
public boolean isTransactedSend() {
return transactedSend;
}
/**
* When true, dispatch to matching destinations always uses a transaction.
* @param transactedSend
*/
public void setTransactedSend(boolean transactedSend) {
this.transactedSend = transactedSend;
}
} }

View File

@ -43,6 +43,8 @@ public class VirtualTopicInterceptor extends DestinationFilter {
private final String postfix; private final String postfix;
private final boolean local; private final boolean local;
private final boolean concurrentSend; private final boolean concurrentSend;
private final boolean transactedSend;
private final LRUCache<ActiveMQDestination, ActiveMQQueue> cache = new LRUCache<ActiveMQDestination, ActiveMQQueue>(); private final LRUCache<ActiveMQDestination, ActiveMQQueue> cache = new LRUCache<ActiveMQDestination, ActiveMQQueue>();
public VirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) { public VirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) {
@ -51,6 +53,7 @@ public class VirtualTopicInterceptor extends DestinationFilter {
this.postfix = virtualTopic.getPostfix(); this.postfix = virtualTopic.getPostfix();
this.local = virtualTopic.isLocal(); this.local = virtualTopic.isLocal();
this.concurrentSend = virtualTopic.isConcurrentSend(); this.concurrentSend = virtualTopic.isConcurrentSend();
this.transactedSend = virtualTopic.isTransactedSend();
} }
public Topic getTopic() { public Topic getTopic() {
@ -120,7 +123,7 @@ public class VirtualTopicInterceptor extends DestinationFilter {
private LocalTransactionId beginLocalTransaction(int numDestinations, ConnectionContext connectionContext, Message message) throws Exception { private LocalTransactionId beginLocalTransaction(int numDestinations, ConnectionContext connectionContext, Message message) throws Exception {
LocalTransactionId result = null; LocalTransactionId result = null;
if (numDestinations > 1 && message.isPersistent() && message.getTransactionId() == null) { if (transactedSend && numDestinations > 1 && message.isPersistent() && message.getTransactionId() == null) {
result = new LocalTransactionId(new ConnectionId(message.getMessageId().getProducerId().toString()), message.getMessageId().getProducerSequenceId()); result = new LocalTransactionId(new ConnectionId(message.getMessageId().getProducerId().toString()), message.getMessageId().getProducerSequenceId());
connectionContext.getBroker().beginTransaction(connectionContext, result); connectionContext.getBroker().beginTransaction(connectionContext, result);
connectionContext.setTransaction(connectionContext.getTransactions().get(result)); connectionContext.setTransaction(connectionContext.getTransactions().get(result));

View File

@ -55,6 +55,7 @@ public class VirtualTopicFanoutPerfTest {
for (VirtualDestination virtualDestination : ((VirtualDestinationInterceptor) destinationInterceptor).getVirtualDestinations()) { for (VirtualDestination virtualDestination : ((VirtualDestinationInterceptor) destinationInterceptor).getVirtualDestinations()) {
if (virtualDestination instanceof VirtualTopic) { if (virtualDestination instanceof VirtualTopic) {
((VirtualTopic) virtualDestination).setConcurrentSend(true); ((VirtualTopic) virtualDestination).setConcurrentSend(true);
((VirtualTopic) virtualDestination).setTransactedSend(true);
} }
} }
} }