ARTEMIS-1101 Read the correct values for timestamp and user-id

Fix the getUserID and getTimestamp methods in AMQPMessage to read and
return the correct values.  Adds some tests to cover these cases and
cleans up some others.
This commit is contained in:
Timothy Bish 2017-04-07 16:12:58 -04:00
parent a41a1316d5
commit 3b45261f19
3 changed files with 181 additions and 35 deletions

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.protocol.amqp.broker;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
@ -37,6 +38,7 @@ import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
@ -60,8 +62,8 @@ import io.netty.buffer.Unpooled;
// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
public class AMQPMessage extends RefCountMessage {
private static final int DEFAULT_MESSAGE_PRIORITY = 4;
private static final int MAX_MESSAGE_PRIORITY = 9;
public static final int DEFAULT_MESSAGE_PRIORITY = 4;
public static final int MAX_MESSAGE_PRIORITY = 9;
final long messageFormat;
ByteBuf data;
@ -91,21 +93,18 @@ public class AMQPMessage extends RefCountMessage {
this.messageFormat = messageFormat;
this.bufferValid = true;
parseHeaders();
}
/** for persistence reload */
public AMQPMessage(long messageFormat) {
this.messageFormat = messageFormat;
this.bufferValid = false;
}
public AMQPMessage(long messageFormat, Message message) {
this.messageFormat = messageFormat;
this.protonMessage = (MessageImpl) message;
this.bufferValid = false;
}
public AMQPMessage(Message message) {
@ -171,7 +170,6 @@ public class AMQPMessage extends RefCountMessage {
}
this.appLocation = -1;
TLSEncode.getDecoder().setByteBuffer(null);
}
return applicationProperties;
@ -238,7 +236,6 @@ public class AMQPMessage extends RefCountMessage {
return null;
}
private void setSymbol(String symbol, Object value) {
setSymbol(Symbol.getSymbol(symbol), value);
}
@ -331,7 +328,6 @@ public class AMQPMessage extends RefCountMessage {
@Override
public synchronized boolean acceptsConsumer(long consumer) {
if (rejectedConsumers == null) {
return true;
} else {
@ -348,7 +344,6 @@ public class AMQPMessage extends RefCountMessage {
rejectedConsumers.add(consumer);
}
private synchronized void partialDecode(ByteBuffer buffer) {
DecoderImpl decoder = TLSEncode.getDecoder();
decoder.setByteBuffer(buffer);
@ -516,10 +511,11 @@ public class AMQPMessage extends RefCountMessage {
@Override
public Object getUserID() {
Properties properties = getProperties();
if (properties != null && properties.getMessageId() != null) {
return properties.getMessageId();
if (properties != null && properties.getUserId() != null) {
Binary binary = properties.getUserId();
return new String(binary.getArray(), binary.getArrayOffset(), binary.getLength(), StandardCharsets.UTF_8);
} else {
return this;
return null;
}
}
@ -585,8 +581,8 @@ public class AMQPMessage extends RefCountMessage {
@Override
public long getTimestamp() {
if (getHeader() != null && getHeader().getTtl() != null) {
return getHeader().getTtl().longValue();
if (getProperties() != null && getProperties().getCreationTime() != null) {
return getProperties().getCreationTime().getTime();
} else {
return 0L;
}
@ -594,7 +590,7 @@ public class AMQPMessage extends RefCountMessage {
@Override
public org.apache.activemq.artemis.api.core.Message setTimestamp(long timestamp) {
getHeader().setTtl(UnsignedInteger.valueOf(timestamp));
getProperties().setCreationTime(new Date(timestamp));
return this;
}
@ -868,7 +864,6 @@ public class AMQPMessage extends RefCountMessage {
return this;
}
@Override
public void reencode() {
if (_deliveryAnnotations != null) getProtonMessage().setDeliveryAnnotations(_deliveryAnnotations);
@ -879,8 +874,6 @@ public class AMQPMessage extends RefCountMessage {
checkBuffer();
}
@Override
public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException {
return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key));
@ -995,7 +988,6 @@ public class AMQPMessage extends RefCountMessage {
} else {
return null;
}
}
@Override

View File

@ -17,8 +17,14 @@
package org.apache.activemq.artemis.protocol.amqp.message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable;
import org.apache.commons.collections.map.HashedMap;
@ -28,9 +34,11 @@ import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Assert;
import org.junit.Test;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
public class AMQPMessageTest {
@Test
@ -44,20 +52,168 @@ public class AMQPMessageTest {
protonMessage.getHeader().setDurable(Boolean.TRUE);
protonMessage.setApplicationProperties(new ApplicationProperties(new HashedMap()));
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(7, decoded.getHeader().getDeliveryCount().intValue());
assertEquals(true, decoded.getHeader().getDurable());
assertEquals("someNiceLocal", decoded.getAddress());
}
@Test
public void testGetAddressFromMessage() {
final String ADDRESS = "myQueue";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setAddress(ADDRESS);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(ADDRESS, decoded.getAddress());
}
@Test
public void testGetAddressSimpleStringFromMessage() {
final String ADDRESS = "myQueue";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setAddress(ADDRESS);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(ADDRESS, decoded.getAddressSimpleString().toString());
}
@Test
public void testGetAddressFromMessageWithNoValueSet() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getAddress());
assertNull(decoded.getAddressSimpleString());
}
@Test
public void testIsDurableFromMessage() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setDurable(true);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertTrue(decoded.isDurable());
}
@Test
public void testIsDurableFromMessageWithNoValueSet() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertFalse(decoded.isDurable());
}
@Test
public void testGetGroupIDFromMessage() {
final String GROUP_ID = "group-1";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setGroupId(GROUP_ID);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(GROUP_ID, decoded.getGroupID().toString());
}
@Test
public void testGetGroupIDFromMessageWithNoGroupId() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getUserID());
}
@Test
public void testGetUserIDFromMessage() {
final String USER_NAME = "foo";
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setUserId(USER_NAME.getBytes(StandardCharsets.UTF_8));
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(USER_NAME, decoded.getUserID());
}
@Test
public void testGetUserIDFromMessageWithNoUserID() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertNull(decoded.getUserID());
}
@Test
public void testGetPriorityFromMessage() {
final short PRIORITY = 7;
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader(new Header());
protonMessage.setPriority(PRIORITY);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(PRIORITY, decoded.getPriority());
}
@Test
public void testGetPriorityFromMessageWithNoPrioritySet() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(AMQPMessage.DEFAULT_MESSAGE_PRIORITY, decoded.getPriority());
}
@Test
public void testGetTimestampFromMessage() {
Date timestamp = new Date(System.currentTimeMillis());
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader( new Header());
Properties properties = new Properties();
properties.setCreationTime(timestamp);
protonMessage.setProperties(properties);
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(timestamp.getTime(), decoded.getTimestamp());
}
@Test
public void testGetTimestampFromMessageWithNoCreateTimeSet() {
MessageImpl protonMessage = (MessageImpl) Message.Factory.create();
protonMessage.setHeader( new Header());
AMQPMessage decoded = encodeAndDecodeMessage(protonMessage);
assertEquals(0L, decoded.getTimestamp());
}
private AMQPMessage encodeAndDecodeMessage(MessageImpl message) {
ByteBuf nettyBuffer = Unpooled.buffer(1500);
protonMessage.encode(new NettyWritable(nettyBuffer));
message.encode(new NettyWritable(nettyBuffer));
byte[] bytes = new byte[nettyBuffer.writerIndex()];
nettyBuffer.readBytes(bytes);
AMQPMessage encode = new AMQPMessage(0, bytes);
Assert.assertEquals(7, encode.getHeader().getDeliveryCount().intValue());
Assert.assertEquals(true, encode.getHeader().getDurable());
Assert.assertEquals("someNiceLocal", encode.getAddress());
return new AMQPMessage(0, bytes);
}
}

View File

@ -67,9 +67,8 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
connection.close();
}
@Test(timeout = 60000)
public void testRestartServer() throws Exception {
public void testMessagePriorityPreservedAfterServerRestart() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = addConnection(client.connect());
AmqpSession session = connection.createSession();
@ -81,7 +80,6 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport {
message.setMessageId("MessageID:1");
message.setPriority((short) 7);
sender.send(message);
sender.close();
connection.close();