From 45c59f05dcd4df6eb0b27be92faf7da903bb4798 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Thu, 6 Apr 2017 16:18:53 -0400 Subject: [PATCH] ARTEMIS-1097 Respect the message priority value in the AMQP message Ensure that the header value for priority is read and returned in a form that is scaled such that it won't cause an IndexOutOfBoundsException from the QueueImpl priority array. Adds some additional testing for message priority support. --- .../protocol/amqp/broker/AMQPMessage.java | 21 +- .../amqp/AmqpMessagePriorityTest.java | 200 ++++++++++++++++++ .../integration/amqp/AmqpSendReceiveTest.java | 90 -------- 3 files changed, 217 insertions(+), 94 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index affe49ad47..cc0d1d8914 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -60,6 +60,9 @@ import io.netty.buffer.Unpooled; // see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format public class AMQPMessage extends RefCountMessage { + private static final int DEFAULT_MESSAGE_PRIORITY = 4; + private static final int MAX_MESSAGE_PRIORITY = 9; + final long messageFormat; ByteBuf data; boolean bufferValid; @@ -582,22 +585,32 @@ public class AMQPMessage extends RefCountMessage { @Override public long getTimestamp() { - return 0; + if (getHeader() != null && getHeader().getTtl() != null) { + return getHeader().getTtl().longValue(); + } else { + return 0L; + } } @Override public org.apache.activemq.artemis.api.core.Message setTimestamp(long timestamp) { - return null; + getHeader().setTtl(UnsignedInteger.valueOf(timestamp)); + return this; } @Override public byte getPriority() { - return 0; + if (getHeader() != null && getHeader().getPriority() != null) { + return (byte) Math.min(getHeader().getPriority().intValue(), MAX_MESSAGE_PRIORITY); + } else { + return DEFAULT_MESSAGE_PRIORITY; + } } @Override public org.apache.activemq.artemis.api.core.Message setPriority(byte priority) { - return null; + getHeader().setPriority(UnsignedByte.valueOf(priority)); + return this; } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java new file mode 100644 index 0000000000..39f6eac130 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.amqp; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpMessage; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSender; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tests for handling of the AMQP message priority header. + */ +public class AmqpMessagePriorityTest extends AmqpClientTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(AmqpMessagePriorityTest.class); + + @Test(timeout = 60000) + public void testMessageDefaultPriority() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + AmqpMessage message = new AmqpMessage(); + message.setMessageId("MessageID:1"); + message.setPriority((short) 4); + + sender.send(message); + sender.close(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertEquals((short) 4, receive.getPriority()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testMessageNonDefaultPriority() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + AmqpMessage message = new AmqpMessage(); + message.setMessageId("MessageID:1"); + message.setPriority((short) 0); + + sender.send(message); + sender.close(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertEquals((short) 0, receive.getPriority()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testMessageWithVeryHighPriority() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + AmqpMessage message = new AmqpMessage(); + message.setMessageId("MessageID:1"); + message.setPriority((short) 99); + + sender.send(message); + sender.close(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertEquals(99, receive.getPriority()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testMessageNoPriority() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + AmqpMessage message = new AmqpMessage(); + message.setMessageId("MessageID:1"); + + sender.send(message); + sender.close(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertEquals((short) 4, receive.getPriority()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + + @Test(timeout = 60000) + public void testMessagePriorityOrdering() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + for (short i = 0; i <= 9; ++i) { + AmqpMessage message = new AmqpMessage(); + message.setMessageId("MessageID:" + i); + message.setPriority(i); + sender.send(message); + } + + sender.close(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(10, queueView.getMessageCount()); + + receiver.flow(10); + for (int i = 9; i >= 0; --i) { + AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(received); + assertEquals((short) i, received.getPriority()); + received.accept(); + } + receiver.close(); + + assertEquals(0, queueView.getMessageCount()); + + connection.close(); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java index 47576591ab..54b361ca53 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java @@ -359,78 +359,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { connection.close(); } - @Test(timeout = 60000) - public void testMessageDefaultPriority() throws Exception { - sendMessages(getTestName(), 1, (short) 4); - - AmqpClient client = createAmqpClient(); - AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); - - AmqpReceiver receiver = session.createReceiver(getTestName()); - - Queue queueView = getProxyToQueue(getTestName()); - assertEquals(1, queueView.getMessageCount()); - - receiver.flow(1); - AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); - assertNotNull(receive); - assertEquals((short) 4, receive.getPriority()); - receiver.close(); - - assertEquals(1, queueView.getMessageCount()); - - connection.close(); - } - - @Test(timeout = 60000) - public void testMessageNonDefaultPriority() throws Exception { - sendMessages(getTestName(), 1, (short) 0); - - AmqpClient client = createAmqpClient(); - AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); - - AmqpReceiver receiver = session.createReceiver(getTestName()); - - Queue queueView = getProxyToQueue(getTestName()); - assertEquals(1, queueView.getMessageCount()); - - receiver.flow(1); - AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); - assertNotNull(receive); - assertEquals((short) 0, receive.getPriority()); - receiver.close(); - - assertEquals(1, queueView.getMessageCount()); - - connection.close(); - } - - @Test(timeout = 60000) - public void testMessageNoPriority() throws Exception { - sendMessages(getTestName(), 1); - - AmqpClient client = createAmqpClient(); - AmqpConnection connection = addConnection(client.connect()); - AmqpSession session = connection.createSession(); - - AmqpReceiver receiver = session.createReceiver(getTestName()); - - Queue queueView = getProxyToQueue(getTestName()); - assertEquals(1, queueView.getMessageCount()); - - receiver.flow(1); - AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); - assertNotNull(receive); - assertEquals((short) 4, receive.getPriority()); - receiver.close(); - - assertEquals(1, queueView.getMessageCount()); - - connection.close(); - } - @Test(timeout = 60000) public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception { int MSG_COUNT = 4; @@ -1213,22 +1141,4 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport { connection.close(); } } - - public void sendMessages(String destinationName, int count, short priority) throws Exception { - AmqpClient client = createAmqpClient(); - AmqpConnection connection = addConnection(client.connect()); - try { - AmqpSession session = connection.createSession(); - AmqpSender sender = session.createSender(destinationName); - - for (int i = 0; i < count; ++i) { - AmqpMessage message = new AmqpMessage(); - message.setMessageId("MessageID:" + i); - message.setPriority(priority); - sender.send(message); - } - } finally { - connection.close(); - } - } }