diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/FileUtil.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/FileUtil.java index c7fd65c213..68333f24fd 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/FileUtil.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/FileUtil.java @@ -16,8 +16,11 @@ */ package org.apache.activemq.artemis.utils; +import java.io.BufferedReader; import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.lang.invoke.MethodHandles; import java.nio.file.FileVisitResult; import java.nio.file.Files; @@ -27,6 +30,10 @@ import java.nio.file.StandardCopyOption; import java.nio.file.attribute.BasicFileAttributes; import java.util.Arrays; import java.util.HashSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.activemq.artemis.logs.ActiveMQUtilLogger; import org.slf4j.Logger; @@ -126,4 +133,21 @@ public class FileUtil { } } + public static String readFile(InputStream inputStream) throws Exception { + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); + String fileOutput = bufferedReader.lines().collect(Collectors.joining(System.lineSeparator())); + return fileOutput; + } + + public static boolean find(File file, Predicate search) throws Exception { + AtomicBoolean found = new AtomicBoolean(false); + try (Stream lines = Files.lines(file.toPath())) { + lines.filter(search::test).findFirst().ifPresent(line -> { + logger.info("pattern found at {}", line); + found.set(true); + }); + } + return found.get(); + } + } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/EmptyList.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/EmptyList.java index 616e8f71ca..4501c68910 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/EmptyList.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/EmptyList.java @@ -61,6 +61,11 @@ public class EmptyList implements LinkedList { public void repeat() { } + @Override + public E removeLastElement() { + return null; + } + @Override public void close() { } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java index 121b5ba924..0ea52199f8 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java @@ -599,15 +599,22 @@ public class LinkedListImpl implements LinkedList { @Override public void remove() { + removeLastElement(); + } + + @Override + public E removeLastElement() { synchronized (LinkedListImpl.this) { if (last == null) { throw new NoSuchElementException(); } if (current == null) { - return; + return null; } + E returningElement = current.val(); + Node prev = current.prev; if (prev != null) { @@ -615,6 +622,8 @@ public class LinkedListImpl implements LinkedList { last = null; } + + return returningElement; } } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListIterator.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListIterator.java index 62ab097adc..5f0c2482b5 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListIterator.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListIterator.java @@ -27,6 +27,9 @@ public interface LinkedListIterator extends Iterator, AutoCloseable { void repeat(); + /** This method is doing exactly what {@link Iterator#remove()} would do, however it will return the removed element being removed. */ + E removeLastElement(); + @Override void close(); } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/NodeStore.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/NodeStore.java index 6cb689e66a..b07ce23e1a 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/NodeStore.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/NodeStore.java @@ -30,6 +30,14 @@ public interface NodeStore { void removeNode(E element, LinkedListImpl.Node node); + default NodeStore setName(String name) { + return this; + } + + default String getName() { + return null; + } + /** this is meant to be a quick help to Garbage Collection. * Whenever the IDSupplier list is being cleared, you should first call the clear method and * empty every list before you let the instance go. */ diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java index 7d2a3748e5..03a616b5f8 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedList.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.artemis.utils.collections; +import java.util.function.Supplier; + /** * A type of linked list which maintains items according to a priority * and allows adding and removing of elements at both ends, and peeking.
@@ -40,7 +42,7 @@ public interface PriorityLinkedList { * @see LinkedList#setNodeStore(NodeStore) * @param supplier */ - void setNodeStore(NodeStore supplier); + void setNodeStore(Supplier> supplier); E removeWithID(String listID, long id); diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java index e545b9e9f3..52ef8343dd 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/PriorityLinkedListImpl.java @@ -20,6 +20,7 @@ import java.lang.reflect.Array; import java.util.Comparator; import java.util.NoSuchElementException; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Supplier; /** * A priority linked list implementation @@ -40,6 +41,10 @@ public class PriorityLinkedListImpl implements PriorityLinkedList { private int lastPriority = -1; + protected void removed(final int level, final E element) { + exclusiveIncrementSize(-1); + } + public PriorityLinkedListImpl(final int priorities) { this(priorities, null); } @@ -96,9 +101,10 @@ public class PriorityLinkedListImpl implements PriorityLinkedList { } @Override - public void setNodeStore(NodeStore supplier) { + public void setNodeStore(Supplier> supplier) { for (LinkedList list : levels) { - list.setNodeStore(supplier); + NodeStore nodeStore = supplier.get(); + list.setNodeStore(nodeStore); } } @@ -109,7 +115,7 @@ public class PriorityLinkedListImpl implements PriorityLinkedList { for (int l = 4; l < levels.length; l++) { E removed = levels[l].removeWithID(listID, id); if (removed != null) { - exclusiveIncrementSize(-1); + removed(l, removed); return removed; } } @@ -118,7 +124,7 @@ public class PriorityLinkedListImpl implements PriorityLinkedList { for (int l = Math.min(3, levels.length); l >= 0; l--) { E removed = levels[l].removeWithID(listID, id); if (removed != null) { - exclusiveIncrementSize(-1); + removed(l, removed); return removed; } } @@ -155,7 +161,7 @@ public class PriorityLinkedListImpl implements PriorityLinkedList { e = ll.poll(); if (e != null) { - exclusiveIncrementSize(-1); + removed(i, e); if (ll.size() == 0) { if (highestPriority == i) { @@ -211,6 +217,8 @@ public class PriorityLinkedListImpl implements PriorityLinkedList { private LinkedListIterator lastIter; + private int lastLevel = -1; + private int resetCount = lastReset; volatile boolean closed = false; @@ -233,6 +241,7 @@ public class PriorityLinkedListImpl implements PriorityLinkedList { if (!closed) { closed = true; lastIter = null; + lastLevel = -1; for (LinkedListIterator iter : cachedIters) { if (iter != null) { @@ -256,6 +265,7 @@ public class PriorityLinkedListImpl implements PriorityLinkedList { while (index >= 0) { lastIter = cachedIters[index]; + lastLevel = index; if (lastIter == null) { lastIter = cachedIters[index] = levels[index].iterator(); @@ -289,18 +299,25 @@ public class PriorityLinkedListImpl implements PriorityLinkedList { @Override public void remove() { + removeLastElement(); + } + + @Override + public E removeLastElement() { if (lastIter == null) { throw new NoSuchElementException(); } - lastIter.remove(); + E returningElement = lastIter.removeLastElement(); // If the last message in the current priority is removed then find the next highest for (int i = index; i >= 0 && levels[i].size() == 0; i--) { highestPriority = i; } - exclusiveIncrementSize(-1); + removed(lastLevel, returningElement); + + return returningElement; } } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java index 485051e8aa..18e8278ff4 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/connect/mirror/ReferenceNodeStore.java @@ -21,6 +21,7 @@ import java.util.HashMap; import io.netty.util.collection.LongObjectHashMap; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.core.server.MessageReference; +import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolLogger; import org.apache.activemq.artemis.utils.collections.NodeStore; import org.apache.activemq.artemis.utils.collections.LinkedListImpl; @@ -35,9 +36,26 @@ public class ReferenceNodeStore implements NodeStore { // This is where the messages are stored by server id... HashMap>> lists; + String name; + String lruListID; LongObjectHashMap> lruMap; + @Override + public String toString() { + return "ReferenceNodeStore{" + "name='" + name + "'}" + "@" + Integer.toHexString(System.identityHashCode(ReferenceNodeStore.this)); + } + + @Override + public NodeStore setName(String name) { + this.name = name; + return this; + } + + @Override + public String getName() { + return name; + } @Override public void storeNode(MessageReference element, LinkedListImpl.Node node) { @@ -50,7 +68,10 @@ public class ReferenceNodeStore implements NodeStore { LongObjectHashMap> nodesMap = getMap(serverID); if (nodesMap != null) { synchronized (nodesMap) { - nodesMap.put(id, node); + LinkedListImpl.Node previousNode = nodesMap.put(id, node); + if (previousNode != null) { + ActiveMQAMQPProtocolLogger.LOGGER.duplicateNodeStoreID(name, serverID, id, new Exception("trace")); + } } } } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java index 531c9e9ef9..dc7dbda0ac 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolLogger.java @@ -64,4 +64,7 @@ public interface ActiveMQAMQPProtocolLogger { @LogMessage(id = 111009, value = "The AckManager was interrupt. timeout = {} milliseconds", level = LogMessage.Level.WARN) void interruptedAckManager(Exception e); + + @LogMessage(id = 111010, value = "Duplicate AckManager node detected. Queue={}, ServerID={}, recordID={}", level = LogMessage.Level.WARN) + void duplicateNodeStoreID(String queue, String serverId, long recordID, Exception trace); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java index 769ea1407c..49640a6762 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/impl/PageSubscriptionImpl.java @@ -1452,6 +1452,11 @@ public final class PageSubscriptionImpl implements PageSubscription { @Override public void remove() { + removeLastElement(); + } + + @Override + public PagedReference removeLastElement() { PagedReference delivery = currentDelivery; if (delivery != null) { PageCursorInfo info = PageSubscriptionImpl.this.getPageInfo(delivery.getPagedMessage().getPageNumber()); @@ -1459,6 +1464,7 @@ public final class PageSubscriptionImpl implements PageSubscription { info.remove(delivery.getPagedMessage().getMessageNumber()); } } + return delivery; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java index 4a6b8a280c..a88235cbfa 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java @@ -315,8 +315,6 @@ public interface Queue extends Bindable,CriticalComponent { MessageReference removeReferenceWithID(long id) throws Exception; - MessageReference getReference(long id) throws ActiveMQException; - int deleteAllReferences() throws Exception; int deleteAllReferences(int flushLimit) throws Exception; diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 4d1243d022..ccaeb50a31 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -114,7 +114,6 @@ import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.SizeAwareMetric; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.activemq.artemis.utils.collections.ConcurrentHashSet; -import org.apache.activemq.artemis.utils.collections.NodeStore; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.artemis.utils.collections.NodeStoreFactory; import org.apache.activemq.artemis.utils.collections.PriorityLinkedList; @@ -218,12 +217,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { // This is where messages are stored protected final PriorityLinkedList messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getSequenceComparator()); - private NodeStore nodeStore; + private NodeStoreFactory nodeStoreFactory; private void checkIDSupplier(NodeStoreFactory nodeStoreFactory) { - if (this.nodeStore == null) { - this.nodeStore = nodeStoreFactory.newNodeStore(); - messageReferences.setNodeStore(nodeStore); + if (this.nodeStoreFactory == null) { + this.nodeStoreFactory = nodeStoreFactory; + messageReferences.setNodeStore( () -> nodeStoreFactory.newNodeStore().setName(String.valueOf(name))); } } @@ -1816,22 +1815,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } - @Override - public synchronized MessageReference getReference(final long id1) throws ActiveMQException { - try (LinkedListIterator iterator = iterator()) { - - while (iterator.hasNext()) { - MessageReference ref = iterator.next(); - - if (ref.getMessage().getMessageID() == id1) { - return ref; - } - } - - return null; - } - } - @Override public long getMessageCount() { if (pageSubscription != null) { @@ -3070,6 +3053,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } private int getPriority(MessageReference ref) { + if (isInternalQueue()) { + // if it's an internal queue we need to send the events on their original ordering + // for example an ACK arriving before the send on a Mirror.. + return 4; + } try { return ref.getMessage().getPriority(); } catch (Throwable e) { @@ -4539,6 +4527,13 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } + @Override + public MessageReference removeLastElement() { + synchronized (QueueImpl.this) { + return iter.removeLastElement(); + } + } + @Override public void remove() { synchronized (QueueImpl.this) { @@ -4561,7 +4556,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { return pagingIterator; } - Iterator lastIterator = null; + LinkedListIterator lastIterator = null; MessageReference cachedNext = null; HashSet previouslyBrowsed = new HashSet<>(); @@ -4663,6 +4658,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } + @Override + public MessageReference removeLastElement() { + if (lastIterator != null) { + return lastIterator.removeLastElement(); + } else { + return null; + } + } + @Override public void repeat() { } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/list/PriorityLinkedListTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/list/PriorityLinkedListTest.java index a7c524ecd7..d4aed372cc 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/list/PriorityLinkedListTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/list/PriorityLinkedListTest.java @@ -18,23 +18,33 @@ package org.apache.activemq.artemis.core.list; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; +import java.lang.invoke.MethodHandles; +import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Objects; import io.netty.util.collection.LongObjectHashMap; +import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.collections.NodeStore; import org.apache.activemq.artemis.utils.collections.LinkedListImpl; import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.apache.activemq.artemis.utils.collections.PriorityLinkedListImpl; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class PriorityLinkedListTest { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + protected Wibble a; protected Wibble b; @@ -89,8 +99,18 @@ public final class PriorityLinkedListTest { private PriorityLinkedListImpl list; - protected PriorityLinkedListImpl getList() { - return new PriorityLinkedListImpl<>(10); + int lastRemovedLevel; + Wibble lastRemovedWibble; + + private PriorityLinkedListImpl getList() { + return new PriorityLinkedListImpl<>(10) { + @Override + protected void removed(int level, Wibble element) { + super.removed(level, element); + lastRemovedWibble = element; + lastRemovedLevel = level; + } + }; } @BeforeEach @@ -916,36 +936,7 @@ public final class PriorityLinkedListTest { list.addHead(new Wibble("" + i, i), i % 10); } - class WibbleNodeStore implements NodeStore { - LongObjectHashMap> list = new LongObjectHashMap<>(); - - @Override - public void storeNode(Wibble element, LinkedListImpl.Node node) { - list.put(element.id, node); - } - - @Override - public LinkedListImpl.Node getNode(String listID, long id) { - return list.get(id); - } - - @Override - public void removeNode(Wibble element, LinkedListImpl.Node node) { - list.remove(element.id); - } - - @Override - public void clear() { - list.clear(); - } - - @Override - public int size() { - return list.size(); - } - } - - list.setNodeStore(new WibbleNodeStore()); + list.setNodeStore(WibbleNodeStore::new); // remove every 3rd for (int i = 3; i <= 3000; i += 3) { @@ -975,19 +966,65 @@ public final class PriorityLinkedListTest { } + @Test + public void testRemoveWithRandomOrderID() { + + list.setNodeStore(WibbleNodeStore::new); + + ArrayList usedIds = new ArrayList<>(); + + int elements = 50; + + for (int i = 1; i <= elements; i++) { + int level = RandomUtil.randomInterval(0, 2); + list.addTail(new Wibble("" + i, i, level), level); + usedIds.add(i); + } + + HashMap hashMapOutput = new HashMap<>(elements); + + while (usedIds.size() > 0) { + Integer idToRemove = usedIds.remove(RandomUtil.randomInterval(0, usedIds.size() - 1)); + Wibble wibble = list.removeWithID("", idToRemove); + assertNotNull(wibble); + assertEquals(idToRemove.intValue(), wibble.id); + assertSame(wibble, lastRemovedWibble); + // removing from the wrong could create a mess in the linked list nodes + assertEquals(wibble.level, lastRemovedLevel); + hashMapOutput.put(idToRemove, wibble); + } + + assertEquals(elements, hashMapOutput.size()); + + for (int i = 1; i <= elements; i++) { + Wibble wibble = hashMapOutput.remove(i); + assertNotNull(wibble); + assertEquals(i, wibble.id); + } + + assertEquals(0, hashMapOutput.size()); + } + static class Wibble { String s1; long id; + int level; + Wibble(final String s, long id) { + this(s, id, 4); + } + + Wibble(final String s, long id, int level) { this.s1 = s; this.id = id; + this.level = level; } @Override public String toString() { - return s1; + return "s = " + s1 + ", id = " + id + ", level = " + level; } @Override @@ -1006,4 +1043,34 @@ public final class PriorityLinkedListTest { } } + class WibbleNodeStore implements NodeStore { + LongObjectHashMap> list = new LongObjectHashMap<>(); + + @Override + public void storeNode(Wibble element, LinkedListImpl.Node node) { + list.put(element.id, node); + } + + @Override + public LinkedListImpl.Node getNode(String listID, long id) { + return list.get(id); + } + + @Override + public void removeNode(Wibble element, LinkedListImpl.Node node) { + list.remove(element.id); + } + + @Override + public void clear() { + list.clear(); + } + + @Override + public int size() { + return list.size(); + } + } + + } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java index 6f145b46b3..8a42425963 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/RoutingContextTest.java @@ -621,11 +621,6 @@ public class RoutingContextTest { return null; } - @Override - public MessageReference getReference(long id) throws ActiveMQException { - return null; - } - @Override public int deleteAllReferences() throws Exception { return 0; diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java index 95de62e1a2..2dc71e587a 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java @@ -1387,11 +1387,6 @@ public class ScheduledDeliveryHandlerTest { return null; } - @Override - public MessageReference getReference(long id) { - return null; - } - @Override public int deleteAllReferences() throws Exception { return 0; diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/fakes/FakeQueue.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/fakes/FakeQueue.java index 7d1ffd5e29..8363ccb0f1 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/fakes/FakeQueue.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/fakes/FakeQueue.java @@ -709,12 +709,6 @@ public class FakeQueue extends CriticalComponentImpl implements Queue { return id; } - @Override - public MessageReference getReference(final long id1) { - // no-op - return null; - } - @Override public int getScheduledCount() { // no-op diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/AccumulatedInPageSoakTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/AccumulatedInPageSoakTest.java index f856a043fe..0f133ae98f 100644 --- a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/AccumulatedInPageSoakTest.java +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/AccumulatedInPageSoakTest.java @@ -37,6 +37,7 @@ import org.apache.activemq.artemis.api.core.management.SimpleManagement; import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType; import org.apache.activemq.artemis.tests.soak.SoakTestBase; import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.tests.util.RandomUtil; import org.apache.activemq.artemis.util.ServerUtil; import org.apache.activemq.artemis.utils.Wait; import org.apache.activemq.artemis.utils.cli.helper.HelperCreate; @@ -174,6 +175,8 @@ public class AccumulatedInPageSoakTest extends SoakTestBase { MessageProducer producer = session.createProducer(queue); for (int i = 0; i < numberOfMessages; i++) { + // This is to reproduce ARTEMIS-5038 + producer.setPriority(RandomUtil.randomInterval(0, 9)); producer.send(session.createTextMessage(body)); if (i > 0 && i % commitInterval == 0) { logger.info("Sent {}", i); @@ -189,6 +192,8 @@ public class AccumulatedInPageSoakTest extends SoakTestBase { startDC2(); Wait.assertEquals(0L, () -> getMessageCount(simpleManagementDC1A, SNF_QUEUE), 240_000, 500); Wait.assertEquals(0L, () -> getMessageCount(simpleManagementDC2A, QUEUE_NAME), 60_000, 500); + LogAssert.assertServerLogsForMirror(getFileServerLocation(DC2_NODE_A)); + LogAssert.assertServerLogsForMirror(getFileServerLocation(DC1_NODE_A)); } diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/LogAssert.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/LogAssert.java new file mode 100644 index 0000000000..6688aa22eb --- /dev/null +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/brokerConnection/mirror/LogAssert.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror; + +import java.io.File; + +import org.apache.activemq.artemis.utils.FileUtil; + +public class LogAssert { + + public static void assertServerLogsForMirror(File serverLocation) throws Exception { + File log = new File(serverLocation, "log/artemis.log"); + FileUtil.find(log, l -> l.contains("NullPointerException") || l.contains("AMQ111010")); + } + +} diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java index a83fb81ac6..9525099059 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java @@ -1218,32 +1218,6 @@ public class QueueImplTest extends ActiveMQTestBase { assertEquals(getMessagesAdded(queue), 3); } - @Test - public void testGetReference() throws Exception { - QueueImpl queue = getTemporaryQueue(); - MessageReference messageReference = generateReference(queue, 1); - MessageReference messageReference2 = generateReference(queue, 2); - MessageReference messageReference3 = generateReference(queue, 3); - queue.addHead(messageReference, false); - queue.addHead(messageReference2, false); - queue.addHead(messageReference3, false); - assertEquals(queue.getReference(2), messageReference2); - - } - - @Test - public void testGetNonExistentReference() throws Exception { - QueueImpl queue = getTemporaryQueue(); - MessageReference messageReference = generateReference(queue, 1); - MessageReference messageReference2 = generateReference(queue, 2); - MessageReference messageReference3 = generateReference(queue, 3); - queue.addHead(messageReference, false); - queue.addHead(messageReference2, false); - queue.addHead(messageReference3, false); - assertNull(queue.getReference(5)); - - } - /** * Test the paused and resumed states with async deliveries. *