From d9b9427a5f79c0e2977a9466c0b0bcb5919e0023 Mon Sep 17 00:00:00 2001 From: James Strachan Date: Mon, 6 Mar 2006 19:10:32 +0000 Subject: [PATCH] added fix for AMQ-519 so that we explicitly force the redelivered message to be deserialized again for ObjectMessage instances to avoid mutable objects being changed, then rolled back git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@383627 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/ActiveMQMessageConsumer.java | 2 +- .../org/apache/activemq/ActiveMQSession.java | 2 +- .../command/ActiveMQObjectMessage.java | 12 ++++- .../org/apache/activemq/command/Message.java | 5 +- .../activemq/JmsTransactionTestSupport.java | 50 ++++++++++++++++++- 5 files changed, 65 insertions(+), 6 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 01cf717ce6..6e58951c97 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -692,7 +692,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) { MessageDispatch md = (MessageDispatch) iter.next(); - md.getMessage().incrementRedeliveryCounter(); + md.getMessage().onMessageRolledBack(); unconsumedMessages.enqueueFirst(md); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java index 484b837abc..bfb2af064d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -677,7 +677,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta getTransactionContext().addSynchronization(new Synchronization(){ public void afterRollback() throws Exception { - md.getMessage().incrementRedeliveryCounter(); + md.getMessage().onMessageRolledBack(); RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy(); int redeliveryCounter = md.getMessage().getRedeliveryCounter(); diff --git a/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java b/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java index bd35432594..b7c7e58616 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQObjectMessage.java @@ -29,18 +29,19 @@ import org.apache.activemq.util.JMSExceptionSupport; import javax.jms.JMSException; import javax.jms.ObjectMessage; + import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.io.ObjectStreamClass; import java.io.OutputStream; import java.io.Serializable; -import java.io.ObjectStreamClass; +import java.lang.reflect.Proxy; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; -import java.lang.reflect.Proxy; /** * An ObjectMessage object is used to send a message that contains a serializable object in the Java @@ -174,6 +175,13 @@ public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMess return this.object; } + public void onMessageRolledBack() { + super.onMessageRolledBack(); + + // lets force the object to be deserialized again - as we could have changed the object + object = null; + } + public String toString() { try { getObject(); diff --git a/activemq-core/src/main/java/org/apache/activemq/command/Message.java b/activemq-core/src/main/java/org/apache/activemq/command/Message.java index dcd73c41b0..e746726c1a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/command/Message.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/Message.java @@ -611,5 +611,8 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess public void setRecievedByDFBridge(boolean recievedByDFBridge){ this.recievedByDFBridge=recievedByDFBridge; } - + + public void onMessageRolledBack() { + incrementRedeliveryCounter(); + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java b/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java index f94c3dc344..104d3bc0cc 100755 --- a/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java +++ b/activemq-core/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java @@ -30,10 +30,12 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; +import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.MessageListener; +import java.io.Serializable; import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; @@ -432,9 +434,55 @@ abstract public class JmsTransactionTestSupport extends TestSupport implements M } + public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception { + ArrayList list = new ArrayList(); + list.add("First"); + Message outbound = session.createObjectMessage(list); + outbound.setStringProperty("foo", "abc"); + + producer.send(outbound); + session.commit(); + + log.info("About to consume message 1"); + Message message = consumer.receive(1000); + + List body = assertReceivedObjectMessageWithListBody(message); + + // now lets try mutate it + try { + message.setStringProperty("foo", "def"); + fail("Cannot change properties of the object!"); + } + catch (JMSException e) { + log.info("Caught expected exception: " + e, e); + } + body.clear(); + body.add("This should never be seen!"); + session.rollback(); + + message = consumer.receive(1000); + List secondBody = assertReceivedObjectMessageWithListBody(message); + assertNotSame("Second call should return a different body", secondBody, body); + session.commit(); + } + + protected List assertReceivedObjectMessageWithListBody(Message message) throws JMSException { + assertNotNull("Should have received a message!", message); + assertEquals("foo header", "abc", message.getStringProperty("foo")); + + assertTrue("Should be an object message but was: " + message, message instanceof ObjectMessage); + ObjectMessage objectMessage = (ObjectMessage) message; + List body = (List) objectMessage.getObject(); + log.info("Received body: " + body); + + assertEquals("Size of list should be 1", 1, body.size()); + assertEquals("element 0 of list", "First", body.get(0)); + return body; + } + /** * Recreates the connection. - * + * * @throws JMSException */ protected void reconnect() throws JMSException {