mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-4166 - fix processing of expired - stack-trace-1.txt was in error, expiry check should not have used the oneshot broker check, just checing the messages state is required
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1407640 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
aac50fc57c
commit
fdd0534fcb
|
@ -127,7 +127,7 @@ public class RedeliveryPlugin extends BrokerPluginSupport {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
|
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference messageReference, Subscription subscription) {
|
||||||
if (next.get().isExpired(messageReference)) {
|
if (messageReference.isExpired()) {
|
||||||
// there are two uses of sendToDeadLetterQueue, we are only interested in valid messages
|
// there are two uses of sendToDeadLetterQueue, we are only interested in valid messages
|
||||||
super.sendToDeadLetterQueue(context, messageReference, subscription);
|
super.sendToDeadLetterQueue(context, messageReference, subscription);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -23,6 +23,7 @@ import javax.jms.MessageProducer;
|
||||||
import javax.jms.Session;
|
import javax.jms.Session;
|
||||||
import org.apache.activemq.ActiveMQConnection;
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.ActiveMQPrefetchPolicy;
|
||||||
import org.apache.activemq.RedeliveryPolicy;
|
import org.apache.activemq.RedeliveryPolicy;
|
||||||
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
|
import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap;
|
||||||
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
|
import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
|
||||||
|
@ -43,7 +44,7 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
|
||||||
|
|
||||||
public void testScheduledRedelivery() throws Exception {
|
public void testScheduledRedelivery() throws Exception {
|
||||||
|
|
||||||
sendMessage();
|
sendMessage(0);
|
||||||
|
|
||||||
ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection();
|
ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection();
|
||||||
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
|
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
|
||||||
|
@ -81,11 +82,38 @@ public class BrokerRedeliveryTest extends org.apache.activemq.TestSupport {
|
||||||
consumerSession.commit();
|
consumerSession.commit();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sendMessage() throws Exception {
|
public void testNoScheduledRedeliveryOfExpired() throws Exception {
|
||||||
|
ActiveMQConnection consumerConnection = (ActiveMQConnection) createConnection();
|
||||||
|
consumerConnection.start();
|
||||||
|
Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
|
||||||
|
MessageConsumer consumer = consumerSession.createConsumer(destination);
|
||||||
|
sendMessage(1000);
|
||||||
|
Message message = consumer.receive(1000);
|
||||||
|
assertNotNull("got message", message);
|
||||||
|
|
||||||
|
// ensure there is another consumer to redispatch to
|
||||||
|
MessageConsumer redeliverConsumer = consumerSession.createConsumer(destination);
|
||||||
|
|
||||||
|
// allow consumed to expire so it gets redelivered
|
||||||
|
TimeUnit.SECONDS.sleep(2);
|
||||||
|
consumer.close();
|
||||||
|
|
||||||
|
// should go to dlq as it has expired
|
||||||
|
// validate DLQ
|
||||||
|
MessageConsumer dlqConsumer = consumerSession.createConsumer(new ActiveMQQueue(SharedDeadLetterStrategy.DEFAULT_DEAD_LETTER_QUEUE_NAME));
|
||||||
|
Message dlqMessage = dlqConsumer.receive(2000);
|
||||||
|
assertNotNull("Got message from dql", dlqMessage);
|
||||||
|
assertEquals("message matches", message.getStringProperty("data"), dlqMessage.getStringProperty("data"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void sendMessage(int timeToLive) throws Exception {
|
||||||
ActiveMQConnection producerConnection = (ActiveMQConnection) createConnection();
|
ActiveMQConnection producerConnection = (ActiveMQConnection) createConnection();
|
||||||
producerConnection.start();
|
producerConnection.start();
|
||||||
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
MessageProducer producer = producerSession.createProducer(destination);
|
MessageProducer producer = producerSession.createProducer(destination);
|
||||||
|
if (timeToLive > 0) {
|
||||||
|
producer.setTimeToLive(timeToLive);
|
||||||
|
}
|
||||||
Message message = producerSession.createMessage();
|
Message message = producerSession.createMessage();
|
||||||
message.setStringProperty("data", data);
|
message.setStringProperty("data", data);
|
||||||
producer.send(message);
|
producer.send(message);
|
||||||
|
|
Loading…
Reference in New Issue