https://issues.apache.org/jira/browse/AMQ-5068 - don't rewrite durable subs as the message instance is shared

This commit is contained in:
gtully 2014-03-26 11:50:13 +00:00
parent 266d23ef79
commit 75eb814ca7
2 changed files with 51 additions and 9 deletions

View File

@ -128,6 +128,7 @@ public class PolicyEntry extends DestinationMapEntry {
queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts());
queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
queue.setPersistJMSRedelivered(isPersistJMSRedelivered());
}
public void update(Queue queue) {
@ -142,6 +143,7 @@ public class PolicyEntry extends DestinationMapEntry {
queue.setTimeBeforeDispatchStarts(getTimeBeforeDispatchStarts());
queue.setConsumersBeforeDispatchStarts(getConsumersBeforeDispatchStarts());
queue.setAllConsumersExclusiveByDefault(isAllConsumersExclusiveByDefault());
queue.setPersistJMSRedelivered(isPersistJMSRedelivered());
}
public void configure(Broker broker,Topic topic) {
@ -197,7 +199,6 @@ public class PolicyEntry extends DestinationMapEntry {
destination.setAdvisoryForFastProducers(isAdvisoryForFastProducers());
destination.setAdvisoryWhenFull(isAdvisoryWhenFull());
destination.setSendAdvisoryIfNoConsumers(isSendAdvisoryIfNoConsumers());
destination.setPersistJMSRedelivered(isPersistJMSRedelivered());
}
public void baseConfiguration(Broker broker, BaseDestination destination) {

View File

@ -24,11 +24,13 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.TestSupport;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.transport.failover.FailoverTransport;
import org.junit.After;
import org.junit.Before;
@ -91,10 +93,9 @@ public class RedeliveryRestartTest extends TestSupport {
connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start();
populateDestination(10, queueName, connection);
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Destination destination = session.createQueue(queueName);
populateDestination(10, destination, connection);
MessageConsumer consumer = session.createConsumer(destination);
TextMessage msg = null;
@ -135,6 +136,49 @@ public class RedeliveryRestartTest extends TestSupport {
connection.close();
}
@org.junit.Test
public void testDurableSubRedeliveryFlagAfterRestartNotSupported() throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(" + broker.getTransportConnectors().get(0).getPublishableConnectString()
+ ")?jms.prefetchPolicy.all=0");
connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.setClientID("id");
connection.start();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
ActiveMQTopic destination = new ActiveMQTopic(queueName);
TopicSubscriber durableSub = session.createDurableSubscriber(destination, "id");
populateDestination(10, destination, connection);
TextMessage msg = null;
for (int i = 0; i < 5; i++) {
msg = (TextMessage) durableSub.receive(20000);
LOG.info("not redelivered? got: " + msg);
assertNotNull("got the message", msg);
assertEquals("first delivery", 1, msg.getLongProperty("JMSXDeliveryCount"));
assertEquals("not a redelivery", false, msg.getJMSRedelivered());
}
durableSub.close();
restartBroker();
// make failover aware of the restarted auto assigned port
connection.getTransport().narrow(FailoverTransport.class).add(true, broker.getTransportConnectors().get(0)
.getPublishableConnectString());
durableSub = session.createDurableSubscriber(destination, "id");
for (int i = 0; i < 10; i++) {
msg = (TextMessage) durableSub.receive(4000);
LOG.info("redelivered? got: " + msg);
assertNotNull("got the message again", msg);
assertEquals("no reDelivery flag", false, msg.getJMSRedelivered());
msg.acknowledge();
}
connection.close();
}
@org.junit.Test
public void testValidateRedeliveryFlagAfterRestart() throws Exception {
@ -143,10 +187,9 @@ public class RedeliveryRestartTest extends TestSupport {
connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start();
populateDestination(10, queueName, connection);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Destination destination = session.createQueue(queueName);
populateDestination(10, destination, connection);
MessageConsumer consumer = session.createConsumer(destination);
TextMessage msg = null;
@ -196,10 +239,9 @@ public class RedeliveryRestartTest extends TestSupport {
connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.start();
populateDestination(1, queueName, connection);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Destination destination = session.createQueue(queueName);
populateDestination(1, destination, connection);
MessageConsumer consumer = session.createConsumer(destination);
TextMessage msg = (TextMessage) consumer.receive(5000);
@ -243,9 +285,8 @@ public class RedeliveryRestartTest extends TestSupport {
return broker;
}
private void populateDestination(final int nbMessages, final String destinationName, javax.jms.Connection connection) throws JMSException {
private void populateDestination(final int nbMessages, final Destination destination, javax.jms.Connection connection) throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue(destinationName);
MessageProducer producer = session.createProducer(destination);
for (int i = 1; i <= nbMessages; i++) {
producer.send(session.createTextMessage("<hello id='" + i + "'/>"));