tidy up redispatch logic a little more, resolve: AMQ-2128, deliver acks on dispose in auto_ack mode. also get some closure on: MQ-2075

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@745953 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2009-02-19 18:15:59 +00:00
parent f53e392766
commit 184761a119
3 changed files with 112 additions and 37 deletions

View File

@ -630,7 +630,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
void deliverAcks() {
MessageAck ack = null;
if (deliveryingAcknowledgements.compareAndSet(false, true)) {
if (this.optimizeAcknowledge) {
if (session.isAutoAcknowledge()) {
synchronized(deliveredMessages) {
ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
@ -775,14 +775,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
if (session.getTransacted()) {
// Do nothing.
} else if (session.isAutoAcknowledge()) {
synchronized (deliveredMessages) {
if (!deliveredMessages.isEmpty()) {
if (optimizeAcknowledge) {
if (deliveryingAcknowledgements.compareAndSet(
false, true)) {
if (deliveryingAcknowledgements.compareAndSet(false, true)) {
synchronized (deliveredMessages) {
if (!deliveredMessages.isEmpty()) {
if (optimizeAcknowledge) {
ackCounter++;
if (ackCounter >= (info
.getCurrentPrefetchSize() * .65)) {
if (ackCounter >= (info.getCurrentPrefetchSize() * .65)) {
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack != null) {
deliveredMessages.clear();
@ -790,16 +788,16 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
session.sendAck(ack);
}
}
deliveryingAcknowledgements.set(false);
}
} else {
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack!=null) {
deliveredMessages.clear();
session.sendAck(ack);
} else {
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack!=null) {
deliveredMessages.clear();
session.sendAck(ack);
}
}
}
}
deliveryingAcknowledgements.set(false);
}
} else if (session.isDupsOkAcknowledge()) {
ackLater(md, MessageAck.STANDARD_ACK_TYPE);

View File

@ -336,8 +336,7 @@ public class Queue extends BaseDestination implements Task {
}
}
ConsumerId consumerId = sub.getConsumerInfo().getConsumerId();
MessageGroupSet ownedGroups = getMessageGroupOwners()
.removeConsumer(consumerId);
getMessageGroupOwners().removeConsumer(consumerId);
// redeliver inflight messages
List<QueueMessageReference> list = new ArrayList<QueueMessageReference>();
@ -353,19 +352,10 @@ public class Queue extends BaseDestination implements Task {
list.add(qmr);
}
if (!list.isEmpty() && !consumers.isEmpty()) {
if (!list.isEmpty()) {
doDispatch(list);
}
}
//if it is a last consumer (and not a browser) dispatch all pagedIn messages
if (consumers.isEmpty() && !(sub instanceof QueueBrowserSubscription)) {
List<QueueMessageReference> list = new ArrayList<QueueMessageReference>();
for (QueueMessageReference ref : pagedInMessages.values()) {
list.add(ref);
}
pagedInPendingDispatch.clear();
doDispatch(list);
}
if (!(this.optimizedDispatch || isSlave())) {
wakeup();
}
@ -1068,7 +1058,7 @@ public class Queue extends BaseDestination implements Task {
}
synchronized (messages) {
pageInMoreMessages = !messages.isEmpty();
pageInMoreMessages |= !messages.isEmpty();
}
// Kinda ugly.. but I think dispatchLock is the only mutex protecting the
@ -1333,14 +1323,18 @@ public class Queue extends BaseDestination implements Task {
* were not full.
*/
private List<QueueMessageReference> doActualDispatch(List<QueueMessageReference> list) throws Exception {
List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size());
Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
List<Subscription> consumers;
synchronized (this.consumers) {
if (this.consumers.isEmpty()) {
return list;
}
consumers = new ArrayList<Subscription>(this.consumers);
}
List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size());
Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
for (MessageReference node : list) {
Subscription target = null;
int interestCount=0;

View File

@ -358,15 +358,99 @@ public class JMSConsumerTest extends JmsTestSupport {
assertEquals(4, counter.get());
}
public void initCombosForTestMessageListenerUnackedWithPrefetch1StayInQueue() {
public void initCombosForTestMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() {
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE)});
addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)});
}
public void testMessageListenerUnackedWithPrefetch1StayInQueue() throws Exception {
public void testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception {
final AtomicInteger counter = new AtomicInteger(0);
final CountDownLatch sendDone = new CountDownLatch(1);
final CountDownLatch got2Done = new CountDownLatch(1);
// Set prefetch to 1
connection.getPrefetchPolicy().setAll(1);
// This test case does not work if optimized message dispatch is used as
// the main thread send block until the consumer receives the
// message. This test depends on thread decoupling so that the main
// thread can stop the consumer thread.
connection.setOptimizedMessageDispatch(false);
connection.start();
// Use all the ack modes
Session session = connection.createSession(false, ackMode);
destination = createDestination(session, destinationType);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message m) {
try {
TextMessage tm = (TextMessage)m;
LOG.info("Got in first listener: " + tm.getText());
assertEquals("" + counter.get(), tm.getText());
counter.incrementAndGet();
if (counter.get() == 2) {
sendDone.await();
connection.close();
got2Done.countDown();
}
} catch (Throwable e) {
e.printStackTrace();
}
}
});
// Send the messages
sendMessages(session, destination, 4);
sendDone.countDown();
// Wait for first 2 messages to arrive.
assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS));
// Re-start connection.
connection = (ActiveMQConnection)factory.createConnection();
connections.add(connection);
connection.getPrefetchPolicy().setAll(1);
connection.start();
// Pickup the remaining messages.
final CountDownLatch done2 = new CountDownLatch(1);
session = connection.createSession(false, ackMode);
consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message m) {
try {
TextMessage tm = (TextMessage)m;
LOG.info("Got in second listener: " + tm.getText());
// order is not guaranteed as the connection is started before the listener is set.
// assertEquals("" + counter.get(), tm.getText());
counter.incrementAndGet();
if (counter.get() == 4) {
done2.countDown();
}
} catch (Throwable e) {
LOG.error("unexpected ex onMessage: ", e);
}
}
});
assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
Thread.sleep(200);
// assert msg 2 was redelivered as close() from onMessages() will only ack in auto_ack mode
assertEquals(5, counter.get());
}
public void initCombosForTestMessageListenerAutoAckOnCloseWithPrefetch1() {
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)});
addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)});
}
public void testMessageListenerAutoAckOnCloseWithPrefetch1() throws Exception {
final AtomicInteger counter = new AtomicInteger(0);
final CountDownLatch sendDone = new CountDownLatch(1);
final CountDownLatch got2Done = new CountDownLatch(1);
@ -426,13 +510,12 @@ public class JMSConsumerTest extends JmsTestSupport {
try {
TextMessage tm = (TextMessage)m;
LOG.info("Got in second listener: " + tm.getText());
assertEquals("" + counter.get(), tm.getText());
counter.incrementAndGet();
if (counter.get() == 4) {
done2.countDown();
}
} catch (Throwable e) {
LOG.info("unexpected ex onMessage: ", e);
LOG.error("unexpected ex onMessage: ", e);
}
}
});
@ -440,9 +523,9 @@ public class JMSConsumerTest extends JmsTestSupport {
assertTrue(done2.await(1000, TimeUnit.MILLISECONDS));
Thread.sleep(200);
// close from onMessage with Auto_ack will ack
// Make sure only 4 messages were delivered.
assertEquals(4, counter.get());
}
public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() {