ARTEMIS-4517 - cancel message references in queue sequence order

This commit is contained in:
Gary Tully 2023-11-28 15:01:33 +00:00 committed by clebertsuconic
parent ff3c006374
commit fed01276ba
14 changed files with 332 additions and 10 deletions

View File

@ -1996,7 +1996,11 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
if (sub != null) {
sub.reloadPreparedACK(tx, encoding.position);
referencesToAck.add(new QueryPagedReferenceImpl(encoding.position, null, sub));
QueryPagedReferenceImpl reference = new QueryPagedReferenceImpl(encoding.position, null, sub);
referencesToAck.add(reference);
if (sub.getQueue() != null) {
sub.getQueue().reloadSequence(reference);
}
} else {
ActiveMQServerLogger.LOGGER.journalCannotFindQueueReloadingACK(encoding.queueID);
}

View File

@ -142,4 +142,7 @@ public interface MessageReference {
*/
long getPersistentSize() throws ActiveMQException;
long getSequence();
void setSequence(long nextSequence);
}

View File

@ -199,6 +199,8 @@ public interface Queue extends Bindable,CriticalComponent {
void reload(MessageReference ref);
void reloadSequence(MessageReference ref);
default void flushOnIntermediate(Runnable runnable) {
}

View File

@ -28,6 +28,7 @@ import org.apache.activemq.artemis.utils.collections.LinkedListImpl;
public abstract class AbstractProtocolReference extends LinkedListImpl.Node<MessageReferenceImpl> implements MessageReference {
private HashMap<Class, Object> protocolDataMap;
protected volatile long sequence = 0;
@Override
public <T> T getProtocolData(Class<T> classType) {
@ -46,4 +47,15 @@ public abstract class AbstractProtocolReference extends LinkedListImpl.Node<Mess
protocolDataMap.put(classType, protocolData);
}
@Override
public long getSequence() {
return sequence;
}
@Override
public void setSequence(long nextSequence) {
this.sequence = nextSequence;
}
}

View File

@ -218,4 +218,14 @@ public class GroupFirstMessageReference implements MessageReference {
return messageReference.getPersistentSize();
}
@Override
public long getSequence() {
return messageReference.getSequence();
}
@Override
public void setSequence(long nextSequence) {
messageReference.setSequence(nextSequence);
}
}

View File

@ -33,17 +33,21 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
*/
public class MessageReferenceImpl extends AbstractProtocolReference implements MessageReference, Runnable {
private static final MessageReferenceComparatorByID idComparator = new MessageReferenceComparatorByID();
private static final MessageReferenceComparatorSequence sequenceComparator = new MessageReferenceComparatorSequence();
public static Comparator<MessageReference> getIDComparator() {
return idComparator;
public static Comparator<MessageReference> getSequenceComparator() {
return sequenceComparator;
}
private static class MessageReferenceComparatorByID implements Comparator<MessageReference> {
private static class MessageReferenceComparatorSequence implements Comparator<MessageReference> {
@Override
public int compare(MessageReference o1, MessageReference o2) {
long value = o2.getMessage().getMessageID() - o1.getMessage().getMessageID();
// ID is assigned to the producer on send via the message store and order can depend on producer interleaving
// and transaction completion order.
// ID gives a relative producer view order but not a total queue order.
// each queue needs to track its own sequence to be able to preserve queue order wit rollback, redelivery and cancel
long value = o2.getSequence() - o1.getSequence();
if (value > 0) {
return 1;
} else if (value < 0) {
@ -100,6 +104,7 @@ public class MessageReferenceImpl extends AbstractProtocolReference implements M
this.queue = queue;
sequence = other.sequence;
}
public MessageReferenceImpl(final Message message, final Queue queue) {

View File

@ -216,7 +216,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final MpscUnboundedArrayQueue<MessageReference> intermediateMessageReferences = new MpscUnboundedArrayQueue<>(8192);
// This is where messages are stored
protected final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getIDComparator());
protected final PriorityLinkedList<MessageReference> messageReferences = new PriorityLinkedListImpl<>(QueueImpl.NUM_PRIORITIES, MessageReferenceImpl.getSequenceComparator());
private NodeStore<MessageReference> nodeStore;
@ -321,6 +321,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private final AtomicLong messagesAddedSnapshot = new AtomicLong(0);
private final AtomicLong queueSequence = new AtomicLong(0);
private ScheduledFuture slowConsumerReaperFuture;
private SlowConsumerReaperRunnable slowConsumerReaperRunnable;
@ -1243,6 +1245,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
@Override
public void reloadSequence(final MessageReference ref) {
ref.setSequence(queueSequence.incrementAndGet());
}
@Override
public void addTail(final MessageReference ref) {
addTail(ref, false);
@ -2984,6 +2991,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
private synchronized void internalAddTail(final MessageReference ref) {
refAdded(ref);
ref.setSequence(queueSequence.incrementAndGet());
messageReferences.addTail(ref, getPriority(ref));
pendingMetrics.incrementMetrics(ref);
enforceRing(false);
@ -3979,6 +3987,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
reference.setInDelivery(true);
proceedDeliver(consumer, reference);
consumers.reset();
reference.setSequence(queueSequence.incrementAndGet());
return true;
}

View File

@ -391,6 +391,10 @@ public class RoutingContextTest {
}
@Override
public void reloadSequence(MessageReference ref) {
}
@Override
public void addTail(MessageReference ref) {

View File

@ -837,6 +837,10 @@ public class ScheduledDeliveryHandlerTest extends Assert {
}
@Override
public void reloadSequence(MessageReference ref) {
}
@Override
public boolean isEnabled() {
return false;

View File

@ -195,6 +195,93 @@ public class JMSOrderTest extends JMSTestBase {
}
protected void sendToAmqQueueOutOfOrder(int totalCount) throws Exception {
Connection activemqConnection = protocolCF.createConnection();
Session amqSession = activemqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue amqTestQueue = amqSession.createQueue(name.getMethodName());
for (int i = 1; i <= totalCount; i += 2) {
Session sessionA = activemqConnection.createSession(true, Session.SESSION_TRANSACTED);
Session sessionB = activemqConnection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer pA = sessionA.createProducer(amqTestQueue);
MessageProducer pB = sessionB.createProducer(amqTestQueue);
TextMessage message = sessionA.createTextMessage();
// this is sent/committed second
message.setText("TextMessage: " + i + 1);
message.setIntProperty("nr", i + 1);
pA.send(message);
message = sessionB.createTextMessage();
message.setText("TextMessage: " + i);
message.setIntProperty("nr", i);
pB.send(message);
// commit in reverse to get sequenceId out of order on the queue.
sessionB.commit();
sessionA.commit();
sessionA.close();
sessionB.close();
}
activemqConnection.close();
}
@Test(timeout = 30000)
public void testReceiveOutOfOrderProducers() throws Exception {
Connection connection = protocolCF.createConnection();
try {
connection.start();
int totalCount = 4;
int consumeBeforeRollback = 2;
sendToAmqQueueOutOfOrder(totalCount);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(name.getMethodName());
MessageConsumer consumer = session.createConsumer(queue);
List<Integer> messageNumbers = new ArrayList<>();
for (int i = 1; i <= consumeBeforeRollback; i++) {
Message message = consumer.receive(3000);
assertNotNull(message);
assertEquals("Unexpected message number", i, message.getIntProperty("nr"));
messageNumbers.add(message.getIntProperty("nr"));
}
for (int i = 0; i < messageNumbers.size(); i++) {
assertEquals("Unexpected order of messages: " + messageNumbers, Integer.valueOf(i + 1), messageNumbers.get(i));
}
session.close();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
queue = session.createQueue(name.getMethodName());
consumer = session.createConsumer(queue);
// Consume again.. the previously consumed messages should get delivered
// again after the rollback and then the remainder should follow
messageNumbers = new ArrayList<>();
for (int i = 1; i <= totalCount; i++) {
Message message = consumer.receive(3000);
assertNotNull("Failed to receive message: " + i, message);
int msgNum = message.getIntProperty("nr");
messageNumbers.add(msgNum);
}
session.commit();
assertEquals("Unexpected size of list", totalCount, messageNumbers.size());
for (int i = 0; i < messageNumbers.size(); i++) {
assertEquals("Unexpected order of messages: " + messageNumbers, Integer.valueOf(i + 1), messageNumbers.get(i));
}
} finally {
connection.close();
}
}
@Test
public void testMultipleConsumersRollback() throws Exception {
internalMultipleConsumers(true);

View File

@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collection;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.tests.util.JMSTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.activemq.artemis.tests.util.CFUtil.createConnectionFactory;
@RunWith(value = Parameterized.class)
public class XAJMSOrderTest extends JMSTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
String protocol;
boolean exclusive;
ConnectionFactory protocolCF;
@Override
protected boolean usePersistence() {
return true;
}
public XAJMSOrderTest(String protocol, boolean exclusive) {
this.protocol = protocol;
this.exclusive = exclusive;
}
@Before
public void setupCF() {
protocolCF = createConnectionFactory(protocol, "tcp://localhost:61616");
}
@Parameterized.Parameters(name = "protocol={0}&exclusive={1}")
public static Collection getParameters() {
return Arrays.asList(new Object[][]{{"CORE", true}, {"CORE", false}});
}
@Override
protected void extraServerConfig(ActiveMQServer server) {
if (exclusive) {
server.getConfiguration().getAddressSettings().put("#", new AddressSettings().setAutoCreateQueues(true).setAutoCreateAddresses(true).setDeadLetterAddress(new SimpleString("ActiveMQ.DLQ")).setDefaultExclusiveQueue(true));
}
}
@Test
public void testPreparedRollbackACKWithRestart() throws Exception {
org.apache.activemq.artemis.core.server.Queue serverQueue = server.createQueue(new QueueConfiguration(getName()).setRoutingType(RoutingType.ANYCAST).setDurable(true));
final int NUMBER_OF_MESSAGES = 30;
ConnectionFactory cf = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
try (Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
Queue queue = session.createQueue(getName());
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
Message message = session.createTextMessage("hello " + i);
message.setIntProperty("i", i);
producer.send(message);
}
}
Wait.assertEquals(NUMBER_OF_MESSAGES, serverQueue::getMessageCount, 2000);
Xid xid = newXID(); // prepared TX with ACK
try (XAConnection connection = ((XAConnectionFactory) cf).createXAConnection(); XASession session = connection.createXASession()) {
Queue queue = session.createQueue(getName());
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
session.getXAResource().start(xid, XAResource.TMNOFLAGS);
for (int i = 0; i < 5; i++) {
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
logger.debug("message {} received", message.getText());
Assert.assertEquals("hello " + i, message.getText());
Assert.assertEquals(i, message.getIntProperty("i"));
}
session.getXAResource().end(xid, XAResource.TMSUCCESS);
session.getXAResource().prepare(xid);
}
server.stop();
server.start();
serverQueue = server.locateQueue(getName());
try (Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) {
Queue queue = session.createQueue(getName());
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
for (int i = 5; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
logger.debug("message {} received", message.getText());
Assert.assertEquals("hello " + i, message.getText());
Assert.assertEquals(i, message.getIntProperty("i"));
}
session.rollback();
}
try (XAConnection connection = ((XAConnectionFactory) cf).createXAConnection(); XASession session = connection.createXASession()) {
session.getXAResource().rollback(xid);
}
try (Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) {
Queue queue = session.createQueue(getName());
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
logger.debug("message {} received", message.getText());
Assert.assertEquals("hello " + i, message.getText());
Assert.assertEquals(i, message.getIntProperty("i"));
}
session.commit();
}
Wait.assertEquals(0, serverQueue::getMessageCount, 2000);
}
}

View File

@ -239,6 +239,7 @@ public class PendingTXCounterTest extends ActiveMQTestBase {
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
Message message = session.createTextMessage("hello " + i);
message.setIntProperty("i", i);
message.setStringProperty("text", "hello " + i);
producer.send(message);
}
}
@ -332,10 +333,12 @@ public class PendingTXCounterTest extends ActiveMQTestBase {
connection.start();
int start = rollback ? 5 : 10;
logger.debug("start is at {}, since rollback={}", start, rollback);
for (int i = start; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
logger.debug("Received message {}", message.getText());
Assert.assertEquals("hello " + i, message.getText());
Assert.assertEquals(i, message.getIntProperty("i"));
}

View File

@ -110,7 +110,7 @@ export TEST_PGDB_COMMIT_INTERVAL=50
export TEST_CLIENT_FAILURE_TEST_ENABLED=true
export TEST_CLIENT_FAILURE_PROTOCOL_LIST=AMQP,CORE,OPENWIRE
export TEST_CLIENT_FAILURE_AMQP_USE_LARGE_MESSAGE=FALSE
export TEST_CLIENT_FAILURE_AMQP_USE_LARGE_MESSAGE=TRUE
export TEST_CLIENT_FAILURE_AMQP_THREADS_PER_VM=20
export TEST_CLIENT_FAILURE_AMQP_CLIENT_CONSUMERS_PER_THREAD=20
export TEST_CLIENT_FAILURE_AMQP_TEST_REPEATS=1
@ -119,7 +119,7 @@ export TEST_CLIENT_FAILURE_AMQP_NUMBER_OF_VMS=5
export TEST_CLIENT_FAILURE_AMQP_NUMBER_OF_MESSAGES=20000
export TEST_CLIENT_FAILURE_AMQP_MEMORY_CLIENT=-Xmx256m
export TEST_CLIENT_FAILURE_CORE_USE_LARGE_MESSAGE=FALSE
export TEST_CLIENT_FAILURE_CORE_USE_LARGE_MESSAGE=TRUE
export TEST_CLIENT_FAILURE_CORE_THREADS_PER_VM=20
export TEST_CLIENT_FAILURE_CORE_CLIENT_CONSUMERS_PER_THREAD=20
export TEST_CLIENT_FAILURE_CORE_TEST_REPEATS=1
@ -128,7 +128,7 @@ export TEST_CLIENT_FAILURE_CORE_NUMBER_OF_VMS=5
export TEST_CLIENT_FAILURE_CORE_NUMBER_OF_MESSAGES=20000
export TEST_CLIENT_FAILURE_CORE_MEMORY_CLIENT=-Xmx256m
export TEST_CLIENT_FAILURE_OPENWIRE_USE_LARGE_MESSAGE=FALSE
export TEST_CLIENT_FAILURE_OPENWIRE_USE_LARGE_MESSAGE=TRUE
export TEST_CLIENT_FAILURE_OPENWIRE_THREADS_PER_VM=20
export TEST_CLIENT_FAILURE_OPENWIRE_CLIENT_CONSUMERS_PER_THREAD=20
export TEST_CLIENT_FAILURE_OPENWIRE_TEST_REPEATS=1

View File

@ -52,6 +52,10 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
}
@Override
public void reloadSequence(MessageReference ref) {
}
@Override
public boolean isEnabled() {
return false;