git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@685966 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2008-08-14 17:22:15 +00:00
parent 84dfaabc85
commit 529e5c2231
3 changed files with 37 additions and 21 deletions

View File

@ -134,7 +134,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
public TransactionStore createTransactionStore() throws IOException {
if (transactionStore == null) {
transactionStore = new MemoryTransactionStore();
transactionStore = new MemoryTransactionStore(this);
}
return this.transactionStore;
}

View File

@ -89,7 +89,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
public TransactionStore createTransactionStore() throws IOException {
if (transactionStore == null) {
transactionStore = new MemoryTransactionStore();
transactionStore = new MemoryTransactionStore(this);
}
return transactionStore;
}

View File

@ -29,6 +29,7 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.ProxyMessageStore;
import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
@ -44,12 +45,12 @@ import org.apache.activemq.store.TransactionStore;
public class MemoryTransactionStore implements TransactionStore {
ConcurrentHashMap<Object, Tx> inflightTransactions = new ConcurrentHashMap<Object, Tx>();
ConcurrentHashMap<TransactionId, Tx> preparedTransactions = new ConcurrentHashMap<TransactionId, Tx>();
final PersistenceAdapter persistenceAdapter;
private boolean doingRecover;
public static class Tx {
public class Tx {
private ArrayList<AddMessageCommand> messages = new ArrayList<AddMessageCommand>();
private ArrayList<RemoveMessageCommand> acks = new ArrayList<RemoveMessageCommand>();
@ -86,29 +87,43 @@ public class MemoryTransactionStore implements TransactionStore {
* @throws IOException
*/
public void commit() throws IOException {
// Do all the message adds.
for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
AddMessageCommand cmd = iter.next();
cmd.run();
}
// And removes..
for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
RemoveMessageCommand cmd = iter.next();
cmd.run();
ConnectionContext ctx = new ConnectionContext();
persistenceAdapter.beginTransaction(ctx);
try {
// Do all the message adds.
for (Iterator<AddMessageCommand> iter = messages.iterator(); iter.hasNext();) {
AddMessageCommand cmd = iter.next();
cmd.run(ctx);
}
// And removes..
for (Iterator<RemoveMessageCommand> iter = acks.iterator(); iter.hasNext();) {
RemoveMessageCommand cmd = iter.next();
cmd.run(ctx);
}
} catch ( IOException e ) {
persistenceAdapter.rollbackTransaction(ctx);
throw e;
}
persistenceAdapter.commitTransaction(ctx);
}
}
public interface AddMessageCommand {
Message getMessage();
void run() throws IOException;
void run(ConnectionContext context) throws IOException;
}
public interface RemoveMessageCommand {
MessageAck getMessageAck();
void run() throws IOException;
void run(ConnectionContext context) throws IOException;
}
public MemoryTransactionStore(PersistenceAdapter persistenceAdapter) {
this.persistenceAdapter=persistenceAdapter;
}
public MessageStore proxy(MessageStore messageStore) {
@ -221,15 +236,16 @@ public class MemoryTransactionStore implements TransactionStore {
return message;
}
public void run() throws IOException {
destination.addMessage(null, message);
public void run(ConnectionContext ctx) throws IOException {
destination.addMessage(ctx, message);
}
});
} else {
destination.addMessage(null, message);
}
}
/**
* @param ack
* @throws IOException
@ -246,8 +262,8 @@ public class MemoryTransactionStore implements TransactionStore {
return ack;
}
public void run() throws IOException {
destination.removeMessage(null, ack);
public void run(ConnectionContext ctx) throws IOException {
destination.removeMessage(ctx, ack);
}
});
} else {