https://issues.apache.org/jira/browse/AMQ-6336 - queue browser delivers expired messages

This commit is contained in:
Dejan Bosanac 2016-06-23 17:47:05 +02:00
parent 03785a4d53
commit 3dfda807f6
2 changed files with 51 additions and 3 deletions

View File

@ -1419,9 +1419,14 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
// delayed redelivery, ensure it can be re delivered // delayed redelivery, ensure it can be re delivered
session.connection.rollbackDuplicate(this, md.getMessage()); session.connection.rollbackDuplicate(this, md.getMessage());
} }
unconsumedMessages.enqueue(md); if (!(md.getMessage() != null && md.getMessage().isExpired())) {
if (availableListener != null) { unconsumedMessages.enqueue(md);
availableListener.onMessageAvailable(this); if (availableListener != null) {
availableListener.onMessageAvailable(this);
}
} else {
beforeMessageIsConsumed(md);
afterMessageIsConsumed(md, false);
} }
} }
} else { } else {

View File

@ -26,6 +26,7 @@ import javax.jms.Connection;
import javax.jms.DeliveryMode; import javax.jms.DeliveryMode;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser; import javax.jms.QueueBrowser;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
@ -112,6 +113,48 @@ public class JmsQueueBrowserExpirationTest {
browserConnection.close(); browserConnection.close();
} }
@Test(timeout=10000)
public void testDoNotReceiveExpiredMessage() throws Exception {
int WAIT_TIME = 1000;
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue producerQueue = session.createQueue("MyTestQueue");
MessageProducer producer = session.createProducer(producerQueue);
producer.setTimeToLive(WAIT_TIME);
TextMessage message = session.createTextMessage("Test message");
producer.send(producerQueue, message);
int count = getMessageCount(producerQueue, session);
assertEquals(1, count);
Thread.sleep(WAIT_TIME + 1000);
count = getMessageCount(producerQueue, session);
assertEquals(0, count);
producer.close();
session.close();
connection.close();
}
private int getMessageCount(Queue destination, Session session) throws Exception {
int result = 0;
QueueBrowser browser = session.createBrowser(destination);
Enumeration<?> enumeration = browser.getEnumeration();
while (enumeration.hasMoreElements()) {
++result;
enumeration.nextElement();
}
browser.close();
return result;
}
private int browse(ActiveMQQueue queue, Connection connection) throws JMSException { private int browse(ActiveMQQueue queue, Connection connection) throws JMSException {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
QueueBrowser browser = session.createBrowser(queue); QueueBrowser browser = session.createBrowser(queue);