From 184761a119ae684a910bed41bc0f81bc2334fdb4 Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Thu, 19 Feb 2009 18:15:59 +0000 Subject: [PATCH] tidy up redispatch logic a little more, resolve: AMQ-2128, deliver acks on dispose in auto_ack mode. also get some closure on: MQ-2075 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@745953 13f79535-47bb-0310-9956-ffa450edef68 --- .../activemq/ActiveMQMessageConsumer.java | 28 +++--- .../apache/activemq/broker/region/Queue.java | 24 ++--- .../org/apache/activemq/JMSConsumerTest.java | 97 +++++++++++++++++-- 3 files changed, 112 insertions(+), 37 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index c13b243bba..714bebc2aa 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -630,7 +630,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC void deliverAcks() { MessageAck ack = null; if (deliveryingAcknowledgements.compareAndSet(false, true)) { - if (this.optimizeAcknowledge) { + if (session.isAutoAcknowledge()) { synchronized(deliveredMessages) { ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack != null) { @@ -775,14 +775,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC if (session.getTransacted()) { // Do nothing. } else if (session.isAutoAcknowledge()) { - synchronized (deliveredMessages) { - if (!deliveredMessages.isEmpty()) { - if (optimizeAcknowledge) { - if (deliveryingAcknowledgements.compareAndSet( - false, true)) { + if (deliveryingAcknowledgements.compareAndSet(false, true)) { + synchronized (deliveredMessages) { + if (!deliveredMessages.isEmpty()) { + if (optimizeAcknowledge) { ackCounter++; - if (ackCounter >= (info - .getCurrentPrefetchSize() * .65)) { + if (ackCounter >= (info.getCurrentPrefetchSize() * .65)) { MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack != null) { deliveredMessages.clear(); @@ -790,16 +788,16 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC session.sendAck(ack); } } - deliveryingAcknowledgements.set(false); - } - } else { - MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); - if (ack!=null) { - deliveredMessages.clear(); - session.sendAck(ack); + } else { + MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); + if (ack!=null) { + deliveredMessages.clear(); + session.sendAck(ack); + } } } } + deliveryingAcknowledgements.set(false); } } else if (session.isDupsOkAcknowledge()) { ackLater(md, MessageAck.STANDARD_ACK_TYPE); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index fa3e246782..b2aa2038e9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -336,8 +336,7 @@ public class Queue extends BaseDestination implements Task { } } ConsumerId consumerId = sub.getConsumerInfo().getConsumerId(); - MessageGroupSet ownedGroups = getMessageGroupOwners() - .removeConsumer(consumerId); + getMessageGroupOwners().removeConsumer(consumerId); // redeliver inflight messages List list = new ArrayList(); @@ -353,19 +352,10 @@ public class Queue extends BaseDestination implements Task { list.add(qmr); } - if (!list.isEmpty() && !consumers.isEmpty()) { + if (!list.isEmpty()) { doDispatch(list); } } - //if it is a last consumer (and not a browser) dispatch all pagedIn messages - if (consumers.isEmpty() && !(sub instanceof QueueBrowserSubscription)) { - List list = new ArrayList(); - for (QueueMessageReference ref : pagedInMessages.values()) { - list.add(ref); - } - pagedInPendingDispatch.clear(); - doDispatch(list); - } if (!(this.optimizedDispatch || isSlave())) { wakeup(); } @@ -1068,7 +1058,7 @@ public class Queue extends BaseDestination implements Task { } synchronized (messages) { - pageInMoreMessages = !messages.isEmpty(); + pageInMoreMessages |= !messages.isEmpty(); } // Kinda ugly.. but I think dispatchLock is the only mutex protecting the @@ -1333,14 +1323,18 @@ public class Queue extends BaseDestination implements Task { * were not full. */ private List doActualDispatch(List list) throws Exception { - List rc = new ArrayList(list.size()); - Set fullConsumers = new HashSet(this.consumers.size()); List consumers; synchronized (this.consumers) { + if (this.consumers.isEmpty()) { + return list; + } consumers = new ArrayList(this.consumers); } + List rc = new ArrayList(list.size()); + Set fullConsumers = new HashSet(this.consumers.size()); + for (MessageReference node : list) { Subscription target = null; int interestCount=0; diff --git a/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java index 8cf633f698..05a9bddbb3 100755 --- a/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/JMSConsumerTest.java @@ -358,15 +358,99 @@ public class JMSConsumerTest extends JmsTestSupport { assertEquals(4, counter.get()); } - public void initCombosForTestMessageListenerUnackedWithPrefetch1StayInQueue() { + public void initCombosForTestMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() { addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); - addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE), - Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)}); + addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE)}); addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)}); } - public void testMessageListenerUnackedWithPrefetch1StayInQueue() throws Exception { + public void testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception { + + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch sendDone = new CountDownLatch(1); + final CountDownLatch got2Done = new CountDownLatch(1); + // Set prefetch to 1 + connection.getPrefetchPolicy().setAll(1); + // This test case does not work if optimized message dispatch is used as + // the main thread send block until the consumer receives the + // message. This test depends on thread decoupling so that the main + // thread can stop the consumer thread. + connection.setOptimizedMessageDispatch(false); + connection.start(); + + // Use all the ack modes + Session session = connection.createSession(false, ackMode); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message m) { + try { + TextMessage tm = (TextMessage)m; + LOG.info("Got in first listener: " + tm.getText()); + assertEquals("" + counter.get(), tm.getText()); + counter.incrementAndGet(); + if (counter.get() == 2) { + sendDone.await(); + connection.close(); + got2Done.countDown(); + } + } catch (Throwable e) { + e.printStackTrace(); + } + } + }); + + // Send the messages + sendMessages(session, destination, 4); + sendDone.countDown(); + + // Wait for first 2 messages to arrive. + assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS)); + + // Re-start connection. + connection = (ActiveMQConnection)factory.createConnection(); + connections.add(connection); + + connection.getPrefetchPolicy().setAll(1); + connection.start(); + + // Pickup the remaining messages. + final CountDownLatch done2 = new CountDownLatch(1); + session = connection.createSession(false, ackMode); + consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + public void onMessage(Message m) { + try { + TextMessage tm = (TextMessage)m; + LOG.info("Got in second listener: " + tm.getText()); + // order is not guaranteed as the connection is started before the listener is set. + // assertEquals("" + counter.get(), tm.getText()); + counter.incrementAndGet(); + if (counter.get() == 4) { + done2.countDown(); + } + } catch (Throwable e) { + LOG.error("unexpected ex onMessage: ", e); + } + } + }); + + assertTrue(done2.await(1000, TimeUnit.MILLISECONDS)); + Thread.sleep(200); + + // assert msg 2 was redelivered as close() from onMessages() will only ack in auto_ack mode + assertEquals(5, counter.get()); + } + + public void initCombosForTestMessageListenerAutoAckOnCloseWithPrefetch1() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); + addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)}); + } + + public void testMessageListenerAutoAckOnCloseWithPrefetch1() throws Exception { + final AtomicInteger counter = new AtomicInteger(0); final CountDownLatch sendDone = new CountDownLatch(1); final CountDownLatch got2Done = new CountDownLatch(1); @@ -426,13 +510,12 @@ public class JMSConsumerTest extends JmsTestSupport { try { TextMessage tm = (TextMessage)m; LOG.info("Got in second listener: " + tm.getText()); - assertEquals("" + counter.get(), tm.getText()); counter.incrementAndGet(); if (counter.get() == 4) { done2.countDown(); } } catch (Throwable e) { - LOG.info("unexpected ex onMessage: ", e); + LOG.error("unexpected ex onMessage: ", e); } } }); @@ -440,9 +523,9 @@ public class JMSConsumerTest extends JmsTestSupport { assertTrue(done2.await(1000, TimeUnit.MILLISECONDS)); Thread.sleep(200); + // close from onMessage with Auto_ack will ack // Make sure only 4 messages were delivered. assertEquals(4, counter.get()); - } public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() {