mirror of https://github.com/apache/activemq.git
Patch applied for: https://issues.apache.org/activemq/browse/AMQ-1208
Test case added git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@548300 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
fae5c5fd29
commit
34a8028770
|
@ -36,40 +36,47 @@ import java.io.StringReader;
|
|||
import java.io.StringWriter;
|
||||
|
||||
/**
|
||||
* Transforms object messages to text messages using {@link XStream}
|
||||
* Transforms object messages to text messages and vice versa using {@link XStream}
|
||||
*
|
||||
* @version $Revision$
|
||||
*/
|
||||
public class XStreamMessageTransformer extends MessageTransformerSupport {
|
||||
private XStream xStream;
|
||||
|
||||
/**
|
||||
* Defines the direction in which transformation goes.
|
||||
* If 'false' (default),
|
||||
* - producer transformation transforms from Object to XML.
|
||||
* - consumer transformation transforms from XML to Object.
|
||||
* If 'true' transformation directions changes,
|
||||
* - producer transformation transforms from XML to Object.
|
||||
* - consumer transformation transforms from Object to XML.
|
||||
*/
|
||||
private boolean reverse;
|
||||
|
||||
public Message producerTransform(Session session, MessageProducer producer, Message message) throws JMSException {
|
||||
if (message instanceof ObjectMessage) {
|
||||
TextMessage answer = session.createTextMessage(marshall(session, producer, (ObjectMessage) message));
|
||||
copyProperties(message, answer);
|
||||
return answer;
|
||||
}
|
||||
return message;
|
||||
}
|
||||
public XStreamMessageTransformer() {
|
||||
new XStreamMessageTransformer(false);
|
||||
}
|
||||
|
||||
public XStreamMessageTransformer(boolean direction) {
|
||||
this.reverse = direction;
|
||||
}
|
||||
|
||||
public Message consumerTransform(Session session, MessageConsumer consumer, Message message) throws JMSException {
|
||||
if (message instanceof TextMessage) {
|
||||
TextMessage textMessage = (TextMessage) message;
|
||||
Object object = unmarshall(session, consumer, textMessage);
|
||||
if (object instanceof Serializable) {
|
||||
ObjectMessage answer = session.createObjectMessage((Serializable) object);
|
||||
copyProperties(message, answer);
|
||||
return answer;
|
||||
}
|
||||
else {
|
||||
throw new JMSException("Object is not serializable: " + object);
|
||||
}
|
||||
}
|
||||
return message;
|
||||
}
|
||||
if (reverse) {
|
||||
return objectToText(session, message);
|
||||
} else {
|
||||
return textToObject(session, message);
|
||||
}
|
||||
}
|
||||
|
||||
public Message producerTransform(Session session, MessageProducer producer, Message message) throws JMSException {
|
||||
if (reverse) {
|
||||
return textToObject(session, message);
|
||||
} else {
|
||||
return objectToText(session, message);
|
||||
}
|
||||
}
|
||||
|
||||
// Properties
|
||||
// -------------------------------------------------------------------------
|
||||
|
@ -90,10 +97,58 @@ public class XStreamMessageTransformer extends MessageTransformerSupport {
|
|||
return new XStream();
|
||||
}
|
||||
|
||||
|
||||
public boolean isReverse() {
|
||||
return reverse;
|
||||
}
|
||||
|
||||
public void setReverse(boolean reverse) {
|
||||
this.reverse = reverse;
|
||||
}
|
||||
|
||||
/**
|
||||
* Transforms an incoming XML encoded {@link TextMessage} to an {@link ObjectMessage}
|
||||
* @param session - JMS session currently being used
|
||||
* @param message - if this is a TextMessage, it will be transformed to an ObjectMessage
|
||||
* @return ObjectMessage, if the incoming message is a TextMessage, the original message otherwise
|
||||
* @throws JMSException
|
||||
*/
|
||||
protected Message textToObject(Session session, Message message) throws JMSException {
|
||||
if (message instanceof TextMessage) {
|
||||
TextMessage textMessage = (TextMessage) message;
|
||||
Object object = unmarshall(session, textMessage);
|
||||
if (object instanceof Serializable) {
|
||||
ObjectMessage answer = session.createObjectMessage((Serializable) object);
|
||||
copyProperties(message, answer);
|
||||
return answer;
|
||||
}
|
||||
else {
|
||||
throw new JMSException("Object is not serializable: " + object);
|
||||
}
|
||||
}
|
||||
return message;
|
||||
}
|
||||
|
||||
/**
|
||||
* Transforms an incoming {@link ObjectMessage} to an XML encoded {@link TextMessage}
|
||||
* @param session - JMS session currently being used
|
||||
* @param message - if this is an ObjectMessage, it will be transformed to a TextMessage
|
||||
* @return XML encoded TextMessage, if the incoming messsage is an ObjectMessge, the original message otherwise
|
||||
* @throws JMSException
|
||||
*/
|
||||
protected Message objectToText(Session session, Message message) throws JMSException {
|
||||
if (message instanceof ObjectMessage) {
|
||||
TextMessage answer = session.createTextMessage(marshall(session, (ObjectMessage) message));
|
||||
copyProperties(message, answer);
|
||||
return answer;
|
||||
}
|
||||
return message;
|
||||
}
|
||||
|
||||
/**
|
||||
* Marshalls the Object in the {@link ObjectMessage} to a string using XML encoding
|
||||
*/
|
||||
protected String marshall(Session session, MessageProducer producer, ObjectMessage objectMessage) throws JMSException {
|
||||
protected String marshall(Session session, ObjectMessage objectMessage) throws JMSException {
|
||||
Serializable object = objectMessage.getObject();
|
||||
StringWriter buffer = new StringWriter();
|
||||
HierarchicalStreamWriter out = new PrettyPrintWriter(buffer);
|
||||
|
@ -101,10 +156,11 @@ public class XStreamMessageTransformer extends MessageTransformerSupport {
|
|||
return buffer.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Unmarshalls the Object using XML encoding of the String
|
||||
|
||||
/**
|
||||
* Unmarshalls the XML encoded message in the {@link TextMessage} to an Object
|
||||
*/
|
||||
protected Object unmarshall(Session session, MessageConsumer consumer, TextMessage textMessage) throws JMSException {
|
||||
protected Object unmarshall(Session session, TextMessage textMessage) throws JMSException {
|
||||
HierarchicalStreamReader in = new XppReader(new StringReader(textMessage.getText()));
|
||||
return getXStream().unmarshal(in);
|
||||
}
|
||||
|
|
|
@ -40,6 +40,10 @@ public class XStreamTransformTest extends TestCase {
|
|||
protected long timeout = 5000;
|
||||
|
||||
public void testSendObjectMessageReceiveAsTextMessageAndObjectMessage() throws Exception {
|
||||
connectionFactory.setTransformer(new XStreamMessageTransformer());
|
||||
connection = connectionFactory.createConnection();
|
||||
connection.start();
|
||||
|
||||
// lets create the consumers
|
||||
Session objectSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = objectSession.createTopic(getClass().getName());
|
||||
|
@ -82,13 +86,57 @@ public class XStreamTransformTest extends TestCase {
|
|||
System.out.println(text);
|
||||
}
|
||||
|
||||
|
||||
protected void setUp() throws Exception {
|
||||
connectionFactory.setTransformer(new XStreamMessageTransformer());
|
||||
public void testSendTextMessageReceiveAsObjectMessageAndTextMessage() throws Exception {
|
||||
// Set reverse to true
|
||||
connectionFactory.setTransformer(new XStreamMessageTransformer(true));
|
||||
connection = connectionFactory.createConnection();
|
||||
connection.start();
|
||||
}
|
||||
|
||||
// lets create the consumers
|
||||
Session textSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
Destination destination = textSession.createTopic(getClass().getName());
|
||||
MessageConsumer textConsumer = textSession.createConsumer(destination);
|
||||
|
||||
Session objectSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageConsumer objectConsumer = objectSession.createConsumer(destination);
|
||||
// lets clear the transformer on this consumer so we see the message as it really is
|
||||
((ActiveMQMessageConsumer) objectConsumer).setTransformer(null);
|
||||
|
||||
|
||||
// send a message
|
||||
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
MessageProducer producer = producerSession.createProducer(destination);
|
||||
|
||||
String xmlText =
|
||||
"<org.apache.activemq.util.xstream.SamplePojo>" +
|
||||
"<name>James</name>" +
|
||||
"<city>London</city>" +
|
||||
"</org.apache.activemq.util.xstream.SamplePojo>";
|
||||
|
||||
TextMessage request = producerSession.createTextMessage(xmlText);
|
||||
producer.send(request);
|
||||
|
||||
Message message;
|
||||
// lets consume it as a text message
|
||||
message = textConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
|
||||
TextMessage textMessage = (TextMessage) message;
|
||||
String text = textMessage.getText();
|
||||
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
|
||||
|
||||
// lets consume it as an object message
|
||||
message = objectConsumer.receive(timeout);
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
|
||||
ObjectMessage objectMessage = (ObjectMessage) message;
|
||||
Object object = objectMessage.getObject();
|
||||
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
|
||||
SamplePojo body = (SamplePojo) object;
|
||||
assertEquals("name", "James", body.getName());
|
||||
assertEquals("city", "London", body.getCity());
|
||||
|
||||
}
|
||||
|
||||
protected void tearDown() throws Exception {
|
||||
if (connection != null) {
|
||||
|
|
Loading…
Reference in New Issue