ARTEMIS-1100 Store Header on AMQP message
This commit is contained in:
parent
7304416d42
commit
01362bbb1d
|
@ -1013,14 +1013,14 @@ public class AMQPMessage extends RefCountMessage {
|
||||||
}
|
}
|
||||||
|
|
||||||
private int internalPersistSize() {
|
private int internalPersistSize() {
|
||||||
return data.array().length - sendFrom;
|
return data.array().length;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void persist(ActiveMQBuffer targetRecord) {
|
public void persist(ActiveMQBuffer targetRecord) {
|
||||||
checkBuffer();
|
checkBuffer();
|
||||||
targetRecord.writeInt(internalPersistSize());
|
targetRecord.writeInt(internalPersistSize());
|
||||||
targetRecord.writeBytes(data.array(), sendFrom, data.array().length - sendFrom);
|
targetRecord.writeBytes(data.array(), 0, data.array().length );
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -67,6 +67,48 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
|
||||||
connection.close();
|
connection.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testRestartServer() throws Exception {
|
||||||
|
AmqpClient client = createAmqpClient();
|
||||||
|
AmqpConnection connection = addConnection(client.connect());
|
||||||
|
AmqpSession session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpSender sender = session.createSender(getTestName());
|
||||||
|
|
||||||
|
AmqpMessage message = new AmqpMessage();
|
||||||
|
message.setDurable(true);
|
||||||
|
message.setMessageId("MessageID:1");
|
||||||
|
message.setPriority((short) 7);
|
||||||
|
|
||||||
|
|
||||||
|
sender.send(message);
|
||||||
|
sender.close();
|
||||||
|
connection.close();
|
||||||
|
|
||||||
|
server.stop();
|
||||||
|
server.start();
|
||||||
|
|
||||||
|
client = createAmqpClient();
|
||||||
|
connection = addConnection(client.connect());
|
||||||
|
session = connection.createSession();
|
||||||
|
|
||||||
|
AmqpReceiver receiver = session.createReceiver(getTestName());
|
||||||
|
|
||||||
|
Queue queueView = getProxyToQueue(getTestName());
|
||||||
|
assertEquals(1, queueView.getMessageCount());
|
||||||
|
|
||||||
|
receiver.flow(1);
|
||||||
|
AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
|
||||||
|
assertNotNull(receive);
|
||||||
|
assertEquals((short) 7, receive.getPriority());
|
||||||
|
receiver.close();
|
||||||
|
|
||||||
|
assertEquals(1, queueView.getMessageCount());
|
||||||
|
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60000)
|
@Test(timeout = 60000)
|
||||||
public void testMessageNonDefaultPriority() throws Exception {
|
public void testMessageNonDefaultPriority() throws Exception {
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue