ARTEMIS-1464 Fix Core to AMQP conversion BytesMessage corrupts bytes

Extend test cases in MessageTypesTest to cover Core to AMQP combinations for all JMSTypeTests
Fix ServerJMSBytesMessage to correctly return bodyLength
Remove unused/dead code
This commit is contained in:
Michael André Pearce 2017-10-17 09:20:34 +01:00 committed by Martyn Taylor
parent 89ccee6533
commit c1fcadb706
3 changed files with 166 additions and 61 deletions

View File

@ -404,16 +404,6 @@ public class CoreAmqpConverter {
return new Binary(data); return new Binary(data);
} }
private static Binary getBinaryFromMessageBody(ServerJMSTextMessage message) throws JMSException {
Binary result = null;
String text = message.getText();
if (text != null) {
result = new Binary(text.getBytes(StandardCharsets.UTF_8));
}
return result;
}
private static Binary getBinaryFromMessageBody(ServerJMSObjectMessage message) throws JMSException { private static Binary getBinaryFromMessageBody(ServerJMSObjectMessage message) throws JMSException {
message.getInnerMessage().getBodyBuffer().resetReaderIndex(); message.getInnerMessage().getBodyBuffer().resetReaderIndex();
int size = message.getInnerMessage().getBodyBuffer().readInt(); int size = message.getInnerMessage().getBodyBuffer().readInt();

View File

@ -20,7 +20,6 @@ import javax.jms.BytesMessage;
import javax.jms.JMSException; import javax.jms.JMSException;
import org.apache.activemq.artemis.api.core.ICoreMessage; import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesMessageReset; import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesMessageReset;
import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadBoolean; import static org.apache.activemq.artemis.reader.BytesMessageUtil.bytesReadBoolean;
@ -55,7 +54,7 @@ public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMess
@Override @Override
public long getBodyLength() throws JMSException { public long getBodyLength() throws JMSException {
return message.getEndOfBodyPosition() - CoreMessage.BODY_OFFSET; return message.getReadOnlyBodyBuffer().readableBytes();
} }
@Override @Override

View File

@ -49,6 +49,11 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
final int NUM_MESSAGES = 10; final int NUM_MESSAGES = 10;
@Override
protected String getConfiguredProtocols() {
return "AMQP,OPENWIRE,CORE";
}
@Test(timeout = 60000) @Test(timeout = 60000)
public void testAddressControlSendMessage() throws Exception { public void testAddressControlSendMessage() throws Exception {
SimpleString address = RandomUtil.randomSimpleString(); SimpleString address = RandomUtil.randomSimpleString();
@ -67,7 +72,7 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(address.toString()); javax.jms.Queue queue = session.createQueue(address.toString());
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(500); Message message = consumer.receive(5000);
assertNotNull(message); assertNotNull(message);
byte[] buffer = new byte[(int)((BytesMessage)message).getBodyLength()]; byte[] buffer = new byte[(int)((BytesMessage)message).getBodyLength()];
((BytesMessage)message).readBytes(buffer); ((BytesMessage)message).readBytes(buffer);
@ -112,12 +117,10 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
} }
} }
@Test(timeout = 60000) private void testBytesMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable {
public void testBytesMessageSendReceive() throws Throwable {
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
Connection connection = createConnection(); Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName()); Queue queue = session.createQueue(getQueueName());
byte[] bytes = new byte[0xf + 1]; byte[] bytes = new byte[0xf + 1];
@ -135,8 +138,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
producer.send(message); producer.send(message);
} }
Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = sessionConsumer.createConsumer(queue); Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue);
for (int i = 0; i < NUM_MESSAGES; i++) { for (int i = 0; i < NUM_MESSAGES; i++) {
BytesMessage m = (BytesMessage) consumer.receive(5000); BytesMessage m = (BytesMessage) consumer.receive(5000);
@ -158,11 +162,24 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
} }
@Test(timeout = 60000) @Test(timeout = 60000)
public void testMessageSendReceive() throws Throwable { public void testBytesMessageSendReceiveFromAMQPToAMQP() throws Throwable {
testBytesMessageSendReceive(createConnection(), createConnection());
}
@Test(timeout = 60000)
public void testBytesMessageSendReceiveFromCoreToAMQP() throws Throwable {
testBytesMessageSendReceive(createCoreConnection(), createConnection());
}
@Test(timeout = 60000)
public void testBytesMessageSendReceiveFromAMQPToCore() throws Throwable {
testBytesMessageSendReceive(createConnection(), createCoreConnection());
}
private void testMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable {
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
Connection connection = createConnection(); Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName()); Queue queue = session.createQueue(getQueueName());
byte[] bytes = new byte[0xf + 1]; byte[] bytes = new byte[0xf + 1];
@ -179,8 +196,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
producer.send(message); producer.send(message);
} }
Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = sessionConsumer.createConsumer(queue); Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue);
for (int i = 0; i < NUM_MESSAGES; i++) { for (int i = 0; i < NUM_MESSAGES; i++) {
Message m = consumer.receive(5000); Message m = consumer.receive(5000);
@ -192,11 +210,24 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
} }
@Test(timeout = 60000) @Test(timeout = 60000)
public void testMapMessageSendReceive() throws Throwable { public void testMessageSendReceiveFromAMQPToAMQP() throws Throwable {
testMessageSendReceive(createConnection(), createConnection());
}
@Test(timeout = 60000)
public void testMessageSendReceiveFromCoreToAMQP() throws Throwable {
testMessageSendReceive(createCoreConnection(), createConnection());
}
@Test(timeout = 60000)
public void testMessageSendReceiveFromAMQPToCore() throws Throwable {
testMessageSendReceive(createConnection(), createCoreConnection());
}
private void testMapMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable {
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
Connection connection = createConnection(); Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName()); Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
@ -209,8 +240,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
producer.send(message); producer.send(message);
} }
Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = sessionConsumer.createConsumer(queue); Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue);
for (int i = 0; i < NUM_MESSAGES; i++) { for (int i = 0; i < NUM_MESSAGES; i++) {
MapMessage m = (MapMessage) consumer.receive(5000); MapMessage m = (MapMessage) consumer.receive(5000);
@ -225,11 +257,24 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
} }
@Test(timeout = 60000) @Test(timeout = 60000)
public void testTextMessageSendReceive() throws Throwable { public void testMapMessageSendReceiveFromAMQPToAMQP() throws Throwable {
testMapMessageSendReceive(createConnection(), createConnection());
}
@Test(timeout = 60000)
public void testMapMessageSendReceiveFromCoreToAMQP() throws Throwable {
testMapMessageSendReceive(createCoreConnection(), createConnection());
}
@Test(timeout = 60000)
public void testMapMessageSendReceiveFromAMQPToCore() throws Throwable {
testMapMessageSendReceive(createConnection(), createCoreConnection());
}
private void testTextMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable {
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
Connection connection = createConnection(); Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName()); Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
@ -240,8 +285,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
producer.send(message); producer.send(message);
} }
Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = sessionConsumer.createConsumer(queue); Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue);
for (int i = 0; i < NUM_MESSAGES; i++) { for (int i = 0; i < NUM_MESSAGES; i++) {
TextMessage m = (TextMessage) consumer.receive(5000); TextMessage m = (TextMessage) consumer.receive(5000);
@ -254,9 +300,22 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
} }
@Test(timeout = 60000) @Test(timeout = 60000)
public void testStreamMessageSendReceive() throws Throwable { public void testTextMessageSendReceiveFromAMQPToAMQP() throws Throwable {
Connection connection = createConnection(); testTextMessageSendReceive(createConnection(), createConnection());
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); }
@Test(timeout = 60000)
public void testTextMessageSendReceiveFromCoreToAMQP() throws Throwable {
testTextMessageSendReceive(createCoreConnection(), createConnection());
}
@Test(timeout = 60000)
public void testTextMessageSendReceiveFromAMQPToCore() throws Throwable {
testTextMessageSendReceive(createConnection(), createCoreConnection());
}
private void testStreamMessageSendReceive(Connection producerConnection, Connection consumerConnection) throws Throwable {
Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName()); Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
@ -268,8 +327,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
producer.send(message); producer.send(message);
} }
Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = sessionConsumer.createConsumer(queue); Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue);
for (int i = 0; i < NUM_MESSAGES; i++) { for (int i = 0; i < NUM_MESSAGES; i++) {
StreamMessage m = (StreamMessage) consumer.receive(5000); StreamMessage m = (StreamMessage) consumer.receive(5000);
@ -282,22 +342,35 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
} }
@Test(timeout = 60000) @Test(timeout = 60000)
public void testObjectMessageWithArrayListPayload() throws Throwable { public void testStreamMessageSendReceiveFromAMQPToAMQP() throws Throwable {
testStreamMessageSendReceive(createConnection(), createConnection());
}
@Test(timeout = 60000)
public void testStreamMessageSendReceiveFromCoreToAMQP() throws Throwable {
testStreamMessageSendReceive(createCoreConnection(), createConnection());
}
@Test(timeout = 60000)
public void testStreamMessageSendReceiveFromAMQPToCore() throws Throwable {
testStreamMessageSendReceive(createConnection(), createCoreConnection());
}
private void testObjectMessageWithArrayListPayload(Connection producerConnection, Connection consumerConnection) throws Throwable {
ArrayList<String> payload = new ArrayList<>(); ArrayList<String> payload = new ArrayList<>();
payload.add("aString"); payload.add("aString");
Connection connection = createConnection(); Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName()); Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
ObjectMessage objectMessage = session.createObjectMessage(payload); ObjectMessage objectMessage = session.createObjectMessage(payload);
producer.send(objectMessage); producer.send(objectMessage);
session.close(); session.close();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer cons = session.createConsumer(queue); Queue consumerQueue = session.createQueue(getQueueName());
connection.start(); MessageConsumer cons = session.createConsumer(consumerQueue);
consumerConnection.start();
objectMessage = (ObjectMessage) cons.receive(5000); objectMessage = (ObjectMessage) cons.receive(5000);
assertNotNull(objectMessage); assertNotNull(objectMessage);
@ -305,15 +378,28 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
ArrayList<String> received = (ArrayList<String>) objectMessage.getObject(); ArrayList<String> received = (ArrayList<String>) objectMessage.getObject();
assertEquals(received.get(0), "aString"); assertEquals(received.get(0), "aString");
connection.close(); consumerConnection.close();
} }
@Test(timeout = 60000) @Test(timeout = 60000)
public void testObjectMessageUsingCustomType() throws Throwable { public void testObjectMessageWithArrayListPayloadFromAMQPToAMQP() throws Throwable {
testObjectMessageWithArrayListPayload(createConnection(), createConnection());
}
@Test(timeout = 60000)
public void testObjectMessageWithArrayListPayloadFromCoreToAMQP() throws Throwable {
testObjectMessageWithArrayListPayload(createCoreConnection(), createConnection());
}
@Test(timeout = 60000)
public void testObjectMessageWithArrayListPayloadFromAMQPToCore() throws Throwable {
testObjectMessageWithArrayListPayload(createConnection(), createCoreConnection());
}
private void testObjectMessageUsingCustomType(Connection producerConnection, Connection consumerConnection) throws Throwable {
long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
Connection connection = createConnection(); Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName()); Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
@ -323,8 +409,9 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
producer.send(message); producer.send(message);
} }
Session sessionConsumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session sessionConsumer = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = sessionConsumer.createConsumer(queue); Queue consumerQueue = sessionConsumer.createQueue(getQueueName());
final MessageConsumer consumer = sessionConsumer.createConsumer(consumerQueue);
for (int i = 0; i < NUM_MESSAGES; i++) { for (int i = 0; i < NUM_MESSAGES; i++) {
ObjectMessage msg = (ObjectMessage) consumer.receive(5000); ObjectMessage msg = (ObjectMessage) consumer.receive(5000);
@ -338,6 +425,21 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
System.out.println("taken = " + taken); System.out.println("taken = " + taken);
} }
@Test(timeout = 60000)
public void testObjectMessageUsingCustomTypeFromAMQPToAMQP() throws Throwable {
testObjectMessageUsingCustomType(createConnection(), createConnection());
}
@Test(timeout = 60000)
public void testObjectMessageUsingCustomTypeFromCoreToAMQP() throws Throwable {
testObjectMessageUsingCustomType(createCoreConnection(), createConnection());
}
@Test(timeout = 60000)
public void testObjectMessageUsingCustomTypeFromAMQPToCore() throws Throwable {
testObjectMessageUsingCustomType(createConnection(), createCoreConnection());
}
public static class AnythingSerializable implements Serializable { public static class AnythingSerializable implements Serializable {
private static final long serialVersionUID = 5972085029690947807L; private static final long serialVersionUID = 5972085029690947807L;
@ -352,10 +454,8 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
} }
} }
@Test(timeout = 60000) private void testPropertiesArePreserved(Connection producerConnection, Connection consumerConnection) throws Exception {
public void testPropertiesArePreserved() throws Exception { Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Connection connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getQueueName()); Queue queue = session.createQueue(getQueueName());
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
@ -372,9 +472,10 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
producer.send(message); producer.send(message);
producer.send(message); producer.send(message);
connection.start(); consumerConnection.start();
Session consumerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer messageConsumer = session.createConsumer(queue); Queue consumerQueue = session.createQueue(getQueueName());
MessageConsumer messageConsumer = consumerSession.createConsumer(consumerQueue);
TextMessage received = (TextMessage) messageConsumer.receive(5000); TextMessage received = (TextMessage) messageConsumer.receive(5000);
Assert.assertNotNull(received); Assert.assertNotNull(received);
Assert.assertEquals("msg:0", received.getText()); Assert.assertEquals("msg:0", received.getText());
@ -389,6 +490,21 @@ public class JMSMessageTypesTest extends JMSClientTestSupport {
received = (TextMessage) messageConsumer.receive(5000); received = (TextMessage) messageConsumer.receive(5000);
Assert.assertNotNull(received); Assert.assertNotNull(received);
connection.close(); consumerConnection.close();
}
@Test(timeout = 60000)
public void testPropertiesArePreservedFromAMQPToAMQP() throws Throwable {
testPropertiesArePreserved(createConnection(), createConnection());
}
@Test(timeout = 60000)
public void testPropertiesArePreservedFromCoreToAMQP() throws Throwable {
testPropertiesArePreserved(createCoreConnection(), createConnection());
}
@Test(timeout = 60000)
public void testPropertiesArePreservedFromAMQPToCore() throws Throwable {
testPropertiesArePreserved(createConnection(), createCoreConnection());
} }
} }