AMQ-5920 - make using a vt transaction configurable, a transaction negates concurrentstoreanddispatch and imposes local 2pc on mKahadb so needs to be off by default

This commit is contained in:
gtully 2015-08-17 15:35:36 +01:00
parent 049f8da236
commit ffdaeb2bd1
3 changed files with 18 additions and 1 deletions

View File

@ -43,6 +43,7 @@ public class VirtualTopic implements VirtualDestination {
private boolean selectorAware = false;
private boolean local = false;
private boolean concurrentSend = false;
private boolean transactedSend = false;
@Override
public ActiveMQDestination getVirtualDestination() {
@ -181,4 +182,16 @@ public class VirtualTopic implements VirtualDestination {
public void setConcurrentSend(boolean concurrentSend) {
this.concurrentSend = concurrentSend;
}
public boolean isTransactedSend() {
return transactedSend;
}
/**
* When true, dispatch to matching destinations always uses a transaction.
* @param transactedSend
*/
public void setTransactedSend(boolean transactedSend) {
this.transactedSend = transactedSend;
}
}

View File

@ -43,6 +43,8 @@ public class VirtualTopicInterceptor extends DestinationFilter {
private final String postfix;
private final boolean local;
private final boolean concurrentSend;
private final boolean transactedSend;
private final LRUCache<ActiveMQDestination, ActiveMQQueue> cache = new LRUCache<ActiveMQDestination, ActiveMQQueue>();
public VirtualTopicInterceptor(Destination next, VirtualTopic virtualTopic) {
@ -51,6 +53,7 @@ public class VirtualTopicInterceptor extends DestinationFilter {
this.postfix = virtualTopic.getPostfix();
this.local = virtualTopic.isLocal();
this.concurrentSend = virtualTopic.isConcurrentSend();
this.transactedSend = virtualTopic.isTransactedSend();
}
public Topic getTopic() {
@ -120,7 +123,7 @@ public class VirtualTopicInterceptor extends DestinationFilter {
private LocalTransactionId beginLocalTransaction(int numDestinations, ConnectionContext connectionContext, Message message) throws Exception {
LocalTransactionId result = null;
if (numDestinations > 1 && message.isPersistent() && message.getTransactionId() == null) {
if (transactedSend && numDestinations > 1 && message.isPersistent() && message.getTransactionId() == null) {
result = new LocalTransactionId(new ConnectionId(message.getMessageId().getProducerId().toString()), message.getMessageId().getProducerSequenceId());
connectionContext.getBroker().beginTransaction(connectionContext, result);
connectionContext.setTransaction(connectionContext.getTransactions().get(result));

View File

@ -55,6 +55,7 @@ public class VirtualTopicFanoutPerfTest {
for (VirtualDestination virtualDestination : ((VirtualDestinationInterceptor) destinationInterceptor).getVirtualDestinations()) {
if (virtualDestination instanceof VirtualTopic) {
((VirtualTopic) virtualDestination).setConcurrentSend(true);
((VirtualTopic) virtualDestination).setTransactedSend(true);
}
}
}