From 529e5c2231d781d237f93ad2a1ddd4bd1f4e21b2 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Thu, 14 Aug 2008 17:22:15 +0000 Subject: [PATCH] Fix for https://issues.apache.org/activemq/browse/AMQ-1886 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@685966 13f79535-47bb-0310-9956-ffa450edef68 --- .../store/jdbc/JDBCPersistenceAdapter.java | 2 +- .../memory/MemoryPersistenceAdapter.java | 2 +- .../store/memory/MemoryTransactionStore.java | 54 ++++++++++++------- 3 files changed, 37 insertions(+), 21 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java index 80c50d3ffc..97b3955098 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java @@ -134,7 +134,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist public TransactionStore createTransactionStore() throws IOException { if (transactionStore == null) { - transactionStore = new MemoryTransactionStore(); + transactionStore = new MemoryTransactionStore(this); } return this.transactionStore; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java index 9d876476f2..4afaf4bc24 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java @@ -89,7 +89,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { public TransactionStore createTransactionStore() throws IOException { if (transactionStore == null) { - transactionStore = new MemoryTransactionStore(); + transactionStore = new MemoryTransactionStore(this); } return transactionStore; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java index 987bcf4557..bd77cef3b3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryTransactionStore.java @@ -29,6 +29,7 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.XATransactionId; import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.ProxyMessageStore; import org.apache.activemq.store.ProxyTopicMessageStore; import org.apache.activemq.store.TopicMessageStore; @@ -44,12 +45,12 @@ import org.apache.activemq.store.TransactionStore; public class MemoryTransactionStore implements TransactionStore { ConcurrentHashMap inflightTransactions = new ConcurrentHashMap(); - ConcurrentHashMap preparedTransactions = new ConcurrentHashMap(); + final PersistenceAdapter persistenceAdapter; private boolean doingRecover; - public static class Tx { + public class Tx { private ArrayList messages = new ArrayList(); private ArrayList acks = new ArrayList(); @@ -86,29 +87,43 @@ public class MemoryTransactionStore implements TransactionStore { * @throws IOException */ public void commit() throws IOException { - // Do all the message adds. - for (Iterator iter = messages.iterator(); iter.hasNext();) { - AddMessageCommand cmd = iter.next(); - cmd.run(); - } - // And removes.. - for (Iterator iter = acks.iterator(); iter.hasNext();) { - RemoveMessageCommand cmd = iter.next(); - cmd.run(); + ConnectionContext ctx = new ConnectionContext(); + persistenceAdapter.beginTransaction(ctx); + try { + + // Do all the message adds. + for (Iterator iter = messages.iterator(); iter.hasNext();) { + AddMessageCommand cmd = iter.next(); + cmd.run(ctx); + } + // And removes.. + for (Iterator iter = acks.iterator(); iter.hasNext();) { + RemoveMessageCommand cmd = iter.next(); + cmd.run(ctx); + } + + } catch ( IOException e ) { + persistenceAdapter.rollbackTransaction(ctx); + throw e; } + persistenceAdapter.commitTransaction(ctx); } } - + public interface AddMessageCommand { Message getMessage(); - void run() throws IOException; + void run(ConnectionContext context) throws IOException; } public interface RemoveMessageCommand { MessageAck getMessageAck(); - void run() throws IOException; + void run(ConnectionContext context) throws IOException; + } + + public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) { + this.persistenceAdapter=persistenceAdapter; } public MessageStore proxy(MessageStore messageStore) { @@ -221,15 +236,16 @@ public class MemoryTransactionStore implements TransactionStore { return message; } - public void run() throws IOException { - destination.addMessage(null, message); + public void run(ConnectionContext ctx) throws IOException { + destination.addMessage(ctx, message); } + }); } else { destination.addMessage(null, message); } } - + /** * @param ack * @throws IOException @@ -246,8 +262,8 @@ public class MemoryTransactionStore implements TransactionStore { return ack; } - public void run() throws IOException { - destination.removeMessage(null, ack); + public void run(ConnectionContext ctx) throws IOException { + destination.removeMessage(ctx, ack); } }); } else {