ARTEMIS-1304 Message loss on Commmit timeout during failover

This commit is contained in:
Clebert Suconic 2017-07-24 20:11:18 -04:00
parent 08d8c08ed3
commit 50a900c04b
2 changed files with 84 additions and 4 deletions

View File

@ -785,7 +785,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
try { try {
sessionContext.simpleCommit(); sessionContext.simpleCommit();
} catch (ActiveMQException e) { } catch (ActiveMQException e) {
if (e.getType() == ActiveMQExceptionType.UNBLOCKED || rollbackOnly) { if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT || rollbackOnly) {
// The call to commit was unlocked on failover, we therefore rollback the tx, // The call to commit was unlocked on failover, we therefore rollback the tx,
// and throw a transaction rolled back exception instead // and throw a transaction rolled back exception instead
//or //or
@ -1573,7 +1573,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
} catch (XAException xae) { } catch (XAException xae) {
throw xae; throw xae;
} catch (ActiveMQException e) { } catch (ActiveMQException e) {
if (e.getType() == ActiveMQExceptionType.UNBLOCKED) { if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT) {
// Unblocked on failover // Unblocked on failover
try { try {
// will retry once after failover & unblock // will retry once after failover & unblock
@ -1666,7 +1666,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
} catch (XAException xae) { } catch (XAException xae) {
throw xae; throw xae;
} catch (ActiveMQException e) { } catch (ActiveMQException e) {
if (e.getType() == ActiveMQExceptionType.UNBLOCKED) { if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT) {
// Unblocked on failover // Unblocked on failover
throw new XAException(XAException.XA_RETRY); throw new XAException(XAException.XA_RETRY);
} }
@ -1699,7 +1699,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
throw xae; throw xae;
} catch (ActiveMQException e) { } catch (ActiveMQException e) {
// we can retry this only because we know for sure that no work would have been done // we can retry this only because we know for sure that no work would have been done
if (e.getType() == ActiveMQExceptionType.UNBLOCKED) { if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT) {
try { try {
sessionContext.xaStart(xid, flags); sessionContext.xaStart(xid, flags);
} catch (XAException xae) { } catch (XAException xae) {

View File

@ -389,6 +389,86 @@ public class FailoverTest extends FailoverTestBase {
} }
} }
/**
* This test would fail one in three or five times,
* where the commit would leave the session dirty after a timeout.
*/
@Test(timeout = 120000)
public void testTimeoutOnFailoverTransactionCommitTimeoutCommunication() throws Exception {
locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(500);
((InVMNodeManager) nodeManager).failoverPause = 6000L;
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
final ClientSession session = createSession(sf1, false, false, false);
session.createQueue(FailoverTestBase.ADDRESS, RoutingType.MULTICAST, FailoverTestBase.ADDRESS, null, true);
final CountDownLatch connectionFailed = new CountDownLatch(1);
session.addFailureListener(new SessionFailureListener() {
@Override
public void beforeReconnect(ActiveMQException exception) {
}
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver) {
}
@Override
public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
connectionFailed.countDown();
}
});
final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
for (int i = 0; i < 500; i++) {
ClientMessage message = session.createMessage(true);
message.putIntProperty("counter", i);
producer.send(message);
}
session.commit();
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
ClientMessage m = null;
for (int i = 0; i < 500; i++) {
m = consumer.receive(1000);
Assert.assertNotNull(m);
Assert.assertEquals(i, m.getIntProperty("counter").intValue());
}
m.acknowledge();
crash(false, session);
try {
session.commit();
fail("Exception expected");
} catch (Exception expected) {
expected.printStackTrace();
}
Thread.sleep(2000);
m = null;
for (int i = 0; i < 500; i++) {
m = consumer.receive(1000);
Assert.assertNotNull(m);
Assert.assertEquals(i, m.getIntProperty("counter").intValue());
}
m.acknowledge();
session.commit();
}
// https://issues.jboss.org/browse/HORNETQ-685 // https://issues.jboss.org/browse/HORNETQ-685
@Test(timeout = 120000) @Test(timeout = 120000)
public void testTimeoutOnFailoverTransactionRollback() throws Exception { public void testTimeoutOnFailoverTransactionRollback() throws Exception {