diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java index d7666b56ba..5e4dc749f3 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/Message.java @@ -183,6 +183,10 @@ public interface Message { return null; } + default Message setLastValueProperty(SimpleString lastValueName) { + return this; + } + /** * @deprecated do not use this, use through ICoreMessage or ClientMessage */ diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java index 516addc3cb..25001424c8 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/message/impl/CoreMessage.java @@ -598,6 +598,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage { return getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); } + @Override + public Message setLastValueProperty(SimpleString lastValueName) { + return putStringProperty(Message.HDR_LAST_VALUE_NAME, lastValueName); + } + @Override public int getEncodeSize() { checkEncode(); diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java index 06a589442f..2e06d2ec59 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPMessage.java @@ -62,7 +62,6 @@ import org.apache.qpid.proton.message.impl.MessageImpl; // see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format public class AMQPMessage extends RefCountMessage { - public static final String HDR_LAST_VALUE_NAME = org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString(); public static final int DEFAULT_MESSAGE_PRIORITY = 4; public static final int MAX_MESSAGE_PRIORITY = 9; @@ -1091,7 +1090,12 @@ public class AMQPMessage extends RefCountMessage { @Override public SimpleString getLastValueProperty() { - return getSimpleStringProperty(HDR_LAST_VALUE_NAME); + return getSimpleStringProperty(HDR_LAST_VALUE_NAME.toString()); + } + + @Override + public org.apache.activemq.artemis.api.core.Message setLastValueProperty(SimpleString lastValueName) { + return putStringProperty(HDR_LAST_VALUE_NAME, lastValueName); } @Override diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index 88f90ee831..2f9fee4ef5 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -771,6 +771,15 @@ public class OpenWireMessageConverter implements MessageConverter props = coreMessage.getPropertyNames(); if (props != null) { for (SimpleString s : props) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java index 190dd786d4..9e0d41a422 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSClientTestSupport.java @@ -234,7 +234,7 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport { } protected Connection createOpenWireConnection() throws JMSException { - return createCoreConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, true); + return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, true); } private Connection createOpenWireConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java index 38af6bf593..9d7dd193d0 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSLVQTest.java @@ -17,13 +17,13 @@ package org.apache.activemq.artemis.tests.integration.amqp; import javax.jms.Connection; -import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; +import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQServer; @@ -124,23 +124,25 @@ public class JMSLVQTest extends JMSClientTestSupport { MessageProducer p = producerSession.createProducer(null); TextMessage message1 = producerSession.createTextMessage(); - message1.setStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME, "KEY"); + message1.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "KEY"); message1.setText("hello"); p.send(queue1, message1); TextMessage message2 = producerSession.createTextMessage(); - message2.setStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME, "KEY"); + message2.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "KEY"); message2.setText("how are you"); p.send(queue1, message2); - Session consumerSession = consumerConnection.createSession(); + //Simulate a small pause, else both messages could be consumed if consumer is fast enough + Thread.sleep(10); + + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); Queue consumerQueue = consumerSession.createQueue(LVQ_QUEUE_NAME); MessageConsumer consumer = consumerSession.createConsumer(consumerQueue); - Message msg = consumer.receive(1000); + TextMessage msg = (TextMessage) consumer.receive(1000); assertNotNull(msg); - assertEquals("KEY", msg.getStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME)); - assertTrue(msg instanceof TextMessage); - assertEquals("how are you", ((TextMessage)msg).getText()); + assertEquals("KEY", msg.getStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME.toString())); + assertEquals("how are you", msg.getText()); consumer.close(); } finally { producerConnection.close();