Revert previous fix.  We need to ack each expired message when we detect and not overload the usage of delivered acks for extending the prefetch window.  The problem was an edge case where the broker detected a message as expired after the client had delivered it but before it ack'd it as consumed.  

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1391152 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-09-27 18:17:47 +00:00
parent 3f49df7d70
commit 8a75d960d5
2 changed files with 81 additions and 12 deletions

View File

@ -879,11 +879,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
return;
}
if (messageExpired) {
synchronized (deliveredMessages) {
deliveredMessages.remove(md);
}
acknowledge(md, MessageAck.DELIVERED_ACK_TYPE);
stats.getExpiredMessageCount().increment();
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
} else {
stats.onMessage();
if (session.getTransacted()) {
@ -1060,13 +1057,6 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
ack.setTransactionId(session.getTransactionContext().getTransactionId());
}
// if there is a pending delivered ack then we need to send that since there
// could be expired Messages in the ack which haven't been acked yet and the
// ack for all deliveries might not include those in its range of acks. The
// pending standard acks will be included in the ack for all deliveries.
if (pendingAck != null && pendingAck.isDeliveredAck()) {
session.sendAck(pendingAck);
}
pendingAck = null;
session.sendAck(ack);
@ -1137,7 +1127,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
}
void acknowledge(MessageDispatch md) throws JMSException {
MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1);
acknowledge(md, MessageAck.INDIVIDUAL_ACK_TYPE);
}
void acknowledge(MessageDispatch md, byte ackType) throws JMSException {
MessageAck ack = new MessageAck(md, ackType, 1);
session.sendAck(ack);
synchronized(deliveredMessages){
deliveredMessages.remove(md);

View File

@ -19,9 +19,12 @@ package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
@ -33,6 +36,7 @@ import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQQueue;
@ -51,7 +55,10 @@ public class AMQ4083Test {
private static String TEST_QUEUE = "testQueue";
private static ActiveMQQueue queue = new ActiveMQQueue(TEST_QUEUE);
private final int messageCount = 100;
private String connectionUri;
private String[] data;
@Before
public void setUp() throws Exception {
@ -62,6 +69,12 @@ public class AMQ4083Test {
connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString();
brokerService.start();
brokerService.waitUntilStarted();
data = new String[messageCount];
for (int i = 0; i < messageCount; i++) {
data[i] = "Text for message: " + i + " at " + new Date();
}
}
@After
@ -431,6 +444,68 @@ public class AMQ4083Test {
LOG.info("InFlight Count: {}", queueView.getInFlightCount());
}
@Test
public void testConsumeExpiredQueueAndDlq() throws Exception {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producerNormal = session.createProducer(queue);
MessageProducer producerExpire = session.createProducer(queue);
producerExpire.setTimeToLive(500);
MessageConsumer dlqConsumer = session.createConsumer(session.createQueue("ActiveMQ.DLQ"));
connection.start();
Connection consumerConnection = factory.createConnection();
ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setAll(10);
((ActiveMQConnection)consumerConnection).setPrefetchPolicy(prefetchPolicy);
Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = consumerSession.createConsumer(queue);
consumerConnection.start();
String msgBody = new String(new byte[20*1024]);
for (int i = 0; i < data.length; i++) {
Message message = session.createTextMessage(msgBody);
producerExpire.send(queue, message);
}
for (int i = 0; i < data.length; i++) {
Message message = session.createTextMessage(msgBody);
producerNormal.send(queue, message);
}
ArrayList<Message> messages = new ArrayList<Message>();
Message received;
while ((received = consumer.receive(1000)) != null) {
messages.add(received);
if (messages.size() == 1) {
TimeUnit.SECONDS.sleep(1);
}
received.acknowledge();
};
assertEquals("got messages", messageCount + 1, messages.size());
ArrayList<Message> dlqMessages = new ArrayList<Message>();
while ((received = dlqConsumer.receive(1000)) != null) {
dlqMessages.add(received);
};
assertEquals("got dlq messages", data.length - 1, dlqMessages.size());
final QueueViewMBean queueView = getProxyToQueueViewMBean();
LOG.info("Dequeued Count: {}", queueView.getDequeueCount());
LOG.info("Dispatch Count: {}", queueView.getDispatchCount());
LOG.info("Enqueue Count: {}", queueView.getEnqueueCount());
LOG.info("Expired Count: {}", queueView.getExpiredCount());
LOG.info("InFlight Count: {}", queueView.getInFlightCount());
}
private QueueViewMBean getProxyToQueueViewMBean() throws Exception {
final ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:Type=Queue,Destination="
+ queue.getQueueName() + ",BrokerName=localhost");