Fix for AMQ-1053. Added support for pluggable message transformation together with an implementation using XStream to marshall ObjectMessage instances as TextMessages. See the XStreamTransformTest to see how a producer can send ObjectMessages but then consumers can see either ObjectMessages or TextMessages depending on their requirements.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@475701 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-11-16 12:08:22 +00:00
parent 34fb326a0a
commit 86faaef7c9
11 changed files with 459 additions and 26 deletions

View File

@ -117,6 +117,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
// Configuration options variables
private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
private RedeliveryPolicy redeliveryPolicy;
private MessageTransformer transformer;
private boolean disableTimeStampsByDefault = false;
private boolean optimizedMessageDispatch = true;
private boolean copyMessageOnSend = true;
@ -873,7 +875,19 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
public void setSessionTaskRunner(TaskRunnerFactory sessionTaskRunner) {
this.sessionTaskRunner = sessionTaskRunner;
}
public MessageTransformer getTransformer() {
return transformer;
}
/**
* Sets the transformer used to transform messages before they are sent on to the JMS bus
* or when they are received from the bus but before they are delivered to the JMS client
*/
public void setTransformer(MessageTransformer transformer) {
this.transformer = transformer;
}
/**
* @return the statsEnabled
*/
@ -1450,7 +1464,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
/**
* @param command - the command to consume
* @param o - the command to consume
*/
public void onCommand(final Object o) {
final Command command = (Command) o;

View File

@ -72,6 +72,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
// optimization flags
private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
private MessageTransformer transformer;
private boolean disableTimeStampsByDefault = false;
private boolean optimizedMessageDispatch = true;
@ -256,6 +257,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
connection.setRedeliveryPolicy(getRedeliveryPolicy());
connection.setTransformer(getTransformer());
transport.start();
@ -446,6 +448,18 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
this.redeliveryPolicy = redeliveryPolicy;
}
public MessageTransformer getTransformer() {
return transformer;
}
/**
* Sets the transformer used to transform messages before they are sent on to the JMS bus
* or when they are received from the bus but before they are delivered to the JMS client
*/
public void setTransformer(MessageTransformer transformer) {
this.transformer = transformer;
}
public void buildFromProperties(Properties properties) {
if (properties == null) {

View File

@ -119,7 +119,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
private boolean optimizeAcknowledge;
private AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean();
private ExecutorService executorService = null;
private MessageTransformer transformer;
/**
* Create a MessageConsumer
*
@ -162,6 +163,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
this.session = session;
this.selector = selector;
this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
setTransformer(session.getTransformer());
this.info = new ConsumerInfo(consumerId);
this.info.setSubscriptionName(name);
@ -223,6 +225,18 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
this.redeliveryPolicy = redeliveryPolicy;
}
public MessageTransformer getTransformer() {
return transformer;
}
/**
* Sets the transformer used to transform messages before they are sent on to the JMS bus
*/
public void setTransformer(MessageTransformer transformer) {
this.transformer = transformer;
}
/**
* @return Returns the value.
*/
@ -435,8 +449,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
* @param md
* @return
*/
private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) {
private ActiveMQMessage createActiveMQMessage(final MessageDispatch md) throws JMSException {
ActiveMQMessage m = (ActiveMQMessage) md.getMessage().copy();
if (transformer != null) {
Message transformedMessage = transformer.consumerTransform(session, this, m);
if (transformedMessage != null) {
m = ActiveMQMessageTransformation.transformMessage(transformedMessage, session.connection);
}
}
if (session.isClientAcknowledge()) {
m.setAcknowledgeCallback(new Callback() {
public void execute() throws Exception {
@ -538,7 +558,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
public void close() throws JMSException {
if (!unconsumedMessages.isClosed()) {
dispose();
this.session.syncSendPacket(info.createRemoveCommand());
this.session.asyncSendPacket(info.createRemoveCommand());
}
}

View File

@ -82,6 +82,7 @@ public class ActiveMQMessageProducer implements MessageProducer, StatsCapable, C
private int defaultPriority;
private long defaultTimeToLive;
private long startTime;
private MessageTransformer transformer;
protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination)
throws JMSException {
@ -98,6 +99,7 @@ public class ActiveMQMessageProducer implements MessageProducer, StatsCapable, C
this.stats = new JMSProducerStatsImpl(session.getSessionStats(), destination);
this.session.addProducer(this);
this.session.asyncSendPacket(info);
setTransformer(session.getTransformer());
}
public StatsImpl getStats() {
@ -461,11 +463,29 @@ public class ActiveMQMessageProducer implements MessageProducer, StatsCapable, C
if (dest == null) {
throw new JMSException("No destination specified");
}
if (transformer != null) {
Message transformedMessage = transformer.producerTransform(session, this, message);
if (transformedMessage != null) {
message = transformedMessage;
}
}
this.session.send(this, dest, message, deliveryMode, priority, timeToLive);
stats.onMessage();
}
public MessageTransformer getTransformer() {
return transformer;
}
/**
* Sets the transformer used to transform messages before they are sent on to the JMS bus
*/
public void setTransformer(MessageTransformer transformer) {
this.transformer = transformer;
}
/**
* @return the time in milli second when this object was created.
*/

View File

@ -168,26 +168,37 @@ public class ActiveMQMessageTransformation {
activeMessage.setConnection(connection);
}
activeMessage.setJMSMessageID(message.getJMSMessageID());
activeMessage.setJMSCorrelationID(message.getJMSCorrelationID());
activeMessage.setJMSReplyTo(transformDestination(message.getJMSReplyTo()));
activeMessage.setJMSDestination(transformDestination(message.getJMSDestination()));
activeMessage.setJMSDeliveryMode(message.getJMSDeliveryMode());
activeMessage.setJMSRedelivered(message.getJMSRedelivered());
activeMessage.setJMSType(message.getJMSType());
activeMessage.setJMSExpiration(message.getJMSExpiration());
activeMessage.setJMSPriority(message.getJMSPriority());
activeMessage.setJMSTimestamp(message.getJMSTimestamp());
Enumeration propertyNames = message.getPropertyNames();
while (propertyNames.hasMoreElements()) {
String name = propertyNames.nextElement().toString();
Object obj = message.getObjectProperty(name);
activeMessage.setObjectProperty(name, obj);
}
copyProperties(message, activeMessage);
return activeMessage;
}
}
/**
* Copies the standard JMS and user defined properties from the givem message to the specified message
*
* @param fromMessage the message to take the properties from
* @param toMesage the message to add the properties to
* @throws JMSException
*/
public static void copyProperties(Message fromMessage, Message toMesage) throws JMSException {
toMesage.setJMSMessageID(fromMessage.getJMSMessageID());
toMesage.setJMSCorrelationID(fromMessage.getJMSCorrelationID());
toMesage.setJMSReplyTo(transformDestination(fromMessage.getJMSReplyTo()));
toMesage.setJMSDestination(transformDestination(fromMessage.getJMSDestination()));
toMesage.setJMSDeliveryMode(fromMessage.getJMSDeliveryMode());
toMesage.setJMSRedelivered(fromMessage.getJMSRedelivered());
toMesage.setJMSType(fromMessage.getJMSType());
toMesage.setJMSExpiration(fromMessage.getJMSExpiration());
toMesage.setJMSPriority(fromMessage.getJMSPriority());
toMesage.setJMSTimestamp(fromMessage.getJMSTimestamp());
Enumeration propertyNames = fromMessage.getPropertyNames();
while (propertyNames.hasMoreElements()) {
String name = propertyNames.nextElement().toString();
Object obj = fromMessage.getObjectProperty(name);
toMesage.setObjectProperty(name, obj);
}
}
}

View File

@ -183,7 +183,8 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
private JMSSessionStatsImpl stats;
private TransactionContext transactionContext;
private DeliveryListener deliveryListener;
private MessageTransformer transformer;
protected final ActiveMQConnection connection;
protected final SessionInfo info;
protected final LongSequenceGenerator consumerIdGenerator = new LongSequenceGenerator();
@ -224,6 +225,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
connection.addSession(this);
stats = new JMSSessionStatsImpl(producers, consumers);
this.connection.asyncSendPacket(info);
setTransformer(connection.getTransformer());
if( connection.isStarted() )
start();
@ -1702,7 +1704,19 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
this.sessionAsyncDispatch=sessionAsyncDispatch;
}
public List getUnconsumedMessages() {
public MessageTransformer getTransformer() {
return transformer;
}
/**
* Sets the transformer used to transform messages before they are sent on to the JMS bus
* or when they are received from the bus but before they are delivered to the JMS client
*/
public void setTransformer(MessageTransformer transformer) {
this.transformer = transformer;
}
public List getUnconsumedMessages() {
return executor.getUnconsumedMessages();
}

View File

@ -0,0 +1,45 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import org.apache.activemq.command.ActiveMQMessage;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.MessageProducer;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
/**
* A plugin strategy for transforming a message before it is sent by the JMS client or before it is
* dispatched to the JMS consumer
*
* @version $Revision$
*/
public interface MessageTransformer {
/**
* Transforms the given message inside the producer before it is sent to the JMS bus.
*/
public Message producerTransform(Session session, MessageProducer producer, Message message) throws JMSException;
/**
* Transforms the given message inside the consumer before being dispatched to the client code
*/
public Message consumerTransform(Session session, MessageConsumer consumer, Message message)throws JMSException;
}

View File

@ -0,0 +1,40 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import javax.jms.JMSException;
import javax.jms.Message;
/**
* A useful base class for message transformers.
*
* @version $Revision$
*/
public abstract class MessageTransformerSupport implements MessageTransformer {
/**
* Copies the standard JMS and user defined properties from the givem message to the specified message
*
* @param fromMessage the message to take the properties from
* @param toMesage the message to add the properties to
* @throws JMSException
*/
protected void copyProperties(Message fromMessage, Message toMesage) throws JMSException {
ActiveMQMessageTransformation.copyProperties(fromMessage, toMesage);
}
}

View File

@ -0,0 +1,112 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util.xstream;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.HierarchicalStreamReader;
import com.thoughtworks.xstream.io.HierarchicalStreamWriter;
import com.thoughtworks.xstream.io.xml.PrettyPrintWriter;
import com.thoughtworks.xstream.io.xml.XppReader;
import org.apache.activemq.MessageTransformerSupport;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.Serializable;
import java.io.StringReader;
import java.io.StringWriter;
/**
* Transforms object messages to text messages using {@link XStream}
*
* @version $Revision$
*/
public class XStreamMessageTransformer extends MessageTransformerSupport {
private XStream xStream;
public Message producerTransform(Session session, MessageProducer producer, Message message) throws JMSException {
if (message instanceof ObjectMessage) {
TextMessage answer = session.createTextMessage(marshall(session, producer, (ObjectMessage) message));
copyProperties(message, answer);
return answer;
}
return message;
}
public Message consumerTransform(Session session, MessageConsumer consumer, Message message) throws JMSException {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
Object object = unmarshall(session, consumer, textMessage);
if (object instanceof Serializable) {
ObjectMessage answer = session.createObjectMessage((Serializable) object);
copyProperties(message, answer);
return answer;
}
else {
throw new JMSException("Object is not serializable: " + object);
}
}
return message;
}
// Properties
// -------------------------------------------------------------------------
public XStream getXStream() {
if (xStream == null) {
xStream = createXStream();
}
return xStream;
}
public void setXStream(XStream xStream) {
this.xStream = xStream;
}
// Implementation methods
// -------------------------------------------------------------------------
protected XStream createXStream() {
return new XStream();
}
/**
* Marshalls the Object in the {@link ObjectMessage} to a string using XML encoding
*/
protected String marshall(Session session, MessageProducer producer, ObjectMessage objectMessage) throws JMSException {
Serializable object = objectMessage.getObject();
StringWriter buffer = new StringWriter();
HierarchicalStreamWriter out = new PrettyPrintWriter(buffer);
getXStream().marshal(object, out);
return buffer.toString();
}
/**
* Unmarshalls the Object using XML encoding of the String
*/
protected Object unmarshall(Session session, MessageConsumer consumer, TextMessage textMessage) throws JMSException {
HierarchicalStreamReader in = new XppReader(new StringReader(textMessage.getText()));
return getXStream().unmarshal(in);
}
}

View File

@ -0,0 +1,53 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util.xstream;
import java.io.Serializable;
/**
* @version $Revision$
*/
public class SamplePojo implements Serializable {
private String name;
private String city;
public SamplePojo() {
}
public SamplePojo(String name, String city) {
this.name = name;
this.city = city;
}
public String getCity() {
return city;
}
public void setCity(String city) {
this.city = city;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}

View File

@ -0,0 +1,90 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.util.xstream;
import junit.framework.TestCase;
import javax.jms.*;
import org.apache.activemq.*;
/**
* @version $Revision$
*/
public class XStreamTransformTest extends TestCase {
protected ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
protected Connection connection;
protected long timeout = 5000;
public void testSendObjectMessageReceiveAsTextMessageAndObjectMessage() throws Exception {
// lets create the consumers
Session objectSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = objectSession.createTopic(getClass().getName());
MessageConsumer objectConsumer = objectSession.createConsumer(destination);
Session textSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer textConsumer = textSession.createConsumer(destination);
// lets clear the transformer on this consumer so we see the message as it really is
((ActiveMQMessageConsumer) textConsumer).setTransformer(null);
// send a message
Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = producerSession.createProducer(destination);
ObjectMessage request = producerSession.createObjectMessage(new SamplePojo("James", "London"));
producer.send(request);
// lets consume it as an object message
Message message = objectConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be an ObjectMessage but was: " + message, message instanceof ObjectMessage);
ObjectMessage objectMessage = (ObjectMessage) message;
Object object = objectMessage.getObject();
assertTrue("object payload of wrong type: " + object, object instanceof SamplePojo);
SamplePojo body = (SamplePojo) object;
assertEquals("name", "James", body.getName());
assertEquals("city", "London", body.getCity());
// lets consume it as a text message
message = textConsumer.receive(timeout);
assertNotNull("Should have received a message!", message);
assertTrue("Should be a TextMessage but was: " + message, message instanceof TextMessage);
TextMessage textMessage = (TextMessage) message;
String text = textMessage.getText();
assertTrue("Text should be non-empty!", text != null && text.length() > 0);
System.out.println("Received XML...");
System.out.println(text);
}
protected void setUp() throws Exception {
connectionFactory.setTransformer(new XStreamMessageTransformer());
connection = connectionFactory.createConnection();
connection.start();
}
protected void tearDown() throws Exception {
if (connection != null) {
connection.close();
}
}
}