https://issues.apache.org/jira/browse/AMQ-5089 - fix and test, respect client ack for topics and only decrement counters when ack is received

This commit is contained in:
gtully 2014-03-19 15:33:44 +00:00
parent 2efb6df619
commit b136df177f
8 changed files with 89 additions and 27 deletions

View File

@ -297,7 +297,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
break;
}
}
}else if (ack.isDeliveredAck()) {
}else if (ack.isDeliveredAck() || ack.isExpiredAck()) {
// Message was delivered but not acknowledged: update pre-fetch
// counters.
int index = 0;

View File

@ -18,7 +18,7 @@ package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;
@ -65,7 +65,7 @@ public class TopicSubscription extends AbstractSubscription {
private final Object matchedListMutex = new Object();
private final AtomicLong enqueueCounter = new AtomicLong(0);
private final AtomicLong dequeueCounter = new AtomicLong(0);
private final AtomicBoolean prefetchWindowOpen = new AtomicBoolean(false);
private final AtomicInteger prefetchExtension = new AtomicInteger(0);
private int memoryUsageHighWaterMark = 95;
// allow duplicate suppression in a ring network of brokers
protected int maxProducersToAudit = 1024;
@ -288,16 +288,34 @@ public class TopicSubscription extends AbstractSubscription {
}
dequeueCounter.addAndGet(ack.getMessageCount());
}
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - ack.getMessageCount());
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
dispatchMatched();
return;
} else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch counters.
// also. get these for a consumer expired message.
if (destination != null && !ack.isInTransaction()) {
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
prefetchExtension.addAndGet(ack.getMessageCount());
dispatchMatched();
return;
} else if (ack.isExpiredAck()) {
if (singleDestination && destination != null) {
destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
destination.getDestinationStatistics().getExpired().add(ack.getMessageCount());
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
}
dequeueCounter.addAndGet(ack.getMessageCount());
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - ack.getMessageCount());
if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
break;
}
}
dispatchMatched();
return;
} else if (ack.isRedeliveredAck()) {
@ -313,15 +331,16 @@ public class TopicSubscription extends AbstractSubscription {
// The slave should not deliver pull messages.
if (getPrefetchSize() == 0 ) {
prefetchWindowOpen.set(true);
final long currentDispatchedCount = dispatchedCounter.get();
prefetchExtension.incrementAndGet();
dispatchMatched();
// If there was nothing dispatched.. we may need to setup a timeout.
if (prefetchWindowOpen.get()) {
if (currentDispatchedCount == dispatchedCounter.get()) {
// immediate timeout used by receiveNoWait()
if (pull.getTimeout() == -1) {
prefetchWindowOpen.set(false);
prefetchExtension.decrementAndGet();
// Send a NULL message to signal nothing pending.
dispatch(null);
}
@ -331,7 +350,7 @@ public class TopicSubscription extends AbstractSubscription {
@Override
public void run() {
pullTimeout();
pullTimeout(currentDispatchedCount);
}
}, pull.getTimeout());
}
@ -344,13 +363,15 @@ public class TopicSubscription extends AbstractSubscription {
* Occurs when a pull times out. If nothing has been dispatched since the
* timeout was setup, then send the NULL message.
*/
private final void pullTimeout() {
private final void pullTimeout(long currentDispatchedCount) {
synchronized (matchedListMutex) {
if (prefetchWindowOpen.compareAndSet(true, false)) {
if (currentDispatchedCount == dispatchedCounter.get()) {
try {
dispatch(null);
} catch (Exception e) {
context.getConnection().serviceException(e);
} finally {
prefetchExtension.decrementAndGet();
}
}
}
@ -363,7 +384,7 @@ public class TopicSubscription extends AbstractSubscription {
@Override
public int getDispatchedQueueSize() {
return (int)(dispatchedCounter.get() - dequeueCounter.get());
return (int)(dispatchedCounter.get() - prefetchExtension.get() - dequeueCounter.get());
}
public int getMaximumPendingMessages() {
@ -462,7 +483,7 @@ public class TopicSubscription extends AbstractSubscription {
// -------------------------------------------------------------------------
@Override
public boolean isFull() {
return getDispatchedQueueSize() >= info.getPrefetchSize() && !prefetchWindowOpen.get();
return getDispatchedQueueSize() >= info.getPrefetchSize();
}
@Override
@ -553,7 +574,6 @@ public class TopicSubscription extends AbstractSubscription {
continue; // just drop it.
}
dispatch(message);
prefetchWindowOpen.set(false);
}
} finally {
matched.release();

View File

@ -898,7 +898,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
return;
}
if (messageExpired) {
acknowledge(md, MessageAck.DELIVERED_ACK_TYPE);
acknowledge(md, MessageAck.EXPIRED_ACK_TYPE);
stats.getExpiredMessageCount().increment();
} else {
stats.onMessage();

View File

@ -865,7 +865,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
MessageAck earlyAck = null;
if (message.isExpired()) {
earlyAck = new MessageAck(md, MessageAck.DELIVERED_ACK_TYPE, 1);
earlyAck = new MessageAck(md, MessageAck.EXPIRED_ACK_TYPE, 1);
} else if (connection.isDuplicate(ActiveMQSession.this, message)) {
LOG.debug("{} got duplicate: {}", this, message.getMessageId());
earlyAck = new MessageAck(md, MessageAck.POSION_ACK_TYPE, 1);

View File

@ -64,6 +64,11 @@ public class MessageAck extends BaseCommand {
*/
public static final byte UNMATCHED_ACK_TYPE = 5;
/**
* the case where a consumer does not dispatch because message has expired inflight
*/
public static final byte EXPIRED_ACK_TYPE = 6;
protected byte ackType;
protected ConsumerId consumerId;
protected MessageId firstMessageId;
@ -135,6 +140,10 @@ public class MessageAck extends BaseCommand {
return ackType == UNMATCHED_ACK_TYPE;
}
public boolean isExpiredAck() {
return ackType == EXPIRED_ACK_TYPE;
}
/**
* @openwire:property version=1 cache=true
*/

View File

@ -885,7 +885,7 @@ public class JMSConsumerTest extends JmsTestSupport {
connection.setStatsEnabled(true);
Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = sendSession.createProducer(destination);
MessageProducer producer = sendSession.createProducer(destination);
producer.setTimeToLive(1000);
final int count = 4;
for (int i = 0; i < count; i++) {
@ -919,6 +919,7 @@ public class JMSConsumerTest extends JmsTestSupport {
assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), 0, view.getInFlightCount());
assertEquals("Wrong dispatch count: " + view.getDispatchCount(), 8, view.getDispatchCount());
assertEquals("Wrong dequeue count: " + view.getDequeueCount(), 8, view.getDequeueCount());
assertEquals("Wrong expired count: " + view.getExpiredCount(), 4, view.getExpiredCount());
}
protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception {

View File

@ -144,7 +144,6 @@ public class AbortSlowAckConsumer0Test extends AbortSlowConsumer0Test {
@Test
public void testIdleConsumerCanBeAborted() throws Exception {
AbortSlowAckConsumerStrategy strategy = createSlowConsumerStrategy();
strategy.setIgnoreIdleConsumers(false);
strategy.setMaxTimeSinceLastAck(2000); // Make it shorter

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.usecases;
import java.util.LinkedList;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
@ -64,11 +65,26 @@ public class ConsumeTopicPrefetchTest extends ProducerConsumerTestSupport {
}
validateConsumerPrefetch(this.getSubject(), prefetchSize);
LinkedList<TextMessage> consumed = new LinkedList<TextMessage>();
// lets consume them in two fetch batches
for (int i = 0; i < messageCount; i++) {
consumeMessge(i);
int batchSize = messageCount/2;
for (int i = 0; i < batchSize; i++) {
consumed.add(consumeMessge(i));
}
// delayed delivered ack a .5 prefetch
validateConsumerPrefetchGreaterOrEqual(this.getSubject(), (long) Math.min(messageCount, 1.5 * prefetchSize));
for (int i = 0; i < batchSize; i++) {
consumed.remove().acknowledge();
}
// second batch to consume the rest
for (int i = batchSize; i < messageCount; i++) {
consumeMessge(i).acknowledge();
}
validateConsumerPrefetch(this.getSubject(), 0);
}
protected Connection createConnection() throws Exception {
@ -95,9 +111,17 @@ public class ConsumeTopicPrefetchTest extends ProducerConsumerTestSupport {
}
}
protected void validateConsumerPrefetch(String destination, final long expectedCount) throws JMSException {
private void validateConsumerPrefetchGreaterOrEqual(String subject, long min) throws JMSException {
doValidateConsumerPrefetch(subject, min, true);
}
protected void validateConsumerPrefetch(String subject, final long expectedCount) throws JMSException {
doValidateConsumerPrefetch(subject, expectedCount, false);
}
protected void doValidateConsumerPrefetch(String destination, final long expectedCount, final boolean greaterOrEqual) throws JMSException {
RegionBroker regionBroker = (RegionBroker) BrokerRegistry.getInstance().lookup("localhost").getRegionBroker();
for (org.apache.activemq.broker.region.Destination dest : regionBroker.getQueueRegion().getDestinationMap().values()) {
for (org.apache.activemq.broker.region.Destination dest : regionBroker.getTopicRegion().getDestinationMap().values()) {
final org.apache.activemq.broker.region.Destination target = dest;
if (dest.getName().equals(destination)) {
try {
@ -105,7 +129,11 @@ public class ConsumeTopicPrefetchTest extends ProducerConsumerTestSupport {
public boolean isSatisified() throws Exception {
DestinationStatistics stats = target.getDestinationStatistics();
LOG.info("inflight for : " + target.getName() + ": " + stats.getInflight().getCount());
return stats.getInflight().getCount() == expectedCount;
if (greaterOrEqual) {
return stats.getInflight().getCount() >= expectedCount;
} else {
return stats.getInflight().getCount() == expectedCount;
}
}
});
} catch (Exception e) {
@ -113,8 +141,13 @@ public class ConsumeTopicPrefetchTest extends ProducerConsumerTestSupport {
}
DestinationStatistics stats = dest.getDestinationStatistics();
LOG.info("inflight for : " + dest.getName() + ": " + stats.getInflight().getCount());
assertEquals("inflight for: " + dest.getName() + ": " + stats.getInflight().getCount() + " matches",
expectedCount, stats.getInflight().getCount());
if (greaterOrEqual) {
assertTrue("inflight for: " + dest.getName() + ": " + stats.getInflight().getCount() + " > " + stats.getInflight().getCount(),
stats.getInflight().getCount() >= expectedCount);
} else {
assertEquals("inflight for: " + dest.getName() + ": " + stats.getInflight().getCount() + " matches",
expectedCount, stats.getInflight().getCount());
}
}
}
}