From 01362bbb1d423c3859442664508533bdf1755772 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Fri, 7 Apr 2017 09:13:43 -0400 Subject: [PATCH] ARTEMIS-1100 Store Header on AMQP message --- .../protocol/amqp/broker/AMQPMessage.java | 4 +- .../amqp/AmqpMessagePriorityTest.java | 42 +++++++++++++++++++ 2 files changed, 44 insertions(+), 2 deletions(-) diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index cc0d1d8914..ffc378338f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -1013,14 +1013,14 @@ public class AMQPMessage extends RefCountMessage { } private int internalPersistSize() { - return data.array().length - sendFrom; + return data.array().length; } @Override public void persist(ActiveMQBuffer targetRecord) { checkBuffer(); targetRecord.writeInt(internalPersistSize()); - targetRecord.writeBytes(data.array(), sendFrom, data.array().length - sendFrom); + targetRecord.writeBytes(data.array(), 0, data.array().length ); } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java index 39f6eac130..d1467b1e3b 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java @@ -67,6 +67,48 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport { connection.close(); } + + @Test(timeout = 60000) + public void testRestartServer() throws Exception { + AmqpClient client = createAmqpClient(); + AmqpConnection connection = addConnection(client.connect()); + AmqpSession session = connection.createSession(); + + AmqpSender sender = session.createSender(getTestName()); + + AmqpMessage message = new AmqpMessage(); + message.setDurable(true); + message.setMessageId("MessageID:1"); + message.setPriority((short) 7); + + + sender.send(message); + sender.close(); + connection.close(); + + server.stop(); + server.start(); + + client = createAmqpClient(); + connection = addConnection(client.connect()); + session = connection.createSession(); + + AmqpReceiver receiver = session.createReceiver(getTestName()); + + Queue queueView = getProxyToQueue(getTestName()); + assertEquals(1, queueView.getMessageCount()); + + receiver.flow(1); + AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS); + assertNotNull(receive); + assertEquals((short) 7, receive.getPriority()); + receiver.close(); + + assertEquals(1, queueView.getMessageCount()); + + connection.close(); + } + @Test(timeout = 60000) public void testMessageNonDefaultPriority() throws Exception {