ARTEMIS-5032 Ensure AMQP message priority is honored after restart

Ensure that on server restart the original priority value assigned to an
AMQP message is used when dispatching durable messages from the store.
The AMQP Header section is scanned if present and the priority value
is recovered in an efficient manner.
This commit is contained in:
Timothy Bish 2024-09-04 18:14:36 -04:00 committed by clebertsuconic
parent 50c20554d8
commit ec8026e4d6
6 changed files with 259 additions and 25 deletions

View File

@ -66,6 +66,8 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
} }
standardMessage.setMessageAnnotations(messageAnnotations); standardMessage.setMessageAnnotations(messageAnnotations);
standardMessage.setMessageID(messageID); standardMessage.setMessageID(messageID);
standardMessage.setPriority(getPriority());
return standardMessage.toCore(); return standardMessage.toCore();
} catch (Exception e) { } catch (Exception e) {
logger.warn(e.getMessage(), e); logger.warn(e.getMessage(), e);
@ -199,7 +201,6 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
} }
private void saveEncoding(ByteBuf buf) { private void saveEncoding(ByteBuf buf) {
WritableBuffer oldBuffer = TLSEncode.getEncoder().getBuffer(); WritableBuffer oldBuffer = TLSEncode.getEncoder().getBuffer();
TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf)); TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf));
@ -241,6 +242,11 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
encodedHeaderSize = buf.readInt(); encodedHeaderSize = buf.readInt();
header = (Header)TLSEncode.getDecoder().readObject(); header = (Header)TLSEncode.getDecoder().readObject();
// Recover message priority from saved encoding as we store that separately
if (header != null && header.getPriority() != null) {
priority = (byte) Math.min(header.getPriority().byteValue(), MAX_MESSAGE_PRIORITY);
}
deliveryAnnotationsPosition = buf.readInt(); deliveryAnnotationsPosition = buf.readInt();
encodedDeliveryAnnotationsSize = buf.readInt(); encodedDeliveryAnnotationsSize = buf.readInt();
@ -264,8 +270,6 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
expiration = System.currentTimeMillis() + header.getTtl().intValue(); expiration = System.currentTimeMillis() + header.getTtl().intValue();
} }
} }
} finally { } finally {
TLSEncode.getDecoder().setBuffer(oldBuffer); TLSEncode.getDecoder().setBuffer(oldBuffer);
} }
@ -322,7 +326,6 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
} }
public void parseHeader(ReadableBuffer buffer) { public void parseHeader(ReadableBuffer buffer) {
DecoderImpl decoder = TLSEncode.getDecoder(); DecoderImpl decoder = TLSEncode.getDecoder();
decoder.setBuffer(buffer); decoder.setBuffer(buffer);
@ -335,6 +338,9 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
expiration = System.currentTimeMillis() + header.getTtl().intValue(); expiration = System.currentTimeMillis() + header.getTtl().intValue();
} }
} }
if (header.getPriority() != null) {
priority = (byte) Math.min(header.getPriority().intValue(), MAX_MESSAGE_PRIORITY);
}
} }
} finally { } finally {
decoder.setBuffer(null); decoder.setBuffer(null);
@ -489,6 +495,9 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
newMessage.setParentRef(this); newMessage.setParentRef(this);
newMessage.setFileDurable(this.isDurable()); newMessage.setFileDurable(this.isDurable());
newMessage.reloadExpiration(this.expiration); newMessage.reloadExpiration(this.expiration);
if (priority != AMQPMessage.DEFAULT_MESSAGE_PRIORITY) {
newMessage.setPriority(priority);
}
return newMessage; return newMessage;
} }
@ -502,6 +511,9 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage
try { try {
AMQPLargeMessage copy = new AMQPLargeMessage(newID, messageFormat, null, coreMessageObjectPools, storageManager); AMQPLargeMessage copy = new AMQPLargeMessage(newID, messageFormat, null, coreMessageObjectPools, storageManager);
copy.setDurable(this.isDurable()); copy.setDurable(this.isDurable());
if (priority != AMQPMessage.DEFAULT_MESSAGE_PRIORITY) {
copy.setPriority(priority);
}
final AtomicInteger place = new AtomicInteger(0); final AtomicInteger place = new AtomicInteger(0);
ByteBuf bufferNewHeader = null; ByteBuf bufferNewHeader = null;

View File

@ -208,6 +208,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
protected long expiration; protected long expiration;
protected boolean expirationReload = false; protected boolean expirationReload = false;
protected long scheduledTime = -1; protected long scheduledTime = -1;
protected byte priority = DEFAULT_MESSAGE_PRIORITY;
protected boolean isPaged; protected boolean isPaged;
@ -678,6 +679,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
if (!expirationReload) { if (!expirationReload) {
expiration = 0; expiration = 0;
} }
priority = DEFAULT_MESSAGE_PRIORITY;
encodedHeaderSize = 0; encodedHeaderSize = 0;
memoryEstimate = -1; memoryEstimate = -1;
originalEstimate = -1; originalEstimate = -1;
@ -713,6 +715,9 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
expiration = System.currentTimeMillis() + header.getTtl().longValue(); expiration = System.currentTimeMillis() + header.getTtl().longValue();
} }
} }
if (header.getPriority() != null) {
priority = (byte) Math.min(header.getPriority().intValue(), MAX_MESSAGE_PRIORITY);
}
} else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) { } else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) {
deliveryAnnotationsPosition = constructorPos; deliveryAnnotationsPosition = constructorPos;
this.deliveryAnnotations = (DeliveryAnnotations) constructor.readValue(); this.deliveryAnnotations = (DeliveryAnnotations) constructor.readValue();
@ -1297,19 +1302,21 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache.
@Override @Override
public final byte getPriority() { public final byte getPriority() {
if (header != null && header.getPriority() != null) { return priority;
return (byte) Math.min(header.getPriority().intValue(), MAX_MESSAGE_PRIORITY);
} else {
return DEFAULT_MESSAGE_PRIORITY;
}
} }
@Override @Override
public final org.apache.activemq.artemis.api.core.Message setPriority(byte priority) { public final org.apache.activemq.artemis.api.core.Message setPriority(byte priority) {
// Internally we can only deal with a limited range, but the AMQP value is allowed
// to span the full range of the unsigned byte so we store what was actually set in
// the AMQP Header section.
this.priority = (byte) Math.min(priority & 0xff, MAX_MESSAGE_PRIORITY);
if (header == null) { if (header == null) {
header = new Header(); header = new Header();
} }
header.setPriority(UnsignedByte.valueOf(priority)); header.setPriority(UnsignedByte.valueOf(priority));
return this; return this;
} }

View File

@ -39,8 +39,12 @@ import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.amqp.messaging.Section; import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.DecodeException;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.EncoderImpl; import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.EncodingCodes;
import org.apache.qpid.proton.codec.ReadableBuffer; import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.TypeConstructor;
import org.apache.qpid.proton.codec.WritableBuffer; import org.apache.qpid.proton.codec.WritableBuffer;
// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format // see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
@ -235,10 +239,87 @@ public class AMQPStandardMessage extends AMQPMessage {
// Message state is now that the underlying buffer is loaded, but the contents not yet scanned // Message state is now that the underlying buffer is loaded, but the contents not yet scanned
resetMessageData(); resetMessageData();
recoverHeaderDataFromEncoding();
modified = false; modified = false;
messageDataScanned = MessageDataScanningStatus.RELOAD_PERSISTENCE.code; messageDataScanned = MessageDataScanningStatus.RELOAD_PERSISTENCE.code;
} }
private void recoverHeaderDataFromEncoding() {
final DecoderImpl decoder = TLSEncode.getDecoder();
decoder.setBuffer(data);
try {
// At one point the broker could write the header and delivery annotations out of order
// which means a full scan is required for maximum compatibility with that older data
// where delivery annotations could be found ahead of the Header in the encoding.
//
// We manually extract the priority from the Header encoding if present to ensure we do
// not create any unneeded GC overhead during load from storage. We don't directly store
// other values from the header except for a value that is computed based on TTL and or
// absolute expiration time in the Properties section, but that value is stored in the
// data of the persisted message.
for (int section = 0; section < 2 && data.hasRemaining(); section++) {
final TypeConstructor<?> constructor = decoder.readConstructor();
if (Header.class.equals(constructor.getTypeClass())) {
final byte typeCode = data.get();
@SuppressWarnings("unused")
int size = 0;
int count = 0;
switch (typeCode) {
case EncodingCodes.LIST0:
break;
case EncodingCodes.LIST8:
size = data.get() & 0xff;
count = data.get() & 0xff;
break;
case EncodingCodes.LIST32:
size = data.getInt();
count = data.getInt();
break;
default:
throw new DecodeException("Incorrect type found in Header encoding: " + typeCode);
}
// Priority is stored in the second slot of the Header list encoding if present
if (count >= 2) {
decoder.readBoolean(false); // Discard durable for now, it is computed elsewhere.
final byte encodingCode = data.get();
final int priority;
switch (encodingCode) {
case EncodingCodes.UBYTE:
priority = data.get() & 0xff;
break;
case EncodingCodes.NULL:
priority = DEFAULT_MESSAGE_PRIORITY;
break;
default:
throw new DecodeException("Expected UnsignedByte type but found encoding: " + EncodingCodes.toString(encodingCode));
}
// Scaled here so do not call setPriority as that will store the set value in the AMQP header
// and we don't want to create that Header instance at this stage.
this.priority = (byte) Math.min(priority, MAX_MESSAGE_PRIORITY);
}
return;
} else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) {
constructor.skipValue();
} else {
return;
}
}
} finally {
decoder.setBuffer(null);
data.rewind(); // Ensure next scan start at the beginning.
}
}
@Override @Override
public long getPersistentSize() throws ActiveMQException { public long getPersistentSize() throws ActiveMQException {
return getEncodeSize(); return getEncodeSize();

View File

@ -45,7 +45,11 @@ import org.junit.jupiter.api.Test;
public class AMQPPersisterTest { public class AMQPPersisterTest {
protected Message createMessage(SimpleString address, int msgId, byte[] content) { protected Message createMessage(SimpleString address, int msgId, byte[] content) {
final MessageImpl protonMessage = createProtonMessage(address.toString(), content); return createMessage(address, (byte) AMQPMessage.MAX_MESSAGE_PRIORITY, msgId, content);
}
protected Message createMessage(SimpleString address, byte priority, int msgId, byte[] content) {
final MessageImpl protonMessage = createProtonMessage(address.toString(), priority, content);
final AMQPStandardMessage msg = encodeAndDecodeMessage(protonMessage, content.length); final AMQPStandardMessage msg = encodeAndDecodeMessage(protonMessage, content.length);
msg.setAddress(address); msg.setAddress(address);
msg.setMessageID(msgId); msg.setMessageID(msgId);
@ -62,12 +66,12 @@ public class AMQPPersisterTest {
return new AMQPStandardMessage(0, bytes, null); return new AMQPStandardMessage(0, bytes, null);
} }
private MessageImpl createProtonMessage(String address, byte[] content) { private MessageImpl createProtonMessage(String address, byte priority, byte[] content) {
MessageImpl message = (MessageImpl) Proton.message(); MessageImpl message = (MessageImpl) Proton.message();
Header header = new Header(); Header header = new Header();
header.setDurable(true); header.setDurable(true);
header.setPriority(UnsignedByte.valueOf((byte) 9)); header.setPriority(UnsignedByte.valueOf(priority));
Properties properties = new Properties(); Properties properties = new Properties();
properties.setCreationTime(new Date(System.currentTimeMillis())); properties.setCreationTime(new Date(System.currentTimeMillis()));
@ -88,10 +92,8 @@ public class AMQPPersisterTest {
return message; return message;
} }
@Test @Test
public void testEncodeSize() throws Exception { public void testEncodeSize() throws Exception {
Message message = createMessage(SimpleString.of("Test"), 1, new byte[10]); Message message = createMessage(SimpleString.of("Test"), 1, new byte[10]);
MessagePersister persister = AMQPMessagePersisterV3.getInstance(); MessagePersister persister = AMQPMessagePersisterV3.getInstance();
@ -100,7 +102,36 @@ public class AMQPPersisterTest {
persister.encode(buffer, message); persister.encode(buffer, message);
assertEquals(persister.getEncodeSize(message), buffer.writerIndex()); assertEquals(persister.getEncodeSize(message), buffer.writerIndex());
}
@Test
public void testV1PersisterRecoversPriority() {
doTestPersisterRecoversPriority(AMQPMessagePersister.getInstance());
}
@Test
public void testV2PersisterRecoversPriority() {
doTestPersisterRecoversPriority(AMQPMessagePersisterV2.getInstance());
}
@Test
public void testV3PersisterRecoversPriority() {
doTestPersisterRecoversPriority(AMQPMessagePersisterV3.getInstance());
}
private void doTestPersisterRecoversPriority(MessagePersister persister) {
for (byte priority = 0; priority <= AMQPMessage.MAX_MESSAGE_PRIORITY; ++priority) {
final Message message = createMessage(SimpleString.of("Test"), priority, 1, new byte[10]);
final ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024);
persister.encode(buffer, message);
assertEquals(persister.getID(), buffer.readByte());
final Message decoded = persister.decode(buffer, message, null);
assertEquals(priority, decoded.getPriority());
}
} }
} }

View File

@ -491,7 +491,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
@Override @Override
public void processFlowUpdates(AmqpConnection connection) throws IOException { public void processFlowUpdates(AmqpConnection connection) throws IOException {
logger.trace("Sender {} flow update, credit = {}", getEndpoint().getCredit()); logger.trace("Sender {} flow update, credit = {}", senderId, getEndpoint().getCredit());
doCreditInspection(); doCreditInspection();
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.amqp;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
@ -42,6 +43,13 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final int MIN_LARGE_MESSAGE_SIZE = 16384;
@Override
protected void configureAMQPAcceptorParameters(Map<String, Object> params) {
params.put("amqpMinLargeMessageSize", MIN_LARGE_MESSAGE_SIZE);
}
@Test @Test
@Timeout(60) @Timeout(60)
public void testMessageDefaultPriority() throws Exception { public void testMessageDefaultPriority() throws Exception {
@ -215,37 +223,132 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
@Test @Test
@Timeout(60) @Timeout(60)
public void testMessagePriorityOrdering() throws Exception { public void testMessagePriorityOrdering() throws Exception {
AmqpClient client = createAmqpClient(); doTestMessagePriorityOrdering(false);
AmqpConnection connection = addConnection(client.connect()); }
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getQueueName()); @Test
@Timeout(60)
public void testMessagePriorityOrderingForLargeMessages() throws Exception {
doTestMessagePriorityOrdering(true);
}
private void doTestMessagePriorityOrdering(boolean largeMessages) throws Exception {
final AmqpClient client = createAmqpClient();
final AmqpConnection connection = addConnection(client.connect());
final AmqpSession session = connection.createSession();
final AmqpSender sender = session.createSender(getQueueName());
final int priorityLevels = 10;
final int bodySize = largeMessages ? MIN_LARGE_MESSAGE_SIZE + 10 : 10;
final String body = "#".repeat(bodySize);
for (short i = 0; i < priorityLevels; ++i) {
final AmqpMessage message = new AmqpMessage();
for (short i = 0; i <= 9; ++i) {
AmqpMessage message = new AmqpMessage();
message.setMessageId("MessageID:" + i); message.setMessageId("MessageID:" + i);
message.setPriority(i); message.setPriority(i);
message.setText(body);
sender.send(message); sender.send(message);
} }
sender.close(); sender.close();
Queue queueView = getProxyToQueue(getQueueName()); final Queue queueView = getProxyToQueue(getQueueName());
Wait.assertEquals(10L, queueView::getMessageCount, 5000, 10); Wait.assertEquals(10L, queueView::getMessageCount, 5000, 10);
AmqpReceiver receiver = session.createReceiver(getQueueName()); final AmqpReceiver receiver = session.createReceiver(getQueueName());
receiver.flow(10); receiver.flow(10);
for (int i = 9; i >= 0; --i) {
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); for (int i = priorityLevels - 1; i >= 0; --i) {
final AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received); assertNotNull(received);
assertEquals((short) i, received.getPriority()); assertEquals((short) i, received.getPriority());
assertEquals(body, received.getText());
received.accept(); received.accept();
} }
receiver.close(); receiver.close();
Wait.assertEquals(0L, queueView::getMessageCount, 5000, 10); Wait.assertEquals(0L, queueView::getMessageCount, 5000, 10);
connection.close(); connection.close();
} }
@Test
@Timeout(30)
public void testMessagePriorityAppliedAfterServerRestart() throws Exception {
doTestMessagePriorityAppliedAfterServerRestart(false);
}
@Test
@Timeout(30)
public void testLargeMessagePriorityAppliedAfterServerRestart() throws Exception {
doTestMessagePriorityAppliedAfterServerRestart(true);
}
public void doTestMessagePriorityAppliedAfterServerRestart(boolean largeMessages) throws Exception {
final AmqpClient client = createAmqpClient();
final int priorityLevels = 10;
final int bodySize = largeMessages ? MIN_LARGE_MESSAGE_SIZE + 10 : 10;
final String body = "#".repeat(bodySize);
{
final AmqpConnection connection = addConnection(client.connect());
final AmqpSession session = connection.createSession();
final AmqpSender sender = session.createSender(getQueueName());
final Queue queueView = getProxyToQueue(getQueueName());
for (int priority = 0; priority < priorityLevels; ++priority) {
AmqpMessage message = new AmqpMessage();
message.setDurable(true);
message.setMessageId("MessageID:" + priority);
message.setPriority((short) priority);
message.setText(body);
sender.send(message);
}
assertEquals(priorityLevels, queueView.getMessageCount());
sender.close();
connection.close();
}
server.stop();
server.start();
{
final Queue queueView = getProxyToQueue(getQueueName());
final AmqpConnection connection = addConnection(client.connect());
final AmqpSession session = connection.createSession();
final AmqpReceiver receiver = session.createReceiver(getQueueName());
Wait.assertEquals((long)priorityLevels, () -> queueView.getMessageCount(), 2_000, 100);
receiver.flow(priorityLevels);
for (int priority = priorityLevels - 1; priority >= 0; --priority) {
final AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(message);
assertEquals(priority, message.getPriority());
assertEquals("MessageID:" + priority, message.getMessageId());
logger.info("Read message with priority = {}", message.getPriority());
message.accept();
}
receiver.close();
connection.close();
assertEquals(0, queueView.getMessageCount());
}
}
} }