From 8514e38135cf3c4da913806f3677a89785613e10 Mon Sep 17 00:00:00 2001 From: gtully Date: Tue, 29 Sep 2015 16:34:36 +0100 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5951 - scenario wheere transaction command can block, additional test and further fix --- .../transport/failover/FailoverTransport.java | 8 ++- .../failover/FailoverTimeoutTest.java | 50 +++++++++++++++---- 2 files changed, 48 insertions(+), 10 deletions(-) diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java index 4e196b32ef..0f36d67ee0 100755 --- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java +++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java @@ -607,7 +607,7 @@ public class FailoverTransport implements CompositeTransport { long start = System.currentTimeMillis(); boolean timedout = false; while (transport == null && !disposed && connectionFailure == null - && !Thread.currentThread().isInterrupted()) { + && !Thread.currentThread().isInterrupted() && willReconnect()) { if (LOG.isTraceEnabled()) { LOG.trace("Waiting for transport to reconnect..: " + command); } @@ -639,6 +639,8 @@ public class FailoverTransport implements CompositeTransport { error = connectionFailure; } else if (timedout == true) { error = new IOException("Failover timeout of " + timeout + " ms reached."); + } else if (!willReconnect()) { + error = new IOException("Reconnect attempts of " + maxReconnectAttempts + " exceeded"); } else { error = new IOException("Unexpected failure."); } @@ -723,6 +725,10 @@ public class FailoverTransport implements CompositeTransport { } } + private boolean willReconnect() { + return firstConnection || 0 != calculateReconnectAttemptLimit(); + } + @Override public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException { throw new AssertionError("Unsupported Method"); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java index 35a970f599..7c36840155 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import javax.jms.Connection; import javax.jms.ExceptionListener; @@ -37,7 +38,10 @@ import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.Command; +import org.apache.activemq.command.LocalTransactionId; import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.TransactionInfo; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -97,6 +101,14 @@ public class FailoverTimeoutTest { LOG.info("Time spent waiting to connect: {} ms", duration); assertTrue(duration > 3000); + + safeClose(connection); + } + + private void safeClose(Connection connection) { + try { + connection.close(); + } catch (Exception ignored) {} } @Test @@ -131,10 +143,29 @@ public class FailoverTimeoutTest { } @Test - public void testInterleaveSendAndException() throws Exception { - + public void testInterleaveAckAndException() throws Exception { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?maxReconnectAttempts=0"); final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); + + doTestInterleaveAndException(connection, new MessageAck()); + safeClose(connection); + } + + @Test + public void testInterleaveTxAndException() throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?maxReconnectAttempts=0"); + final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); + + TransactionInfo tx = new TransactionInfo(); + tx.setConnectionId(connection.getConnectionInfo().getConnectionId()); + tx.setTransactionId(new LocalTransactionId(tx.getConnectionId(), 1)); + doTestInterleaveAndException(connection, tx); + + safeClose(connection); + } + + public void doTestInterleaveAndException(final ActiveMQConnection connection, final Command command) throws Exception { + connection.start(); connection.setExceptionListener(new ExceptionListener() { @@ -143,7 +174,7 @@ public class FailoverTimeoutTest { try { LOG.info("Deal with exception - invoke op that may block pending outstanding oneway"); // try and invoke on connection as part of handling exception - connection.asyncSendPacket(new MessageAck()); + connection.asyncSendPacket(command); } catch (Exception e) { } } @@ -154,24 +185,24 @@ public class FailoverTimeoutTest { final int NUM_TASKS = 200; final CountDownLatch enqueueOnExecutorDone = new CountDownLatch(NUM_TASKS); + // let a few tasks delay a bit + final AtomicLong sleepMillis = new AtomicLong(1000); for (int i=0; i < NUM_TASKS; i++) { - executorService.submit(new Runnable() { @Override public void run() { try { - connection.asyncSendPacket(new MessageAck()); - } catch (JMSException e) { - e.printStackTrace(); + TimeUnit.MILLISECONDS.sleep(Math.max(0, sleepMillis.addAndGet(-50))); + connection.asyncSendPacket(command); + } catch (Exception e) { } finally { enqueueOnExecutorDone.countDown(); } - } }); } - while (enqueueOnExecutorDone.getCount() > (NUM_TASKS - 20)) { + while (enqueueOnExecutorDone.getCount() > (NUM_TASKS - 10)) { enqueueOnExecutorDone.await(20, TimeUnit.MILLISECONDS); } @@ -184,6 +215,7 @@ public class FailoverTimeoutTest { assertTrue("all ops finish", enqueueOnExecutorDone.await(15, TimeUnit.SECONDS)); } + @Test public void testUpdateUris() throws Exception {