https://issues.apache.org/jira/browse/AMQ-5951 - ensure failover oneway won't retry if reconnect will not happen

This commit is contained in:
gtully 2015-08-31 15:55:44 +01:00
parent 1ea289736b
commit ae9af4b8b2
2 changed files with 64 additions and 3 deletions

View File

@ -674,7 +674,7 @@ public class FailoverTransport implements CompositeTransport {
// If the command was not tracked.. we will retry in // If the command was not tracked.. we will retry in
// this method // this method
if (tracked == null) { if (tracked == null && canReconnect()) {
// since we will retry in this method.. take it // since we will retry in this method.. take it
// out of the request // out of the request

View File

@ -20,9 +20,15 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.net.Socket;
import java.net.URI; import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.ExceptionListener;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
@ -31,6 +37,7 @@ import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.MessageAck;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -49,7 +56,7 @@ public class FailoverTimeoutTest {
public void setUp() throws Exception { public void setUp() throws Exception {
bs = new BrokerService(); bs = new BrokerService();
bs.setUseJmx(false); bs.setUseJmx(false);
bs.addConnector("tcp://localhost:0"); bs.addConnector(getTransportUri());
bs.start(); bs.start();
tcpUri = bs.getTransportConnectors().get(0).getConnectUri(); tcpUri = bs.getTransportConnectors().get(0).getConnectUri();
} }
@ -119,8 +126,62 @@ public class FailoverTimeoutTest {
bs.waitUntilStarted(); bs.waitUntilStarted();
producer.send(message); producer.send(message);
bs.stop(); bs.stop();
connection.close();
}
@Test
public void testInterleaveSendAndException() throws Exception {
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?maxReconnectAttempts=0");
final ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
connection.start();
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
try {
LOG.info("Deal with exception - invoke op that may block pending outstanding oneway");
// try and invoke on connection as part of handling exception
connection.asyncSendPacket(new MessageAck());
} catch (Exception e) {
}
}
});
final ExecutorService executorService = Executors.newCachedThreadPool();
final int NUM_TASKS = 200;
final CountDownLatch enqueueOnExecutorDone = new CountDownLatch(NUM_TASKS);
for (int i=0; i < NUM_TASKS; i++) {
executorService.submit(new Runnable() {
@Override
public void run() {
try {
connection.asyncSendPacket(new MessageAck());
} catch (JMSException e) {
e.printStackTrace();
} finally {
enqueueOnExecutorDone.countDown();
}
}
});
}
while (enqueueOnExecutorDone.getCount() > (NUM_TASKS - 20)) {
enqueueOnExecutorDone.await(20, TimeUnit.MILLISECONDS);
}
// force IOException
final Socket socket = connection.getTransport().narrow(Socket.class);
socket.close();
executorService.shutdown();
assertTrue("all ops finish", enqueueOnExecutorDone.await(15, TimeUnit.SECONDS));
} }
@Test @Test