From f6401d81b55860e091afda052327ca6183bc6586 Mon Sep 17 00:00:00 2001 From: "michael.pearce" Date: Mon, 11 Nov 2019 12:33:13 +0000 Subject: [PATCH] ARTEMIS-2547 fix AMQP Client reconnect fails on broker stop start Add unit test Add fix to clear clientids when server is stopped. --- .../core/server/impl/ActiveMQServerImpl.java | 2 + .../amqp/JMSClientTestSupport.java | 24 ++++++++- .../amqp/JMSMessageConsumerTest.java | 54 +++++++++++++++++++ 3 files changed, 78 insertions(+), 2 deletions(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index f73407a54d..91e71c47cd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -1249,6 +1249,8 @@ public class ActiveMQServerImpl implements ActiveMQServer { scaledDownNodeIDs.clear(); + connectedClientIds.clear(); + for (ActiveMQComponent externalComponent : externalComponents) { try { if (externalComponent instanceof ServiceComponent) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java index 39af3e60a0..fbed4ec8b7 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java @@ -75,7 +75,7 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport { return ""; } - protected URI getBrokerQpidJMSConnectionURI() { + protected String getBrokerQpidJMSConnectionString() { try { int port = AMQP_PORT; @@ -100,7 +100,23 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport { uri = uri + "?" + getJmsConnectionURIOptions(); } - return new URI(uri); + return uri; + } catch (Exception e) { + throw new RuntimeException(); + } + } + + protected URI getBrokerQpidJMSConnectionURI() { + try { + return new URI(getBrokerQpidJMSConnectionString()); + } catch (Exception e) { + throw new RuntimeException(); + } + } + + protected URI getBrokerQpidJMSFailoverConnectionURI() { + try { + return new URI("failover:(" + getBrokerQpidJMSConnectionString() + ")"); } catch (Exception e) { throw new RuntimeException(); } @@ -110,6 +126,10 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport { return createConnection(getBrokerQpidJMSConnectionURI(), null, null, null, true); } + protected Connection createFailoverConnection() throws JMSException { + return createConnection(getBrokerQpidJMSFailoverConnectionURI(), null, null, null, true); + } + protected Connection createConnection(boolean start) throws JMSException { return createConnection(getBrokerQpidJMSConnectionURI(), null, null, null, start); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java index 7a83172b03..a634b44b21 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java @@ -866,4 +866,58 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport { assertFalse(failedToSubscribe.get()); } + + @Test(timeout = 30000) + public void testBrokerRestartAMQPProducerAMQPConsumer() throws Exception { + Connection connection = createFailoverConnection(); //AMQP + Connection connection2 = createFailoverConnection(); //AMQP + testBrokerRestart(connection, connection2); + } + + private void testBrokerRestart(Connection connection1, Connection connection2) throws Exception { + try { + Session session1 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + javax.jms.Queue queue1 = session1.createQueue(getQueueName()); + javax.jms.Queue queue2 = session2.createQueue(getQueueName()); + + final MessageConsumer consumer2 = session2.createConsumer(queue2); + + MessageProducer producer = session1.createProducer(queue1); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + connection1.start(); + + TextMessage message = session1.createTextMessage(); + message.setText("hello"); + producer.send(message); + + Message received = consumer2.receive(100); + + assertNotNull("Should have received a message by now.", received); + assertTrue("Should be an instance of TextMessage", received instanceof TextMessage); + assertEquals(DeliveryMode.PERSISTENT, received.getJMSDeliveryMode()); + + + server.stop(); + Wait.waitFor(() -> !server.isStarted(), 1000); + + server.start(); + + TextMessage message2 = session1.createTextMessage(); + message2.setText("hello"); + producer.send(message2); + + Message received2 = consumer2.receive(100); + + assertNotNull("Should have received a message by now.", received2); + assertTrue("Should be an instance of TextMessage", received2 instanceof TextMessage); + assertEquals(DeliveryMode.PERSISTENT, received2.getJMSDeliveryMode()); + + + } finally { + connection1.close(); + connection2.close(); + } + } }