This fixes the recent errors the test cases have been seeing with transacted acks due to the new ack assertion bits added.

We now take the mesage out of the dispatch list when the ack is received, but we put it back on a rollback.



git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@696370 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2008-09-17 16:59:42 +00:00
parent 1590da28a5
commit d453b8d390
2 changed files with 14 additions and 8 deletions

View File

@ -197,6 +197,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
} }
if (inAckRange) { if (inAckRange) {
// Don't remove the nodes until we are committed. // Don't remove the nodes until we are committed.
removeList.add(node);
if (!context.isInTransaction()) { if (!context.isInTransaction()) {
dequeueCounter++; dequeueCounter++;
if (!this.getConsumerInfo().isBrowser()) { if (!this.getConsumerInfo().isBrowser()) {
@ -205,7 +206,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (!isSlave()) { if (!isSlave()) {
node.getRegionDestination().getDestinationStatistics().getInflight().decrement(); node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
} }
removeList.add(node);
} else { } else {
// setup a Synchronization to remove nodes from the // setup a Synchronization to remove nodes from the
// dispatched list. // dispatched list.
@ -215,9 +215,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
public void afterCommit() public void afterCommit()
throws Exception { throws Exception {
synchronized(dispatchLock) { synchronized(dispatchLock) {
dequeueCounter++; dequeueCounter++;
dispatched.remove(node);
node node
.getRegionDestination() .getRegionDestination()
.getDestinationStatistics() .getDestinationStatistics()
@ -234,9 +232,11 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
} }
} }
public void afterRollback() public void afterRollback() throws Exception {
throws Exception { // Need to put it back in the front.
super.afterRollback(); synchronized(dispatchLock) {
dispatched.add(0, node);
}
} }
}); });
} }
@ -426,12 +426,16 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
boolean checkFoundStart = false; boolean checkFoundStart = false;
boolean checkFoundEnd = false; boolean checkFoundEnd = false;
for (MessageReference node : dispatched) { for (MessageReference node : dispatched) {
if (!checkFoundStart && firstAckedMsg != null && firstAckedMsg.equals(node.getMessageId())) {
if( firstAckedMsg == null ) {
checkFoundStart=true;
} else if (!checkFoundStart && firstAckedMsg.equals(node.getMessageId())) {
checkFoundStart = true; checkFoundStart = true;
} }
if (checkFoundStart || firstAckedMsg == null) if (checkFoundStart) {
checkCount++; checkCount++;
}
if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) { if (lastAckedMsg != null && lastAckedMsg.equals(node.getMessageId())) {
checkFoundEnd = true; checkFoundEnd = true;

View File

@ -18,6 +18,7 @@ package org.apache.activemq.transaction;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import javax.transaction.xa.XAException; import javax.transaction.xa.XAException;
@ -88,6 +89,7 @@ public abstract class Transaction {
} }
public void fireAfterRollback() throws Exception { public void fireAfterRollback() throws Exception {
Collections.reverse(synchronizations);
for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) { for (Iterator<Synchronization> iter = synchronizations.iterator(); iter.hasNext();) {
Synchronization s = iter.next(); Synchronization s = iter.next();
s.afterRollback(); s.afterRollback();