https://issues.apache.org/jira/browse/AMQ-5951 - scenario wheere transaction command can block, additional test and further fix

This commit is contained in:
gtully 2015-09-29 16:34:36 +01:00
parent 94b56977d2
commit 8514e38135
2 changed files with 48 additions and 10 deletions

View File

@ -607,7 +607,7 @@ public class FailoverTransport implements CompositeTransport {
long start = System.currentTimeMillis();
boolean timedout = false;
while (transport == null && !disposed && connectionFailure == null
&& !Thread.currentThread().isInterrupted()) {
&& !Thread.currentThread().isInterrupted() && willReconnect()) {
if (LOG.isTraceEnabled()) {
LOG.trace("Waiting for transport to reconnect..: " + command);
}
@ -639,6 +639,8 @@ public class FailoverTransport implements CompositeTransport {
error = connectionFailure;
} else if (timedout == true) {
error = new IOException("Failover timeout of " + timeout + " ms reached.");
} else if (!willReconnect()) {
error = new IOException("Reconnect attempts of " + maxReconnectAttempts + " exceeded");
} else {
error = new IOException("Unexpected failure.");
}
@ -723,6 +725,10 @@ public class FailoverTransport implements CompositeTransport {
}
}
private boolean willReconnect() {
return firstConnection || 0 != calculateReconnectAttemptLimit();
}
@Override
public FutureResponse asyncRequest(Object command, ResponseCallback responseCallback) throws IOException {
throw new AssertionError("Unsupported Method");

View File

@ -26,6 +26,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.ExceptionListener;
@ -37,7 +38,10 @@ import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.LocalTransactionId;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionInfo;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -97,6 +101,14 @@ public class FailoverTimeoutTest {
LOG.info("Time spent waiting to connect: {} ms", duration);
assertTrue(duration > 3000);
safeClose(connection);
}
private void safeClose(Connection connection) {
try {
connection.close();
} catch (Exception ignored) {}
}
@Test
@ -131,10 +143,29 @@ public class FailoverTimeoutTest {
}
@Test
public void testInterleaveSendAndException() throws Exception {
public void testInterleaveAckAndException() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?maxReconnectAttempts=0");
final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
doTestInterleaveAndException(connection, new MessageAck());
safeClose(connection);
}
@Test
public void testInterleaveTxAndException() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?maxReconnectAttempts=0");
final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
TransactionInfo tx = new TransactionInfo();
tx.setConnectionId(connection.getConnectionInfo().getConnectionId());
tx.setTransactionId(new LocalTransactionId(tx.getConnectionId(), 1));
doTestInterleaveAndException(connection, tx);
safeClose(connection);
}
public void doTestInterleaveAndException(final ActiveMQConnection connection, final Command command) throws Exception {
connection.start();
connection.setExceptionListener(new ExceptionListener() {
@ -143,7 +174,7 @@ public class FailoverTimeoutTest {
try {
LOG.info("Deal with exception - invoke op that may block pending outstanding oneway");
// try and invoke on connection as part of handling exception
connection.asyncSendPacket(new MessageAck());
connection.asyncSendPacket(command);
} catch (Exception e) {
}
}
@ -154,24 +185,24 @@ public class FailoverTimeoutTest {
final int NUM_TASKS = 200;
final CountDownLatch enqueueOnExecutorDone = new CountDownLatch(NUM_TASKS);
// let a few tasks delay a bit
final AtomicLong sleepMillis = new AtomicLong(1000);
for (int i=0; i < NUM_TASKS; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
try {
connection.asyncSendPacket(new MessageAck());
} catch (JMSException e) {
e.printStackTrace();
TimeUnit.MILLISECONDS.sleep(Math.max(0, sleepMillis.addAndGet(-50)));
connection.asyncSendPacket(command);
} catch (Exception e) {
} finally {
enqueueOnExecutorDone.countDown();
}
}
});
}
while (enqueueOnExecutorDone.getCount() > (NUM_TASKS - 20)) {
while (enqueueOnExecutorDone.getCount() > (NUM_TASKS - 10)) {
enqueueOnExecutorDone.await(20, TimeUnit.MILLISECONDS);
}
@ -184,6 +215,7 @@ public class FailoverTimeoutTest {
assertTrue("all ops finish", enqueueOnExecutorDone.await(15, TimeUnit.SECONDS));
}
@Test
public void testUpdateUris() throws Exception {