From ec8026e4d605c2a8f98a05118c731e1d26a5c502 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 4 Sep 2024 18:14:36 -0400 Subject: [PATCH] ARTEMIS-5032 Ensure AMQP message priority is honored after restart Ensure that on server restart the original priority value assigned to an AMQP message is used when dispatching durable messages from the store. The AMQP Header section is scanned if present and the priority value is recovered in an efficient manner. --- .../amqp/broker/AMQPLargeMessage.java | 20 ++- .../protocol/amqp/broker/AMQPMessage.java | 17 ++- .../amqp/broker/AMQPStandardMessage.java | 81 ++++++++++++ .../amqp/broker/AMQPPersisterTest.java | 41 +++++- .../transport/amqp/client/AmqpSender.java | 2 +- .../amqp/AmqpMessagePriorityTest.java | 123 ++++++++++++++++-- 6 files changed, 259 insertions(+), 25 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java index 777f3752ec..71f708873b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPLargeMessage.java @@ -66,6 +66,8 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage } standardMessage.setMessageAnnotations(messageAnnotations); standardMessage.setMessageID(messageID); + standardMessage.setPriority(getPriority()); + return standardMessage.toCore(); } catch (Exception e) { logger.warn(e.getMessage(), e); @@ -199,7 +201,6 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage } private void saveEncoding(ByteBuf buf) { - WritableBuffer oldBuffer = TLSEncode.getEncoder().getBuffer(); TLSEncode.getEncoder().setByteBuffer(new NettyWritable(buf)); @@ -241,6 +242,11 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage encodedHeaderSize = buf.readInt(); header = (Header)TLSEncode.getDecoder().readObject(); + // Recover message priority from saved encoding as we store that separately + if (header != null && header.getPriority() != null) { + priority = (byte) Math.min(header.getPriority().byteValue(), MAX_MESSAGE_PRIORITY); + } + deliveryAnnotationsPosition = buf.readInt(); encodedDeliveryAnnotationsSize = buf.readInt(); @@ -264,8 +270,6 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage expiration = System.currentTimeMillis() + header.getTtl().intValue(); } } - - } finally { TLSEncode.getDecoder().setBuffer(oldBuffer); } @@ -322,7 +326,6 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage } public void parseHeader(ReadableBuffer buffer) { - DecoderImpl decoder = TLSEncode.getDecoder(); decoder.setBuffer(buffer); @@ -335,6 +338,9 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage expiration = System.currentTimeMillis() + header.getTtl().intValue(); } } + if (header.getPriority() != null) { + priority = (byte) Math.min(header.getPriority().intValue(), MAX_MESSAGE_PRIORITY); + } } } finally { decoder.setBuffer(null); @@ -489,6 +495,9 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage newMessage.setParentRef(this); newMessage.setFileDurable(this.isDurable()); newMessage.reloadExpiration(this.expiration); + if (priority != AMQPMessage.DEFAULT_MESSAGE_PRIORITY) { + newMessage.setPriority(priority); + } return newMessage; } @@ -502,6 +511,9 @@ public class AMQPLargeMessage extends AMQPMessage implements LargeServerMessage try { AMQPLargeMessage copy = new AMQPLargeMessage(newID, messageFormat, null, coreMessageObjectPools, storageManager); copy.setDurable(this.isDurable()); + if (priority != AMQPMessage.DEFAULT_MESSAGE_PRIORITY) { + copy.setPriority(priority); + } final AtomicInteger place = new AtomicInteger(0); ByteBuf bufferNewHeader = null; diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 724f71fc56..dc14a503fe 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -208,6 +208,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. protected long expiration; protected boolean expirationReload = false; protected long scheduledTime = -1; + protected byte priority = DEFAULT_MESSAGE_PRIORITY; protected boolean isPaged; @@ -678,6 +679,7 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. if (!expirationReload) { expiration = 0; } + priority = DEFAULT_MESSAGE_PRIORITY; encodedHeaderSize = 0; memoryEstimate = -1; originalEstimate = -1; @@ -713,6 +715,9 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. expiration = System.currentTimeMillis() + header.getTtl().longValue(); } } + if (header.getPriority() != null) { + priority = (byte) Math.min(header.getPriority().intValue(), MAX_MESSAGE_PRIORITY); + } } else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) { deliveryAnnotationsPosition = constructorPos; this.deliveryAnnotations = (DeliveryAnnotations) constructor.readValue(); @@ -1297,19 +1302,21 @@ public abstract class AMQPMessage extends RefCountMessage implements org.apache. @Override public final byte getPriority() { - if (header != null && header.getPriority() != null) { - return (byte) Math.min(header.getPriority().intValue(), MAX_MESSAGE_PRIORITY); - } else { - return DEFAULT_MESSAGE_PRIORITY; - } + return priority; } @Override public final org.apache.activemq.artemis.api.core.Message setPriority(byte priority) { + // Internally we can only deal with a limited range, but the AMQP value is allowed + // to span the full range of the unsigned byte so we store what was actually set in + // the AMQP Header section. + this.priority = (byte) Math.min(priority & 0xff, MAX_MESSAGE_PRIORITY); + if (header == null) { header = new Header(); } header.setPriority(UnsignedByte.valueOf(priority)); + return this; } diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java index 47d94f7032..2ed0deecca 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPStandardMessage.java @@ -39,8 +39,12 @@ import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.codec.DecodeException; +import org.apache.qpid.proton.codec.DecoderImpl; import org.apache.qpid.proton.codec.EncoderImpl; +import org.apache.qpid.proton.codec.EncodingCodes; import org.apache.qpid.proton.codec.ReadableBuffer; +import org.apache.qpid.proton.codec.TypeConstructor; import org.apache.qpid.proton.codec.WritableBuffer; // see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format @@ -235,10 +239,87 @@ public class AMQPStandardMessage extends AMQPMessage { // Message state is now that the underlying buffer is loaded, but the contents not yet scanned resetMessageData(); + recoverHeaderDataFromEncoding(); + modified = false; messageDataScanned = MessageDataScanningStatus.RELOAD_PERSISTENCE.code; } + private void recoverHeaderDataFromEncoding() { + final DecoderImpl decoder = TLSEncode.getDecoder(); + decoder.setBuffer(data); + + try { + // At one point the broker could write the header and delivery annotations out of order + // which means a full scan is required for maximum compatibility with that older data + // where delivery annotations could be found ahead of the Header in the encoding. + // + // We manually extract the priority from the Header encoding if present to ensure we do + // not create any unneeded GC overhead during load from storage. We don't directly store + // other values from the header except for a value that is computed based on TTL and or + // absolute expiration time in the Properties section, but that value is stored in the + // data of the persisted message. + for (int section = 0; section < 2 && data.hasRemaining(); section++) { + final TypeConstructor constructor = decoder.readConstructor(); + + if (Header.class.equals(constructor.getTypeClass())) { + final byte typeCode = data.get(); + + @SuppressWarnings("unused") + int size = 0; + int count = 0; + + switch (typeCode) { + case EncodingCodes.LIST0: + break; + case EncodingCodes.LIST8: + size = data.get() & 0xff; + count = data.get() & 0xff; + break; + case EncodingCodes.LIST32: + size = data.getInt(); + count = data.getInt(); + break; + default: + throw new DecodeException("Incorrect type found in Header encoding: " + typeCode); + } + + // Priority is stored in the second slot of the Header list encoding if present + if (count >= 2) { + decoder.readBoolean(false); // Discard durable for now, it is computed elsewhere. + + final byte encodingCode = data.get(); + final int priority; + + switch (encodingCode) { + case EncodingCodes.UBYTE: + priority = data.get() & 0xff; + break; + case EncodingCodes.NULL: + priority = DEFAULT_MESSAGE_PRIORITY; + break; + default: + throw new DecodeException("Expected UnsignedByte type but found encoding: " + EncodingCodes.toString(encodingCode)); + } + + // Scaled here so do not call setPriority as that will store the set value in the AMQP header + // and we don't want to create that Header instance at this stage. + this.priority = (byte) Math.min(priority, MAX_MESSAGE_PRIORITY); + } + + return; + } else if (DeliveryAnnotations.class.equals(constructor.getTypeClass())) { + constructor.skipValue(); + } else { + return; + } + } + } finally { + decoder.setBuffer(null); + data.rewind(); // Ensure next scan start at the beginning. + } + } + @Override public long getPersistentSize() throws ActiveMQException { return getEncodeSize(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPPersisterTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPPersisterTest.java index 4563e6bf4e..864886d88b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPPersisterTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPPersisterTest.java @@ -45,7 +45,11 @@ import org.junit.jupiter.api.Test; public class AMQPPersisterTest { protected Message createMessage(SimpleString address, int msgId, byte[] content) { - final MessageImpl protonMessage = createProtonMessage(address.toString(), content); + return createMessage(address, (byte) AMQPMessage.MAX_MESSAGE_PRIORITY, msgId, content); + } + + protected Message createMessage(SimpleString address, byte priority, int msgId, byte[] content) { + final MessageImpl protonMessage = createProtonMessage(address.toString(), priority, content); final AMQPStandardMessage msg = encodeAndDecodeMessage(protonMessage, content.length); msg.setAddress(address); msg.setMessageID(msgId); @@ -62,12 +66,12 @@ public class AMQPPersisterTest { return new AMQPStandardMessage(0, bytes, null); } - private MessageImpl createProtonMessage(String address, byte[] content) { + private MessageImpl createProtonMessage(String address, byte priority, byte[] content) { MessageImpl message = (MessageImpl) Proton.message(); Header header = new Header(); header.setDurable(true); - header.setPriority(UnsignedByte.valueOf((byte) 9)); + header.setPriority(UnsignedByte.valueOf(priority)); Properties properties = new Properties(); properties.setCreationTime(new Date(System.currentTimeMillis())); @@ -88,10 +92,8 @@ public class AMQPPersisterTest { return message; } - @Test public void testEncodeSize() throws Exception { - Message message = createMessage(SimpleString.of("Test"), 1, new byte[10]); MessagePersister persister = AMQPMessagePersisterV3.getInstance(); @@ -100,7 +102,36 @@ public class AMQPPersisterTest { persister.encode(buffer, message); assertEquals(persister.getEncodeSize(message), buffer.writerIndex()); + } + @Test + public void testV1PersisterRecoversPriority() { + doTestPersisterRecoversPriority(AMQPMessagePersister.getInstance()); + } + @Test + public void testV2PersisterRecoversPriority() { + doTestPersisterRecoversPriority(AMQPMessagePersisterV2.getInstance()); + } + + @Test + public void testV3PersisterRecoversPriority() { + doTestPersisterRecoversPriority(AMQPMessagePersisterV3.getInstance()); + } + + private void doTestPersisterRecoversPriority(MessagePersister persister) { + for (byte priority = 0; priority <= AMQPMessage.MAX_MESSAGE_PRIORITY; ++priority) { + final Message message = createMessage(SimpleString.of("Test"), priority, 1, new byte[10]); + + final ActiveMQBuffer buffer = ActiveMQBuffers.dynamicBuffer(1024); + + persister.encode(buffer, message); + + assertEquals(persister.getID(), buffer.readByte()); + + final Message decoded = persister.decode(buffer, message, null); + + assertEquals(priority, decoded.getPriority()); + } } } diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java index 17f85749fb..fad795a0d5 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSender.java @@ -491,7 +491,7 @@ public class AmqpSender extends AmqpAbstractResource { @Override public void processFlowUpdates(AmqpConnection connection) throws IOException { - logger.trace("Sender {} flow update, credit = {}", getEndpoint().getCredit()); + logger.trace("Sender {} flow update, credit = {}", senderId, getEndpoint().getCredit()); doCreditInspection(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java index bf7b4b9f9e..997a53c1cd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.amqp; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.activemq.artemis.core.server.Queue; @@ -42,6 +43,13 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport { protected static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final int MIN_LARGE_MESSAGE_SIZE = 16384; + + @Override + protected void configureAMQPAcceptorParameters(Map params) { + params.put("amqpMinLargeMessageSize", MIN_LARGE_MESSAGE_SIZE); + } + @Test @Timeout(60) public void testMessageDefaultPriority() throws Exception { @@ -215,37 +223,132 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport { @Test @Timeout(60) public void testMessagePriorityOrdering() throws Exception { - AmqpClient client = createAmqpClient(); - AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); + doTestMessagePriorityOrdering(false); + } - AmqpSender sender = session.createSender(getQueueName()); + @Test + @Timeout(60) + public void testMessagePriorityOrderingForLargeMessages() throws Exception { + doTestMessagePriorityOrdering(true); + } + + private void doTestMessagePriorityOrdering(boolean largeMessages) throws Exception { + final AmqpClient client = createAmqpClient(); + final AmqpConnection connection = addConnection(client.connect()); + final AmqpSession session = connection.createSession(); + final AmqpSender sender = session.createSender(getQueueName()); + + final int priorityLevels = 10; + final int bodySize = largeMessages ? MIN_LARGE_MESSAGE_SIZE + 10 : 10; + final String body = "#".repeat(bodySize); + + for (short i = 0; i < priorityLevels; ++i) { + final AmqpMessage message = new AmqpMessage(); - for (short i = 0; i <= 9; ++i) { - AmqpMessage message = new AmqpMessage(); message.setMessageId("MessageID:" + i); message.setPriority(i); + message.setText(body); + sender.send(message); } sender.close(); - Queue queueView = getProxyToQueue(getQueueName()); + final Queue queueView = getProxyToQueue(getQueueName()); + Wait.assertEquals(10L, queueView::getMessageCount, 5000, 10); - AmqpReceiver receiver = session.createReceiver(getQueueName()); + final AmqpReceiver receiver = session.createReceiver(getQueueName()); receiver.flow(10); - for (int i = 9; i >= 0; --i) { - AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + + for (int i = priorityLevels - 1; i >= 0; --i) { + final AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); assertEquals((short) i, received.getPriority()); + assertEquals(body, received.getText()); + received.accept(); } + receiver.close(); Wait.assertEquals(0L, queueView::getMessageCount, 5000, 10); connection.close(); } + + @Test + @Timeout(30) + public void testMessagePriorityAppliedAfterServerRestart() throws Exception { + doTestMessagePriorityAppliedAfterServerRestart(false); + } + + @Test + @Timeout(30) + public void testLargeMessagePriorityAppliedAfterServerRestart() throws Exception { + doTestMessagePriorityAppliedAfterServerRestart(true); + } + + public void doTestMessagePriorityAppliedAfterServerRestart(boolean largeMessages) throws Exception { + final AmqpClient client = createAmqpClient(); + + final int priorityLevels = 10; + final int bodySize = largeMessages ? MIN_LARGE_MESSAGE_SIZE + 10 : 10; + final String body = "#".repeat(bodySize); + + { + final AmqpConnection connection = addConnection(client.connect()); + final AmqpSession session = connection.createSession(); + final AmqpSender sender = session.createSender(getQueueName()); + final Queue queueView = getProxyToQueue(getQueueName()); + + for (int priority = 0; priority < priorityLevels; ++priority) { + AmqpMessage message = new AmqpMessage(); + message.setDurable(true); + message.setMessageId("MessageID:" + priority); + message.setPriority((short) priority); + message.setText(body); + + sender.send(message); + } + + assertEquals(priorityLevels, queueView.getMessageCount()); + + sender.close(); + connection.close(); + } + + server.stop(); + server.start(); + + { + final Queue queueView = getProxyToQueue(getQueueName()); + final AmqpConnection connection = addConnection(client.connect()); + final AmqpSession session = connection.createSession(); + final AmqpReceiver receiver = session.createReceiver(getQueueName()); + + Wait.assertEquals((long)priorityLevels, () -> queueView.getMessageCount(), 2_000, 100); + + receiver.flow(priorityLevels); + + for (int priority = priorityLevels - 1; priority >= 0; --priority) { + final AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS); + + assertNotNull(message); + assertEquals(priority, message.getPriority()); + assertEquals("MessageID:" + priority, message.getMessageId()); + + logger.info("Read message with priority = {}", message.getPriority()); + + message.accept(); + } + + receiver.close(); + connection.close(); + + assertEquals(0, queueView.getMessageCount()); + } + } }