Fixes AMQ-4535: Activemq configured with leveldb commit fail when accessed by PutGet from IBM Perf Harness

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1482789 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2013-05-15 12:30:45 +00:00
parent 842630c8c7
commit 8461158178
3 changed files with 24 additions and 19 deletions

View File

@ -786,33 +786,26 @@ public class Queue extends BaseDestination implements Task, UsageListener {
@Override
public void afterCommit() throws Exception {
LinkedList<Transaction> orderedWork = null;
LinkedList<Transaction> orderedWork = new LinkedList<Transaction>();;
// use existing object to sync orderIndexUpdates that can be reassigned
synchronized (sendLock) {
if (transaction == orderIndexUpdates.peek()) {
orderedWork = orderIndexUpdates;
orderIndexUpdates = new LinkedList<Transaction>();
// talking all the ordered work means that earlier
// and later threads do nothing.
// this avoids contention/race on the sendLock that
// guards the actual work.
Transaction next = orderIndexUpdates.peek();
while( next!=null && next.isCommitted() ) {
orderedWork.addLast(orderIndexUpdates.removeFirst());
next = orderIndexUpdates.peek();
}
}
// do the ordered work
if (orderedWork != null) {
if (!orderedWork.isEmpty()) {
sendLock.lockInterruptibly();
try {
for (Transaction tx : orderedWork) {
sendSyncs.get(tx).processSend();
sendSyncs.remove(tx);
}
} finally {
sendLock.unlock();
}
for (Transaction tx : orderedWork) {
sendSyncs.get(tx).processSent();
}
sendSyncs.remove(transaction);
}
}

View File

@ -40,6 +40,7 @@ public abstract class Transaction {
public static final byte IN_USE_STATE = 1; // can go to: 2,3
public static final byte PREPARED_STATE = 2; // can go to: 3
public static final byte FINISHED_STATE = 3;
boolean committed = false;
private final ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>();
private byte state = START_STATE;
@ -64,6 +65,14 @@ public abstract class Transaction {
this.state = state;
}
public boolean isCommitted() {
return committed;
}
public void setCommitted(boolean committed) {
this.committed = committed;
}
public void addSynchronization(Synchronization r) {
synchronizations.add(r);
if (state == START_STATE) {
@ -182,6 +191,7 @@ public abstract class Transaction {
protected void doPostCommit() throws XAException {
try {
setCommitted(true);
fireAfterCommit();
} catch (Throwable e) {
// I guess this could happen. Post commit task failed

View File

@ -298,11 +298,13 @@ class DelayableUOW(val manager:DBManager) extends BaseRetained {
}
def dequeue(expectedQueueKey:Long, id:MessageId) = {
val EntryLocator(queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[EntryLocator];
assert(queueKey == expectedQueueKey)
val entry = QueueEntryRecord(id, queueKey, queueSeq)
this.synchronized {
getAction(id).dequeues += entry
if( id.getEntryLocator != null ) {
val EntryLocator(queueKey, queueSeq) = id.getEntryLocator.asInstanceOf[EntryLocator];
assert(queueKey == expectedQueueKey)
val entry = QueueEntryRecord(id, queueKey, queueSeq)
this.synchronized {
getAction(id).dequeues += entry
}
}
countDownFuture
}