git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1356927 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-07-03 20:45:12 +00:00
parent bb1e246c1b
commit a3836b562d
4 changed files with 154 additions and 9 deletions

View File

@ -55,6 +55,7 @@ import javax.jms.XAConnection;
import org.apache.activemq.advisory.DestinationSource;
import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTempDestination;
@ -131,7 +132,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
// Configuration options variables
private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
private BlobTransferPolicy blobTransferPolicy;
private RedeliveryPolicy redeliveryPolicy;
private RedeliveryPolicyMap redeliveryPolicyMap;
private MessageTransformer transformer;
private boolean disableTimeStampsByDefault;
@ -1644,14 +1645,14 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
* @throws JMSException
*/
public RedeliveryPolicy getRedeliveryPolicy() throws JMSException {
return redeliveryPolicy;
return redeliveryPolicyMap.getDefaultEntry();
}
/**
* Sets the redelivery policy to be used when messages are rolled back
*/
public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
this.redeliveryPolicy = redeliveryPolicy;
this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
}
public BlobTransferPolicy getBlobTransferPolicy() {
@ -2549,4 +2550,22 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
}
}
/**
* Sets the Connection wide RedeliveryPolicyMap for handling messages that are being rolled back.
* @param redeliveryPolicyMap the redeliveryPolicyMap to set
*/
public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
this.redeliveryPolicyMap = redeliveryPolicyMap;
}
/**
* Gets the Connection's configured RedeliveryPolicyMap which will be used by all the
* Consumers when dealing with transaction messages that have been rolled back.
*
* @return the redeliveryPolicyMap
*/
public RedeliveryPolicyMap getRedeliveryPolicyMap() {
return redeliveryPolicyMap;
}
}

View File

@ -36,6 +36,7 @@ import javax.jms.TopicConnectionFactory;
import javax.naming.Context;
import org.apache.activemq.blob.BlobTransferPolicy;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.jndi.JNDIBaseStorable;
import org.apache.activemq.management.JMSStatsImpl;
import org.apache.activemq.management.StatsCapable;
@ -90,7 +91,10 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
// client policies
private ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
private RedeliveryPolicyMap redeliveryPolicyMap = new RedeliveryPolicyMap();
{
redeliveryPolicyMap.setDefaultEntry(new RedeliveryPolicy());
}
private BlobTransferPolicy blobTransferPolicy = new BlobTransferPolicy();
private MessageTransformer transformer;
@ -317,7 +321,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
connection.setOptimizeAcknowledgeTimeOut(getOptimizeAcknowledgeTimeOut());
connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
connection.setExclusiveConsumer(isExclusiveConsumer());
connection.setRedeliveryPolicy(getRedeliveryPolicy());
connection.setRedeliveryPolicyMap(getRedeliveryPolicyMap());
connection.setTransformer(getTransformer());
connection.setBlobTransferPolicy(getBlobTransferPolicy().copy());
connection.setWatchTopicAdvisories(isWatchTopicAdvisories());
@ -577,15 +581,27 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
}
public RedeliveryPolicy getRedeliveryPolicy() {
return redeliveryPolicy;
return redeliveryPolicyMap.getDefaultEntry();
}
/**
* Sets the global redelivery policy to be used when a message is delivered
* Sets the global default redelivery policy to be used when a message is delivered
* but the session is rolled back
*/
public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
this.redeliveryPolicy = redeliveryPolicy;
this.redeliveryPolicyMap.setDefaultEntry(redeliveryPolicy);
}
public RedeliveryPolicyMap getRedeliveryPolicyMap() {
return this.redeliveryPolicyMap;
}
/**
* Sets the global redelivery policy mapping to be used when a message is delivered
* but the session is rolled back
*/
public void setRedeliveryPolicyMap(RedeliveryPolicyMap redeliveryPolicyMap) {
this.redeliveryPolicyMap = redeliveryPolicyMap;
}
public MessageTransformer getTransformer() {

View File

@ -208,7 +208,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
this.session = session;
this.redeliveryPolicy = session.connection.getRedeliveryPolicy();
this.redeliveryPolicy = session.connection.getRedeliveryPolicyMap().getEntryFor(dest);
setTransformer(session.getTransformer());
this.info = new ConsumerInfo(consumerId);

View File

@ -23,7 +23,9 @@ import javax.jms.TextMessage;
import junit.framework.Test;
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
/**
* Test cases used to test the JMS message exclusive consumers.
@ -479,4 +481,112 @@ public class RedeliveryPolicyTest extends JmsTestSupport {
assertEquals("2nd", m.getText());
session.commit();
}
public void testRedeliveryPolicyPerDestination() throws Exception {
RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
queuePolicy.setInitialRedeliveryDelay(0);
queuePolicy.setRedeliveryDelay(1000);
queuePolicy.setUseExponentialBackOff(false);
queuePolicy.setMaximumRedeliveries(2);
RedeliveryPolicy topicPolicy = new RedeliveryPolicy();
topicPolicy.setInitialRedeliveryDelay(0);
topicPolicy.setRedeliveryDelay(1000);
topicPolicy.setUseExponentialBackOff(false);
topicPolicy.setMaximumRedeliveries(3);
// Receive a message with the JMS API
RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
map.put(new ActiveMQTopic(">"), topicPolicy);
map.put(new ActiveMQQueue(">"), queuePolicy);
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
ActiveMQQueue queue = new ActiveMQQueue("TEST");
ActiveMQTopic topic = new ActiveMQTopic("TEST");
MessageProducer producer = session.createProducer(null);
MessageConsumer queueConsumer = session.createConsumer(queue);
MessageConsumer topicConsumer = session.createConsumer(topic);
// Send the messages
producer.send(queue, session.createTextMessage("1st"));
producer.send(queue, session.createTextMessage("2nd"));
producer.send(topic, session.createTextMessage("1st"));
producer.send(topic, session.createTextMessage("2nd"));
session.commit();
TextMessage m;
m = (TextMessage)queueConsumer.receive(100);
assertNotNull(m);
assertEquals("1st", m.getText());
m = (TextMessage)topicConsumer.receive(100);
assertNotNull(m);
assertEquals("1st", m.getText());
m = (TextMessage)queueConsumer.receive(100);
assertNotNull(m);
assertEquals("2nd", m.getText());
m = (TextMessage)topicConsumer.receive(100);
assertNotNull(m);
assertEquals("2nd", m.getText());
session.rollback();
m = (TextMessage)queueConsumer.receive(100);
assertNotNull("first immediate redelivery", m);
m = (TextMessage)topicConsumer.receive(100);
assertNotNull("first immediate redelivery", m);
session.rollback();
m = (TextMessage)queueConsumer.receive(100);
assertNull("second delivery delayed: " + m, m);
m = (TextMessage)topicConsumer.receive(100);
assertNull("second delivery delayed: " + m, m);
m = (TextMessage)queueConsumer.receive(2000);
assertNotNull(m);
assertEquals("1st", m.getText());
m = (TextMessage)topicConsumer.receive(2000);
assertNotNull(m);
assertEquals("1st", m.getText());
m = (TextMessage)queueConsumer.receive(100);
assertNotNull(m);
assertEquals("2nd", m.getText());
m = (TextMessage)topicConsumer.receive(100);
assertNotNull(m);
assertEquals("2nd", m.getText());
session.rollback();
m = (TextMessage)queueConsumer.receive(2000);
assertNotNull(m);
assertEquals("1st", m.getText());
m = (TextMessage)topicConsumer.receive(2000);
assertNotNull(m);
assertEquals("1st", m.getText());
m = (TextMessage)queueConsumer.receive(100);
assertNotNull(m);
assertEquals("2nd", m.getText());
m = (TextMessage)topicConsumer.receive(100);
assertNotNull(m);
assertEquals("2nd", m.getText());
session.rollback();
// No third attempt for the Queue consumer
m = (TextMessage)queueConsumer.receive(2000);
assertNull(m);
m = (TextMessage)topicConsumer.receive(2000);
assertNotNull(m);
assertEquals("1st", m.getText());
m = (TextMessage)queueConsumer.receive(100);
assertNull(m);
m = (TextMessage)topicConsumer.receive(100);
assertNotNull(m);
assertEquals("2nd", m.getText());
session.commit();
}
}