resolve https://issues.apache.org/activemq/browse/AMQ-2489 - duplicate delivery acks resulted in broker exceptions with client or inividual ack - delivery acks now only for unacked messages - down side is pending messages in broker remain on expiry awaiting ack from ackLaer that dependes on prefetch value - but this is reasonable and to be expected. they will be removed on close or subsequent acks in any event

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@883458 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-11-23 18:43:01 +00:00
parent d7a8bc89e8
commit 83df5cef54
3 changed files with 250 additions and 5 deletions

View File

@ -831,7 +831,13 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} else if (isAutoAcknowledgeBatch()) {
ackLater(md, MessageAck.STANDARD_ACK_TYPE);
} else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
boolean messageUnackedByConsumer = false;
synchronized (deliveredMessages) {
messageUnackedByConsumer = deliveredMessages.contains(md);
}
if (messageUnackedByConsumer) {
ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
}
}
else {
throw new IllegalStateException("Invalid session state.");

View File

@ -0,0 +1,226 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.TestSupport;
import org.apache.activemq.command.ActiveMQQueue;
/**
* In CLIENT_ACKNOWLEDGE and INDIVIDUAL_ACKNOWLEDGE modes following exception
* occurs when ASYNCH consumers acknowledges messages in not in order they
* received the messages.
* <p>
* Exception thrown on broker side:
* <p>
* {@code javax.jms.JMSException: Could not correlate acknowledgment with
* dispatched message: MessageAck}
*
* @author daroo
*/
public class AMQ2489Test extends TestSupport {
private final static String SEQ_NUM_PROPERTY = "seqNum";
private final static int TOTAL_MESSAGES_CNT = 2;
private final static int CONSUMERS_CNT = 2;
private final CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT);
private Connection connection;
protected void setUp() throws Exception {
super.setUp();
connection = createConnection();
}
protected void tearDown() throws Exception {
if (connection != null) {
connection.close();
connection = null;
}
super.tearDown();
}
public void testUnorderedClientAcknowledge() throws Exception {
doUnorderedAck(Session.CLIENT_ACKNOWLEDGE);
}
public void testUnorderedIndividualAcknowledge() throws Exception {
doUnorderedAck(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
}
/**
* Main test method
*
* @param acknowledgmentMode
* - ACK mode to be used by consumers
* @throws Exception
*/
protected void doUnorderedAck(int acknowledgmentMode) throws Exception {
List<Consumer> consumers = null;
Session producerSession = null;
connection.start();
// Because exception is thrown on broker side only, let's set up
// exception listener to get it
final TestExceptionListener exceptionListener = new TestExceptionListener();
connection.setExceptionListener(exceptionListener);
try {
consumers = new ArrayList<Consumer>();
// start customers
for (int i = 0; i < CONSUMERS_CNT; i++) {
consumers.add(new Consumer(acknowledgmentMode));
}
// produce few test messages
producerSession = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = producerSession
.createProducer(new ActiveMQQueue(getQueueName()));
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < TOTAL_MESSAGES_CNT; i++) {
final Message message = producerSession
.createTextMessage("test");
// assign each message sequence number
message.setIntProperty(SEQ_NUM_PROPERTY, i);
producer.send(message);
}
// during each onMessage() calls consumers decreases the LATCH
// counter.
//
// so, let's wait till all messages are consumed.
//
LATCH.await();
// wait a bit more to give exception listener a chance be populated
// with
// broker's error
TimeUnit.SECONDS.sleep(1);
assertFalse(exceptionListener.getStatusText(), exceptionListener.hasExceptions());
} finally {
if (producerSession != null)
producerSession.close();
if (consumers != null) {
for (Consumer c : consumers) {
c.close();
}
}
}
}
protected String getQueueName() {
return getClass().getName() + "." + getName();
}
public final class Consumer implements MessageListener {
final Session session;
private Consumer(int acknowledgmentMode) {
try {
session = connection.createSession(false, acknowledgmentMode);
final Queue queue = session.createQueue(getQueueName()
+ "?consumer.prefetchSize=1");
final MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(this);
} catch (JMSException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
public void onMessage(Message message) {
try {
// retrieve sequence number assigned by producer...
final int seqNum = message.getIntProperty(SEQ_NUM_PROPERTY);
// ...and let's delay every second message a little bit before
// acknowledgment
if ((seqNum % 2) == 0) {
System.out.println("Delayed message sequence numeber: "
+ seqNum);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
message.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
// decrease LATCH counter in the main test method.
LATCH.countDown();
}
}
private void close() {
if (session != null) {
try {
session.close();
} catch (JMSException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}
}
public final class TestExceptionListener implements ExceptionListener {
private final java.util.Queue<Exception> exceptions = new ConcurrentLinkedQueue<Exception>();
public void onException(JMSException e) {
exceptions.add(e);
}
public boolean hasExceptions() {
return exceptions.isEmpty() == false;
}
public String getStatusText() {
final StringBuilder str = new StringBuilder();
str.append("Exceptions count on broker side: " + exceptions.size()
+ ".\nMessages:\n");
for (Exception e : exceptions) {
str.append(e.getMessage() + "\n\n");
}
return str.toString();
}
}
}

View File

@ -159,7 +159,8 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
// first ack delivered after expiry
public void testExpiredMessagesWithVerySlowConsumer() throws Exception {
createBroker();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
final long queuePrefetch = 600;
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=" + queuePrefetch);
connection = factory.createConnection();
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
producer = session.createProducer(destination);
@ -222,7 +223,7 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
assertTrue("all dispatched up to default prefetch ", Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return 1000 == view.getDispatchCount();
return queuePrefetch == view.getDispatchCount();
}
}));
assertTrue("All sent have expired ", Wait.waitFor(new Wait.Condition() {
@ -240,17 +241,29 @@ public class ExpiredMessagesWithNoConsumerTest extends CombinationTestSupport {
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return 0 == view.getInFlightCount();
// consumer ackLater(delivery ack for expired messages) is based on half the prefetch value
// which will leave half of the prefetch pending till consumer close
return (queuePrefetch/2) -1 == view.getInFlightCount();
}
});
LOG.info("enqueue=" + view.getEnqueueCount() + ", dequeue=" + view.getDequeueCount()
+ ", inflight=" + view.getInFlightCount() + ", expired= " + view.getExpiredCount()
+ ", size= " + view.getQueueSize());
assertEquals("prefetch gets back to 0 ", 0, view.getInFlightCount());
assertEquals("inflight reduces to half prefetch minus single delivered message", (queuePrefetch/2) -1, view.getInFlightCount());
assertEquals("size gets back to 0 ", 0, view.getQueueSize());
assertEquals("dequeues match sent/expired ", sendCount, view.getDequeueCount());
consumer.close();
Wait.waitFor(new Wait.Condition() {
public boolean isSatisified() throws Exception {
return 0 == view.getInFlightCount();
}
});
assertEquals("inflight goes to zeor on close", 0, view.getInFlightCount());
LOG.info("done: " + getName());
}