mirror of https://github.com/apache/activemq.git
Test for message property handling cross protocols.
Add a test that validates that message properties are propagated between OpenWire and AMQP.
This commit is contained in:
parent
94ca7039b1
commit
d563e9019d
|
@ -19,10 +19,13 @@ package org.apache.activemq.transport.amqp;
|
|||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.junit.Assume.assumeFalse;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Enumeration;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -34,6 +37,7 @@ import javax.jms.MessageConsumer;
|
|||
import javax.jms.MessageProducer;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
|
@ -75,6 +79,173 @@ public class JMSInteroperabilityTest extends JMSClientTestSupport {
|
|||
return transformer;
|
||||
}
|
||||
|
||||
//----- Tests for property handling between protocols --------------------//
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test(timeout = 60000)
|
||||
public void testMessagePropertiesArePreservedOpenWireToAMQP() throws Exception {
|
||||
|
||||
boolean bool = true;
|
||||
byte bValue = 127;
|
||||
short nShort = 10;
|
||||
int nInt = 5;
|
||||
long nLong = 333;
|
||||
float nFloat = 1;
|
||||
double nDouble = 100;
|
||||
Enumeration<String> propertyNames = null;
|
||||
String testMessageBody = "Testing msgPropertyExistTest";
|
||||
|
||||
Connection openwire = createJMSConnection();
|
||||
Connection amqp = createConnection();
|
||||
|
||||
openwire.start();
|
||||
amqp.start();
|
||||
|
||||
Session openwireSession = openwire.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Session amqpSession = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
Destination queue = openwireSession.createQueue(getDestinationName());
|
||||
|
||||
MessageProducer openwireProducer = openwireSession.createProducer(queue);
|
||||
MessageConsumer amqpConsumer = amqpSession.createConsumer(queue);
|
||||
|
||||
TextMessage outbound = openwireSession.createTextMessage();
|
||||
outbound.setText(testMessageBody);
|
||||
outbound.setBooleanProperty("Boolean", bool);
|
||||
outbound.setByteProperty("Byte", bValue);
|
||||
outbound.setShortProperty("Short", nShort);
|
||||
outbound.setIntProperty("Integer", nInt);
|
||||
outbound.setFloatProperty("Float", nFloat);
|
||||
outbound.setDoubleProperty("Double", nDouble);
|
||||
outbound.setStringProperty("String", "test");
|
||||
outbound.setLongProperty("Long", nLong);
|
||||
outbound.setObjectProperty("BooleanObject", Boolean.valueOf(bool));
|
||||
|
||||
openwireProducer.send(outbound);
|
||||
|
||||
Message inbound = amqpConsumer.receive(2500);
|
||||
|
||||
propertyNames = inbound.getPropertyNames();
|
||||
int propertyCount = 0;
|
||||
do {
|
||||
String propertyName = propertyNames.nextElement();
|
||||
|
||||
if (propertyName.indexOf("JMS") != 0) {
|
||||
propertyCount++;
|
||||
if (propertyName.equals("Boolean") || propertyName.equals("Byte") ||
|
||||
propertyName.equals("Integer") || propertyName.equals("Short") ||
|
||||
propertyName.equals("Float") || propertyName.equals("Double") ||
|
||||
propertyName.equals("String") || propertyName.equals("Long") ||
|
||||
propertyName.equals("BooleanObject")) {
|
||||
|
||||
LOG.debug("Appclication Property set by client is: {}", propertyName);
|
||||
if (!inbound.propertyExists(propertyName)) {
|
||||
assertTrue(inbound.propertyExists(propertyName));
|
||||
LOG.debug("Positive propertyExists test failed for {}", propertyName);
|
||||
} else if (inbound.propertyExists(propertyName + "1")) {
|
||||
LOG.debug("Negative propertyExists test failed for {} 1", propertyName);
|
||||
fail("Negative propertyExists test failed for " + propertyName + "1");
|
||||
}
|
||||
} else {
|
||||
LOG.debug("Appclication Property not set by client: {}", propertyName);
|
||||
fail("Appclication Property not set by client: " + propertyName);
|
||||
}
|
||||
} else {
|
||||
LOG.debug("JMSProperty Name is: {}", propertyName);
|
||||
}
|
||||
|
||||
} while (propertyNames.hasMoreElements());
|
||||
|
||||
amqp.close();
|
||||
openwire.close();
|
||||
|
||||
assertEquals("Unexpected number of properties in received message.", 9, propertyCount);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Test(timeout = 60000)
|
||||
public void testMessagePropertiesArePreservedAMQPToOpenWire() throws Exception {
|
||||
|
||||
// Raw Transformer doesn't expand message propeties.
|
||||
assumeFalse(transformer.equals("raw"));
|
||||
|
||||
boolean bool = true;
|
||||
byte bValue = 127;
|
||||
short nShort = 10;
|
||||
int nInt = 5;
|
||||
long nLong = 333;
|
||||
float nFloat = 1;
|
||||
double nDouble = 100;
|
||||
Enumeration<String> propertyNames = null;
|
||||
String testMessageBody = "Testing msgPropertyExistTest";
|
||||
|
||||
Connection openwire = createJMSConnection();
|
||||
Connection amqp = createConnection();
|
||||
|
||||
openwire.start();
|
||||
amqp.start();
|
||||
|
||||
Session openwireSession = openwire.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Session amqpSession = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
Destination queue = openwireSession.createQueue(getDestinationName());
|
||||
|
||||
MessageProducer amqpProducer = amqpSession.createProducer(queue);
|
||||
MessageConsumer openwireConsumer = openwireSession.createConsumer(queue);
|
||||
|
||||
TextMessage outbound = openwireSession.createTextMessage();
|
||||
outbound.setText(testMessageBody);
|
||||
outbound.setBooleanProperty("Boolean", bool);
|
||||
outbound.setByteProperty("Byte", bValue);
|
||||
outbound.setShortProperty("Short", nShort);
|
||||
outbound.setIntProperty("Integer", nInt);
|
||||
outbound.setFloatProperty("Float", nFloat);
|
||||
outbound.setDoubleProperty("Double", nDouble);
|
||||
outbound.setStringProperty("String", "test");
|
||||
outbound.setLongProperty("Long", nLong);
|
||||
outbound.setObjectProperty("BooleanObject", Boolean.valueOf(bool));
|
||||
|
||||
amqpProducer.send(outbound);
|
||||
|
||||
Message inbound = openwireConsumer.receive(2500);
|
||||
|
||||
propertyNames = inbound.getPropertyNames();
|
||||
int propertyCount = 0;
|
||||
do {
|
||||
String propertyName = propertyNames.nextElement();
|
||||
|
||||
if (propertyName.indexOf("JMS") != 0) {
|
||||
propertyCount++;
|
||||
if (propertyName.equals("Boolean") || propertyName.equals("Byte") ||
|
||||
propertyName.equals("Integer") || propertyName.equals("Short") ||
|
||||
propertyName.equals("Float") || propertyName.equals("Double") ||
|
||||
propertyName.equals("String") || propertyName.equals("Long") ||
|
||||
propertyName.equals("BooleanObject")) {
|
||||
|
||||
LOG.debug("Appclication Property set by client is: {}", propertyName);
|
||||
if (!inbound.propertyExists(propertyName)) {
|
||||
assertTrue(inbound.propertyExists(propertyName));
|
||||
LOG.debug("Positive propertyExists test failed for {}", propertyName);
|
||||
} else if (inbound.propertyExists(propertyName + "1")) {
|
||||
LOG.debug("Negative propertyExists test failed for {} 1", propertyName);
|
||||
fail("Negative propertyExists test failed for " + propertyName + "1");
|
||||
}
|
||||
} else {
|
||||
LOG.debug("Appclication Property not set by client: {}", propertyName);
|
||||
fail("Appclication Property not set by client: " + propertyName);
|
||||
}
|
||||
} else {
|
||||
LOG.debug("JMSProperty Name is: {}", propertyName);
|
||||
}
|
||||
|
||||
} while (propertyNames.hasMoreElements());
|
||||
|
||||
amqp.close();
|
||||
openwire.close();
|
||||
|
||||
assertEquals("Unexpected number of properties in received message.", 9, propertyCount);
|
||||
}
|
||||
|
||||
//----- Tests for OpenWire to Qpid JMS using MapMessage ------------------//
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
Loading…
Reference in New Issue