From 50daa35a32851159bbd47ca08cc2517f3d166782 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Wed, 19 May 2010 11:48:38 +0000 Subject: [PATCH] resolve failure of org.apache.activemq.store.jdbc.JDBCStoreOrderTest - async ack when in a transaction and transaction store message store proxy missing async impl, belt and braces, fix async ack and add sync imple to proxy - following async changes from http://svn.apache.org/viewvc?rev=945102&view=rev git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@946135 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/region/Queue.java | 6 +++++- .../activemq/store/AbstractMessageStore.java | 2 +- .../store/memory/MemoryTransactionStore.java | 20 +++++++++++++++++++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index f6f152d1b3..a82aab296b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -666,7 +666,11 @@ public class Queue extends BaseDestination implements Task, UsageListener { ack.setLastMessageId(node.getMessageId()); ack.setMessageCount(1); } - store.removeAsyncMessage(context, ack); + if (context.isInTransaction()) { + store.removeMessage(context, ack); + } else { + store.removeAsyncMessage(context, ack); + } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java index cceae96b5c..cbdeb8a129 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/AbstractMessageStore.java @@ -28,7 +28,7 @@ import org.apache.activemq.command.MessageId; import org.apache.activemq.usage.MemoryUsage; abstract public class AbstractMessageStore implements MessageStore { - static final FutureTask FUTURE; + public static final FutureTask FUTURE; protected final ActiveMQDestination destination; public AbstractMessageStore(ActiveMQDestination destination) { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java index bd77cef3b3..f206e08436 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; import javax.transaction.xa.XAException; @@ -28,6 +29,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; +import org.apache.activemq.store.AbstractMessageStore; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.ProxyMessageStore; @@ -132,9 +134,18 @@ public class MemoryTransactionStore implements TransactionStore { MemoryTransactionStore.this.addMessage(getDelegate(), send); } + public Future asyncAddQueueMessage(ConnectionContext context, Message message) throws IOException { + MemoryTransactionStore.this.addMessage(getDelegate(), message); + return AbstractMessageStore.FUTURE; + } + public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { MemoryTransactionStore.this.removeMessage(getDelegate(), ack); } + + public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { + MemoryTransactionStore.this.removeMessage(getDelegate(), ack); + } }; } @@ -144,9 +155,18 @@ public class MemoryTransactionStore implements TransactionStore { MemoryTransactionStore.this.addMessage(getDelegate(), send); } + public Future asyncAddTopicMessage(ConnectionContext context, Message message) throws IOException { + MemoryTransactionStore.this.addMessage(getDelegate(), message); + return AbstractMessageStore.FUTURE; + } + public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException { MemoryTransactionStore.this.removeMessage(getDelegate(), ack); } + + public void removeAsyncMessage(ConnectionContext context, MessageAck ack) throws IOException { + MemoryTransactionStore.this.removeMessage(getDelegate(), ack); + } }; }