diff --git a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java index da370795e3..b1e7e12c73 100644 --- a/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java +++ b/artemis-jms-server/src/main/java/org/apache/activemq/artemis/jms/bridge/impl/JMSBridgeImpl.java @@ -501,11 +501,13 @@ public final class JMSBridgeImpl implements JMSBridge { } } - try { - sourceConn.close(); - } catch (Exception ignore) { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source conn", ignore); + if (sourceConn != null) { + try { + sourceConn.close(); + } catch (Exception ignore) { + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source conn", ignore); + } } } @@ -519,6 +521,12 @@ public final class JMSBridgeImpl implements JMSBridge { } } + if (messages.size() > 0) { + // Clear outstanding messages so they don't get retransmitted and duplicated on the other side of the bridge + ActiveMQJMSBridgeLogger.LOGGER.trace("Clearing up messages before stopping..."); + messages.clear(); + } + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { ActiveMQJMSBridgeLogger.LOGGER.trace("Stopped " + this); } @@ -1189,11 +1197,13 @@ public final class JMSBridgeImpl implements JMSBridge { private void cleanup() { // Stop the source connection - try { - sourceConn.stop(); - } catch (Throwable ignore) { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to stop source connection", ignore); + if (sourceConn != null) { + try { + sourceConn.stop(); + } catch (Throwable ignore) { + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to stop source connection", ignore); + } } } @@ -1217,11 +1227,13 @@ public final class JMSBridgeImpl implements JMSBridge { } // Close the old objects - try { - sourceConn.close(); - } catch (Throwable ignore) { - if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { - ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source connection", ignore); + if (sourceConn != null) { + try { + sourceConn.close(); + } catch (Throwable ignore) { + if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) { + ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source connection", ignore); + } } } try { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 8fd91e5650..f7a89d7a66 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -1200,6 +1200,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { this.chunkBytes = null; } + @Override + public String toString() { + return "ServerConsumerImpl$LargeMessageDeliverer[ref=[" + ref + "]]"; + } + private ByteBuffer acquireHeapBodyBuffer(int requiredCapacity) { if (this.chunkBytes == null || this.chunkBytes.capacity() != requiredCapacity) { this.chunkBytes = ByteBuffer.allocate(requiredCapacity); diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java index 6f4b9b698a..40c72e1b1e 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/jms/bridge/JMSBridgeTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.extras.jms.bridge; import javax.jms.Connection; import javax.jms.DeliveryMode; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; @@ -35,9 +36,12 @@ import java.util.List; import java.util.Map; import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.management.QueueControl; +import org.apache.activemq.artemis.api.core.management.ResourceNames; import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient; import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants; import org.apache.activemq.artemis.api.jms.JMSFactoryType; +import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory; import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode; import org.apache.activemq.artemis.jms.bridge.impl.JMSBridgeImpl; @@ -1764,10 +1768,188 @@ public class JMSBridgeTest extends BridgeTestBase { Assert.assertFalse(mbeanServer.isRegistered(objectName)); } + @Test + public void testDuplicateMessagesWhenBridgeStops() throws Exception { + final int NUM_MESSAGES = 500; + + JMSBridgeImpl bridge = new JMSBridgeImpl(cff0xa, cff1xa, sourceQueueFactory, targetQueueFactory, null, null, + null, null, null, 1000, 10, + QualityOfServiceMode.ONCE_AND_ONLY_ONCE, 10, 100, null, "ClientId123", + true) + .setBridgeName("test-bridge"); + bridge.setTransactionManager(getNewTm()); + createQueue(targetQueue.getQueueName(), 1); + + final List sentMessages = new ArrayList<>(); + final List receivedMessages = new ArrayList<>(); + + log.info("Starting bridge " + bridge); + bridge.start(); + waitForComponent(bridge, 15); + + Thread producerThread = new Thread(() -> { + Connection conn = null; + Session session = null; + int counter = 0; + try { + conn = cf0.createConnection(); + session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(sourceQueue); + TextMessage msg = null; + + while (counter < NUM_MESSAGES) { + msg = session.createTextMessage("message" + counter); + msg.setIntProperty("count", counter); + producer.send(msg); + sentMessages.add(msg); + log.info("Sent message with property counter: " + counter + ", messageId:" + msg.getJMSMessageID() + + ((msg.getStringProperty("_AMQ_DUPL_ID") != null) ? ", _AMQ_DUPL_ID=" + msg.getStringProperty("_AMQ_DUPL_ID") : "")); + counter++; + Thread.sleep(200); + } + + producer.close(); + } catch (InterruptedException | JMSException e) { + log.error("Error while producing messages: ", e); + } finally { + try { + if (session != null) { + session.close(); + } + + if (conn != null) { + conn.close(); + } + } catch (JMSException e) { + log.error("Error cleaning up the producer thread! ", e); + } + } + }); + + Thread consumerThread = new Thread(() -> { + Connection conn = null; + Session session = null; + try { + conn = cf1.createConnection(); + conn.start(); + + session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + MessageConsumer consumer = session.createConsumer(targetQueue); + TextMessage msg = null; + + boolean running = true; + while (running) { + msg = (TextMessage) consumer.receive(5000); + if (msg != null) { + msg.acknowledge(); + receivedMessages.add(msg); + log.info("Received message with messageId: " + msg.getJMSMessageID() + + " and property counter " + msg.getIntProperty("count")); + } else { + running = false; + } + } + + } catch (JMSException e) { + log.error("Error while consuming messages: ", e); + } finally { + try { + if (session != null) { + session.close(); + } + + if (conn != null) { + conn.close(); + } + } catch (JMSException e) { + log.error("Error cleaning up the consumer thread! ", e); + } + } + }); + + log.info("Starting producer thread..."); + producerThread.start(); + + Assert.assertTrue(waitForMessages(server1, targetQueue.getQueueName(), NUM_MESSAGES / 100, 250000)); + + log.info("Stopping bridge " + bridge); + bridge.stop(); + Thread.sleep(5000); + + log.info("Starting bridge " + bridge + " again"); + bridge.start(); + waitForComponent(bridge, 15); + + Assert.assertTrue(waitForMessages(server1, targetQueue.getQueueName(), NUM_MESSAGES, 300000)); + + + + log.info("Starting consumer thread..."); + consumerThread.start(); + + log.info("Waiting for the consumer thread to die..."); + consumerThread.join(); + + log.info("Waiting for the producer thread to die..."); + producerThread.join(); + + bridge.stop(); + + server1.stop(); + server0.stop(); + + Assert.assertEquals("Number of sent messages is different from received messages", sentMessages.size(), receivedMessages.size()); + } + public TransactionManager getNewTm() { return newTransactionManager(); } + private static long countMessagesInQueue(ActiveMQServer server, String queueName) { + QueueControl queueControl = (QueueControl) server.getManagementService().getResource(ResourceNames.QUEUE + queueName); + Assert.assertNotNull(queueControl); + long count = -1; + int numberOfTries = 0; + int maxNumberOfTries = 10; + while (count == -1 && numberOfTries < maxNumberOfTries) { + try { + numberOfTries++; + count = queueControl.countMessages(); + break; + } catch (Exception ex) { + if (numberOfTries > maxNumberOfTries - 1) { + throw new RuntimeException("countMessagesInQueue() failed for queue:" + queueName + + " and server: " + server + ". Number of tries: " + numberOfTries, ex); + } + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } + } + } + log.info("Number of messages in queue " + queueName + " on server: " + server + " is: " + count); + return count; + } + + private static boolean waitForMessages(ActiveMQServer server, String queueName, long numberOfMessages, long timeout) throws Exception { + + long startTime = System.currentTimeMillis(); + + long count = 0; + while ((count = countMessagesInQueue(server, queueName)) < numberOfMessages) { + log.info("Total number of messages in queue: " + queueName + " on server " + server + " is " + count); + Thread.sleep(5000); + if (System.currentTimeMillis() - startTime > timeout) { + log.warn(numberOfMessages + " not on server " + server + " in timeout " + timeout + "ms."); + return false; + } + } + return true; + + } + // Inner classes ------------------------------------------------------------------- private static class StressSender implements Runnable {