mirror of https://github.com/apache/activemq.git
added fix for AMQ-519 so that we explicitly force the redelivered message to be deserialized again for ObjectMessage instances to avoid mutable objects being changed, then rolled back
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@383627 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f12596648a
commit
d9b9427a5f
|
@ -692,7 +692,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
|
|||
|
||||
for (Iterator iter = deliveredMessages.iterator(); iter.hasNext();) {
|
||||
MessageDispatch md = (MessageDispatch) iter.next();
|
||||
md.getMessage().incrementRedeliveryCounter();
|
||||
md.getMessage().onMessageRolledBack();
|
||||
unconsumedMessages.enqueueFirst(md);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -677,7 +677,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
|
|||
getTransactionContext().addSynchronization(new Synchronization(){
|
||||
public void afterRollback() throws Exception {
|
||||
|
||||
md.getMessage().incrementRedeliveryCounter();
|
||||
md.getMessage().onMessageRolledBack();
|
||||
|
||||
RedeliveryPolicy redeliveryPolicy = connection.getRedeliveryPolicy();
|
||||
int redeliveryCounter = md.getMessage().getRedeliveryCounter();
|
||||
|
|
|
@ -29,18 +29,19 @@ import org.apache.activemq.util.JMSExceptionSupport;
|
|||
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.ObjectMessage;
|
||||
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.ObjectInputStream;
|
||||
import java.io.ObjectOutputStream;
|
||||
import java.io.ObjectStreamClass;
|
||||
import java.io.OutputStream;
|
||||
import java.io.Serializable;
|
||||
import java.io.ObjectStreamClass;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.util.zip.DeflaterOutputStream;
|
||||
import java.util.zip.InflaterInputStream;
|
||||
import java.lang.reflect.Proxy;
|
||||
|
||||
/**
|
||||
* An <CODE>ObjectMessage</CODE> object is used to send a message that contains a serializable object in the Java
|
||||
|
@ -174,6 +175,13 @@ public class ActiveMQObjectMessage extends ActiveMQMessage implements ObjectMess
|
|||
return this.object;
|
||||
}
|
||||
|
||||
public void onMessageRolledBack() {
|
||||
super.onMessageRolledBack();
|
||||
|
||||
// lets force the object to be deserialized again - as we could have changed the object
|
||||
object = null;
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
try {
|
||||
getObject();
|
||||
|
|
|
@ -611,5 +611,8 @@ abstract public class Message extends BaseCommand implements MarshallAware, Mess
|
|||
public void setRecievedByDFBridge(boolean recievedByDFBridge){
|
||||
this.recievedByDFBridge=recievedByDFBridge;
|
||||
}
|
||||
|
||||
|
||||
public void onMessageRolledBack() {
|
||||
incrementRedeliveryCounter();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,10 +30,12 @@ import javax.jms.JMSException;
|
|||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.ObjectMessage;
|
||||
import javax.jms.Session;
|
||||
import javax.jms.TextMessage;
|
||||
import javax.jms.MessageListener;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -432,9 +434,55 @@ abstract public class JmsTransactionTestSupport extends TestSupport implements M
|
|||
}
|
||||
|
||||
|
||||
public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception {
|
||||
ArrayList list = new ArrayList();
|
||||
list.add("First");
|
||||
Message outbound = session.createObjectMessage(list);
|
||||
outbound.setStringProperty("foo", "abc");
|
||||
|
||||
producer.send(outbound);
|
||||
session.commit();
|
||||
|
||||
log.info("About to consume message 1");
|
||||
Message message = consumer.receive(1000);
|
||||
|
||||
List body = assertReceivedObjectMessageWithListBody(message);
|
||||
|
||||
// now lets try mutate it
|
||||
try {
|
||||
message.setStringProperty("foo", "def");
|
||||
fail("Cannot change properties of the object!");
|
||||
}
|
||||
catch (JMSException e) {
|
||||
log.info("Caught expected exception: " + e, e);
|
||||
}
|
||||
body.clear();
|
||||
body.add("This should never be seen!");
|
||||
session.rollback();
|
||||
|
||||
message = consumer.receive(1000);
|
||||
List secondBody = assertReceivedObjectMessageWithListBody(message);
|
||||
assertNotSame("Second call should return a different body", secondBody, body);
|
||||
session.commit();
|
||||
}
|
||||
|
||||
protected List assertReceivedObjectMessageWithListBody(Message message) throws JMSException {
|
||||
assertNotNull("Should have received a message!", message);
|
||||
assertEquals("foo header", "abc", message.getStringProperty("foo"));
|
||||
|
||||
assertTrue("Should be an object message but was: " + message, message instanceof ObjectMessage);
|
||||
ObjectMessage objectMessage = (ObjectMessage) message;
|
||||
List body = (List) objectMessage.getObject();
|
||||
log.info("Received body: " + body);
|
||||
|
||||
assertEquals("Size of list should be 1", 1, body.size());
|
||||
assertEquals("element 0 of list", "First", body.get(0));
|
||||
return body;
|
||||
}
|
||||
|
||||
/**
|
||||
* Recreates the connection.
|
||||
*
|
||||
*
|
||||
* @throws JMSException
|
||||
*/
|
||||
protected void reconnect() throws JMSException {
|
||||
|
|
Loading…
Reference in New Issue