ARTEMIS-3285 potential duplicate messages with LVQ + non-destructive

This commit is contained in:
Justin Bertram 2021-05-06 14:22:17 -05:00 committed by clebertsuconic
parent c2acb95a1e
commit 3621258458
5 changed files with 221 additions and 34 deletions

View File

@ -66,10 +66,16 @@ public class Wait {
}
public static void assertEquals(Long size, LongCondition condition, long timeout, long sleepMillis) throws Exception {
boolean result = waitFor(() -> condition.getCount() == size, timeout, sleepMillis);
assertEquals(size, condition, timeout, sleepMillis, true);
}
public static void assertEquals(Long size, LongCondition condition, long timeout, long sleepMillis, boolean printThreadDump) throws Exception {
boolean result = waitFor(() -> condition.getCount() == size, timeout, sleepMillis, printThreadDump);
if (!result) {
System.out.println(ThreadDumpUtil.threadDump("thread dump"));
if (printThreadDump) {
System.out.println(ThreadDumpUtil.threadDump("thread dump"));
}
Assert.fail(size + " != " + condition.getCount());
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.impl;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -44,6 +45,7 @@ import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.jboss.logging.Logger;
/**
* A queue that will discard messages if a newer message with the same
@ -56,9 +58,27 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
@SuppressWarnings("ALL")
public class LastValueQueue extends QueueImpl {
private static final Logger logger = Logger.getLogger(LastValueQueue.class);
private final Map<SimpleString, HolderReference> map = new ConcurrentHashMap<>();
private final SimpleString lastValueKey;
// only use this within synchronized methods or synchronized(this) blocks
protected final LinkedList<MessageReference> nextDeliveries = new LinkedList<>();
/* in certain cases we need to redeliver a message */
@Override
protected MessageReference nextDelivery() {
return nextDeliveries.poll();
}
@Override
protected void repeatNextDelivery(MessageReference reference) {
// put the ref back onto the head of the list so that the next time poll() is called this ref is returned
nextDeliveries.addFirst(reference);
}
@Deprecated
public LastValueQueue(final long persistenceID,
final SimpleString address,
@ -151,25 +171,20 @@ public class LastValueQueue extends QueueImpl {
HolderReference hr = map.get(prop);
if (hr != null) {
// We need to overwrite the old ref with the new one and ack the old one
if (isNonDestructive() && hr.isInDelivery()) {
// if the ref is already being delivered we'll do the replace in the postAcknowledge
hr.setReplacementRef(ref);
} else {
// We need to overwrite the old ref with the new one and ack the old one
replaceLVQMessage(ref, hr);
replaceLVQMessage(ref, hr);
if (isNonDestructive() && hr.isDelivered()) {
hr.resetDelivered();
// --------------------------------------------------------------------------------
// If non Destructive, and if a reference was previously delivered
// we would not be able to receive this message again
// unless we reset the iterators
// The message is not removed, so we can't actually remove it
// a result of this operation is that previously delivered messages
// will probably be delivered again.
// if we ever want to avoid other redeliveries we would have to implement a reset or redeliver
// operation on the iterator for a single message
resetAllIterators();
deliverAsync();
if (isNonDestructive() && hr.isDelivered()) {
hr.resetDelivered();
// since we're replacing a ref that was already delivered we want to trigger a delivery for this new replacement
nextDeliveries.add(hr);
deliverAsync();
}
}
} else {
hr = new HolderReference(prop, ref);
@ -246,6 +261,19 @@ public class LastValueQueue extends QueueImpl {
}
}
@Override
public void postAcknowledge(final MessageReference ref, AckReason reason) {
if (isNonDestructive()) {
if (ref instanceof HolderReference) {
HolderReference hr = (HolderReference) ref;
if (hr.getReplacementRef() != null) {
replaceLVQMessage(hr.getReplacementRef(), hr);
}
}
}
super.postAcknowledge(ref, reason);
}
@Override
public boolean allowsReferenceCallback() {
return false;
@ -358,11 +386,21 @@ public class LastValueQueue extends QueueImpl {
private volatile MessageReference ref;
private volatile MessageReference replacementRef;
private long consumerID;
private boolean hasConsumerID = false;
public MessageReference getReplacementRef() {
return replacementRef;
}
public void setReplacementRef(MessageReference replacementRef) {
this.replacementRef = replacementRef;
}
public void resetDelivered() {
delivered = false;
}

View File

@ -340,6 +340,17 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private volatile long ringSize;
/* in certain cases we need to redeliver a message directly.
* it's useful for usecases last LastValueQueue */
protected MessageReference nextDelivery() {
return null;
}
protected void repeatNextDelivery(MessageReference reference) {
}
/**
* This is to avoid multi-thread races on calculating direct delivery,
* to guarantee ordering will be always be correct
@ -2969,12 +2980,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
holder.iter = messageReferences.iterator();
}
if (holder.iter.hasNext()) {
ref = holder.iter.next();
} else {
ref = null;
existingMemoryEstimate = 0;
ref = nextDelivery();
boolean nextDelivery = false;
if (ref != null) {
nextDelivery = true;
}
if (ref == null && holder.iter.hasNext()) {
ref = holder.iter.next();
}
if (ref == null) {
noDelivery++;
} else {
@ -3022,14 +3037,18 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
handled++;
consumers.reset();
} else if (status == HandleStatus.BUSY) {
try {
holder.iter.repeat();
} catch (NoSuchElementException e) {
// this could happen if there was an exception on the queue handling
// and it returned BUSY because of that exception
//
// We will just log it as there's nothing else we can do now.
logger.warn(e.getMessage(), e);
if (nextDelivery) {
repeatNextDelivery(ref);
} else {
try {
holder.iter.repeat();
} catch (NoSuchElementException e) {
// this could happen if there was an exception on the queue handling
// and it returned BUSY because of that exception
//
// We will just log it as there's nothing else we can do now.
logger.warn(e.getMessage(), e);
}
}
noDelivery++;
@ -3067,7 +3086,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (handledconsumer != null) {
proceedDeliver(handledconsumer, ref);
}
}
return true;

View File

@ -984,7 +984,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
if (ref == null) {
ActiveMQIllegalStateException ils = new ActiveMQIllegalStateException("Cannot find ref to ack " + messageID);
ActiveMQIllegalStateException ils = ActiveMQMessageBundle.BUNDLE.consumerNoReference(id, messageID, messageQueue.getName());
tx.markAsRollbackOnly(ils);
throw ils;
}

View File

@ -24,8 +24,13 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@ -44,6 +49,8 @@ import org.apache.activemq.artemis.core.server.impl.LastValueQueue;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.RetryRule;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -52,6 +59,8 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class JMSNonDestructiveTest extends JMSClientTestSupport {
private static final Logger logger = Logger.getLogger(JMSNonDestructiveTest.class);
@Rule
public RetryRule retryRule = new RetryRule(2);
@ -588,4 +597,120 @@ public class JMSNonDestructiveTest extends JMSClientTestSupport {
producer.send(message1, javax.jms.Message.DEFAULT_DELIVERY_MODE, javax.jms.Message.DEFAULT_PRIORITY, tombstoneTimeToLive);
}
}
@Test
public void testMultipleLastValuesCore() throws Exception {
testMultipleLastValues(CoreConnection);
}
@Test
public void testMultipleLastValuesAMQP() throws Exception {
testMultipleLastValues(AMQPConnection);
}
private void testMultipleLastValues(ConnectionSupplier connectionSupplier) throws Exception {
final int GROUP_COUNT = 5;
final int MESSAGE_COUNT_PER_GROUP = 25;
final int PRODUCER_COUNT = 5;
HashMap<String, List<String>> results = new HashMap<>();
for (int i = 0; i < GROUP_COUNT; i++) {
results.put(i + "", new ArrayList<>());
}
HashMap<String, Integer> dups = new HashMap<>();
List<Producer> producers = new ArrayList<>();
try (Connection connection = connectionSupplier.createConnection()) {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
for (int i = 0; i < PRODUCER_COUNT; i++) {
producers.add(new Producer(connectionSupplier, MESSAGE_COUNT_PER_GROUP, GROUP_COUNT, i));
}
for (Producer producer : producers) {
new Thread(producer).start();
}
while (true) {
TextMessage tm = (TextMessage) consumer.receive(500);
if (tm == null) {
break;
}
results.get(tm.getStringProperty("lastval")).add(tm.getText());
tm.acknowledge();
}
for (Producer producer : producers) {
assertFalse("Producer failed!", producer.failed);
}
}
for (Map.Entry<String, List<String>> entry : results.entrySet()) {
StringBuilder logMessage = new StringBuilder();
logMessage.append("Messages received with lastval=" + entry.getKey() + " (");
for (String s : entry.getValue()) {
int occurrences = Collections.frequency(entry.getValue(), s);
if (occurrences > 1 && !dups.containsValue(Integer.parseInt(s))) {
dups.put(s, occurrences);
}
logMessage.append(s + ",");
}
logger.info(logMessage + ")");
}
if (dups.size() > 0) {
StringBuffer sb = new StringBuffer();
for (Map.Entry<String, Integer> stringIntegerEntry : dups.entrySet()) {
sb.append(stringIntegerEntry.getKey() + "(" + stringIntegerEntry.getValue() + "),");
}
Assert.fail("Duplicate messages received " + sb);
}
Wait.assertEquals((long) GROUP_COUNT, () -> server.locateQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME).getMessageCount(), 2000, 100, false);
}
private class Producer implements Runnable {
private final ConnectionSupplier connectionSupplier;
private final int messageCount;
private final int groupCount;
private final int offset;
public boolean failed = false;
Producer(ConnectionSupplier connectionSupplier, int messageCount, int groupCount, int offset) {
this.connectionSupplier = connectionSupplier;
this.messageCount = messageCount;
this.groupCount = groupCount;
this.offset = offset;
}
@Override
public void run() {
try (Connection connection = connectionSupplier.createConnection()) {
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(NON_DESTRUCTIVE_LVQ_QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
int startingPoint = offset * (messageCount * groupCount);
int messagesToSend = messageCount * groupCount;
for (int i = startingPoint; i < messagesToSend + startingPoint; i++) {
String lastval = "" + (i % groupCount);
TextMessage message = session.createTextMessage();
message.setText("" + i);
message.setStringProperty("data", "" + i);
message.setStringProperty("lastval", lastval);
message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), lastval);
producer.send(message);
}
} catch (JMSException e) {
e.printStackTrace();
failed = true;
return;
}
}
}
}