diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java index 6194790cfc..2c869a2492 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkFailoverTest.java @@ -56,26 +56,28 @@ public class NetworkFailoverTest extends TestCase { protected BrokerService remoteBroker; protected Session localSession; protected Session remoteSession; - protected ActiveMQQueue included=new ActiveMQQueue("include.test.foo"); + protected ActiveMQQueue included = new ActiveMQQueue("include.test.foo"); protected String consumerName = "durableSubs"; public void testRequestReply() throws Exception { final MessageProducer remoteProducer = remoteSession.createProducer(null); MessageConsumer remoteConsumer = remoteSession.createConsumer(included); remoteConsumer.setMessageListener(new MessageListener() { + @Override public void onMessage(Message msg) { try { - TextMessage textMsg = (TextMessage)msg; + TextMessage textMsg = (TextMessage) msg; String payload = "REPLY: " + textMsg.getText(); Destination replyTo; replyTo = msg.getJMSReplyTo(); textMsg.clearBody(); textMsg.setText(payload); - remoteProducer.send(replyTo, textMsg); - + LOG.info("*** Sending response: {}", textMsg.getText()); + remoteProducer.send(replyTo, textMsg); } catch (Exception e) { + LOG.warn("*** Responder listener caught exception: ", e); e.printStackTrace(); - } + } } }); @@ -83,7 +85,7 @@ public class NetworkFailoverTest extends TestCase { MessageProducer requestProducer = localSession.createProducer(included); requestProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); MessageConsumer requestConsumer = localSession.createConsumer(tempQueue); - + // allow for consumer infos to perculate arround Thread.sleep(2000); for (int i = 0; i < MESSAGE_COUNT; i++) { @@ -91,58 +93,65 @@ public class NetworkFailoverTest extends TestCase { TextMessage msg = localSession.createTextMessage(payload); msg.setJMSReplyTo(tempQueue); requestProducer.send(msg); - LOG.info("Failing over"); - ((FailoverTransport) ((TransportFilter) ((TransportFilter) - ((ActiveMQConnection) localConnection) - .getTransport()).getNext()).getNext()) - .handleTransportFailure(new IOException("Forcing failover from test")); - TextMessage result = (TextMessage)requestConsumer.receive(10000); + LOG.info("*** Failing over for iteration: #{}", i); + ((FailoverTransport) ((TransportFilter) ((TransportFilter) ((ActiveMQConnection) localConnection).getTransport()).getNext()).getNext()) + .handleTransportFailure(new IOException("Forcing failover from test")); + TextMessage result = (TextMessage) requestConsumer.receive(10000); assertNotNull(result); - - LOG.info(result.getText()); + LOG.info("*** Iteration #{} got response: {}", i, result.getText()); } } - - + @Override protected void setUp() throws Exception { super.setUp(); doSetUp(true); } + @Override protected void tearDown() throws Exception { doTearDown(); super.tearDown(); } protected void doTearDown() throws Exception { - localConnection.close(); - remoteConnection.close(); - localBroker.stop(); - remoteBroker.stop(); + try { + localConnection.close(); + remoteConnection.close(); + } catch(Exception ex) {} + + try { + localBroker.stop(); + } catch(Exception ex) {} + try { + remoteBroker.stop(); + } catch(Exception ex) {} } protected void doSetUp(boolean deleteAllMessages) throws Exception { - + remoteBroker = createRemoteBroker(); remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); remoteBroker.start(); + localBroker = createLocalBroker(); localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages); localBroker.start(); + String localURI = "tcp://localhost:61616"; String remoteURI = "tcp://localhost:61617"; - ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("failover:("+localURI+","+remoteURI+")?randomize=false&backup=true&trackMessages=true"); - //ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI); + ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory("failover:(" + localURI + "," + remoteURI + + ")?randomize=false&backup=true&trackMessages=true"); + // ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI); localConnection = fac.createConnection(); localConnection.setClientID("local"); localConnection.start(); - fac = new ActiveMQConnectionFactory("failover:("+remoteURI + ","+localURI+")?randomize=false&backup=true&trackMessages=true"); + fac = new ActiveMQConnectionFactory("failover:(" + remoteURI + "," + localURI + ")?randomize=false&backup=true&trackMessages=true"); fac.setWatchTopicAdvisories(false); remoteConnection = fac.createConnection(); remoteConnection.setClientID("remote"); remoteConnection.start(); - + localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); }