ARTEMIS-3338 Preserve prepared XA transactions on connection failure
This commit is contained in:
parent
05498c350e
commit
bafefdc8ec
|
@ -687,7 +687,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
public void fail(ActiveMQException me, String message) {
|
||||
|
||||
for (Transaction tx : txMap.values()) {
|
||||
tx.rollbackIfPossible();
|
||||
tx.tryRollback();
|
||||
}
|
||||
|
||||
if (me != null) {
|
||||
|
|
|
@ -400,19 +400,17 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
|
|||
|
||||
Transaction txToRollback = tx;
|
||||
if (txToRollback != null) {
|
||||
if (txToRollback.getXid() != null) {
|
||||
if (txToRollback.tryRollback() && txToRollback.getXid() != null) {
|
||||
resourceManager.removeTransaction(txToRollback.getXid(), remotingConnection);
|
||||
}
|
||||
txToRollback.rollbackIfPossible();
|
||||
}
|
||||
|
||||
txToRollback = pendingTX;
|
||||
|
||||
if (txToRollback != null) {
|
||||
if (txToRollback.getXid() != null) {
|
||||
if (txToRollback.tryRollback() && txToRollback.getXid() != null) {
|
||||
resourceManager.removeTransaction(txToRollback.getXid(), remotingConnection);
|
||||
}
|
||||
txToRollback.rollbackIfPossible();
|
||||
}
|
||||
|
||||
} else {
|
||||
|
|
|
@ -54,7 +54,7 @@ public interface Transaction {
|
|||
/** In a ServerSession failure scenario,\
|
||||
* we may try to rollback, however only if it's not prepared.
|
||||
* In case it's prepared, we will just let it be and let the transaction manager to deal with it */
|
||||
void rollbackIfPossible();
|
||||
boolean tryRollback();
|
||||
|
||||
long getID();
|
||||
|
||||
|
|
|
@ -341,16 +341,17 @@ public class TransactionImpl implements Transaction {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void rollbackIfPossible() {
|
||||
public boolean tryRollback() {
|
||||
synchronized (timeoutLock) {
|
||||
if (state == State.ROLLEDBACK) {
|
||||
// I don't think this could happen, but just in case
|
||||
logger.debug("TransactionImpl::rollbackIfPossible::" + this + " is being ignored");
|
||||
return;
|
||||
return true;
|
||||
}
|
||||
if (state != State.PREPARED) {
|
||||
try {
|
||||
internalRollback();
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
// nothing we can do beyond logging
|
||||
// no need to special handler here as this was not even supposed to happen at this point
|
||||
|
@ -359,6 +360,7 @@ public class TransactionImpl implements Transaction {
|
|||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -110,15 +110,20 @@ public class SessionFailureXATest extends ActiveMQTestBase {
|
|||
|
||||
@Test
|
||||
public void testFailureWithXAEnd() throws Exception {
|
||||
testFailure(true);
|
||||
testFailure(true, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFailureWithoutXAEnd() throws Exception {
|
||||
testFailure(false);
|
||||
testFailure(false, false);
|
||||
}
|
||||
|
||||
public void testFailure(boolean xaEnd) throws Exception {
|
||||
@Test
|
||||
public void testFailureWithXAPrepare() throws Exception {
|
||||
testFailure(true, true);
|
||||
}
|
||||
|
||||
public void testFailure(boolean xaEnd, boolean xaPrepare) throws Exception {
|
||||
|
||||
ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
|
||||
try {
|
||||
|
@ -160,6 +165,10 @@ public class SessionFailureXATest extends ActiveMQTestBase {
|
|||
// We are validating both cases, where xaEnd succeeded and didn't succeed
|
||||
// so this tests is parameterized to validate both cases.
|
||||
clientSession.end(xid, XAResource.TMSUCCESS);
|
||||
|
||||
if (xaPrepare) {
|
||||
clientSession.prepare(xid);
|
||||
}
|
||||
}
|
||||
|
||||
Wait.assertEquals(1, () -> messagingService.getSessions().size());
|
||||
|
@ -171,7 +180,11 @@ public class SessionFailureXATest extends ActiveMQTestBase {
|
|||
|
||||
Wait.assertEquals(0, () -> messagingService.getSessions().size());
|
||||
|
||||
Wait.assertEquals(0, messagingService.getResourceManager()::size);
|
||||
if (xaPrepare) {
|
||||
Wait.assertEquals(1, messagingService.getResourceManager()::size);
|
||||
} else {
|
||||
Wait.assertEquals(0, messagingService.getResourceManager()::size);
|
||||
}
|
||||
|
||||
locator = createInVMNonHALocator();
|
||||
sessionFactory = createSessionFactory(locator);
|
||||
|
@ -188,25 +201,29 @@ public class SessionFailureXATest extends ActiveMQTestBase {
|
|||
|
||||
HashSet<String> bodies = new HashSet<>();
|
||||
m = clientConsumer.receive(1000);
|
||||
Assert.assertNotNull(m);
|
||||
m.acknowledge();
|
||||
assertOrTrack(xaEnd, m, bodies, "m1");
|
||||
m = clientConsumer.receive(1000);
|
||||
Assert.assertNotNull(m);
|
||||
m.acknowledge();
|
||||
assertOrTrack(xaEnd, m, bodies, "m2");
|
||||
m = clientConsumer.receive(1000);
|
||||
Assert.assertNotNull(m);
|
||||
m.acknowledge();
|
||||
assertOrTrack(xaEnd, m, bodies, "m3");
|
||||
m = clientConsumer.receive(1000);
|
||||
Assert.assertNotNull(m);
|
||||
m.acknowledge();
|
||||
assertOrTrack(xaEnd, m, bodies, "m4");
|
||||
if (xaPrepare) {
|
||||
Assert.assertNull(m);
|
||||
} else {
|
||||
Assert.assertNotNull(m);
|
||||
m.acknowledge();
|
||||
assertOrTrack(xaEnd, m, bodies, "m1");
|
||||
m = clientConsumer.receive(1000);
|
||||
Assert.assertNotNull(m);
|
||||
m.acknowledge();
|
||||
assertOrTrack(xaEnd, m, bodies, "m2");
|
||||
m = clientConsumer.receive(1000);
|
||||
Assert.assertNotNull(m);
|
||||
m.acknowledge();
|
||||
assertOrTrack(xaEnd, m, bodies, "m3");
|
||||
m = clientConsumer.receive(1000);
|
||||
Assert.assertNotNull(m);
|
||||
m.acknowledge();
|
||||
assertOrTrack(xaEnd, m, bodies, "m4");
|
||||
|
||||
if (!xaEnd) {
|
||||
// order is not guaranteed b/c the m4 async ack may not have been processed when there is no sync end call
|
||||
assertEquals("got all bodies", 4, bodies.size());
|
||||
if (!xaEnd) {
|
||||
// order is not guaranteed b/c the m4 async ack may not have been processed when there is no sync end call
|
||||
assertEquals("got all bodies", 4, bodies.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -140,8 +140,8 @@ public class BindingsImplTest extends ActiveMQTestBase {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void rollbackIfPossible() {
|
||||
|
||||
public boolean tryRollback() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue