diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index ffc378338f..d076b21974 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.Date; import java.util.HashMap; @@ -37,6 +38,7 @@ import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.protocol.amqp.util.TLSEncode; import org.apache.activemq.artemis.reader.MessageUtil; import org.apache.activemq.artemis.utils.DataConstants; +import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedByte; import org.apache.qpid.proton.amqp.UnsignedInteger; @@ -60,8 +62,8 @@ import io.netty.buffer.Unpooled; // see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format public class AMQPMessage extends RefCountMessage { - private static final int DEFAULT_MESSAGE_PRIORITY = 4; - private static final int MAX_MESSAGE_PRIORITY = 9; + public static final int DEFAULT_MESSAGE_PRIORITY = 4; + public static final int MAX_MESSAGE_PRIORITY = 9; final long messageFormat; ByteBuf data; @@ -91,21 +93,18 @@ public class AMQPMessage extends RefCountMessage { this.messageFormat = messageFormat; this.bufferValid = true; parseHeaders(); - } /** for persistence reload */ public AMQPMessage(long messageFormat) { this.messageFormat = messageFormat; this.bufferValid = false; - } public AMQPMessage(long messageFormat, Message message) { this.messageFormat = messageFormat; this.protonMessage = (MessageImpl) message; this.bufferValid = false; - } public AMQPMessage(Message message) { @@ -171,7 +170,6 @@ public class AMQPMessage extends RefCountMessage { } this.appLocation = -1; TLSEncode.getDecoder().setByteBuffer(null); - } return applicationProperties; @@ -238,7 +236,6 @@ public class AMQPMessage extends RefCountMessage { return null; } - private void setSymbol(String symbol, Object value) { setSymbol(Symbol.getSymbol(symbol), value); } @@ -331,7 +328,6 @@ public class AMQPMessage extends RefCountMessage { @Override public synchronized boolean acceptsConsumer(long consumer) { - if (rejectedConsumers == null) { return true; } else { @@ -348,7 +344,6 @@ public class AMQPMessage extends RefCountMessage { rejectedConsumers.add(consumer); } - private synchronized void partialDecode(ByteBuffer buffer) { DecoderImpl decoder = TLSEncode.getDecoder(); decoder.setByteBuffer(buffer); @@ -516,10 +511,11 @@ public class AMQPMessage extends RefCountMessage { @Override public Object getUserID() { Properties properties = getProperties(); - if (properties != null && properties.getMessageId() != null) { - return properties.getMessageId(); + if (properties != null && properties.getUserId() != null) { + Binary binary = properties.getUserId(); + return new String(binary.getArray(), binary.getArrayOffset(), binary.getLength(), StandardCharsets.UTF_8); } else { - return this; + return null; } } @@ -585,8 +581,8 @@ public class AMQPMessage extends RefCountMessage { @Override public long getTimestamp() { - if (getHeader() != null && getHeader().getTtl() != null) { - return getHeader().getTtl().longValue(); + if (getProperties() != null && getProperties().getCreationTime() != null) { + return getProperties().getCreationTime().getTime(); } else { return 0L; } @@ -594,7 +590,7 @@ public class AMQPMessage extends RefCountMessage { @Override public org.apache.activemq.artemis.api.core.Message setTimestamp(long timestamp) { - getHeader().setTtl(UnsignedInteger.valueOf(timestamp)); + getProperties().setCreationTime(new Date(timestamp)); return this; } @@ -868,7 +864,6 @@ public class AMQPMessage extends RefCountMessage { return this; } - @Override public void reencode() { if (_deliveryAnnotations != null) getProtonMessage().setDeliveryAnnotations(_deliveryAnnotations); @@ -879,8 +874,6 @@ public class AMQPMessage extends RefCountMessage { checkBuffer(); } - - @Override public SimpleString getSimpleStringProperty(String key) throws ActiveMQPropertyConversionException { return SimpleString.toSimpleString((String) getApplicationPropertiesMap().get(key)); @@ -995,7 +988,6 @@ public class AMQPMessage extends RefCountMessage { } else { return null; } - } @Override diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java index db40a8e82c..496454b2c9 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/message/AMQPMessageTest.java @@ -17,8 +17,14 @@ package org.apache.activemq.artemis.protocol.amqp.message; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.nio.charset.StandardCharsets; +import java.util.Date; + import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage; import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.commons.collections.map.HashedMap; @@ -28,9 +34,11 @@ import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.impl.MessageImpl; -import org.junit.Assert; import org.junit.Test; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + public class AMQPMessageTest { @Test @@ -44,20 +52,168 @@ public class AMQPMessageTest { protonMessage.getHeader().setDurable(Boolean.TRUE); protonMessage.setApplicationProperties(new ApplicationProperties(new HashedMap())); + AMQPMessage decoded = encodeAndDecodeMessage(protonMessage); + + assertEquals(7, decoded.getHeader().getDeliveryCount().intValue()); + assertEquals(true, decoded.getHeader().getDurable()); + assertEquals("someNiceLocal", decoded.getAddress()); + } + + @Test + public void testGetAddressFromMessage() { + final String ADDRESS = "myQueue"; + + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + protonMessage.setHeader(new Header()); + protonMessage.setAddress(ADDRESS); + + AMQPMessage decoded = encodeAndDecodeMessage(protonMessage); + + assertEquals(ADDRESS, decoded.getAddress()); + } + + @Test + public void testGetAddressSimpleStringFromMessage() { + final String ADDRESS = "myQueue"; + + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + protonMessage.setHeader(new Header()); + protonMessage.setAddress(ADDRESS); + + AMQPMessage decoded = encodeAndDecodeMessage(protonMessage); + + assertEquals(ADDRESS, decoded.getAddressSimpleString().toString()); + } + + @Test + public void testGetAddressFromMessageWithNoValueSet() { + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + + AMQPMessage decoded = encodeAndDecodeMessage(protonMessage); + + assertNull(decoded.getAddress()); + assertNull(decoded.getAddressSimpleString()); + } + + @Test + public void testIsDurableFromMessage() { + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + protonMessage.setHeader(new Header()); + protonMessage.setDurable(true); + + AMQPMessage decoded = encodeAndDecodeMessage(protonMessage); + + assertTrue(decoded.isDurable()); + } + + @Test + public void testIsDurableFromMessageWithNoValueSet() { + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + + AMQPMessage decoded = encodeAndDecodeMessage(protonMessage); + + assertFalse(decoded.isDurable()); + } + + @Test + public void testGetGroupIDFromMessage() { + final String GROUP_ID = "group-1"; + + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + protonMessage.setHeader(new Header()); + protonMessage.setGroupId(GROUP_ID); + + AMQPMessage decoded = encodeAndDecodeMessage(protonMessage); + + assertEquals(GROUP_ID, decoded.getGroupID().toString()); + } + + @Test + public void testGetGroupIDFromMessageWithNoGroupId() { + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + + AMQPMessage decoded = encodeAndDecodeMessage(protonMessage); + + assertNull(decoded.getUserID()); + } + + @Test + public void testGetUserIDFromMessage() { + final String USER_NAME = "foo"; + + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + protonMessage.setHeader(new Header()); + protonMessage.setUserId(USER_NAME.getBytes(StandardCharsets.UTF_8)); + + AMQPMessage decoded = encodeAndDecodeMessage(protonMessage); + + assertEquals(USER_NAME, decoded.getUserID()); + } + + @Test + public void testGetUserIDFromMessageWithNoUserID() { + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + + AMQPMessage decoded = encodeAndDecodeMessage(protonMessage); + + assertNull(decoded.getUserID()); + } + + @Test + public void testGetPriorityFromMessage() { + final short PRIORITY = 7; + + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + protonMessage.setHeader(new Header()); + protonMessage.setPriority(PRIORITY); + + AMQPMessage decoded = encodeAndDecodeMessage(protonMessage); + + assertEquals(PRIORITY, decoded.getPriority()); + } + + @Test + public void testGetPriorityFromMessageWithNoPrioritySet() { + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + + AMQPMessage decoded = encodeAndDecodeMessage(protonMessage); + + assertEquals(AMQPMessage.DEFAULT_MESSAGE_PRIORITY, decoded.getPriority()); + } + + @Test + public void testGetTimestampFromMessage() { + Date timestamp = new Date(System.currentTimeMillis()); + + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + protonMessage.setHeader( new Header()); + Properties properties = new Properties(); + properties.setCreationTime(timestamp); + + protonMessage.setProperties(properties); + + AMQPMessage decoded = encodeAndDecodeMessage(protonMessage); + + assertEquals(timestamp.getTime(), decoded.getTimestamp()); + } + + @Test + public void testGetTimestampFromMessageWithNoCreateTimeSet() { + MessageImpl protonMessage = (MessageImpl) Message.Factory.create(); + protonMessage.setHeader( new Header()); + + AMQPMessage decoded = encodeAndDecodeMessage(protonMessage); + + assertEquals(0L, decoded.getTimestamp()); + } + + private AMQPMessage encodeAndDecodeMessage(MessageImpl message) { ByteBuf nettyBuffer = Unpooled.buffer(1500); - protonMessage.encode(new NettyWritable(nettyBuffer)); - + message.encode(new NettyWritable(nettyBuffer)); byte[] bytes = new byte[nettyBuffer.writerIndex()]; - nettyBuffer.readBytes(bytes); - AMQPMessage encode = new AMQPMessage(0, bytes); - - Assert.assertEquals(7, encode.getHeader().getDeliveryCount().intValue()); - Assert.assertEquals(true, encode.getHeader().getDurable()); - Assert.assertEquals("someNiceLocal", encode.getAddress()); - - + return new AMQPMessage(0, bytes); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java index d1467b1e3b..2b573546cf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpMessagePriorityTest.java @@ -67,9 +67,8 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport { connection.close(); } - @Test(timeout = 60000) - public void testRestartServer() throws Exception { + public void testMessagePriorityPreservedAfterServerRestart() throws Exception { AmqpClient client = createAmqpClient(); AmqpConnection connection = addConnection(client.connect()); AmqpSession session = connection.createSession(); @@ -81,7 +80,6 @@ public class AmqpMessagePriorityTest extends AmqpClientTestSupport { message.setMessageId("MessageID:1"); message.setPriority((short) 7); - sender.send(message); sender.close(); connection.close();