This closes #1417
This commit is contained in:
commit
69004178dd
|
@ -785,7 +785,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
try {
|
||||
sessionContext.simpleCommit();
|
||||
} catch (ActiveMQException e) {
|
||||
if (e.getType() == ActiveMQExceptionType.UNBLOCKED || rollbackOnly) {
|
||||
if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT || rollbackOnly) {
|
||||
// The call to commit was unlocked on failover, we therefore rollback the tx,
|
||||
// and throw a transaction rolled back exception instead
|
||||
//or
|
||||
|
@ -1573,7 +1573,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
} catch (XAException xae) {
|
||||
throw xae;
|
||||
} catch (ActiveMQException e) {
|
||||
if (e.getType() == ActiveMQExceptionType.UNBLOCKED) {
|
||||
if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT) {
|
||||
// Unblocked on failover
|
||||
try {
|
||||
// will retry once after failover & unblock
|
||||
|
@ -1666,7 +1666,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
} catch (XAException xae) {
|
||||
throw xae;
|
||||
} catch (ActiveMQException e) {
|
||||
if (e.getType() == ActiveMQExceptionType.UNBLOCKED) {
|
||||
if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT) {
|
||||
// Unblocked on failover
|
||||
throw new XAException(XAException.XA_RETRY);
|
||||
}
|
||||
|
@ -1699,7 +1699,7 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
|
|||
throw xae;
|
||||
} catch (ActiveMQException e) {
|
||||
// we can retry this only because we know for sure that no work would have been done
|
||||
if (e.getType() == ActiveMQExceptionType.UNBLOCKED) {
|
||||
if (e.getType() == ActiveMQExceptionType.UNBLOCKED || e.getType() == ActiveMQExceptionType.CONNECTION_TIMEDOUT) {
|
||||
try {
|
||||
sessionContext.xaStart(xid, flags);
|
||||
} catch (XAException xae) {
|
||||
|
|
|
@ -389,6 +389,86 @@ public class FailoverTest extends FailoverTestBase {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This test would fail one in three or five times,
|
||||
* where the commit would leave the session dirty after a timeout.
|
||||
*/
|
||||
@Test(timeout = 120000)
|
||||
public void testTimeoutOnFailoverTransactionCommitTimeoutCommunication() throws Exception {
|
||||
locator.setCallTimeout(1000).setBlockOnNonDurableSend(true).setBlockOnDurableSend(true).setAckBatchSize(0).setReconnectAttempts(300).setRetryInterval(500);
|
||||
|
||||
((InVMNodeManager) nodeManager).failoverPause = 6000L;
|
||||
|
||||
ClientSessionFactoryInternal sf1 = (ClientSessionFactoryInternal) createSessionFactory(locator);
|
||||
final ClientSession session = createSession(sf1, false, false, false);
|
||||
|
||||
|
||||
session.createQueue(FailoverTestBase.ADDRESS, RoutingType.MULTICAST, FailoverTestBase.ADDRESS, null, true);
|
||||
|
||||
final CountDownLatch connectionFailed = new CountDownLatch(1);
|
||||
|
||||
session.addFailureListener(new SessionFailureListener() {
|
||||
@Override
|
||||
public void beforeReconnect(ActiveMQException exception) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connectionFailed(ActiveMQException exception, boolean failedOver) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void connectionFailed(ActiveMQException exception, boolean failedOver, String scaleDownTargetNodeID) {
|
||||
connectionFailed.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
|
||||
|
||||
for (int i = 0; i < 500; i++) {
|
||||
ClientMessage message = session.createMessage(true);
|
||||
message.putIntProperty("counter", i);
|
||||
|
||||
producer.send(message);
|
||||
|
||||
}
|
||||
|
||||
session.commit();
|
||||
|
||||
|
||||
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
|
||||
session.start();
|
||||
ClientMessage m = null;
|
||||
for (int i = 0; i < 500; i++) {
|
||||
m = consumer.receive(1000);
|
||||
Assert.assertNotNull(m);
|
||||
Assert.assertEquals(i, m.getIntProperty("counter").intValue());
|
||||
}
|
||||
|
||||
m.acknowledge();
|
||||
|
||||
crash(false, session);
|
||||
try {
|
||||
session.commit();
|
||||
fail("Exception expected");
|
||||
} catch (Exception expected) {
|
||||
expected.printStackTrace();
|
||||
}
|
||||
|
||||
Thread.sleep(2000);
|
||||
|
||||
m = null;
|
||||
for (int i = 0; i < 500; i++) {
|
||||
m = consumer.receive(1000);
|
||||
Assert.assertNotNull(m);
|
||||
Assert.assertEquals(i, m.getIntProperty("counter").intValue());
|
||||
}
|
||||
|
||||
m.acknowledge();
|
||||
|
||||
session.commit();
|
||||
|
||||
}
|
||||
|
||||
// https://issues.jboss.org/browse/HORNETQ-685
|
||||
@Test(timeout = 120000)
|
||||
public void testTimeoutOnFailoverTransactionRollback() throws Exception {
|
||||
|
|
Loading…
Reference in New Issue