ARTEMIS-1566 Openwire client can't receive compressed messages

When openwire client uses compressed option to send messages
(jms.useCompression=true) openwire client failed to receive them.
The reason is in OpenwireMessageConverter.toAMQMessage():

1. message.setContent() should be called after setting properties
 (It will cause the compressed content to decompressed before delivering to clients)
2. message.onSend() should not be called here (it should be used
by producers. If used here it changes the internal flags of the
message and cause receive to fail).
This commit is contained in:
Howard Gao 2017-12-19 21:15:05 +08:00 committed by Clebert Suconic
parent bd8ec582b1
commit 6792dcdf27
2 changed files with 100 additions and 11 deletions

View File

@ -489,9 +489,9 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
boolean isCompressed = compressProp == null ? false : compressProp.booleanValue();
amqMsg.setCompressed(isCompressed);
byte[] bytes = null;
if (buffer != null) {
buffer.resetReaderIndex();
byte[] bytes = null;
synchronized (buffer) {
if (coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
SimpleString text = buffer.readNullableSimpleString();
@ -642,11 +642,6 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
buffer.resetReaderIndex();// this is important for topics as the buffer
// may be read multiple times
}
if (bytes != null) {
ByteSequence content = new ByteSequence(bytes);
amqMsg.setContent(content);
}
}
//we need check null because messages may come from other clients
@ -805,11 +800,11 @@ public class OpenWireMessageConverter implements MessageConverter<OpenwireMessag
}
}
}
try {
amqMsg.onSend();
amqMsg.setCompressed(isCompressed);
} catch (JMSException e) {
throw new IOException("Failed to covert to Openwire message", e);
amqMsg.setCompressed(isCompressed);
if (bytes != null) {
ByteSequence content = new ByteSequence(bytes);
amqMsg.setContent(content);
}
return amqMsg;
}

View File

@ -76,6 +76,10 @@ import org.junit.Test;
public class SimpleOpenWireTest extends BasicOpenWireTest {
private final String testString = "simple test string";
private final String testProp = "BASE_DATE";
private final String propValue = "2017-11-01";
@Override
@Before
public void setUp() throws Exception {
@ -330,6 +334,95 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
connection.close();
}
@Test
public void testCompression() throws Exception {
Connection cconnection = null;
Connection connection = null;
try {
ActiveMQConnectionFactory cfactory = new ActiveMQConnectionFactory("tcp://" + OWHOST + ":" + OWPORT + "");
cconnection = cfactory.createConnection();
cconnection.start();
Session csession = cconnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue cQueue = csession.createQueue(queueName);
MessageConsumer consumer = csession.createConsumer(cQueue);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://" + OWHOST + ":" + OWPORT + "?jms.useCompression=true");
connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
//text
TextMessage textMessage = session.createTextMessage();
textMessage.setText(testString);
TextMessage receivedMessage = sendAndReceive(textMessage, producer, consumer);
String receivedText = receivedMessage.getText();
assertEquals(testString, receivedText);
//MapMessage
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString(testProp, propValue);
MapMessage receivedMapMessage = sendAndReceive(mapMessage, producer, consumer);
String value = receivedMapMessage.getString(testProp);
assertEquals(propValue, value);
//Object
ObjectMessage objMessage = session.createObjectMessage();
objMessage.setObject(testString);
ObjectMessage receivedObjMessage = sendAndReceive(objMessage, producer, consumer);
String receivedObj = (String) receivedObjMessage.getObject();
assertEquals(testString, receivedObj);
//Stream
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString(testString);
StreamMessage receivedStreamMessage = sendAndReceive(streamMessage, producer, consumer);
String streamValue = receivedStreamMessage.readString();
assertEquals(testString, streamValue);
//byte
BytesMessage byteMessage = session.createBytesMessage();
byte[] bytes = testString.getBytes();
byteMessage.writeBytes(bytes);
BytesMessage receivedByteMessage = sendAndReceive(byteMessage, producer, consumer);
long receivedBodylength = receivedByteMessage.getBodyLength();
assertEquals("bodylength Correct", bytes.length, receivedBodylength);
byte[] receivedBytes = new byte[(int) receivedBodylength];
receivedByteMessage.readBytes(receivedBytes);
String receivedString = new String(receivedBytes);
assertEquals(testString, receivedString);
//Message
Message m = session.createMessage();
sendAndReceive(m, producer, consumer);
} finally {
if (cconnection != null) {
connection.close();
}
if (connection != null) {
cconnection.close();
}
}
}
private <T extends Message> T sendAndReceive(T m, MessageProducer producer, MessageConsumer consumer) throws JMSException {
m.setStringProperty(testProp, propValue);
producer.send(m);
T receivedMessage = (T) consumer.receive(1000);
String receivedProp = receivedMessage.getStringProperty(testProp);
assertEquals(propValue, receivedProp);
return receivedMessage;
}
@Test
public void testSimpleQueue() throws Exception {
connection.start();
@ -1523,6 +1616,7 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
//close first connection, let temp queue die
connection1.close();
waitForBindings(this.server, tempQueue.getQueueName(), true, 0, 0, 5000);
//send again
try {
producer.send(m);