This commit is contained in:
Clebert Suconic 2017-12-09 10:10:40 -05:00
commit aca2ae25f5
6 changed files with 35 additions and 11 deletions

View File

@ -183,6 +183,10 @@ public interface Message {
return null; return null;
} }
default Message setLastValueProperty(SimpleString lastValueName) {
return this;
}
/** /**
* @deprecated do not use this, use through ICoreMessage or ClientMessage * @deprecated do not use this, use through ICoreMessage or ClientMessage
*/ */

View File

@ -598,6 +598,11 @@ public class CoreMessage extends RefCountMessage implements ICoreMessage {
return getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME); return getSimpleStringProperty(Message.HDR_LAST_VALUE_NAME);
} }
@Override
public Message setLastValueProperty(SimpleString lastValueName) {
return putStringProperty(Message.HDR_LAST_VALUE_NAME, lastValueName);
}
@Override @Override
public int getEncodeSize() { public int getEncodeSize() {
checkEncode(); checkEncode();

View File

@ -62,7 +62,6 @@ import org.apache.qpid.proton.message.impl.MessageImpl;
// see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format // see https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#section-message-format
public class AMQPMessage extends RefCountMessage { public class AMQPMessage extends RefCountMessage {
public static final String HDR_LAST_VALUE_NAME = org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString();
public static final int DEFAULT_MESSAGE_PRIORITY = 4; public static final int DEFAULT_MESSAGE_PRIORITY = 4;
public static final int MAX_MESSAGE_PRIORITY = 9; public static final int MAX_MESSAGE_PRIORITY = 9;
@ -1091,7 +1090,12 @@ public class AMQPMessage extends RefCountMessage {
@Override @Override
public SimpleString getLastValueProperty() { public SimpleString getLastValueProperty() {
return getSimpleStringProperty(HDR_LAST_VALUE_NAME); return getSimpleStringProperty(HDR_LAST_VALUE_NAME.toString());
}
@Override
public org.apache.activemq.artemis.api.core.Message setLastValueProperty(SimpleString lastValueName) {
return putStringProperty(HDR_LAST_VALUE_NAME, lastValueName);
} }
@Override @Override

View File

@ -771,6 +771,15 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
} }
} }
SimpleString lastValueProperty = coreMessage.getLastValueProperty();
if (lastValueProperty != null) {
try {
amqMsg.setStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_LAST_VALUE_NAME.toString(), lastValueProperty.toString());
} catch (JMSException e) {
throw new IOException("failure to set lvq property " + dlqCause, e);
}
}
Set<SimpleString> props = coreMessage.getPropertyNames(); Set<SimpleString> props = coreMessage.getPropertyNames();
if (props != null) { if (props != null) {
for (SimpleString s : props) { for (SimpleString s : props) {

View File

@ -234,7 +234,7 @@ public abstract class JMSClientTestSupport extends AmqpClientTestSupport {
} }
protected Connection createOpenWireConnection() throws JMSException { protected Connection createOpenWireConnection() throws JMSException {
return createCoreConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, true); return createOpenWireConnection(getBrokerOpenWireJMSConnectionString(), null, null, null, true);
} }
private Connection createOpenWireConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException { private Connection createOpenWireConnection(String connectionString, String username, String password, String clientId, boolean start) throws JMSException {

View File

@ -17,13 +17,13 @@
package org.apache.activemq.artemis.tests.integration.amqp; package org.apache.activemq.artemis.tests.integration.amqp;
import javax.jms.Connection; import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Queue; import javax.jms.Queue;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
@ -124,23 +124,25 @@ public class JMSLVQTest extends JMSClientTestSupport {
MessageProducer p = producerSession.createProducer(null); MessageProducer p = producerSession.createProducer(null);
TextMessage message1 = producerSession.createTextMessage(); TextMessage message1 = producerSession.createTextMessage();
message1.setStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME, "KEY"); message1.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "KEY");
message1.setText("hello"); message1.setText("hello");
p.send(queue1, message1); p.send(queue1, message1);
TextMessage message2 = producerSession.createTextMessage(); TextMessage message2 = producerSession.createTextMessage();
message2.setStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME, "KEY"); message2.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "KEY");
message2.setText("how are you"); message2.setText("how are you");
p.send(queue1, message2); p.send(queue1, message2);
Session consumerSession = consumerConnection.createSession(); //Simulate a small pause, else both messages could be consumed if consumer is fast enough
Thread.sleep(10);
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue consumerQueue = consumerSession.createQueue(LVQ_QUEUE_NAME); Queue consumerQueue = consumerSession.createQueue(LVQ_QUEUE_NAME);
MessageConsumer consumer = consumerSession.createConsumer(consumerQueue); MessageConsumer consumer = consumerSession.createConsumer(consumerQueue);
Message msg = consumer.receive(1000); TextMessage msg = (TextMessage) consumer.receive(1000);
assertNotNull(msg); assertNotNull(msg);
assertEquals("KEY", msg.getStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME)); assertEquals("KEY", msg.getStringProperty(AMQPMessage.HDR_LAST_VALUE_NAME.toString()));
assertTrue(msg instanceof TextMessage); assertEquals("how are you", msg.getText());
assertEquals("how are you", ((TextMessage)msg).getText());
consumer.close(); consumer.close();
} finally { } finally {
producerConnection.close(); producerConnection.close();