[ARTEMIS-2175] Duplicate messages when JMS bridge is stopped and restarted

Issue: https://issues.apache.org/jira/browse/ARTEMIS-2175
This commit is contained in:
Ingo Weiss 2018-11-13 17:00:07 +00:00 committed by Clebert Suconic
parent c50c493427
commit ff5f1213bb
3 changed files with 214 additions and 15 deletions

View File

@ -501,6 +501,7 @@ public final class JMSBridgeImpl implements JMSBridge {
}
}
if (sourceConn != null) {
try {
sourceConn.close();
} catch (Exception ignore) {
@ -508,6 +509,7 @@ public final class JMSBridgeImpl implements JMSBridge {
ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source conn", ignore);
}
}
}
if (targetConn != null) {
try {
@ -519,6 +521,12 @@ public final class JMSBridgeImpl implements JMSBridge {
}
}
if (messages.size() > 0) {
// Clear outstanding messages so they don't get retransmitted and duplicated on the other side of the bridge
ActiveMQJMSBridgeLogger.LOGGER.trace("Clearing up messages before stopping...");
messages.clear();
}
if (ActiveMQJMSBridgeLogger.LOGGER.isTraceEnabled()) {
ActiveMQJMSBridgeLogger.LOGGER.trace("Stopped " + this);
}
@ -1189,6 +1197,7 @@ public final class JMSBridgeImpl implements JMSBridge {
private void cleanup() {
// Stop the source connection
if (sourceConn != null) {
try {
sourceConn.stop();
} catch (Throwable ignore) {
@ -1196,6 +1205,7 @@ public final class JMSBridgeImpl implements JMSBridge {
ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to stop source connection", ignore);
}
}
}
if (tx != null) {
try {
@ -1217,6 +1227,7 @@ public final class JMSBridgeImpl implements JMSBridge {
}
// Close the old objects
if (sourceConn != null) {
try {
sourceConn.close();
} catch (Throwable ignore) {
@ -1224,6 +1235,7 @@ public final class JMSBridgeImpl implements JMSBridge {
ActiveMQJMSBridgeLogger.LOGGER.trace("Failed to close source connection", ignore);
}
}
}
try {
if (targetConn != null) {
targetConn.close();

View File

@ -1200,6 +1200,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
this.chunkBytes = null;
}
@Override
public String toString() {
return "ServerConsumerImpl$LargeMessageDeliverer[ref=[" + ref + "]]";
}
private ByteBuffer acquireHeapBodyBuffer(int requiredCapacity) {
if (this.chunkBytes == null || this.chunkBytes.capacity() != requiredCapacity) {
this.chunkBytes = ByteBuffer.allocate(requiredCapacity);

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.extras.jms.bridge;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@ -35,9 +36,12 @@ import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.jms.bridge.ConnectionFactoryFactory;
import org.apache.activemq.artemis.jms.bridge.QualityOfServiceMode;
import org.apache.activemq.artemis.jms.bridge.impl.JMSBridgeImpl;
@ -1764,10 +1768,188 @@ public class JMSBridgeTest extends BridgeTestBase {
Assert.assertFalse(mbeanServer.isRegistered(objectName));
}
@Test
public void testDuplicateMessagesWhenBridgeStops() throws Exception {
final int NUM_MESSAGES = 500;
JMSBridgeImpl bridge = new JMSBridgeImpl(cff0xa, cff1xa, sourceQueueFactory, targetQueueFactory, null, null,
null, null, null, 1000, 10,
QualityOfServiceMode.ONCE_AND_ONLY_ONCE, 10, 100, null, "ClientId123",
true)
.setBridgeName("test-bridge");
bridge.setTransactionManager(getNewTm());
createQueue(targetQueue.getQueueName(), 1);
final List<TextMessage> sentMessages = new ArrayList<>();
final List<TextMessage> receivedMessages = new ArrayList<>();
log.info("Starting bridge " + bridge);
bridge.start();
waitForComponent(bridge, 15);
Thread producerThread = new Thread(() -> {
Connection conn = null;
Session session = null;
int counter = 0;
try {
conn = cf0.createConnection();
session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(sourceQueue);
TextMessage msg = null;
while (counter < NUM_MESSAGES) {
msg = session.createTextMessage("message" + counter);
msg.setIntProperty("count", counter);
producer.send(msg);
sentMessages.add(msg);
log.info("Sent message with property counter: " + counter + ", messageId:" + msg.getJMSMessageID()
+ ((msg.getStringProperty("_AMQ_DUPL_ID") != null) ? ", _AMQ_DUPL_ID=" + msg.getStringProperty("_AMQ_DUPL_ID") : ""));
counter++;
Thread.sleep(200);
}
producer.close();
} catch (InterruptedException | JMSException e) {
log.error("Error while producing messages: ", e);
} finally {
try {
if (session != null) {
session.close();
}
if (conn != null) {
conn.close();
}
} catch (JMSException e) {
log.error("Error cleaning up the producer thread! ", e);
}
}
});
Thread consumerThread = new Thread(() -> {
Connection conn = null;
Session session = null;
try {
conn = cf1.createConnection();
conn.start();
session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(targetQueue);
TextMessage msg = null;
boolean running = true;
while (running) {
msg = (TextMessage) consumer.receive(5000);
if (msg != null) {
msg.acknowledge();
receivedMessages.add(msg);
log.info("Received message with messageId: " + msg.getJMSMessageID() +
" and property counter " + msg.getIntProperty("count"));
} else {
running = false;
}
}
} catch (JMSException e) {
log.error("Error while consuming messages: ", e);
} finally {
try {
if (session != null) {
session.close();
}
if (conn != null) {
conn.close();
}
} catch (JMSException e) {
log.error("Error cleaning up the consumer thread! ", e);
}
}
});
log.info("Starting producer thread...");
producerThread.start();
Assert.assertTrue(waitForMessages(server1, targetQueue.getQueueName(), NUM_MESSAGES / 100, 250000));
log.info("Stopping bridge " + bridge);
bridge.stop();
Thread.sleep(5000);
log.info("Starting bridge " + bridge + " again");
bridge.start();
waitForComponent(bridge, 15);
Assert.assertTrue(waitForMessages(server1, targetQueue.getQueueName(), NUM_MESSAGES, 300000));
log.info("Starting consumer thread...");
consumerThread.start();
log.info("Waiting for the consumer thread to die...");
consumerThread.join();
log.info("Waiting for the producer thread to die...");
producerThread.join();
bridge.stop();
server1.stop();
server0.stop();
Assert.assertEquals("Number of sent messages is different from received messages", sentMessages.size(), receivedMessages.size());
}
public TransactionManager getNewTm() {
return newTransactionManager();
}
private static long countMessagesInQueue(ActiveMQServer server, String queueName) {
QueueControl queueControl = (QueueControl) server.getManagementService().getResource(ResourceNames.QUEUE + queueName);
Assert.assertNotNull(queueControl);
long count = -1;
int numberOfTries = 0;
int maxNumberOfTries = 10;
while (count == -1 && numberOfTries < maxNumberOfTries) {
try {
numberOfTries++;
count = queueControl.countMessages();
break;
} catch (Exception ex) {
if (numberOfTries > maxNumberOfTries - 1) {
throw new RuntimeException("countMessagesInQueue() failed for queue:" + queueName
+ " and server: " + server + ". Number of tries: " + numberOfTries, ex);
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
}
}
}
log.info("Number of messages in queue " + queueName + " on server: " + server + " is: " + count);
return count;
}
private static boolean waitForMessages(ActiveMQServer server, String queueName, long numberOfMessages, long timeout) throws Exception {
long startTime = System.currentTimeMillis();
long count = 0;
while ((count = countMessagesInQueue(server, queueName)) < numberOfMessages) {
log.info("Total number of messages in queue: " + queueName + " on server " + server + " is " + count);
Thread.sleep(5000);
if (System.currentTimeMillis() - startTime > timeout) {
log.warn(numberOfMessages + " not on server " + server + " in timeout " + timeout + "ms.");
return false;
}
}
return true;
}
// Inner classes -------------------------------------------------------------------
private static class StressSender implements Runnable {