diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 2acbe5ef0f..7e1e4c580c 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -786,33 +786,26 @@ public class Queue extends BaseDestination implements Task, UsageListener { @Override public void afterCommit() throws Exception { - LinkedList orderedWork = null; + LinkedList orderedWork = new LinkedList();; // use existing object to sync orderIndexUpdates that can be reassigned synchronized (sendLock) { - if (transaction == orderIndexUpdates.peek()) { - orderedWork = orderIndexUpdates; - orderIndexUpdates = new LinkedList(); - - // talking all the ordered work means that earlier - // and later threads do nothing. - // this avoids contention/race on the sendLock that - // guards the actual work. + Transaction next = orderIndexUpdates.peek(); + while( next!=null && next.isCommitted() ) { + orderedWork.addLast(orderIndexUpdates.removeFirst()); + next = orderIndexUpdates.peek(); } } // do the ordered work - if (orderedWork != null) { + if (!orderedWork.isEmpty()) { sendLock.lockInterruptibly(); try { for (Transaction tx : orderedWork) { sendSyncs.get(tx).processSend(); + sendSyncs.remove(tx); } } finally { sendLock.unlock(); } - for (Transaction tx : orderedWork) { - sendSyncs.get(tx).processSent(); - } - sendSyncs.remove(transaction); } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java index dce90e9954..99758d16cd 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java +++ b/activemq-broker/src/main/java/org/apache/activemq/transaction/Transaction.java @@ -40,6 +40,7 @@ public abstract class Transaction { public static final byte IN_USE_STATE = 1; // can go to: 2,3 public static final byte PREPARED_STATE = 2; // can go to: 3 public static final byte FINISHED_STATE = 3; + boolean committed = false; private final ArrayList synchronizations = new ArrayList(); private byte state = START_STATE; @@ -64,6 +65,14 @@ public abstract class Transaction { this.state = state; } + public boolean isCommitted() { + return committed; + } + + public void setCommitted(boolean committed) { + this.committed = committed; + } + public void addSynchronization(Synchronization r) { synchronizations.add(r); if (state == START_STATE) { @@ -182,6 +191,7 @@ public abstract class Transaction { protected void doPostCommit() throws XAException { try { + setCommitted(true); fireAfterCommit(); } catch (Throwable e) { // I guess this could happen. Post commit task failed diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala index b7bab2f683..9d8ecbf25d 100644 --- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala +++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala @@ -298,11 +298,13 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained { } def dequeue(expectedQueueKey:Long, id:MessageId) = { - val EntryLocator(queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[EntryLocator]; - assert(queueKey == expectedQueueKey) - val entry = QueueEntryRecord(id, queueKey, queueSeq) - this.synchronized { - getAction(id).dequeues += entry + if( id.getEntryLocator != null ) { + val EntryLocator(queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[EntryLocator]; + assert(queueKey == expectedQueueKey) + val entry = QueueEntryRecord(id, queueKey, queueSeq) + this.synchronized { + getAction(id).dequeues += entry + } } countDownFuture }