From 50a900c04b497c470eeb5c69985944cd6666ffa0 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Mon, 24 Jul 2017 20:11:18 -0400 Subject: [PATCH] ARTEMIS-1304 Message loss on Commmit timeout during failover --- .../core/client/impl/ClientSessionImpl.java | 8 +- .../cluster/failover/FailoverTest.java | 80 +++++++++++++++++++ 2 files changed, 84 insertions(+), 4 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java index 29b9504239..f4b80cd0ed 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ClientSessionImpl.java @@ -785,7 +785,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi try { sessionContext.simpleCommit(); } catch (ActiveMQException e) { - if (e.getType() == ActiveMQExceptionType.UNBLOCKED || rollbackOnly) { + if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT || rollbackOnly) { // The call to commit was unlocked on failover, we therefore rollback the tx, // and throw a transaction rolled back exception instead //or @@ -1573,7 +1573,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } catch (XAException xae) { throw xae; } catch (ActiveMQException e) { - if (e.getType() == ActiveMQExceptionType.UNBLOCKED) { + if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT) { // Unblocked on failover try { // will retry once after failover & unblock @@ -1666,7 +1666,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi } catch (XAException xae) { throw xae; } catch (ActiveMQException e) { - if (e.getType() == ActiveMQExceptionType.UNBLOCKED) { + if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT) { // Unblocked on failover throw new XAException(XAException.XA_RETRY); } @@ -1699,7 +1699,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi throw xae; } catch (ActiveMQException e) { // we can retry this only because we know for sure that no work would have been done - if (e.getType() == ActiveMQExceptionType.UNBLOCKED) { + if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT) { try { sessionContext.xaStart(xid, flags); } catch (XAException xae) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java index 7912c96a74..c5954cf599 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java @@ -389,6 +389,86 @@ public class FailoverTest extends FailoverTestBase { } } + /** + * This test would fail one in three or five times, + * where the commit would leave the session dirty after a timeout. + */ + @Test(timeout = 120000) + public void testTimeoutOnFailoverTransactionCommitTimeoutCommunication() throws Exception { + locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(500); + + ((InVMNodeManager) nodeManager).failoverPause = 6000L; + + ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator); + final ClientSession session = createSession(sf1, false, false, false); + + + session.createQueue(FailoverTestBase.ADDRESS, RoutingType.MULTICAST, FailoverTestBase.ADDRESS, null, true); + + final CountDownLatch connectionFailed = new CountDownLatch(1); + + session.addFailureListener(new SessionFailureListener() { + @Override + public void beforeReconnect(ActiveMQException exception) { + } + + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver) { + } + + @Override + public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) { + connectionFailed.countDown(); + } + }); + + final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS); + + for (int i = 0; i < 500; i++) { + ClientMessage message = session.createMessage(true); + message.putIntProperty("counter", i); + + producer.send(message); + + } + + session.commit(); + + + ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS); + session.start(); + ClientMessage m = null; + for (int i = 0; i < 500; i++) { + m = consumer.receive(1000); + Assert.assertNotNull(m); + Assert.assertEquals(i, m.getIntProperty("counter").intValue()); + } + + m.acknowledge(); + + crash(false, session); + try { + session.commit(); + fail("Exception expected"); + } catch (Exception expected) { + expected.printStackTrace(); + } + + Thread.sleep(2000); + + m = null; + for (int i = 0; i < 500; i++) { + m = consumer.receive(1000); + Assert.assertNotNull(m); + Assert.assertEquals(i, m.getIntProperty("counter").intValue()); + } + + m.acknowledge(); + + session.commit(); + + } + // https://issues.jboss.org/browse/HORNETQ-685 @Test(timeout = 120000) public void testTimeoutOnFailoverTransactionRollback() throws Exception {