From 6792dcdf2745ca9c2d735b56511a01f1168da01b Mon Sep 17 00:00:00 2001 From: Howard Gao Date: Tue, 19 Dec 2017 21:15:05 +0800 Subject: [PATCH] ARTEMIS-1566 Openwire client can't receive compressed messages When openwire client uses compressed option to send messages (jms.useCompression=true) openwire client failed to receive them. The reason is in OpenwireMessageConverter.toAMQMessage(): 1. message.setContent() should be called after setting properties (It will cause the compressed content to decompressed before delivering to clients) 2. message.onSend() should not be called here (it should be used by producers. If used here it changes the internal flags of the message and cause receive to fail). --- .../openwire/OpenWireMessageConverter.java | 17 ++-- .../openwire/SimpleOpenWireTest.java | 94 +++++++++++++++++++ 2 files changed, 100 insertions(+), 11 deletions(-) diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java index a5bb0f98c2..6af9997bd7 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java @@ -489,9 +489,9 @@ public class OpenWireMessageConverter implements MessageConverter T sendAndReceive(T m, MessageProducer producer, MessageConsumer consumer) throws JMSException { + m.setStringProperty(testProp, propValue); + producer.send(m); + T receivedMessage = (T) consumer.receive(1000); + String receivedProp = receivedMessage.getStringProperty(testProp); + assertEquals(propValue, receivedProp); + return receivedMessage; + } + @Test public void testSimpleQueue() throws Exception { connection.start(); @@ -1523,6 +1616,7 @@ public class SimpleOpenWireTest extends BasicOpenWireTest { //close first connection, let temp queue die connection1.close(); + waitForBindings(this.server, tempQueue.getQueueName(), true, 0, 0, 5000); //send again try { producer.send(m);