This commit is contained in:
Rob Davies 2013-12-03 17:15:02 +00:00
parent 5fa462a08a
commit 07ec890372
9 changed files with 66 additions and 1 deletions

View File

@ -184,4 +184,16 @@ public class ProducerView implements ProducerViewMBean {
producerBrokerExchange.resetFlowControl();
}
}
@Override
public void resetStatistics() {
if (info != null){
info.getSentCount().reset();
}
}
@Override
public long getSentCount() {
return info != null ? info.getSentCount().getCount() :0;
}
}

View File

@ -96,6 +96,13 @@ public interface ProducerViewMBean {
@MBeanInfo("percentage of sends Producer Blocked for Flow Control")
int getPercentageBlocked();
@MBeanInfo("reset flow control stata")
@MBeanInfo("reset flow control state")
void resetFlowControlStats();
@MBeanInfo("Resets statistics.")
void resetStatistics();
@MBeanInfo("Messages consumed")
long getSentCount();
}

View File

@ -88,6 +88,8 @@ public class SubscriptionView implements SubscriptionViewMBean {
return result;
}
private ObjectName createConnectionQuery(ManagementContext ctx, String brokerName) throws IOException {
try {
return BrokerMBeanSupport.createConnectionQuery(ctx.getJmxDomainName(), brokerName, clientId);
@ -415,4 +417,16 @@ public class SubscriptionView implements SubscriptionViewMBean {
public String getUserName() {
return userName;
}
@Override
public void resetStatistics() {
if (subscription != null){
subscription.getConsumedCount().reset();
}
}
@Override
public long getConsumedCount() {
return subscription != null ? subscription.getConsumedCount().getCount() : 0;
}
}

View File

@ -242,4 +242,11 @@ public interface SubscriptionViewMBean {
@MBeanInfo("ObjectName of the Connection that created this Subscription")
ObjectName getConnection();
@MBeanInfo("Resets statistics.")
void resetStatistics();
@MBeanInfo("Messages consumed")
long getConsumedCount();
}

View File

@ -392,6 +392,9 @@ public abstract class AbstractRegion implements Region {
}
producerExchange.getRegionDestination().send(producerExchange, messageSend);
if (producerExchange.getProducerState() != null && producerExchange.getProducerState().getInfo() != null){
producerExchange.getProducerState().getInfo().getSentCount().increment();
}
}
public void acknowledge(ConsumerBrokerExchange consumerExchange, MessageAck ack) throws Exception {

View File

@ -36,6 +36,7 @@ import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.LogicExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.filter.NoLocalExpression;
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.selector.SelectorParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -53,6 +54,7 @@ public abstract class AbstractSubscription implements Subscription {
private int cursorMemoryHighWaterMark = 70;
private boolean slowConsumer;
private long lastAckTime;
private CountStatisticImpl consumedCount = new CountStatisticImpl("consumed","The number of messages consumed");
public AbstractSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException {
this.broker = broker;
@ -88,6 +90,7 @@ public abstract class AbstractSubscription implements Subscription {
@Override
public synchronized void acknowledge(final ConnectionContext context, final MessageAck ack) throws Exception {
this.lastAckTime = System.currentTimeMillis();
this.consumedCount.increment();
}
@Override
@ -276,4 +279,8 @@ public abstract class AbstractSubscription implements Subscription {
public void setTimeOfLastMessageAck(long value) {
this.lastAckTime = value;
}
public CountStatisticImpl getConsumedCount(){
return consumedCount;
}
}

View File

@ -30,6 +30,7 @@ import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.management.CountStatisticImpl;
/**
*
@ -234,4 +235,6 @@ public interface Subscription extends SubscriptionRecovery {
*/
long getTimeOfLastMessageAck();
CountStatisticImpl getConsumedCount();
}

View File

@ -16,6 +16,7 @@
*/
package org.apache.activemq.command;
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.state.CommandVisitor;
/**
@ -32,6 +33,7 @@ public class ProducerInfo extends BaseCommand {
protected BrokerId[] brokerPath;
protected boolean dispatchAsync;
protected int windowSize;
protected CountStatisticImpl sentCount = new CountStatisticImpl("sentCount","number of messages sent to a broker");
public ProducerInfo() {
}
@ -135,4 +137,8 @@ public class ProducerInfo extends BaseCommand {
this.windowSize = windowSize;
}
public CountStatisticImpl getSentCount(){
return sentCount;
}
}

View File

@ -44,6 +44,7 @@ import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.management.CountStatisticImpl;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
@ -339,6 +340,11 @@ public class QueueDuplicatesFromStoreTest extends TestCase {
public long getTimeOfLastMessageAck() {
return 0;
}
@Override
public CountStatisticImpl getConsumedCount() {
return null;
}
};
queue.addSubscription(contextNotInTx, subscription);