This commit is contained in:
Justin Bertram 2017-04-07 15:36:23 -05:00
commit bb6a418374
3 changed files with 181 additions and 35 deletions

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.protocol.amqp.broker;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@ -37,6 +38,7 @@ import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
@ -60,8 +62,8 @@ import io.netty.buffer.Unpooled;
// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
public class AMQPMessage extends RefCountMessage {
private static final int DEFAULT_MESSAGE_PRIORITY = 4;
private static final int MAX_MESSAGE_PRIORITY = 9;
public static final int DEFAULT_MESSAGE_PRIORITY = 4;
public static final int MAX_MESSAGE_PRIORITY = 9;
final long messageFormat;
ByteBuf data;
@ -91,21 +93,18 @@ public class AMQPMessage extends RefCountMessage {
this.messageFormat = messageFormat;
this.bufferValid = true;
parseHeaders();
}
/** for persistence reload */
public AMQPMessage(long messageFormat) {
this.messageFormat = messageFormat;
this.bufferValid = false;
}
public AMQPMessage(long messageFormat, Message message) {
this.messageFormat = messageFormat;
this.protonMessage = (MessageImpl) message;
this.bufferValid = false;
}
public AMQPMessage(Message message) {
@ -171,7 +170,6 @@ public class AMQPMessage extends RefCountMessage {
}
this.appLocation = -1;
TLSEncode.getDecoder().setByteBuffer(null);
}
return applicationProperties;
@ -238,7 +236,6 @@ public class AMQPMessage extends RefCountMessage {
return null;
}
private void setSymbol(String symbol, Object value) {
setSymbol(Symbol.getSymbol(symbol), value);
}
@ -331,7 +328,6 @@ public class AMQPMessage extends RefCountMessage {
@Override
public synchronized boolean acceptsConsumer(long consumer) {
if (rejectedConsumers == null) {
return true;
} else {
@ -348,7 +344,6 @@ public class AMQPMessage extends RefCountMessage {
rejectedConsumers.add(consumer);
}
private synchronized void partialDecode(ByteBuffer buffer) {
DecoderImpl decoder = TLSEncode.getDecoder();
decoder.setByteBuffer(buffer);
@ -516,10 +511,11 @@ public class AMQPMessage extends RefCountMessage {
@Override
public Object getUserID() {
Properties properties = getProperties();
if (properties != null && properties.getMessageId() != null) {
return properties.getMessageId();
if (properties != null && properties.getUserId() != null) {
Binary binary = properties.getUserId();
return new String(binary.getArray(), binary.getArrayOffset(), binary.getLength(), StandardCharsets.UTF_8);
} else {
return this;
return null;
}
}
@ -585,8 +581,8 @@ public class AMQPMessage extends RefCountMessage {
@Override
public long getTimestamp() {
if (getHeader() != null && getHeader().getTtl() != null) {
return getHeader().getTtl().longValue();
if (getProperties() != null && getProperties().getCreationTime() != null) {
return getProperties().getCreationTime().getTime();
} else {
return 0L;
}
@ -594,7 +590,7 @@ public class AMQPMessage extends RefCountMessage {
@Override
public org.apache.activemq.artemis.api.core.Message setTimestamp(long timestamp) {
getHeader().setTtl(UnsignedInteger.valueOf(timestamp));
getProperties().setCreationTime(new Date(timestamp));
return this;
}
@ -868,7 +864,6 @@ public class AMQPMessage extends RefCountMessage {
return this;
}
@Override
public void reencode() {
if (_deliveryAnnotations != null) getProtonMessage().setDeliveryAnnotations(_deliveryAnnotations);
@ -879,8 +874,6 @@ public class AMQPMessage extends RefCountMessage {
checkBuffer();
}
@Override
public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key));
@ -995,7 +988,6 @@ public class AMQPMessage extends RefCountMessage {
} else {
return null;
}
}
@Override

View File

@ -17,8 +17,14 @@
package org.apache.activemq.artemis.protocol.amqp.message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.commons.collections.map.HashedMap;
@ -28,9 +34,11 @@ import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
public class AMQPMessageTest {
@Test
@ -44,20 +52,168 @@ public class AMQPMessageTest {
protonMessage.getHeader().setDurable(Boolean.TRUE);
protonMessage.setApplicationProperties(new ApplicationProperties(new HashedMap()));
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(7, decoded.getHeader().getDeliveryCount().intValue());
assertEquals(true, decoded.getHeader().getDurable());
assertEquals("someNiceLocal", decoded.getAddress());
}
@Test
public void testGetAddressFromMessage() {
final String ADDRESS = "myQueue";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setAddress(ADDRESS);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(ADDRESS, decoded.getAddress());
}
@Test
public void testGetAddressSimpleStringFromMessage() {
final String ADDRESS = "myQueue";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setAddress(ADDRESS);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(ADDRESS, decoded.getAddressSimpleString().toString());
}
@Test
public void testGetAddressFromMessageWithNoValueSet() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getAddress());
assertNull(decoded.getAddressSimpleString());
}
@Test
public void testIsDurableFromMessage() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setDurable(true);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertTrue(decoded.isDurable());
}
@Test
public void testIsDurableFromMessageWithNoValueSet() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertFalse(decoded.isDurable());
}
@Test
public void testGetGroupIDFromMessage() {
final String GROUP_ID = "group-1";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setGroupId(GROUP_ID);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(GROUP_ID, decoded.getGroupID().toString());
}
@Test
public void testGetGroupIDFromMessageWithNoGroupId() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getUserID());
}
@Test
public void testGetUserIDFromMessage() {
final String USER_NAME = "foo";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setUserId(USER_NAME.getBytes(StandardCharsets.UTF_8));
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(USER_NAME, decoded.getUserID());
}
@Test
public void testGetUserIDFromMessageWithNoUserID() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getUserID());
}
@Test
public void testGetPriorityFromMessage() {
final short PRIORITY = 7;
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setPriority(PRIORITY);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(PRIORITY, decoded.getPriority());
}
@Test
public void testGetPriorityFromMessageWithNoPrioritySet() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(AMQPMessage.DEFAULT_MESSAGE_PRIORITY, decoded.getPriority());
}
@Test
public void testGetTimestampFromMessage() {
Date timestamp = new Date(System.currentTimeMillis());
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader( new Header());
Properties properties = new Properties();
properties.setCreationTime(timestamp);
protonMessage.setProperties(properties);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(timestamp.getTime(), decoded.getTimestamp());
}
@Test
public void testGetTimestampFromMessageWithNoCreateTimeSet() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader( new Header());
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(0L, decoded.getTimestamp());
}
private AMQPMessage encodeAndDecodeMessage(MessageImpl message) {
ByteBuf nettyBuffer = Unpooled.buffer(1500);
protonMessage.encode(new NettyWritable(nettyBuffer));
message.encode(new NettyWritable(nettyBuffer));
byte[] bytes = new byte[nettyBuffer.writerIndex()];
nettyBuffer.readBytes(bytes);
AMQPMessage encode = new AMQPMessage(0, bytes);
Assert.assertEquals(7, encode.getHeader().getDeliveryCount().intValue());
Assert.assertEquals(true, encode.getHeader().getDurable());
Assert.assertEquals("someNiceLocal", encode.getAddress());
return new AMQPMessage(0, bytes);
}
}

View File

@ -67,9 +67,8 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 60000)
public void testRestartServer() throws Exception {
public void testMessagePriorityPreservedAfterServerRestart() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
@ -81,7 +80,6 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
message.setMessageId("MessageID:1");
message.setPriority((short) 7);
sender.send(message);
sender.close();
connection.close();