https://issues.apache.org/jira/browse/AMQ-6089 - support TMNOFLAGS as a scan end to allow looping calls to recover to terminate

This commit is contained in:
gtully 2016-01-06 12:54:20 +00:00
parent e3df09b9db
commit 16bc0f0d75
2 changed files with 38 additions and 16 deletions

View File

@ -672,26 +672,32 @@ public class TransactionContext implements XAResource {
@Override
public Xid[] recover(int flag) throws XAException {
LOG.debug("recover({})", flag);
XATransactionId[] answer;
TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
try {
this.connection.checkClosedOrFailed();
this.connection.ensureConnectionInfoSent();
if (XAResource.TMNOFLAGS == flag) {
// signal next in cursor scan, which for us is always the end b/c we don't maintain any cursor state
// allows looping scan to complete
answer = new XATransactionId[0];
} else {
TransactionInfo info = new TransactionInfo(getConnectionId(), null, TransactionInfo.RECOVER);
try {
this.connection.checkClosedOrFailed();
this.connection.ensureConnectionInfoSent();
DataArrayResponse receipt = (DataArrayResponse)this.connection.syncSendPacket(info);
DataStructure[] data = receipt.getData();
XATransactionId[] answer;
if (data instanceof XATransactionId[]) {
answer = (XATransactionId[])data;
} else {
answer = new XATransactionId[data.length];
System.arraycopy(data, 0, answer, 0, data.length);
DataArrayResponse receipt = (DataArrayResponse) this.connection.syncSendPacket(info);
DataStructure[] data = receipt.getData();
if (data instanceof XATransactionId[]) {
answer = (XATransactionId[]) data;
} else {
answer = new XATransactionId[data.length];
System.arraycopy(data, 0, answer, 0, data.length);
}
} catch (JMSException e) {
throw toXAException(e);
}
LOG.debug("recover({})={}", flag, answer);
return answer;
} catch (JMSException e) {
throw toXAException(e);
}
LOG.debug("recover({})={}", flag, answer);
return answer;
}
@Override

View File

@ -16,7 +16,9 @@
*/
package org.apache.activemq.broker;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -24,7 +26,11 @@ import javax.jms.JMSException;
import javax.management.InstanceNotFoundException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import junit.framework.Test;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.TransactionContext;
import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
import org.apache.activemq.broker.jmx.DestinationViewMBean;
import org.apache.activemq.broker.jmx.PersistenceAdapterViewMBean;
@ -102,6 +108,16 @@ public class XARecoveryBrokerTest extends BrokerRestartTestSupport {
dar = (DataArrayResponse)response;
assertEquals(4, dar.getData().length);
// verify XAResource scan loop
XAResource transactionContextXAResource = new TransactionContext(ActiveMQConnection.makeConnection(broker.getVmConnectorURI().toString()));
LinkedList<Xid> tracked = new LinkedList<Xid>();
Xid[] recoveryXids = transactionContextXAResource.recover(XAResource.TMSTARTRSCAN);
while (recoveryXids.length > 0) {
tracked.addAll(Arrays.asList(recoveryXids));
recoveryXids = transactionContextXAResource.recover(XAResource.TMNOFLAGS);
}
assertEquals("got 4 via scan loop", 4, tracked.size());
// validate destination depth via jmx
DestinationViewMBean destinationView = getProxyToDestination(destinationList(destination)[0]);
assertEquals("enqueue count does not see prepared", 0, destinationView.getQueueSize());