mirror of https://github.com/apache/activemq.git
Added an adaptive message transformation. More details here: https://issues.apache.org/activemq/browse/AMQ-1208
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@548562 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
34a8028770
commit
d2fe512fa4
|
@ -44,38 +44,69 @@ public class XStreamMessageTransformer extends MessageTransformerSupport {
|
||||||
private XStream xStream;
|
private XStream xStream;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Defines the direction in which transformation goes.
|
* Defines the type of transformation.
|
||||||
* If 'false' (default),
|
* If XML (default),
|
||||||
* - producer transformation transforms from Object to XML.
|
* - producer transformation transforms from Object to XML.
|
||||||
* - consumer transformation transforms from XML to Object.
|
* - consumer transformation transforms from XML to Object.
|
||||||
* If 'true' transformation directions changes,
|
* If OBJECT,
|
||||||
* - producer transformation transforms from XML to Object.
|
* - producer transformation transforms from XML to Object.
|
||||||
* - consumer transformation transforms from Object to XML.
|
* - consumer transformation transforms from Object to XML.
|
||||||
|
* If ADAPTIVE,
|
||||||
|
* - producer transformation transforms from Object to XML, or XML to Object
|
||||||
|
* depending on the type of the original message
|
||||||
|
* - consumer transformation transforms from XML to Object, or Object to XML
|
||||||
|
* depending on the type of the original message
|
||||||
*/
|
*/
|
||||||
private boolean reverse;
|
public enum MessageTransform {XML, OBJECT, ADAPTIVE};
|
||||||
|
|
||||||
public XStreamMessageTransformer() {
|
protected MessageTransform transformType;
|
||||||
new XStreamMessageTransformer(false);
|
|
||||||
|
public XStreamMessageTransformer() {
|
||||||
|
this(MessageTransform.XML);
|
||||||
}
|
}
|
||||||
|
|
||||||
public XStreamMessageTransformer(boolean direction) {
|
public XStreamMessageTransformer(MessageTransform transformType) {
|
||||||
this.reverse = direction;
|
this.transformType = transformType;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Message consumerTransform(Session session, MessageConsumer consumer, Message message) throws JMSException {
|
public Message consumerTransform(Session session, MessageConsumer consumer, Message message) throws JMSException {
|
||||||
if (reverse) {
|
switch (transformType) {
|
||||||
return objectToText(session, message);
|
case XML:
|
||||||
} else {
|
return (message instanceof TextMessage) ?
|
||||||
return textToObject(session, message);
|
textToObject(session, (TextMessage)message) :
|
||||||
}
|
message;
|
||||||
}
|
case OBJECT:
|
||||||
|
return (message instanceof ObjectMessage) ?
|
||||||
|
objectToText(session, (ObjectMessage)message) :
|
||||||
|
message;
|
||||||
|
case ADAPTIVE:
|
||||||
|
return (message instanceof TextMessage) ?
|
||||||
|
textToObject(session, (TextMessage)message) :
|
||||||
|
(message instanceof ObjectMessage) ?
|
||||||
|
objectToText(session, (ObjectMessage)message) :
|
||||||
|
message;
|
||||||
|
}
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
|
||||||
public Message producerTransform(Session session, MessageProducer producer, Message message) throws JMSException {
|
public Message producerTransform(Session session, MessageProducer producer, Message message) throws JMSException {
|
||||||
if (reverse) {
|
switch (transformType) {
|
||||||
return textToObject(session, message);
|
case XML:
|
||||||
} else {
|
return (message instanceof ObjectMessage) ?
|
||||||
return objectToText(session, message);
|
objectToText(session, (ObjectMessage)message) :
|
||||||
}
|
message;
|
||||||
|
case OBJECT:
|
||||||
|
return (message instanceof TextMessage) ?
|
||||||
|
textToObject(session, (TextMessage)message) :
|
||||||
|
message;
|
||||||
|
case ADAPTIVE:
|
||||||
|
return (message instanceof TextMessage) ?
|
||||||
|
textToObject(session, (TextMessage)message) :
|
||||||
|
(message instanceof ObjectMessage) ?
|
||||||
|
objectToText(session, (ObjectMessage)message) :
|
||||||
|
message;
|
||||||
|
}
|
||||||
|
return message;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Properties
|
// Properties
|
||||||
|
@ -97,52 +128,44 @@ public class XStreamMessageTransformer extends MessageTransformerSupport {
|
||||||
return new XStream();
|
return new XStream();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public MessageTransform getTransformType() {
|
||||||
public boolean isReverse() {
|
return transformType;
|
||||||
return reverse;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setReverse(boolean reverse) {
|
public void setTransformType(MessageTransform transformType) {
|
||||||
this.reverse = reverse;
|
this.transformType = transformType;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Transforms an incoming XML encoded {@link TextMessage} to an {@link ObjectMessage}
|
* Transforms an incoming XML encoded {@link TextMessage} to an {@link ObjectMessage}
|
||||||
* @param session - JMS session currently being used
|
* @param session - JMS session currently being used
|
||||||
* @param message - if this is a TextMessage, it will be transformed to an ObjectMessage
|
* @param textMessage - text message to transform to object message
|
||||||
* @return ObjectMessage, if the incoming message is a TextMessage, the original message otherwise
|
* @return ObjectMessage
|
||||||
* @throws JMSException
|
* @throws JMSException
|
||||||
*/
|
*/
|
||||||
protected Message textToObject(Session session, Message message) throws JMSException {
|
protected ObjectMessage textToObject(Session session, TextMessage textMessage) throws JMSException {
|
||||||
if (message instanceof TextMessage) {
|
Object object = unmarshall(session, textMessage);
|
||||||
TextMessage textMessage = (TextMessage) message;
|
if (object instanceof Serializable) {
|
||||||
Object object = unmarshall(session, textMessage);
|
ObjectMessage answer = session.createObjectMessage((Serializable) object);
|
||||||
if (object instanceof Serializable) {
|
copyProperties(textMessage, answer);
|
||||||
ObjectMessage answer = session.createObjectMessage((Serializable) object);
|
return answer;
|
||||||
copyProperties(message, answer);
|
}
|
||||||
return answer;
|
else {
|
||||||
}
|
throw new JMSException("Object is not serializable: " + object);
|
||||||
else {
|
|
||||||
throw new JMSException("Object is not serializable: " + object);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return message;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Transforms an incoming {@link ObjectMessage} to an XML encoded {@link TextMessage}
|
* Transforms an incoming {@link ObjectMessage} to an XML encoded {@link TextMessage}
|
||||||
* @param session - JMS session currently being used
|
* @param session - JMS session currently being used
|
||||||
* @param message - if this is an ObjectMessage, it will be transformed to a TextMessage
|
* @param objectMessage - object message to transform to text message
|
||||||
* @return XML encoded TextMessage, if the incoming messsage is an ObjectMessge, the original message otherwise
|
* @return XML encoded TextMessage
|
||||||
* @throws JMSException
|
* @throws JMSException
|
||||||
*/
|
*/
|
||||||
protected Message objectToText(Session session, Message message) throws JMSException {
|
protected TextMessage objectToText(Session session, ObjectMessage objectMessage) throws JMSException {
|
||||||
if (message instanceof ObjectMessage) {
|
TextMessage answer = session.createTextMessage(marshall(session, objectMessage));
|
||||||
TextMessage answer = session.createTextMessage(marshall(session, (ObjectMessage) message));
|
copyProperties(objectMessage, answer);
|
||||||
copyProperties(message, answer);
|
return answer;
|
||||||
return answer;
|
|
||||||
}
|
|
||||||
return message;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -31,6 +31,8 @@ import junit.framework.TestCase;
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.ActiveMQMessageConsumer;
|
import org.apache.activemq.ActiveMQMessageConsumer;
|
||||||
|
|
||||||
|
import static org.apache.activemq.util.xstream.XStreamMessageTransformer.MessageTransform.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @version $Revision$
|
* @version $Revision$
|
||||||
*/
|
*/
|
||||||
|
@ -40,7 +42,7 @@ public class XStreamTransformTest extends TestCase {
|
||||||
protected long timeout = 5000;
|
protected long timeout = 5000;
|
||||||
|
|
||||||
public void testSendObjectMessageReceiveAsTextMessageAndObjectMessage() throws Exception {
|
public void testSendObjectMessageReceiveAsTextMessageAndObjectMessage() throws Exception {
|
||||||
connectionFactory.setTransformer(new XStreamMessageTransformer());
|
connectionFactory.setTransformer(new XStreamMessageTransformer(XML));
|
||||||
connection = connectionFactory.createConnection();
|
connection = connectionFactory.createConnection();
|
||||||
connection.start();
|
connection.start();
|
||||||
|
|
||||||
|
@ -87,8 +89,7 @@ public class XStreamTransformTest extends TestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testSendTextMessageReceiveAsObjectMessageAndTextMessage() throws Exception {
|
public void testSendTextMessageReceiveAsObjectMessageAndTextMessage() throws Exception {
|
||||||
// Set reverse to true
|
connectionFactory.setTransformer(new XStreamMessageTransformer(OBJECT));
|
||||||
connectionFactory.setTransformer(new XStreamMessageTransformer(true));
|
|
||||||
connection = connectionFactory.createConnection();
|
connection = connectionFactory.createConnection();
|
||||||
connection.start();
|
connection.start();
|
||||||
|
|
||||||
|
@ -138,6 +139,89 @@ public class XStreamTransformTest extends TestCase {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testAdaptiveTransform() throws Exception {
|
||||||
|
connectionFactory.setTransformer(new XStreamMessageTransformer(ADAPTIVE));
|
||||||
|
connection = connectionFactory.createConnection();
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
// lets create the consumers
|
||||||
|
Session adaptiveSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Destination destination = adaptiveSession.createTopic(getClass().getName());
|
||||||
|
MessageConsumer adaptiveConsumer = adaptiveSession.createConsumer(destination);
|
||||||
|
|
||||||
|
Session origSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageConsumer origConsumer = origSession.createConsumer(destination);
|
||||||
|
// lets clear the transformer on this consumer so we see the message as it really is
|
||||||
|
((ActiveMQMessageConsumer) origConsumer).setTransformer(null);
|
||||||
|
|
||||||
|
// Create producer
|
||||||
|
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
MessageProducer producer = producerSession.createProducer(destination);
|
||||||
|
|
||||||
|
Message message;
|
||||||
|
ObjectMessage objectMessage;
|
||||||
|
TextMessage textMessage;
|
||||||
|
SamplePojo body;
|
||||||
|
Object object;
|
||||||
|
String text;
|
||||||
|
|
||||||
|
// Send a text message
|
||||||
|
String xmlText =
|
||||||
|
"<org.apache.activemq.util.xstream.SamplePojo>" +
|
||||||
|
"<name>James</name>" +
|
||||||
|
"<city>London</city>" +
|
||||||
|
"</org.apache.activemq.util.xstream.SamplePojo>";
|
||||||
|
|
||||||
|
TextMessage txtRequest = producerSession.createTextMessage(xmlText);
|
||||||
|
producer.send(txtRequest);
|
||||||
|
|
||||||
|
// lets consume it as a text message
|
||||||
|
message = adaptiveConsumer.receive(timeout);
|
||||||
|
assertNotNull("Should have received a message!", message);
|
||||||
|
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
|
||||||
|
textMessage = (TextMessage) message;
|
||||||
|
text = textMessage.getText();
|
||||||
|
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
|
||||||
|
|
||||||
|
// lets consume it as an object message
|
||||||
|
message = origConsumer.receive(timeout);
|
||||||
|
assertNotNull("Should have received a message!", message);
|
||||||
|
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
|
||||||
|
objectMessage = (ObjectMessage) message;
|
||||||
|
object = objectMessage.getObject();
|
||||||
|
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
|
||||||
|
body = (SamplePojo) object;
|
||||||
|
assertEquals("name", "James", body.getName());
|
||||||
|
assertEquals("city", "London", body.getCity());
|
||||||
|
|
||||||
|
// Send object message
|
||||||
|
ObjectMessage objRequest = producerSession.createObjectMessage(new SamplePojo("James", "London"));
|
||||||
|
producer.send(objRequest);
|
||||||
|
|
||||||
|
// lets consume it as an object message
|
||||||
|
message = adaptiveConsumer.receive(timeout);
|
||||||
|
assertNotNull("Should have received a message!", message);
|
||||||
|
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
|
||||||
|
objectMessage = (ObjectMessage) message;
|
||||||
|
object = objectMessage.getObject();
|
||||||
|
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
|
||||||
|
body = (SamplePojo) object;
|
||||||
|
assertEquals("name", "James", body.getName());
|
||||||
|
assertEquals("city", "London", body.getCity());
|
||||||
|
|
||||||
|
|
||||||
|
// lets consume it as a text message
|
||||||
|
message = origConsumer.receive(timeout);
|
||||||
|
assertNotNull("Should have received a message!", message);
|
||||||
|
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
|
||||||
|
textMessage = (TextMessage) message;
|
||||||
|
text = textMessage.getText();
|
||||||
|
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
|
||||||
|
System.out.println("Received XML...");
|
||||||
|
System.out.println(text);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
protected void tearDown() throws Exception {
|
protected void tearDown() throws Exception {
|
||||||
if (connection != null) {
|
if (connection != null) {
|
||||||
connection.close();
|
connection.close();
|
||||||
|
|
Loading…
Reference in New Issue