ARTEMIS-5038 Mirrored ACKs are broken if using multiple priorities on producers

PriorityLinkedList has multiple sub-lists, before this commit PriorityLinkedList::setNodeStore would set the same node store between all the lists.
When a removeWithID was called for an item on list[0] the remove from list[4] would always succeed first. This operation would work correctly most of the time except
when tail and head is being used. Many NullPointerExceptions would be seen while iterating on the list for remove operations, and the navigation would be completely broken.

A test was added to PriorityLinkedListTest to make sure the correct lists were used however I was not able to reproduce the NPE condition in that test.
AccumulatedInPageSoakTest reproduced the exact condition for the NPE when significant load is used.
This commit is contained in:
Clebert Suconic 2024-09-06 13:58:09 -04:00 committed by clebertsuconic
parent a70b053dbd
commit f92a846c21
19 changed files with 270 additions and 109 deletions

View File

@ -16,8 +16,11 @@
*/
package org.apache.activemq.artemis.utils;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.lang.invoke.MethodHandles;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
@ -27,6 +30,10 @@ import java.nio.file.StandardCopyOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.activemq.artemis.logs.ActiveMQUtilLogger;
import org.slf4j.Logger;
@ -126,4 +133,21 @@ public class FileUtil {
}
}
public static String readFile(InputStream inputStream) throws Exception {
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream));
String fileOutput = bufferedReader.lines().collect(Collectors.joining(System.lineSeparator()));
return fileOutput;
}
public static boolean find(File file, Predicate<String> search) throws Exception {
AtomicBoolean found = new AtomicBoolean(false);
try (Stream<String> lines = Files.lines(file.toPath())) {
lines.filter(search::test).findFirst().ifPresent(line -> {
logger.info("pattern found at {}", line);
found.set(true);
});
}
return found.get();
}
}

View File

@ -61,6 +61,11 @@ public class EmptyList<E> implements LinkedList<E> {
public void repeat() {
}
@Override
public E removeLastElement() {
return null;
}
@Override
public void close() {
}

View File

@ -599,15 +599,22 @@ public class LinkedListImpl<E> implements LinkedList<E> {
@Override
public void remove() {
removeLastElement();
}
@Override
public E removeLastElement() {
synchronized (LinkedListImpl.this) {
if (last == null) {
throw new NoSuchElementException();
}
if (current == null) {
return;
return null;
}
E returningElement = current.val();
Node<E> prev = current.prev;
if (prev != null) {
@ -615,6 +622,8 @@ public class LinkedListImpl<E> implements LinkedList<E> {
last = null;
}
return returningElement;
}
}

View File

@ -27,6 +27,9 @@ public interface LinkedListIterator<E> extends Iterator<E>, AutoCloseable {
void repeat();
/** This method is doing exactly what {@link Iterator#remove()} would do, however it will return the removed element being removed. */
E removeLastElement();
@Override
void close();
}

View File

@ -30,6 +30,14 @@ public interface NodeStore<E> {
void removeNode(E element, LinkedListImpl.Node<E> node);
default NodeStore<E> setName(String name) {
return this;
}
default String getName() {
return null;
}
/** this is meant to be a quick help to Garbage Collection.
* Whenever the IDSupplier list is being cleared, you should first call the clear method and
* empty every list before you let the instance go. */

View File

@ -16,6 +16,8 @@
*/
package org.apache.activemq.artemis.utils.collections;
import java.util.function.Supplier;
/**
* A type of linked list which maintains items according to a priority
* and allows adding and removing of elements at both ends, and peeking.<br>
@ -40,7 +42,7 @@ public interface PriorityLinkedList<E> {
* @see LinkedList#setNodeStore(NodeStore)
* @param supplier
*/
void setNodeStore(NodeStore<E> supplier);
void setNodeStore(Supplier<NodeStore<E>> supplier);
E removeWithID(String listID, long id);

View File

@ -20,6 +20,7 @@ import java.lang.reflect.Array;
import java.util.Comparator;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Supplier;
/**
* A priority linked list implementation
@ -40,6 +41,10 @@ public class PriorityLinkedListImpl<E> implements PriorityLinkedList<E> {
private int lastPriority = -1;
protected void removed(final int level, final E element) {
exclusiveIncrementSize(-1);
}
public PriorityLinkedListImpl(final int priorities) {
this(priorities, null);
}
@ -96,9 +101,10 @@ public class PriorityLinkedListImpl<E> implements PriorityLinkedList<E> {
}
@Override
public void setNodeStore(NodeStore<E> supplier) {
public void setNodeStore(Supplier<NodeStore<E>> supplier) {
for (LinkedList<E> list : levels) {
list.setNodeStore(supplier);
NodeStore<E> nodeStore = supplier.get();
list.setNodeStore(nodeStore);
}
}
@ -109,7 +115,7 @@ public class PriorityLinkedListImpl<E> implements PriorityLinkedList<E> {
for (int l = 4; l < levels.length; l++) {
E removed = levels[l].removeWithID(listID, id);
if (removed != null) {
exclusiveIncrementSize(-1);
removed(l, removed);
return removed;
}
}
@ -118,7 +124,7 @@ public class PriorityLinkedListImpl<E> implements PriorityLinkedList<E> {
for (int l = Math.min(3, levels.length); l >= 0; l--) {
E removed = levels[l].removeWithID(listID, id);
if (removed != null) {
exclusiveIncrementSize(-1);
removed(l, removed);
return removed;
}
}
@ -155,7 +161,7 @@ public class PriorityLinkedListImpl<E> implements PriorityLinkedList<E> {
e = ll.poll();
if (e != null) {
exclusiveIncrementSize(-1);
removed(i, e);
if (ll.size() == 0) {
if (highestPriority == i) {
@ -211,6 +217,8 @@ public class PriorityLinkedListImpl<E> implements PriorityLinkedList<E> {
private LinkedListIterator<E> lastIter;
private int lastLevel = -1;
private int resetCount = lastReset;
volatile boolean closed = false;
@ -233,6 +241,7 @@ public class PriorityLinkedListImpl<E> implements PriorityLinkedList<E> {
if (!closed) {
closed = true;
lastIter = null;
lastLevel = -1;
for (LinkedListIterator<E> iter : cachedIters) {
if (iter != null) {
@ -256,6 +265,7 @@ public class PriorityLinkedListImpl<E> implements PriorityLinkedList<E> {
while (index >= 0) {
lastIter = cachedIters[index];
lastLevel = index;
if (lastIter == null) {
lastIter = cachedIters[index] = levels[index].iterator();
@ -289,18 +299,25 @@ public class PriorityLinkedListImpl<E> implements PriorityLinkedList<E> {
@Override
public void remove() {
removeLastElement();
}
@Override
public E removeLastElement() {
if (lastIter == null) {
throw new NoSuchElementException();
}
lastIter.remove();
E returningElement = lastIter.removeLastElement();
// If the last message in the current priority is removed then find the next highest
for (int i = index; i >= 0 && levels[i].size() == 0; i--) {
highestPriority = i;
}
exclusiveIncrementSize(-1);
removed(lastLevel, returningElement);
return returningElement;
}
}
}

View File

@ -21,6 +21,7 @@ import java.util.HashMap;
import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger;
import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
@ -35,9 +36,26 @@ public class ReferenceNodeStore implements NodeStore<MessageReference> {
// This is where the messages are stored by server id...
HashMap<String, LongObjectHashMap<LinkedListImpl.Node<MessageReference>>> lists;
String name;
String lruListID;
LongObjectHashMap<LinkedListImpl.Node<MessageReference>> lruMap;
@Override
public String toString() {
return "ReferenceNodeStore{" + "name='" + name + "'}" + "@" + Integer.toHexString(System.identityHashCode(ReferenceNodeStore.this));
}
@Override
public NodeStore<MessageReference> setName(String name) {
this.name = name;
return this;
}
@Override
public String getName() {
return name;
}
@Override
public void storeNode(MessageReference element, LinkedListImpl.Node<MessageReference> node) {
@ -50,7 +68,10 @@ public class ReferenceNodeStore implements NodeStore<MessageReference> {
LongObjectHashMap<LinkedListImpl.Node<MessageReference>> nodesMap = getMap(serverID);
if (nodesMap != null) {
synchronized (nodesMap) {
nodesMap.put(id, node);
LinkedListImpl.Node<MessageReference> previousNode = nodesMap.put(id, node);
if (previousNode != null) {
ActiveMQAMQPProtocolLogger.LOGGER.duplicateNodeStoreID(name, serverID, id, new Exception("trace"));
}
}
}
}

View File

@ -64,4 +64,7 @@ public interface ActiveMQAMQPProtocolLogger {
@LogMessage(id = 111009, value = "The AckManager was interrupt. timeout = {} milliseconds", level = LogMessage.Level.WARN)
void interruptedAckManager(Exception e);
@LogMessage(id = 111010, value = "Duplicate AckManager node detected. Queue={}, ServerID={}, recordID={}", level = LogMessage.Level.WARN)
void duplicateNodeStoreID(String queue, String serverId, long recordID, Exception trace);
}

View File

@ -1452,6 +1452,11 @@ public final class PageSubscriptionImpl implements PageSubscription {
@Override
public void remove() {
removeLastElement();
}
@Override
public PagedReference removeLastElement() {
PagedReference delivery = currentDelivery;
if (delivery != null) {
PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(delivery.getPagedMessage().getPageNumber());
@ -1459,6 +1464,7 @@ public final class PageSubscriptionImpl implements PageSubscription {
info.remove(delivery.getPagedMessage().getMessageNumber());
}
}
return delivery;
}
@Override

View File

@ -315,8 +315,6 @@ public interface Queue extends Bindable,CriticalComponent {
MessageReference removeReferenceWithID(long id) throws Exception;
MessageReference getReference(long id) throws ActiveMQException;
int deleteAllReferences() throws Exception;
int deleteAllReferences(int flushLimit) throws Exception;

View File

@ -114,7 +114,6 @@ import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.SizeAwareMetric;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.NodeStoreFactory;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedList;
@ -218,12 +217,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
// This is where messages are stored
protected final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getSequenceComparator());
private NodeStore<MessageReference> nodeStore;
private NodeStoreFactory<MessageReference> nodeStoreFactory;
private void checkIDSupplier(NodeStoreFactory<MessageReference> nodeStoreFactory) {
if (this.nodeStore == null) {
this.nodeStore = nodeStoreFactory.newNodeStore();
messageReferences.setNodeStore(nodeStore);
if (this.nodeStoreFactory == null) {
this.nodeStoreFactory = nodeStoreFactory;
messageReferences.setNodeStore( () -> nodeStoreFactory.newNodeStore().setName(String.valueOf(name)));
}
}
@ -1816,22 +1815,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
@Override
public synchronized MessageReference getReference(final long id1) throws ActiveMQException {
try (LinkedListIterator<MessageReference> iterator = iterator()) {
while (iterator.hasNext()) {
MessageReference ref = iterator.next();
if (ref.getMessage().getMessageID() == id1) {
return ref;
}
}
return null;
}
}
@Override
public long getMessageCount() {
if (pageSubscription != null) {
@ -3070,6 +3053,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
private int getPriority(MessageReference ref) {
if (isInternalQueue()) {
// if it's an internal queue we need to send the events on their original ordering
// for example an ACK arriving before the send on a Mirror..
return 4;
}
try {
return ref.getMessage().getPriority();
} catch (Throwable e) {
@ -4539,6 +4527,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
@Override
public MessageReference removeLastElement() {
synchronized (QueueImpl.this) {
return iter.removeLastElement();
}
}
@Override
public void remove() {
synchronized (QueueImpl.this) {
@ -4561,7 +4556,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return pagingIterator;
}
Iterator<? extends MessageReference> lastIterator = null;
LinkedListIterator<? extends MessageReference> lastIterator = null;
MessageReference cachedNext = null;
HashSet<PagePosition> previouslyBrowsed = new HashSet<>();
@ -4663,6 +4658,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
@Override
public MessageReference removeLastElement() {
if (lastIterator != null) {
return lastIterator.removeLastElement();
} else {
return null;
}
}
@Override
public void repeat() {
}

View File

@ -18,23 +18,33 @@ package org.apache.activemq.artemis.core.list;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import io.netty.util.collection.LongObjectHashMap;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.collections.NodeStore;
import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public final class PriorityLinkedListTest {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected Wibble a;
protected Wibble b;
@ -89,8 +99,18 @@ public final class PriorityLinkedListTest {
private PriorityLinkedListImpl<Wibble> list;
protected PriorityLinkedListImpl<Wibble> getList() {
return new PriorityLinkedListImpl<>(10);
int lastRemovedLevel;
Wibble lastRemovedWibble;
private PriorityLinkedListImpl<Wibble> getList() {
return new PriorityLinkedListImpl<>(10) {
@Override
protected void removed(int level, Wibble element) {
super.removed(level, element);
lastRemovedWibble = element;
lastRemovedLevel = level;
}
};
}
@BeforeEach
@ -916,36 +936,7 @@ public final class PriorityLinkedListTest {
list.addHead(new Wibble("" + i, i), i % 10);
}
class WibbleNodeStore implements NodeStore<Wibble> {
LongObjectHashMap<LinkedListImpl.Node<Wibble>> list = new LongObjectHashMap<>();
@Override
public void storeNode(Wibble element, LinkedListImpl.Node<Wibble> node) {
list.put(element.id, node);
}
@Override
public LinkedListImpl.Node<Wibble> getNode(String listID, long id) {
return list.get(id);
}
@Override
public void removeNode(Wibble element, LinkedListImpl.Node<Wibble> node) {
list.remove(element.id);
}
@Override
public void clear() {
list.clear();
}
@Override
public int size() {
return list.size();
}
}
list.setNodeStore(new WibbleNodeStore());
list.setNodeStore(WibbleNodeStore::new);
// remove every 3rd
for (int i = 3; i <= 3000; i += 3) {
@ -975,19 +966,65 @@ public final class PriorityLinkedListTest {
}
@Test
public void testRemoveWithRandomOrderID() {
list.setNodeStore(WibbleNodeStore::new);
ArrayList<Integer> usedIds = new ArrayList<>();
int elements = 50;
for (int i = 1; i <= elements; i++) {
int level = RandomUtil.randomInterval(0, 2);
list.addTail(new Wibble("" + i, i, level), level);
usedIds.add(i);
}
HashMap<Integer, Wibble> hashMapOutput = new HashMap<>(elements);
while (usedIds.size() > 0) {
Integer idToRemove = usedIds.remove(RandomUtil.randomInterval(0, usedIds.size() - 1));
Wibble wibble = list.removeWithID("", idToRemove);
assertNotNull(wibble);
assertEquals(idToRemove.intValue(), wibble.id);
assertSame(wibble, lastRemovedWibble);
// removing from the wrong could create a mess in the linked list nodes
assertEquals(wibble.level, lastRemovedLevel);
hashMapOutput.put(idToRemove, wibble);
}
assertEquals(elements, hashMapOutput.size());
for (int i = 1; i <= elements; i++) {
Wibble wibble = hashMapOutput.remove(i);
assertNotNull(wibble);
assertEquals(i, wibble.id);
}
assertEquals(0, hashMapOutput.size());
}
static class Wibble {
String s1;
long id;
int level;
Wibble(final String s, long id) {
this(s, id, 4);
}
Wibble(final String s, long id, int level) {
this.s1 = s;
this.id = id;
this.level = level;
}
@Override
public String toString() {
return s1;
return "s = " + s1 + ", id = " + id + ", level = " + level;
}
@Override
@ -1006,4 +1043,34 @@ public final class PriorityLinkedListTest {
}
}
class WibbleNodeStore implements NodeStore<Wibble> {
LongObjectHashMap<LinkedListImpl.Node<Wibble>> list = new LongObjectHashMap<>();
@Override
public void storeNode(Wibble element, LinkedListImpl.Node<Wibble> node) {
list.put(element.id, node);
}
@Override
public LinkedListImpl.Node<Wibble> getNode(String listID, long id) {
return list.get(id);
}
@Override
public void removeNode(Wibble element, LinkedListImpl.Node<Wibble> node) {
list.remove(element.id);
}
@Override
public void clear() {
list.clear();
}
@Override
public int size() {
return list.size();
}
}
}

View File

@ -621,11 +621,6 @@ public class RoutingContextTest {
return null;
}
@Override
public MessageReference getReference(long id) throws ActiveMQException {
return null;
}
@Override
public int deleteAllReferences() throws Exception {
return 0;

View File

@ -1387,11 +1387,6 @@ public class ScheduledDeliveryHandlerTest {
return null;
}
@Override
public MessageReference getReference(long id) {
return null;
}
@Override
public int deleteAllReferences() throws Exception {
return 0;

View File

@ -709,12 +709,6 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
return id;
}
@Override
public MessageReference getReference(final long id1) {
// no-op
return null;
}
@Override
public int getScheduledCount() {
// no-op

View File

@ -37,6 +37,7 @@ import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.RandomUtil;
import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
@ -174,6 +175,8 @@ public class AccumulatedInPageSoakTest extends SoakTestBase {
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < numberOfMessages; i++) {
// This is to reproduce ARTEMIS-5038
producer.setPriority(RandomUtil.randomInterval(0, 9));
producer.send(session.createTextMessage(body));
if (i > 0 && i % commitInterval == 0) {
logger.info("Sent {}", i);
@ -189,6 +192,8 @@ public class AccumulatedInPageSoakTest extends SoakTestBase {
startDC2();
Wait.assertEquals(0L, () -> getMessageCount(simpleManagementDC1A, SNF_QUEUE), 240_000, 500);
Wait.assertEquals(0L, () -> getMessageCount(simpleManagementDC2A, QUEUE_NAME), 60_000, 500);
LogAssert.assertServerLogsForMirror(getFileServerLocation(DC2_NODE_A));
LogAssert.assertServerLogsForMirror(getFileServerLocation(DC1_NODE_A));
}

View File

@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
import java.io.File;
import org.apache.activemq.artemis.utils.FileUtil;
public class LogAssert {
public static void assertServerLogsForMirror(File serverLocation) throws Exception {
File log = new File(serverLocation, "log/artemis.log");
FileUtil.find(log, l -> l.contains("NullPointerException") || l.contains("AMQ111010"));
}
}

View File

@ -1218,32 +1218,6 @@ public class QueueImplTest extends ActiveMQTestBase {
assertEquals(getMessagesAdded(queue), 3);
}
@Test
public void testGetReference() throws Exception {
QueueImpl queue = getTemporaryQueue();
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
queue.addHead(messageReference, false);
queue.addHead(messageReference2, false);
queue.addHead(messageReference3, false);
assertEquals(queue.getReference(2), messageReference2);
}
@Test
public void testGetNonExistentReference() throws Exception {
QueueImpl queue = getTemporaryQueue();
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
queue.addHead(messageReference, false);
queue.addHead(messageReference2, false);
queue.addHead(messageReference3, false);
assertNull(queue.getReference(5));
}
/**
* Test the paused and resumed states with async deliveries.
*