ARTEMIS-464 Sending null textMessage shouldn't break the wire
This commit is contained in:
parent
3560415bcb
commit
91bdeb3728
|
@ -129,9 +129,13 @@ public class OpenWireMessageConverter implements MessageConverter {
|
|||
byte coreType = toCoreType(messageSend.getDataStructureType());
|
||||
coreMessage.setType(coreType);
|
||||
|
||||
ActiveMQBuffer body = coreMessage.getBodyBuffer();
|
||||
|
||||
ByteSequence contents = messageSend.getContent();
|
||||
if (contents != null) {
|
||||
ActiveMQBuffer body = coreMessage.getBodyBuffer();
|
||||
if (contents == null && coreType == org.apache.activemq.artemis.api.core.Message.TEXT_TYPE) {
|
||||
body.writeNullableString(null);
|
||||
}
|
||||
else if (contents != null) {
|
||||
boolean messageCompressed = messageSend.isCompressed();
|
||||
if (messageCompressed) {
|
||||
coreMessage.putBooleanProperty(AMQ_MSG_COMPRESSED, messageCompressed);
|
||||
|
|
|
@ -193,9 +193,11 @@ public class AMQConsumer {
|
|||
return size;
|
||||
}
|
||||
catch (IOException e) {
|
||||
e.printStackTrace();
|
||||
return 0;
|
||||
}
|
||||
catch (Throwable t) {
|
||||
t.printStackTrace();
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,19 +78,24 @@ public class TopicRedeliverTest extends TestSupport {
|
|||
|
||||
TextMessage sent1 = producerSession.createTextMessage();
|
||||
sent1.setText("msg1");
|
||||
sent1.setStringProperty("str", "1");
|
||||
producer.send(sent1);
|
||||
|
||||
TextMessage sent2 = producerSession.createTextMessage();
|
||||
sent1.setText("msg2");
|
||||
sent2.setText("msg2");
|
||||
sent2.setStringProperty("str", "2");
|
||||
producer.send(sent2);
|
||||
|
||||
TextMessage sent3 = producerSession.createTextMessage();
|
||||
sent1.setText("msg3");
|
||||
sent2.setText("msg3");
|
||||
sent2.setStringProperty("str", "3");
|
||||
producer.send(sent3);
|
||||
|
||||
consumer.receive(RECEIVE_TIMEOUT);
|
||||
Message rec2 = consumer.receive(RECEIVE_TIMEOUT);
|
||||
consumer.receive(RECEIVE_TIMEOUT);
|
||||
TextMessage msgTest = (TextMessage)consumer.receive(RECEIVE_TIMEOUT);
|
||||
System.out.println("msgTest::" + msgTest + " // " + msgTest.getText());
|
||||
TextMessage rec2 = (TextMessage)consumer.receive(RECEIVE_TIMEOUT);
|
||||
System.out.println("msgTest::" + rec2 + " // " + rec2.getText());
|
||||
assertNull(consumer.receiveNoWait());
|
||||
|
||||
// ack rec2
|
||||
rec2.acknowledge();
|
||||
|
@ -99,10 +104,10 @@ public class TopicRedeliverTest extends TestSupport {
|
|||
sent4.setText("msg4");
|
||||
producer.send(sent4);
|
||||
|
||||
Message rec4 = consumer.receive(RECEIVE_TIMEOUT);
|
||||
TextMessage rec4 = (TextMessage)consumer.receive(RECEIVE_TIMEOUT);
|
||||
assertTrue(rec4.equals(sent4));
|
||||
consumerSession.recover();
|
||||
rec4 = consumer.receive(RECEIVE_TIMEOUT);
|
||||
rec4 = (TextMessage)consumer.receive(RECEIVE_TIMEOUT);
|
||||
assertTrue(rec4.equals(sent4));
|
||||
assertTrue(rec4.getJMSRedelivered());
|
||||
rec4.acknowledge();
|
||||
|
|
|
@ -92,6 +92,28 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendEmpty() throws Exception {
|
||||
try (Connection connection = factory.createConnection()) {
|
||||
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Queue queue = session.createQueue(queueName);
|
||||
System.out.println("Queue:" + queue);
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
producer.send(session.createTextMessage());
|
||||
|
||||
Assert.assertNull(consumer.receive(100));
|
||||
connection.start();
|
||||
|
||||
TextMessage message = (TextMessage) consumer.receive(5000);
|
||||
|
||||
Assert.assertNotNull(message);
|
||||
|
||||
message.acknowledge();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testXASimple() throws Exception {
|
||||
XAConnection connection = xaFactory.createXAConnection();
|
||||
|
|
Loading…
Reference in New Issue