https://issues.apache.org/jira/browse/AMQ-5289 - add forwardCount to destinationStatistics - allow local consumption to be accounted with dequeueCount - forwardCount so forwarded messages are not accounted for num hops times

This commit is contained in:
gtully 2014-07-25 11:46:36 +01:00
parent 619864dd42
commit 25e3c1b3c6
9 changed files with 57 additions and 3 deletions

View File

@ -89,6 +89,11 @@ public class DestinationView implements DestinationViewMBean {
return destination.getDestinationStatistics().getDequeues().getCount();
}
@Override
public long getForwardCount() {
return destination.getDestinationStatistics().getForwards().getCount();
}
@Override
public long getDispatchCount() {
return destination.getDestinationStatistics().getDispatched().getCount();

View File

@ -69,6 +69,16 @@ public interface DestinationViewMBean {
@MBeanInfo("Number of messages that have been acknowledged (and removed from) from the destination.")
long getDequeueCount();
/**
* Returns the number of messages that have been acknowledged by network subscriptions from the
* destination.
*
* @return The number of messages that have been acknowledged by network subscriptions from the
* destination.
*/
@MBeanInfo("Number of messages that have been forwarded (to a networked broker) from the destination.")
long getForwardCount();
/**
* Returns the number of messages that have been dispatched but not
* acknowledged

View File

@ -31,6 +31,7 @@ public class DestinationStatistics extends StatsImpl {
protected CountStatisticImpl enqueues;
protected CountStatisticImpl dequeues;
protected CountStatisticImpl forwards;
protected CountStatisticImpl consumers;
protected CountStatisticImpl producers;
protected CountStatisticImpl messages;
@ -49,6 +50,7 @@ public class DestinationStatistics extends StatsImpl {
enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
dispatched = new CountStatisticImpl("dispatched", "The number of messages that have been dispatched from the destination");
dequeues = new CountStatisticImpl("dequeues", "The number of messages that have been acknowledged from the destination");
forwards = new CountStatisticImpl("forwards", "The number of messages that have been forwarded to a networked broker from the destination");
inflight = new CountStatisticImpl("inflight", "The number of messages dispatched but awaiting acknowledgement");
expired = new CountStatisticImpl("expired", "The number of messages that have expired");
@ -86,6 +88,10 @@ public class DestinationStatistics extends StatsImpl {
return dequeues;
}
public CountStatisticImpl getForwards() {
return forwards;
}
public CountStatisticImpl getInflight() {
return inflight;
}
@ -137,6 +143,7 @@ public class DestinationStatistics extends StatsImpl {
super.reset();
enqueues.reset();
dequeues.reset();
forwards.reset();
dispatched.reset();
inflight.reset();
expired.reset();
@ -151,6 +158,7 @@ public class DestinationStatistics extends StatsImpl {
enqueues.setEnabled(enabled);
dispatched.setEnabled(enabled);
dequeues.setEnabled(enabled);
forwards.setEnabled(enabled);
inflight.setEnabled(enabled);
expired.setEnabled(true);
consumers.setEnabled(enabled);
@ -169,6 +177,7 @@ public class DestinationStatistics extends StatsImpl {
enqueues.setParent(parent.enqueues);
dispatched.setParent(parent.dispatched);
dequeues.setParent(parent.dequeues);
forwards.setParent(parent.forwards);
inflight.setParent(parent.inflight);
expired.setParent(parent.expired);
consumers.setParent(parent.consumers);
@ -183,6 +192,7 @@ public class DestinationStatistics extends StatsImpl {
enqueues.setParent(null);
dispatched.setParent(null);
dequeues.setParent(null);
forwards.setParent(null);
inflight.setParent(null);
expired.setParent(null);
consumers.setParent(null);

View File

@ -1810,6 +1810,9 @@ public class Queue extends BaseDestination implements Task, UsageListener {
} finally {
messagesLock.writeLock().unlock();
}
if (sub != null && sub.getConsumerInfo().isNetworkSubscription()) {
getDestinationStatistics().getForwards().increment();
}
}
}

View File

@ -285,6 +285,9 @@ public class TopicSubscription extends AbstractSubscription {
if (singleDestination && destination != null) {
destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
if (info.isNetworkSubscription()) {
destination.getDestinationStatistics().getForwards().add(ack.getMessageCount());
}
}
dequeueCounter.addAndGet(ack.getMessageCount());
}

View File

@ -95,7 +95,7 @@ public class DstatCommand extends AbstractJmxCommand {
// sort list so the names is A..Z
Collections.sort(queueList, new ObjectInstanceComparator());
context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Memory %"));
context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Forward #", "Memory %"));
// Iterate through the queue result
for (Object view : queueList) {
@ -115,6 +115,7 @@ public class DstatCommand extends AbstractJmxCommand {
queueView.getConsumerCount(),
queueView.getEnqueueCount(),
queueView.getDequeueCount(),
queueView.getForwardCount(),
queueView.getMemoryPercentUsage()));
}
}
@ -128,7 +129,7 @@ public class DstatCommand extends AbstractJmxCommand {
final String header = "%-50s %10s %10s %10s %10s %10s %10s";
final String tableRow = "%-50s %10d %10d %10d %10d %10d %10d";
context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Memory %"));
context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Forward #", "Memory %"));
Collections.sort(queueList, new ObjectInstanceComparator());
@ -150,6 +151,7 @@ public class DstatCommand extends AbstractJmxCommand {
queueView.getConsumerCount(),
queueView.getEnqueueCount(),
queueView.getDequeueCount(),
queueView.getForwardCount(),
queueView.getMemoryPercentUsage()));
}
}
@ -166,7 +168,7 @@ public class DstatCommand extends AbstractJmxCommand {
// sort list so the names is A..Z
Collections.sort(topicsList, new ObjectInstanceComparator());
context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Memory %"));
context.print(String.format(Locale.US, header, "Name", "Queue Size", "Producer #", "Consumer #", "Enqueue #", "Dequeue #", "Forward #", "Memory %"));
// Iterate through the topics result
for (Object view : topicsList) {
@ -186,6 +188,7 @@ public class DstatCommand extends AbstractJmxCommand {
topicView.getConsumerCount(),
topicView.getEnqueueCount(),
topicView.getDequeueCount(),
topicView.getForwardCount(),
topicView.getMemoryPercentUsage()));
}
}

View File

@ -169,6 +169,7 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertEquals("old dest has no memory usage", 0, queue.getMemoryPercentUsage());
assertTrue("use cache", queueNew.isUseCache());
assertTrue("cache enabled", queueNew.isCacheEnabled());
assertEquals("no forwards", 0, queueNew.getForwardCount());
}
public void testRemoveMessages() throws Exception {

View File

@ -21,6 +21,7 @@ import javax.jms.DeliveryMode;
import junit.framework.Test;
import org.apache.activemq.broker.StubConnection;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
@ -72,6 +73,11 @@ public class DemandForwardingBridgeTest extends NetworkTestSupport {
// Close consumer to cause the message to rollback.
connection1.send(consumerInfo1.createRemoveCommand());
final DestinationStatistics destinationStatistics = broker.getDestination(destination).getDestinationStatistics();
assertEquals("broker dest stat dispatched", 1, destinationStatistics.getDispatched().getCount());
assertEquals("broker dest stat dequeues", 0, destinationStatistics.getDequeues().getCount());
assertEquals("broker dest stat forwards", 0, destinationStatistics.getForwards().getCount());
// Now create remote consumer that should cause message to move to this
// remote consumer.
ConsumerInfo consumerInfo2 = createConsumerInfo(sessionInfo2, destination);
@ -84,6 +90,15 @@ public class DemandForwardingBridgeTest extends NetworkTestSupport {
return receiveMessage(connection2) != null;
}
}));
assertTrue("broker dest stat forwards", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 1 == destinationStatistics.getForwards().getCount();
}
}));
assertEquals("broker dest stat dequeues", 1, destinationStatistics.getDequeues().getCount());
assertEquals("remote broker dest stat dequeues", 1, remoteBroker.getDestination(destination).getDestinationStatistics().getDequeues().getCount());
}
public void initCombosForTestAddConsumerThenSend() {

View File

@ -31,6 +31,7 @@ import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.util.MessageIdList;
/**
@ -77,6 +78,9 @@ public class ThreeBrokerTopicNetworkTest extends JmsMultipleBrokersTestSupport {
assertEquals(MESSAGE_COUNT, msgsA.getMessageCount());
assertEquals(MESSAGE_COUNT * 2, msgsB.getMessageCount());
assertEquals(MESSAGE_COUNT * 2, msgsC.getMessageCount());
assertEquals("Correct forwards from A", MESSAGE_COUNT,
brokers.get("BrokerA").broker.getDestination(ActiveMQDestination.transform(dest)).getDestinationStatistics().getForwards().getCount());
}
public void initCombosForTestABandBCbrokerNetworkWithSelectors() {