From 09458713aacfa026f660591230dc52854b0203bc Mon Sep 17 00:00:00 2001 From: Matt Pavlovich Date: Wed, 9 Feb 2022 08:50:24 -0600 Subject: [PATCH] [AMQ-8443] Add unit test --- .../usecases/ClientRebalanceTest.java | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ClientRebalanceTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ClientRebalanceTest.java index e00eb70b10..4094ed3ee3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ClientRebalanceTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ClientRebalanceTest.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.usecases; +import static org.junit.Assert.assertNotEquals; + import javax.jms.Connection; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -25,6 +27,9 @@ import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.TransportConnection; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ConnectionControl; import org.apache.log4j.Logger; import org.springframework.core.io.ClassPathResource; @@ -87,4 +92,64 @@ public class ClientRebalanceTest extends JmsMultipleBrokersTestSupport { msg = consumer.receive(2000); assertNotNull(msg); } + + public void testReconnect() throws Exception { + createBroker(new ClassPathResource("org/apache/activemq/usecases/rebalance-broker1.xml")); + createBroker(new ClassPathResource("org/apache/activemq/usecases/rebalance-broker2.xml")); + + startAllBrokers(); + + brokers.get("b1").broker.waitUntilStarted(); + + LOG.info("Starting connection"); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)?randomize=false"); + Connection conn = factory.createConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue theQueue = session.createQueue("Test.ClientReconnectTest"); + MessageProducer producer = session.createProducer(theQueue); + MessageConsumer consumer = session.createConsumer(theQueue); + Message message = session.createTextMessage("Test message"); + producer.send(message); + Message msg = consumer.receive(2000); + assertNotNull(msg); + + TransportConnector transportConnector = brokers.get("b1").broker.getTransportConnectorByName("openwire"); + assertNotNull(transportConnector); + + TransportConnection startFailoverConnection = findClientFailoverTransportConnection(transportConnector); + + assertNotNull(startFailoverConnection); + String startConnectionId = startFailoverConnection.getConnectionId(); + String startRemoteAddress = startFailoverConnection.getRemoteAddress(); + ConnectionControl simulateRebalance = new ConnectionControl(); + simulateRebalance.setReconnectTo("tcp://localhost:61616"); + startFailoverConnection.dispatchSync(simulateRebalance); + + Thread.sleep(2000l); + + TransportConnection afterFailoverConnection = findClientFailoverTransportConnection(transportConnector); + assertNotNull(afterFailoverConnection); + assertEquals(startConnectionId, afterFailoverConnection.getConnectionId()); + assertNotEquals(startRemoteAddress, afterFailoverConnection.getRemoteAddress()); + // should still be able to send without exception to broker3 + producer.send(message); + msg = consumer.receive(2000); + assertNotNull(msg); + conn.close(); + } + + protected TransportConnection findClientFailoverTransportConnection(TransportConnector transportConnector) { + TransportConnection failoverConnection = null; + for(TransportConnection tmpConnection : transportConnector.getConnections()) { + if(tmpConnection.isNetworkConnection()) { + continue; + } + if(tmpConnection.isFaultTolerantConnection()) { + failoverConnection = tmpConnection; + } + } + return failoverConnection; + } }