From 66da97a3b151cfbf0f81fb32abda953b361b7170 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 8 Nov 2022 09:42:16 -0500 Subject: [PATCH] ARTEMIS-4084 Fixing addSorted with large transactions when cancelling a large number of messages, the addSorted could be holding a lock for too long causing the server to crash under CriticalAnalyzer co-authored: AntonRoskvist (discovering the issue and providing the test ClientCrashMassiveRollbackTest.java) (cherry picked from commit 03b82142eb0844b9de02ca3d7ed365d849e3ac02) --- .../utils/collections/LinkedListImpl.java | 67 ++++++++-- .../artemis/core/server/impl/QueueImpl.java | 42 +++++- .../core/server/impl/RefsOperation.java | 4 +- .../ClientCrashMassiveRollbackTest.java | 121 ++++++++++++++++++ .../tests/unit/util/LinkedListTest.java | 73 +++++++++-- 5 files changed, 281 insertions(+), 26 deletions(-) create mode 100644 tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java index 76ee69499e..8d1c98eada 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/LinkedListImpl.java @@ -16,12 +16,16 @@ */ package org.apache.activemq.artemis.utils.collections; +import java.lang.invoke.MethodHandles; import java.lang.reflect.Array; import java.util.Comparator; import java.util.NoSuchElementException; import java.util.Objects; import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * A linked list implementation which allows multiple iterators to exist at the same time on the queue, and which see any * elements added or removed from the queue either directly or via iterators. @@ -30,6 +34,8 @@ import java.util.function.Consumer; */ public class LinkedListImpl implements LinkedList { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final int INITIAL_ITERATOR_ARRAY_SIZE = 10; private final Node head = new NodeHolder<>(null); @@ -42,6 +48,8 @@ public class LinkedListImpl implements LinkedList { private int nextIndex; private NodeStore nodeStore; + private volatile Node lastAdd; + public LinkedListImpl() { this(null, null); } @@ -155,12 +163,18 @@ public class LinkedListImpl implements LinkedList { } private void itemAdded(Node node, E item) { + assert node.val() == item; + lastAdd = node; + if (logger.isTraceEnabled()) { + logger.trace("Setting lastAdd as {}, e={}", lastAdd, lastAdd.val()); + } if (nodeStore != null) { putID(item, node); } } private void itemRemoved(Node node) { + lastAdd = null; if (nodeStore != null) { nodeStore.removeNode(node.val(), node); } @@ -186,13 +200,22 @@ public class LinkedListImpl implements LinkedList { } public void addSorted(E e) { + final Node localLastAdd = lastAdd; + + logger.trace("**** addSorted element {}", e); + if (comparator == null) { throw new NullPointerException("comparator=null"); } + if (size == 0) { + logger.trace("adding head as there are no elements {}", e); addHead(e); } else { if (comparator.compare(head.next.val(), e) < 0) { + if (logger.isTraceEnabled()) { + logger.trace("addHead as e={} and head={}", e, head.next.val()); + } addHead(e); return; } @@ -203,18 +226,30 @@ public class LinkedListImpl implements LinkedList { // This would be an optimization for our usage. // avoiding scanning the entire List just to add at the end, so we compare the end first. if (comparator.compare(tail.val(), e) >= 0) { + logger.trace("addTail as e={} and tail={}", e, tail.val()); addTail(e); return; } - Node fetching = head.next; - while (fetching.next != null) { - int compareNext = comparator.compare(fetching.next.val(), e); - if (compareNext <= 0) { - addAfter(fetching, e); - return; + if (localLastAdd != null) { // as an optimization we check against the last add rather than always scan. + if (localLastAdd.prev != null && localLastAdd.prev.val() != null) { + if (comparator.compare(localLastAdd.prev.val(), e) > 0 && comparator.compare(localLastAdd.val(), e) < 0) { + logger.trace("Adding {} before most recent added element {}", e, localLastAdd.val()); + addAfter(localLastAdd.prev, e); + return; + } } - fetching = fetching.next; + if (localLastAdd.next != null && localLastAdd.next.val() != null) { + if (comparator.compare(localLastAdd.val(), e) > 0 && comparator.compare(localLastAdd.next.val(), e) < 0) { + logger.trace("Adding {} after most recent added element {}", e, localLastAdd.val()); + addAfter(localLastAdd, e); + return; + } + } + } + + if (addSortedScan(e)) { + return; } // this shouldn't happen as the tail was compared before iterating @@ -229,6 +264,22 @@ public class LinkedListImpl implements LinkedList { } } + protected boolean addSortedScan(E e) { + logger.trace("addSortedScan {}...", e); + Node fetching = head.next; + while (fetching.next != null) { + int compareNext = comparator.compare(fetching.next.val(), e); + if (compareNext <= 0) { + addAfter(fetching, e); + logger.trace("... addSortedScan done, returning true"); + return true; + } + fetching = fetching.next; + } + logger.trace("... addSortedScan done, could not find a spot, returning false"); + return false; + } + private void addAfter(Node node, E e) { Node newNode = Node.with(e); Node nextNode = node.next; @@ -236,7 +287,7 @@ public class LinkedListImpl implements LinkedList { newNode.prev = node; newNode.next = nextNode; nextNode.prev = newNode; - itemAdded(node, e); + itemAdded(newNode, e); size++; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 908bdca23e..d222d192fd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1100,8 +1100,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { /* Called when a message is cancelled back into the queue */ @Override public void addHead(final MessageReference ref, boolean scheduling) { - if (logger.isDebugEnabled()) { - logger.debug("AddHead, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref); + if (logger.isTraceEnabled()) { + logger.trace("AddHead, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref); } try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) { synchronized (this) { @@ -1125,11 +1125,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { /* Called when a message is cancelled back into the queue */ @Override public void addSorted(final MessageReference ref, boolean scheduling) { - if (logger.isDebugEnabled()) { - logger.debug("addSorted, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref); + if (logger.isTraceEnabled()) { + logger.trace("addSorted, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref); } try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) { - synchronized (this) { + synchronized (QueueImpl.this) { if (ringSize != -1) { enforceRing(ref, false, true); } @@ -1165,6 +1165,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { /* Called when a message is cancelled back into the queue */ @Override public void addSorted(final List refs, boolean scheduling) { + if (refs.size() > MAX_DELIVERIES_IN_LOOP) { + logger.debug("Switching addSorted call to addSortedLargeTX on queue {}", name); + addSortedLargeTX(refs, scheduling); + return; + } try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) { synchronized (this) { for (MessageReference ref : refs) { @@ -1178,6 +1183,29 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { } } + // Perhaps we could just replace addSorted by addSortedLargeTX + // However I am not 100% confident we could always resetAllIterators + // we certainly can in the case of a rollback in a huge TX. + // so I am just playing safe and keeping the original semantic for small transactions. + private void addSortedLargeTX(final List refs, boolean scheduling) { + for (MessageReference ref : refs) { + // When dealing with large transactions, we are not holding a synchronization lock here. + // addSorted will lock for each individual adds + addSorted(ref, scheduling); + } + + if (logger.isDebugEnabled()) { + logger.debug("addSortedHugeLoad finished on queue {}", name); + } + + synchronized (this) { + + resetAllIterators(); + + deliverAsync(); + } + } + @Override public synchronized void reload(final MessageReference ref) { queueMemorySize.addSize(ref.getMessageMemoryEstimate()); @@ -2983,8 +3011,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue { * are no more matching or available messages. */ private boolean deliver() { - if (logger.isDebugEnabled()) { - logger.debug("Queue {} doing deliver. messageReferences={} with consumers={}", name, messageReferences.size(), getConsumerCount()); + if (logger.isTraceEnabled()) { + logger.trace("Queue {} doing deliver. messageReferences={} with consumers={}", name, messageReferences.size(), getConsumerCount()); } scheduledRunners.decrementAndGet(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java index 65f39783c9..6930047c4f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/RefsOperation.java @@ -127,9 +127,7 @@ public class RefsOperation extends TransactionOperationAbstract { QueueImpl queue = entry.getKey(); - synchronized (queue) { - queue.postRollback(refs); - } + queue.postRollback(refs); } if (!ackedRefs.isEmpty()) { diff --git a/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java new file mode 100644 index 0000000000..9e5bbbef65 --- /dev/null +++ b/tests/soak-tests/src/test/java/org/apache/activemq/artemis/tests/soak/client/ClientCrashMassiveRollbackTest.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.soak.client; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.client.ClientProducer; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler; +import org.apache.activemq.artemis.api.core.client.ServerLocator; +import org.apache.activemq.artemis.api.core.management.QueueControl; +import org.apache.activemq.artemis.api.core.management.ResourceNames; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class ClientCrashMassiveRollbackTest extends ActiveMQTestBase { + protected ActiveMQServer server; + protected ClientSession session; + protected ClientSessionFactory sf; + protected ServerLocator locator; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + Configuration config = createDefaultNettyConfig(); + config.setCriticalAnalyzer(true); + config.setCriticalAnalyzerTimeout(10000); + config.setCriticalAnalyzerCheckPeriod(5000); + config.setConnectionTTLOverride(5000); + config.setCriticalAnalyzerPolicy(CriticalAnalyzerPolicy.LOG); + server = createServer(false, config); + server.start(); + } + + @Test + public void clientCrashMassiveRollbackTest() throws Exception { + final String queueName = "queueName"; + final int messageCount = 1000000; + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("(tcp://localhost:61616)"); + factory.setConsumerWindowSize(-1); + factory.setConfirmationWindowSize(10240000); + Connection connection = factory.createConnection(); + connection.start(); + + Thread thread = new Thread(() -> { + try { + Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED); + Queue destination = consumerSession.createQueue(queueName); + MessageConsumer consumer = consumerSession.createConsumer(destination); + for (;;) { + consumer.receive(); + } + } catch (Exception e) { + } + }); + + locator = createNettyNonHALocator(); + locator.setConfirmationWindowSize(10240000); + sf = createSessionFactory(locator); + session = addClientSession(sf.createSession(false, true, true)); + SendAcknowledgementHandler sendHandler = message -> { + }; + session.setSendAcknowledgementHandler(sendHandler); + session.createQueue(new QueueConfiguration(queueName).setAddress(queueName).setRoutingType(RoutingType.ANYCAST)); + ClientProducer producer = session.createProducer(queueName); + QueueControl queueControl = (QueueControl)server.getManagementService().getResource(ResourceNames.QUEUE + queueName); + + thread.start(); + + for (int i = 0; i < messageCount; i++) { + producer.send(session.createMessage(true)); + } + producer.close(); + + while (queueControl.getDeliveringCount() < messageCount) { + Thread.sleep(1000); + } + + thread.interrupt(); + + Assert.assertEquals(messageCount, queueControl.getMessageCount()); + Assert.assertEquals(ActiveMQServer.SERVER_STATE.STARTED, server.getState()); + + server.stop(); + + Wait.assertEquals(ActiveMQServer.SERVER_STATE.STOPPED, server::getState, 5000, 100); + + + } + +} diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java index 2245e8b02c..f95e91fc2c 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/util/LinkedListTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.artemis.tests.unit.util; +import java.lang.invoke.MethodHandles; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -38,27 +39,42 @@ import org.apache.activemq.artemis.utils.collections.LinkedListIterator; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class LinkedListTest extends ActiveMQTestBase { + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private int scans = 0; private LinkedListImpl list; @Override @Before public void setUp() throws Exception { super.setUp(); - list = new LinkedListImpl<>(integerComparator); + list = new LinkedListImpl<>(integerComparator) { + @Override + protected boolean addSortedScan(Integer e) { + scans++; + return super.addSortedScan(e); + } + }; } Comparator integerComparator = new Comparator() { @Override public int compare(Integer o1, Integer o2) { + logger.trace("Compare {} and {}", o1, o2); if (o1.intValue() == o2.intValue()) { + logger.trace("Return 0"); return 0; } if (o2.intValue() > o1.intValue()) { + logger.trace("o2 is greater than, returning 1"); return 1; } else { + logger.trace("o2 is lower than, returning -1"); return -1; } } @@ -66,27 +82,68 @@ public class LinkedListTest extends ActiveMQTestBase { @Test public void addSorted() { + Assert.assertEquals(0, scans); // sanity check list.addSorted(1); list.addSorted(3); list.addSorted(2); list.addSorted(0); + + Assert.assertEquals(0, scans); // all adds were somewhat ordered, it shouldn't be doing any scans + validateOrder(null); Assert.assertEquals(4, list.size()); } + @Test + public void addSortedCachedLast() { + Assert.assertEquals(0, scans); // just a sanity check + list.addSorted(5); + list.addSorted(1); + list.addSorted(3); + list.addSorted(4); + Assert.assertEquals(0, scans); // no scans made until now + list.addSorted(2); // this should need a scan + Assert.assertEquals(1, scans); + list.addSorted(10); + list.addSorted(20); + list.addSorted(7); // this will need a scan as it's totally random + Assert.assertEquals(2, scans); + printDebug(); + + validateOrder(null); + + } + + private void printDebug() { + if (logger.isDebugEnabled()) { + logger.debug("**** list output:"); + LinkedListIterator integerIterator = list.iterator(); + while (integerIterator.hasNext()) { + logger.debug("list {}", integerIterator.next()); + } + integerIterator.close(); + } + } @Test public void randomSorted() { - HashSet values = new HashSet<>(); - for (int i = 0; i < 1000; i++) { + int elements = 10_000; - int value = RandomUtil.randomInt(); - if (!values.contains(value)) { - values.add(value); - list.addSorted(value); + HashSet values = new HashSet<>(); + for (int i = 0; i < elements; i++) { + for (;;) { // a retry loop, if a random give me the same value twice, I would retry + int value = RandomUtil.randomInt(); + if (!values.contains(value)) { // validating if the random is repeated or not, and retrying otherwise + if (logger.isDebugEnabled()) { + logger.debug("Adding {}", value); + } + values.add(value); + list.addSorted(value); + break; + } } } @@ -102,8 +159,8 @@ public class LinkedListTest extends ActiveMQTestBase { Integer previous = null; LinkedListIterator integerIterator = list.iterator(); while (integerIterator.hasNext()) { - Integer value = integerIterator.next(); + logger.debug("Reading {}", value); if (previous != null) { Assert.assertTrue(value + " should be > " + previous, integerComparator.compare(previous, value) > 0); Assert.assertTrue(value + " should be > " + previous, value.intValue() > previous.intValue());