ARTEMIS-4084 Fixing addSorted with large transactions

when cancelling a large number of messages, the addSorted could be holding a lock for too long causing the server to crash under CriticalAnalyzer

co-authored: AntonRoskvist <anton.roskvist@volvo.com> (discovering the issue and providing the test ClientCrashMassiveRollbackTest.java)
This commit is contained in:
Clebert Suconic 2022-11-08 09:42:16 -05:00 committed by clebertsuconic
parent dce4ba3c1a
commit 03b82142eb
5 changed files with 281 additions and 26 deletions

View File

@ -16,12 +16,16 @@
*/
package org.apache.activemq.artemis.utils.collections;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Array;
import java.util.Comparator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A linked list implementation which allows multiple iterators to exist at the same time on the queue, and which see any
* elements added or removed from the queue either directly or via iterators.
@ -30,6 +34,8 @@ import java.util.function.Consumer;
*/
public class LinkedListImpl<E> implements LinkedList<E> {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final int INITIAL_ITERATOR_ARRAY_SIZE = 10;
private final Node<E> head = new NodeHolder<>(null);
@ -42,6 +48,8 @@ public class LinkedListImpl<E> implements LinkedList<E> {
private int nextIndex;
private NodeStore<E> nodeStore;
private volatile Node<E> lastAdd;
public LinkedListImpl() {
this(null, null);
}
@ -155,12 +163,18 @@ public class LinkedListImpl<E> implements LinkedList<E> {
}
private void itemAdded(Node<E> node, E item) {
assert node.val() == item;
lastAdd = node;
if (logger.isTraceEnabled()) {
logger.trace("Setting lastAdd as {}, e={}", lastAdd, lastAdd.val());
}
if (nodeStore != null) {
putID(item, node);
}
}
private void itemRemoved(Node<E> node) {
lastAdd = null;
if (nodeStore != null) {
nodeStore.removeNode(node.val(), node);
}
@ -186,13 +200,22 @@ public class LinkedListImpl<E> implements LinkedList<E> {
}
public void addSorted(E e) {
final Node<E> localLastAdd = lastAdd;
logger.trace("**** addSorted element {}", e);
if (comparator == null) {
throw new NullPointerException("comparator=null");
}
if (size == 0) {
logger.trace("adding head as there are no elements {}", e);
addHead(e);
} else {
if (comparator.compare(head.next.val(), e) < 0) {
if (logger.isTraceEnabled()) {
logger.trace("addHead as e={} and head={}", e, head.next.val());
}
addHead(e);
return;
}
@ -203,18 +226,30 @@ public class LinkedListImpl<E> implements LinkedList<E> {
// This would be an optimization for our usage.
// avoiding scanning the entire List just to add at the end, so we compare the end first.
if (comparator.compare(tail.val(), e) >= 0) {
logger.trace("addTail as e={} and tail={}", e, tail.val());
addTail(e);
return;
}
Node<E> fetching = head.next;
while (fetching.next != null) {
int compareNext = comparator.compare(fetching.next.val(), e);
if (compareNext <= 0) {
addAfter(fetching, e);
return;
if (localLastAdd != null) { // as an optimization we check against the last add rather than always scan.
if (localLastAdd.prev != null && localLastAdd.prev.val() != null) {
if (comparator.compare(localLastAdd.prev.val(), e) > 0 && comparator.compare(localLastAdd.val(), e) < 0) {
logger.trace("Adding {} before most recent added element {}", e, localLastAdd.val());
addAfter(localLastAdd.prev, e);
return;
}
}
fetching = fetching.next;
if (localLastAdd.next != null && localLastAdd.next.val() != null) {
if (comparator.compare(localLastAdd.val(), e) > 0 && comparator.compare(localLastAdd.next.val(), e) < 0) {
logger.trace("Adding {} after most recent added element {}", e, localLastAdd.val());
addAfter(localLastAdd, e);
return;
}
}
}
if (addSortedScan(e)) {
return;
}
// this shouldn't happen as the tail was compared before iterating
@ -229,6 +264,22 @@ public class LinkedListImpl<E> implements LinkedList<E> {
}
}
protected boolean addSortedScan(E e) {
logger.trace("addSortedScan {}...", e);
Node<E> fetching = head.next;
while (fetching.next != null) {
int compareNext = comparator.compare(fetching.next.val(), e);
if (compareNext <= 0) {
addAfter(fetching, e);
logger.trace("... addSortedScan done, returning true");
return true;
}
fetching = fetching.next;
}
logger.trace("... addSortedScan done, could not find a spot, returning false");
return false;
}
private void addAfter(Node<E> node, E e) {
Node<E> newNode = Node.with(e);
Node<E> nextNode = node.next;
@ -236,7 +287,7 @@ public class LinkedListImpl<E> implements LinkedList<E> {
newNode.prev = node;
newNode.next = nextNode;
nextNode.prev = newNode;
itemAdded(node, e);
itemAdded(newNode, e);
size++;
}

View File

@ -1100,8 +1100,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
/* Called when a message is cancelled back into the queue */
@Override
public void addHead(final MessageReference ref, boolean scheduling) {
if (logger.isDebugEnabled()) {
logger.debug("AddHead, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref);
if (logger.isTraceEnabled()) {
logger.trace("AddHead, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref);
}
try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) {
synchronized (this) {
@ -1125,11 +1125,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
/* Called when a message is cancelled back into the queue */
@Override
public void addSorted(final MessageReference ref, boolean scheduling) {
if (logger.isDebugEnabled()) {
logger.debug("addSorted, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref);
if (logger.isTraceEnabled()) {
logger.trace("addSorted, size = {}, intermediate size = {}, references size = {}\nreference={}", queueMemorySize, intermediateMessageReferences.size(), messageReferences.size(), ref);
}
try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) {
synchronized (this) {
synchronized (QueueImpl.this) {
if (ringSize != -1) {
enforceRing(ref, false, true);
}
@ -1165,6 +1165,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
/* Called when a message is cancelled back into the queue */
@Override
public void addSorted(final List<MessageReference> refs, boolean scheduling) {
if (refs.size() > MAX_DELIVERIES_IN_LOOP) {
logger.debug("Switching addSorted call to addSortedLargeTX on queue {}", name);
addSortedLargeTX(refs, scheduling);
return;
}
try (ArtemisCloseable metric = measureCritical(CRITICAL_PATH_ADD_HEAD)) {
synchronized (this) {
for (MessageReference ref : refs) {
@ -1178,6 +1183,29 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
// Perhaps we could just replace addSorted by addSortedLargeTX
// However I am not 100% confident we could always resetAllIterators
// we certainly can in the case of a rollback in a huge TX.
// so I am just playing safe and keeping the original semantic for small transactions.
private void addSortedLargeTX(final List<MessageReference> refs, boolean scheduling) {
for (MessageReference ref : refs) {
// When dealing with large transactions, we are not holding a synchronization lock here.
// addSorted will lock for each individual adds
addSorted(ref, scheduling);
}
if (logger.isDebugEnabled()) {
logger.debug("addSortedHugeLoad finished on queue {}", name);
}
synchronized (this) {
resetAllIterators();
deliverAsync();
}
}
@Override
public synchronized void reload(final MessageReference ref) {
queueMemorySize.addSize(ref.getMessageMemoryEstimate());
@ -2983,8 +3011,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
* are no more matching or available messages.
*/
private boolean deliver() {
if (logger.isDebugEnabled()) {
logger.debug("Queue {} doing deliver. messageReferences={} with consumers={}", name, messageReferences.size(), getConsumerCount());
if (logger.isTraceEnabled()) {
logger.trace("Queue {} doing deliver. messageReferences={} with consumers={}", name, messageReferences.size(), getConsumerCount());
}
scheduledRunners.decrementAndGet();

View File

@ -127,9 +127,7 @@ public class RefsOperation extends TransactionOperationAbstract {
QueueImpl queue = entry.getKey();
synchronized (queue) {
queue.postRollback(refs);
}
queue.postRollback(refs);
}
if (!ackedRefs.isEmpty()) {

View File

@ -0,0 +1,121 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.soak.client;
import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.client.ClientProducer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.critical.CriticalAnalyzerPolicy;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class ClientCrashMassiveRollbackTest extends ActiveMQTestBase {
protected ActiveMQServer server;
protected ClientSession session;
protected ClientSessionFactory sf;
protected ServerLocator locator;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
Configuration config = createDefaultNettyConfig();
config.setCriticalAnalyzer(true);
config.setCriticalAnalyzerTimeout(10000);
config.setCriticalAnalyzerCheckPeriod(5000);
config.setConnectionTTLOverride(5000);
config.setCriticalAnalyzerPolicy(CriticalAnalyzerPolicy.LOG);
server = createServer(false, config);
server.start();
}
@Test
public void clientCrashMassiveRollbackTest() throws Exception {
final String queueName = "queueName";
final int messageCount = 1000000;
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("(tcp://localhost:61616)");
factory.setConsumerWindowSize(-1);
factory.setConfirmationWindowSize(10240000);
Connection connection = factory.createConnection();
connection.start();
Thread thread = new Thread(() -> {
try {
Session consumerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue destination = consumerSession.createQueue(queueName);
MessageConsumer consumer = consumerSession.createConsumer(destination);
for (;;) {
consumer.receive();
}
} catch (Exception e) {
}
});
locator = createNettyNonHALocator();
locator.setConfirmationWindowSize(10240000);
sf = createSessionFactory(locator);
session = addClientSession(sf.createSession(false, true, true));
SendAcknowledgementHandler sendHandler = message -> {
};
session.setSendAcknowledgementHandler(sendHandler);
session.createQueue(new QueueConfiguration(queueName).setAddress(queueName).setRoutingType(RoutingType.ANYCAST));
ClientProducer producer = session.createProducer(queueName);
QueueControl queueControl = (QueueControl)server.getManagementService().getResource(ResourceNames.QUEUE + queueName);
thread.start();
for (int i = 0; i < messageCount; i++) {
producer.send(session.createMessage(true));
}
producer.close();
while (queueControl.getDeliveringCount() < messageCount) {
Thread.sleep(1000);
}
thread.interrupt();
Assert.assertEquals(messageCount, queueControl.getMessageCount());
Assert.assertEquals(ActiveMQServer.SERVER_STATE.STARTED, server.getState());
server.stop();
Wait.assertEquals(ActiveMQServer.SERVER_STATE.STOPPED, server::getState, 5000, 100);
}
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.tests.unit.util;
import java.lang.invoke.MethodHandles;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
@ -38,27 +39,42 @@ import org.apache.activemq.artemis.utils.collections.LinkedListIterator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class LinkedListTest extends ActiveMQTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private int scans = 0;
private LinkedListImpl<Integer> list;
@Override
@Before
public void setUp() throws Exception {
super.setUp();
list = new LinkedListImpl<>(integerComparator);
list = new LinkedListImpl<>(integerComparator) {
@Override
protected boolean addSortedScan(Integer e) {
scans++;
return super.addSortedScan(e);
}
};
}
Comparator<Integer> integerComparator = new Comparator<Integer>() {
@Override
public int compare(Integer o1, Integer o2) {
logger.trace("Compare {} and {}", o1, o2);
if (o1.intValue() == o2.intValue()) {
logger.trace("Return 0");
return 0;
}
if (o2.intValue() > o1.intValue()) {
logger.trace("o2 is greater than, returning 1");
return 1;
} else {
logger.trace("o2 is lower than, returning -1");
return -1;
}
}
@ -66,27 +82,68 @@ public class LinkedListTest extends ActiveMQTestBase {
@Test
public void addSorted() {
Assert.assertEquals(0, scans); // sanity check
list.addSorted(1);
list.addSorted(3);
list.addSorted(2);
list.addSorted(0);
Assert.assertEquals(0, scans); // all adds were somewhat ordered, it shouldn't be doing any scans
validateOrder(null);
Assert.assertEquals(4, list.size());
}
@Test
public void addSortedCachedLast() {
Assert.assertEquals(0, scans); // just a sanity check
list.addSorted(5);
list.addSorted(1);
list.addSorted(3);
list.addSorted(4);
Assert.assertEquals(0, scans); // no scans made until now
list.addSorted(2); // this should need a scan
Assert.assertEquals(1, scans);
list.addSorted(10);
list.addSorted(20);
list.addSorted(7); // this will need a scan as it's totally random
Assert.assertEquals(2, scans);
printDebug();
validateOrder(null);
}
private void printDebug() {
if (logger.isDebugEnabled()) {
logger.debug("**** list output:");
LinkedListIterator<Integer> integerIterator = list.iterator();
while (integerIterator.hasNext()) {
logger.debug("list {}", integerIterator.next());
}
integerIterator.close();
}
}
@Test
public void randomSorted() {
HashSet<Integer> values = new HashSet<>();
for (int i = 0; i < 1000; i++) {
int elements = 10_000;
int value = RandomUtil.randomInt();
if (!values.contains(value)) {
values.add(value);
list.addSorted(value);
HashSet<Integer> values = new HashSet<>();
for (int i = 0; i < elements; i++) {
for (;;) { // a retry loop, if a random give me the same value twice, I would retry
int value = RandomUtil.randomInt();
if (!values.contains(value)) { // validating if the random is repeated or not, and retrying otherwise
if (logger.isDebugEnabled()) {
logger.debug("Adding {}", value);
}
values.add(value);
list.addSorted(value);
break;
}
}
}
@ -102,8 +159,8 @@ public class LinkedListTest extends ActiveMQTestBase {
Integer previous = null;
LinkedListIterator<Integer> integerIterator = list.iterator();
while (integerIterator.hasNext()) {
Integer value = integerIterator.next();
logger.debug("Reading {}", value);
if (previous != null) {
Assert.assertTrue(value + " should be > " + previous, integerComparator.compare(previous, value) > 0);
Assert.assertTrue(value + " should be > " + previous, value.intValue() > previous.intValue());