diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 81f90fb53f..ff4c0aa8c3 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -297,7 +297,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription { break; } } - }else if (ack.isDeliveredAck()) { + }else if (ack.isDeliveredAck() || ack.isExpiredAck()) { // Message was delivered but not acknowledged: update pre-fetch // counters. int index = 0; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index 7e17cf34bb..d17fb2f8ff 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -18,7 +18,7 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.LinkedList; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import javax.jms.JMSException; @@ -65,7 +65,7 @@ public class TopicSubscription extends AbstractSubscription { private final Object matchedListMutex = new Object(); private final AtomicLong enqueueCounter = new AtomicLong(0); private final AtomicLong dequeueCounter = new AtomicLong(0); - private final AtomicBoolean prefetchWindowOpen = new AtomicBoolean(false); + private final AtomicInteger prefetchExtension = new AtomicInteger(0); private int memoryUsageHighWaterMark = 95; // allow duplicate suppression in a ring network of brokers protected int maxProducersToAudit = 1024; @@ -288,16 +288,34 @@ public class TopicSubscription extends AbstractSubscription { } dequeueCounter.addAndGet(ack.getMessageCount()); } + while (true) { + int currentExtension = prefetchExtension.get(); + int newExtension = Math.max(0, currentExtension - ack.getMessageCount()); + if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { + break; + } + } dispatchMatched(); return; } else if (ack.isDeliveredAck()) { // Message was delivered but not acknowledged: update pre-fetch counters. - // also. get these for a consumer expired message. - if (destination != null && !ack.isInTransaction()) { - destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); + prefetchExtension.addAndGet(ack.getMessageCount()); + dispatchMatched(); + return; + } else if (ack.isExpiredAck()) { + if (singleDestination && destination != null) { destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount()); + destination.getDestinationStatistics().getExpired().add(ack.getMessageCount()); + destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount()); } dequeueCounter.addAndGet(ack.getMessageCount()); + while (true) { + int currentExtension = prefetchExtension.get(); + int newExtension = Math.max(0, currentExtension - ack.getMessageCount()); + if (prefetchExtension.compareAndSet(currentExtension, newExtension)) { + break; + } + } dispatchMatched(); return; } else if (ack.isRedeliveredAck()) { @@ -313,15 +331,16 @@ public class TopicSubscription extends AbstractSubscription { // The slave should not deliver pull messages. if (getPrefetchSize() == 0 ) { - prefetchWindowOpen.set(true); + final long currentDispatchedCount = dispatchedCounter.get(); + prefetchExtension.incrementAndGet(); dispatchMatched(); // If there was nothing dispatched.. we may need to setup a timeout. - if (prefetchWindowOpen.get()) { + if (currentDispatchedCount == dispatchedCounter.get()) { // immediate timeout used by receiveNoWait() if (pull.getTimeout() == -1) { - prefetchWindowOpen.set(false); + prefetchExtension.decrementAndGet(); // Send a NULL message to signal nothing pending. dispatch(null); } @@ -331,7 +350,7 @@ public class TopicSubscription extends AbstractSubscription { @Override public void run() { - pullTimeout(); + pullTimeout(currentDispatchedCount); } }, pull.getTimeout()); } @@ -344,13 +363,15 @@ public class TopicSubscription extends AbstractSubscription { * Occurs when a pull times out. If nothing has been dispatched since the * timeout was setup, then send the NULL message. */ - private final void pullTimeout() { + private final void pullTimeout(long currentDispatchedCount) { synchronized (matchedListMutex) { - if (prefetchWindowOpen.compareAndSet(true, false)) { + if (currentDispatchedCount == dispatchedCounter.get()) { try { dispatch(null); } catch (Exception e) { context.getConnection().serviceException(e); + } finally { + prefetchExtension.decrementAndGet(); } } } @@ -363,7 +384,7 @@ public class TopicSubscription extends AbstractSubscription { @Override public int getDispatchedQueueSize() { - return (int)(dispatchedCounter.get() - dequeueCounter.get()); + return (int)(dispatchedCounter.get() - prefetchExtension.get() - dequeueCounter.get()); } public int getMaximumPendingMessages() { @@ -462,7 +483,7 @@ public class TopicSubscription extends AbstractSubscription { // ------------------------------------------------------------------------- @Override public boolean isFull() { - return getDispatchedQueueSize() >= info.getPrefetchSize() && !prefetchWindowOpen.get(); + return getDispatchedQueueSize() >= info.getPrefetchSize(); } @Override @@ -553,7 +574,6 @@ public class TopicSubscription extends AbstractSubscription { continue; // just drop it. } dispatch(message); - prefetchWindowOpen.set(false); } } finally { matched.release(); diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 39f55bf8f8..d862f708d5 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -898,7 +898,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC return; } if (messageExpired) { - acknowledge(md, MessageAck.DELIVERED_ACK_TYPE); + acknowledge(md, MessageAck.EXPIRED_ACK_TYPE); stats.getExpiredMessageCount().increment(); } else { stats.onMessage(); diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index 1f9ef32996..1dc197fa9e 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -865,7 +865,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta MessageAck earlyAck = null; if (message.isExpired()) { - earlyAck = new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1); + earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1); } else if (connection.isDuplicate(ActiveMQSession.this, message)) { LOG.debug("{} got duplicate: {}", this, message.getMessageId()); earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1); diff --git a/activemq-client/src/main/java/org/apache/activemq/command/MessageAck.java b/activemq-client/src/main/java/org/apache/activemq/command/MessageAck.java index 1fc6c10d6f..bb0a72f9c1 100755 --- a/activemq-client/src/main/java/org/apache/activemq/command/MessageAck.java +++ b/activemq-client/src/main/java/org/apache/activemq/command/MessageAck.java @@ -64,6 +64,11 @@ public class MessageAck extends BaseCommand { */ public static final byte UNMATCHED_ACK_TYPE = 5; + /** + * the case where a consumer does not dispatch because message has expired inflight + */ + public static final byte EXPIRED_ACK_TYPE = 6; + protected byte ackType; protected ConsumerId consumerId; protected MessageId firstMessageId; @@ -135,6 +140,10 @@ public class MessageAck extends BaseCommand { return ackType == UNMATCHED_ACK_TYPE; } + public boolean isExpiredAck() { + return ackType == EXPIRED_ACK_TYPE; + } + /** * @openwire:property version=1 cache=true */ diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java index a6bc997ef7..c793dc8e2c 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java @@ -885,7 +885,7 @@ public class JMSConsumerTest extends JmsTestSupport { connection.setStatsEnabled(true); Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = sendSession.createProducer(destination); + MessageProducer producer = sendSession.createProducer(destination); producer.setTimeToLive(1000); final int count = 4; for (int i = 0; i < count; i++) { @@ -919,6 +919,7 @@ public class JMSConsumerTest extends JmsTestSupport { assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), 0, view.getInFlightCount()); assertEquals("Wrong dispatch count: " + view.getDispatchCount(), 8, view.getDispatchCount()); assertEquals("Wrong dequeue count: " + view.getDequeueCount(), 8, view.getDequeueCount()); + assertEquals("Wrong expired count: " + view.getExpiredCount(), 4, view.getExpiredCount()); } protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java index 36a12f1abc..fa14142af3 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/policy/AbortSlowAckConsumer0Test.java @@ -144,7 +144,6 @@ public class AbortSlowAckConsumer0Test extends AbortSlowConsumer0Test { @Test public void testIdleConsumerCanBeAborted() throws Exception { - AbortSlowAckConsumerStrategy strategy = createSlowConsumerStrategy(); strategy.setIgnoreIdleConsumers(false); strategy.setMaxTimeSinceLastAck(2000); // Make it shorter diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java index f7b979490e..d95d2d6d47 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/ConsumeTopicPrefetchTest.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.usecases; +import java.util.LinkedList; import javax.jms.Connection; import javax.jms.JMSException; import javax.jms.Message; @@ -64,11 +65,26 @@ public class ConsumeTopicPrefetchTest extends ProducerConsumerTestSupport { } validateConsumerPrefetch(this.getSubject(), prefetchSize); - + + LinkedList consumed = new LinkedList(); // lets consume them in two fetch batches - for (int i = 0; i < messageCount; i++) { - consumeMessge(i); + int batchSize = messageCount/2; + for (int i = 0; i < batchSize; i++) { + consumed.add(consumeMessge(i)); } + + // delayed delivered ack a .5 prefetch + validateConsumerPrefetchGreaterOrEqual(this.getSubject(), (long) Math.min(messageCount, 1.5 * prefetchSize)); + + for (int i = 0; i < batchSize; i++) { + consumed.remove().acknowledge(); + } + + // second batch to consume the rest + for (int i = batchSize; i < messageCount; i++) { + consumeMessge(i).acknowledge(); + } + validateConsumerPrefetch(this.getSubject(), 0); } protected Connection createConnection() throws Exception { @@ -95,9 +111,17 @@ public class ConsumeTopicPrefetchTest extends ProducerConsumerTestSupport { } } - protected void validateConsumerPrefetch(String destination, final long expectedCount) throws JMSException { + private void validateConsumerPrefetchGreaterOrEqual(String subject, long min) throws JMSException { + doValidateConsumerPrefetch(subject, min, true); + } + + protected void validateConsumerPrefetch(String subject, final long expectedCount) throws JMSException { + doValidateConsumerPrefetch(subject, expectedCount, false); + } + + protected void doValidateConsumerPrefetch(String destination, final long expectedCount, final boolean greaterOrEqual) throws JMSException { RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().lookup("localhost").getRegionBroker(); - for (org.apache.activemq.broker.region.Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) { + for (org.apache.activemq.broker.region.Destination dest : regionBroker.getTopicRegion().getDestinationMap().values()) { final org.apache.activemq.broker.region.Destination target = dest; if (dest.getName().equals(destination)) { try { @@ -105,7 +129,11 @@ public class ConsumeTopicPrefetchTest extends ProducerConsumerTestSupport { public boolean isSatisified() throws Exception { DestinationStatistics stats = target.getDestinationStatistics(); LOG.info("inflight for : " + target.getName() + ": " + stats.getInflight().getCount()); - return stats.getInflight().getCount() == expectedCount; + if (greaterOrEqual) { + return stats.getInflight().getCount() >= expectedCount; + } else { + return stats.getInflight().getCount() == expectedCount; + } } }); } catch (Exception e) { @@ -113,8 +141,13 @@ public class ConsumeTopicPrefetchTest extends ProducerConsumerTestSupport { } DestinationStatistics stats = dest.getDestinationStatistics(); LOG.info("inflight for : " + dest.getName() + ": " + stats.getInflight().getCount()); - assertEquals("inflight for: " + dest.getName() + ": " + stats.getInflight().getCount() + " matches", - expectedCount, stats.getInflight().getCount()); + if (greaterOrEqual) { + assertTrue("inflight for: " + dest.getName() + ": " + stats.getInflight().getCount() + " > " + stats.getInflight().getCount(), + stats.getInflight().getCount() >= expectedCount); + } else { + assertEquals("inflight for: " + dest.getName() + ": " + stats.getInflight().getCount() + " matches", + expectedCount, stats.getInflight().getCount()); + } } } }