This closes #102

This commit is contained in:
Timothy Bish 2015-05-28 16:26:56 -04:00
commit df8dcb5040
8 changed files with 167 additions and 51 deletions

View File

@ -53,7 +53,7 @@ public abstract class AbstractSubscription implements Subscription {
private int cursorMemoryHighWaterMark = 70;
private boolean slowConsumer;
private long lastAckTime;
private AtomicLong consumedCount = new AtomicLong();
private final SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics();
public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
this.broker = broker;
@ -89,7 +89,7 @@ public abstract class AbstractSubscription implements Subscription {
@Override
public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
this.lastAckTime = System.currentTimeMillis();
this.consumedCount.incrementAndGet();
subscriptionStatistics.getConsumedCount().increment();
}
@Override
@ -285,14 +285,19 @@ public abstract class AbstractSubscription implements Subscription {
}
public long getConsumedCount(){
return consumedCount.get();
return subscriptionStatistics.getConsumedCount().getCount();
}
public void incrementConsumedCount(){
consumedCount.incrementAndGet();
subscriptionStatistics.getConsumedCount().increment();
}
public void resetConsumedCount(){
consumedCount.set(0);
subscriptionStatistics.getConsumedCount().reset();
}
@Override
public SubscriptionStatistics getSubscriptionStatistics() {
return subscriptionStatistics;
}
}

View File

@ -121,11 +121,11 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
if (active.get() || keepDurableSubsActive) {
Topic topic = (Topic) destination;
topic.activate(context, this);
this.enqueueCounter += pending.size();
getSubscriptionStatistics().getEnqueues().add(pending.size());
} else if (destination.getMessageStore() != null) {
TopicMessageStore store = (TopicMessageStore) destination.getMessageStore();
try {
this.enqueueCounter += store.getMessageCount(subscriptionKey.getClientId(), subscriptionKey.getSubscriptionName());
getSubscriptionStatistics().getEnqueues().add(store.getMessageCount(subscriptionKey.getClientId(), subscriptionKey.getSubscriptionName()));
} catch (IOException e) {
JMSException jmsEx = new JMSException("Failed to retrieve enqueueCount from store " + e);
jmsEx.setLinkedException(e);
@ -325,7 +325,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
@Override
public synchronized String toString() {
return "DurableTopicSubscription-" + getSubscriptionKey() + ", id=" + info.getConsumerId() + ", active=" + isActive() + ", destinations="
+ durableDestinations.size() + ", total=" + enqueueCounter + ", pending=" + getPendingQueueSize() + ", dispatched=" + dispatchCounter
+ durableDestinations.size() + ", total=" + getSubscriptionStatistics().getEnqueues().getCount() + ", pending=" + getPendingQueueSize() + ", dispatched=" + getSubscriptionStatistics().getDispatched().getCount()
+ ", inflight=" + dispatched.size() + ", prefetchExtension=" + getPrefetchExtension();
}

View File

@ -58,9 +58,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
protected final AtomicInteger prefetchExtension = new AtomicInteger();
protected boolean usePrefetchExtension = true;
protected long enqueueCounter;
protected long dispatchCounter;
protected long dequeueCounter;
private int maxProducersToAudit=32;
private int maxAuditDepth=2048;
protected final SystemUsage usageManager;
@ -94,7 +91,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
// consumers to 'wake them up' in case they were waiting for a message.
if (getPrefetchSize() == 0) {
prefetchExtension.set(pull.getQuantity());
final long dispatchCounterBeforePull = dispatchCounter;
final long dispatchCounterBeforePull = getSubscriptionStatistics().getDispatched().getCount();
// Have the destination push us some messages.
for (Destination dest : destinations) {
@ -104,7 +101,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
synchronized(this) {
// If there was nothing dispatched.. we may need to setup a timeout.
if (dispatchCounterBeforePull == dispatchCounter || pull.isAlwaysSignalDone()) {
if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount() || pull.isAlwaysSignalDone()) {
// immediate timeout used by receiveNoWait()
if (pull.getTimeout() == -1) {
// Null message indicates the pull is done or did not have pending.
@ -132,7 +129,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
*/
final void pullTimeout(long dispatchCounterBeforePull, boolean alwaysSignalDone) {
synchronized (pendingLock) {
if (dispatchCounterBeforePull == dispatchCounter || alwaysSignalDone) {
if (dispatchCounterBeforePull == getSubscriptionStatistics().getDispatched().getCount() || alwaysSignalDone) {
try {
prefetchExtension.set(1);
add(QueueMessageReference.NULL_MESSAGE);
@ -157,7 +154,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
// Don't increment for the pullTimeout control message.
if (!node.equals(QueueMessageReference.NULL_MESSAGE)) {
enqueueCounter++;
getSubscriptionStatistics().getEnqueues().increment();
}
pending.addMessageLast(node);
}
@ -227,7 +224,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (inAckRange) {
// Don't remove the nodes until we are committed.
if (!context.isInTransaction()) {
dequeueCounter++;
getSubscriptionStatistics().getDequeues().increment();
((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
removeList.add(node);
} else {
@ -257,7 +254,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (ack.getLastMessageId().equals(messageId)) {
// Don't remove the nodes until we are committed - immediateAck option
if (!context.isInTransaction()) {
dequeueCounter++;
getSubscriptionStatistics().getDequeues().increment();
((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
dispatched.remove(node);
} else {
@ -363,7 +360,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
nodeDest.getDestinationStatistics()
.getInflight().decrement();
removeList.add(node);
dequeueCounter++;
getSubscriptionStatistics().getDequeues().increment();
index++;
acknowledge(context, ack, node);
if (ack.getLastMessageId().equals(messageId)) {
@ -428,7 +425,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
throws Exception {
Destination nodeDest = (Destination) node.getRegionDestination();
synchronized(dispatchLock) {
dequeueCounter++;
getSubscriptionStatistics().getDequeues().increment();
dispatched.remove(node);
nodeDest.getDestinationStatistics().getInflight().decrement();
}
@ -550,17 +547,17 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
@Override
public long getDequeueCounter() {
return dequeueCounter;
return getSubscriptionStatistics().getDequeues().getCount();
}
@Override
public long getDispatchedCounter() {
return dispatchCounter;
return getSubscriptionStatistics().getDispatched().getCount();
}
@Override
public long getEnqueueCounter() {
return enqueueCounter;
return getSubscriptionStatistics().getEnqueues().getCount();
}
@Override
@ -695,7 +692,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
MessageDispatch md = createMessageDispatch(node, message);
if (node != QueueMessageReference.NULL_MESSAGE) {
dispatchCounter++;
getSubscriptionStatistics().getDispatched().increment();
dispatched.add(node);
}
if (getPrefetchSize() == 0) {
@ -724,7 +721,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (node != QueueMessageReference.NULL_MESSAGE) {
nodeDest.getDestinationStatistics().getDispatched().increment();
nodeDest.getDestinationStatistics().getInflight().increment();
LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), dispatchCounter, dispatched.size() });
LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() });
}
}
if (node instanceof QueueMessageReference) {
@ -746,7 +743,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (node != QueueMessageReference.NULL_MESSAGE) {
nodeDest.getDestinationStatistics().getDispatched().increment();
nodeDest.getDestinationStatistics().getInflight().increment();
LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), dispatchCounter, dispatched.size() });
LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() });
}
}

View File

@ -137,6 +137,8 @@ public interface Subscription extends SubscriptionRecovery {
*/
long getDequeueCounter();
SubscriptionStatistics getSubscriptionStatistics();
/**
* @return the JMS selector on the current subscription
*/

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.region;
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.management.StatsImpl;
/**
* The J2EE Statistics for a Subsription.
*/
public class SubscriptionStatistics extends StatsImpl {
protected CountStatisticImpl consumedCount;
protected CountStatisticImpl enqueues;
protected CountStatisticImpl dequeues;
protected CountStatisticImpl dispatched;
public SubscriptionStatistics() {
this(true);
}
public SubscriptionStatistics(boolean enabled) {
consumedCount = new CountStatisticImpl("consumedCount", "The number of messages that have been consumed by the subscription");
enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the subscription");
dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the subscription");
dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the subscription");
addStatistic("consumedCount", consumedCount);
addStatistic("enqueues", enqueues);
addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues);
this.setEnabled(enabled);
}
public CountStatisticImpl getConsumedCount() {
return consumedCount;
}
public CountStatisticImpl getEnqueues() {
return enqueues;
}
public CountStatisticImpl getDequeues() {
return dequeues;
}
public CountStatisticImpl getDispatched() {
return dispatched;
}
public void reset() {
if (this.isDoReset()) {
super.reset();
consumedCount.reset();
enqueues.reset();
dequeues.reset();
dispatched.reset();
}
}
public void setEnabled(boolean enabled) {
super.setEnabled(enabled);
consumedCount.setEnabled(enabled);
enqueues.setEnabled(enabled);
dispatched.setEnabled(enabled);
dequeues.setEnabled(enabled);
}
public void setParent(SubscriptionStatistics parent) {
if (parent != null) {
consumedCount.setParent(parent.consumedCount);
enqueues.setParent(parent.enqueues);
dispatched.setParent(parent.dispatched);
dequeues.setParent(parent.dequeues);
} else {
consumedCount.setParent(null);
enqueues.setParent(null);
dispatched.setParent(null);
dequeues.setParent(null);
}
}
}

View File

@ -53,8 +53,6 @@ public class TopicSubscription extends AbstractSubscription {
protected PendingMessageCursor matched;
protected final SystemUsage usageManager;
protected AtomicLong dispatchedCounter = new AtomicLong();
boolean singleDestination = true;
Destination destination;
private final Scheduler scheduler;
@ -63,8 +61,6 @@ public class TopicSubscription extends AbstractSubscription {
private MessageEvictionStrategy messageEvictionStrategy = new OldestMessageEvictionStrategy();
private int discarded;
private final Object matchedListMutex = new Object();
private final AtomicLong enqueueCounter = new AtomicLong(0);
private final AtomicLong dequeueCounter = new AtomicLong(0);
private final AtomicInteger prefetchExtension = new AtomicInteger(0);
private int memoryUsageHighWaterMark = 95;
// allow duplicate suppression in a ring network of brokers
@ -106,7 +102,7 @@ public class TopicSubscription extends AbstractSubscription {
// Lets use an indirect reference so that we can associate a unique
// locator /w the message.
node = new IndirectMessageReference(node.getMessage());
enqueueCounter.incrementAndGet();
getSubscriptionStatistics().getEnqueues().increment();
synchronized (matchedListMutex) {
// if this subscriber is already discarding a message, we don't want to add
// any more messages to it as those messages can only be advisories generated in the process,
@ -135,7 +131,7 @@ public class TopicSubscription extends AbstractSubscription {
while (matched.isFull()) {
if (getContext().getStopping().get()) {
LOG.warn("{}: stopped waiting for space in pendingMessage cursor for: {}", toString(), node.getMessageId());
enqueueCounter.decrementAndGet();
getSubscriptionStatistics().getEnqueues().decrement();
return;
}
if (!warnedAboutWait) {
@ -231,7 +227,7 @@ public class TopicSubscription extends AbstractSubscription {
node.decrementReferenceCount();
if (broker.isExpired(node)) {
matched.remove();
dispatchedCounter.incrementAndGet();
getSubscriptionStatistics().getDispatched().increment();
node.decrementReferenceCount();
((Destination)node.getRegionDestination()).getDestinationStatistics().getExpired().increment();
broker.messageExpired(getContext(), node, this);
@ -253,7 +249,7 @@ public class TopicSubscription extends AbstractSubscription {
node.decrementReferenceCount();
if (node.getMessageId().equals(mdn.getMessageId())) {
matched.remove();
dispatchedCounter.incrementAndGet();
getSubscriptionStatistics().getDispatched().increment();
node.decrementReferenceCount();
break;
}
@ -280,7 +276,7 @@ public class TopicSubscription extends AbstractSubscription {
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
}
}
dequeueCounter.addAndGet(ack.getMessageCount());
getSubscriptionStatistics().getDequeues().add(ack.getMessageCount());
dispatchMatched();
}
});
@ -292,7 +288,7 @@ public class TopicSubscription extends AbstractSubscription {
destination.getDestinationStatistics().getForwards().add(ack.getMessageCount());
}
}
dequeueCounter.addAndGet(ack.getMessageCount());
getSubscriptionStatistics().getDequeues().add(ack.getMessageCount());
}
while (true) {
int currentExtension = prefetchExtension.get();
@ -314,7 +310,7 @@ public class TopicSubscription extends AbstractSubscription {
destination.getDestinationStatistics().getExpired().add(ack.getMessageCount());
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
}
dequeueCounter.addAndGet(ack.getMessageCount());
getSubscriptionStatistics().getDequeues().add(ack.getMessageCount());
while (true) {
int currentExtension = prefetchExtension.get();
int newExtension = Math.max(0, currentExtension - ack.getMessageCount());
@ -337,12 +333,12 @@ public class TopicSubscription extends AbstractSubscription {
// The slave should not deliver pull messages.
if (getPrefetchSize() == 0) {
final long currentDispatchedCount = dispatchedCounter.get();
final long currentDispatchedCount = getSubscriptionStatistics().getDispatched().getCount();
prefetchExtension.set(pull.getQuantity());
dispatchMatched();
// If there was nothing dispatched.. we may need to setup a timeout.
if (currentDispatchedCount == dispatchedCounter.get() || pull.isAlwaysSignalDone()) {
if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount() || pull.isAlwaysSignalDone()) {
// immediate timeout used by receiveNoWait()
if (pull.getTimeout() == -1) {
@ -371,7 +367,7 @@ public class TopicSubscription extends AbstractSubscription {
*/
private final void pullTimeout(long currentDispatchedCount, boolean alwaysSendDone) {
synchronized (matchedListMutex) {
if (currentDispatchedCount == dispatchedCounter.get() || alwaysSendDone) {
if (currentDispatchedCount == getSubscriptionStatistics().getDispatched().getCount() || alwaysSendDone) {
try {
dispatch(null);
} catch (Exception e) {
@ -390,7 +386,8 @@ public class TopicSubscription extends AbstractSubscription {
@Override
public int getDispatchedQueueSize() {
return (int)(dispatchedCounter.get() - prefetchExtension.get() - dequeueCounter.get());
return (int)(getSubscriptionStatistics().getDispatched().getCount() -
prefetchExtension.get() - getSubscriptionStatistics().getDequeues().getCount());
}
public int getMaximumPendingMessages() {
@ -399,17 +396,17 @@ public class TopicSubscription extends AbstractSubscription {
@Override
public long getDispatchedCounter() {
return dispatchedCounter.get();
return getSubscriptionStatistics().getDispatched().getCount();
}
@Override
public long getEnqueueCounter() {
return enqueueCounter.get();
return getSubscriptionStatistics().getEnqueues().getCount();
}
@Override
public long getDequeueCounter() {
return dequeueCounter.get();
return getSubscriptionStatistics().getDequeues().getCount();
}
/**
@ -599,7 +596,7 @@ public class TopicSubscription extends AbstractSubscription {
md.setConsumerId(info.getConsumerId());
if (node != null) {
md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
dispatchedCounter.incrementAndGet();
getSubscriptionStatistics().getDispatched().increment();
// Keep track if this subscription is receiving messages from a single destination.
if (singleDestination) {
if (destination == null) {

View File

@ -32,6 +32,7 @@ import junit.framework.TestCase;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.SubscriptionStatistics;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
@ -139,6 +140,8 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
// pull from store in small windows
Subscription subscription = new Subscription() {
private SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics();
@Override
public void add(MessageReference node) throws Exception {
if (enqueueCounter.get() != node.getMessageId().getProducerSequenceId()) {
@ -358,6 +361,11 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
public void resetConsumedCount(){
}
@Override
public SubscriptionStatistics getSubscriptionStatistics() {
return subscriptionStatistics;
}
};
queue.addSubscription(contextNotInTx, subscription);

View File

@ -173,6 +173,7 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
public class SimpleImmediateDispatchSubscription implements Subscription, LockOwner {
private SubscriptionStatistics subscriptionStatistics = new SubscriptionStatistics();
List<MessageReference> dispatched =
Collections.synchronizedList(new ArrayList<MessageReference>());
@ -370,5 +371,10 @@ public class SubscriptionAddRemoveQueueTest extends TestCase {
return 10;
}
@Override
public SubscriptionStatistics getSubscriptionStatistics() {
return subscriptionStatistics;
}
}
}