mirror of https://github.com/apache/activemq.git
[AMQ-9426] Add destination AdvancedStatisticsEnabled flag and networkEnqueue/networkDequeue stats
This commit is contained in:
parent
e45ee4aae5
commit
0ddd20f650
|
@ -190,9 +190,20 @@ public class TransactionBroker extends BrokerFilter {
|
|||
dest.clearPendingMessages(opCount);
|
||||
dest.getDestinationStatistics().getEnqueues().add(opCount);
|
||||
dest.getDestinationStatistics().getMessages().add(opCount);
|
||||
|
||||
if(dest.isAdvancedStatisticsEnabled()) {
|
||||
if(transactionBroker.context.isNetworkConnection()) {
|
||||
dest.getDestinationStatistics().getNetworkEnqueues().add(opCount);
|
||||
}
|
||||
}
|
||||
LOG.debug("cleared pending from afterCommit: {}", destination);
|
||||
} else {
|
||||
dest.getDestinationStatistics().getDequeues().add(opCount);
|
||||
if(dest.isAdvancedStatisticsEnabled()) {
|
||||
if(transactionBroker.context.isNetworkConnection()) {
|
||||
dest.getDestinationStatistics().getNetworkDequeues().add(opCount);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -599,4 +599,25 @@ public class DestinationView implements DestinationViewMBean {
|
|||
public long getMaxUncommittedExceededCount() {
|
||||
return destination.getDestinationStatistics().getMaxUncommittedExceededCount().getCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAdvancedStatisticsEnabled() {
|
||||
return destination.isAdvancedStatisticsEnabled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled) {
|
||||
destination.setAdvancedStatisticsEnabled(advancedStatisticsEnabled);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNetworkEnqueues() {
|
||||
return destination.getDestinationStatistics().getNetworkEnqueues().getCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getNetworkDequeues() {
|
||||
return destination.getDestinationStatistics().getNetworkDequeues().getCount();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -481,4 +481,16 @@ public interface DestinationViewMBean {
|
|||
|
||||
@MBeanInfo("Number of times the max uncommitted limit has been exceed for this destination")
|
||||
long getMaxUncommittedExceededCount();
|
||||
|
||||
@MBeanInfo("Query Advanced Statistics flag")
|
||||
boolean isAdvancedStatisticsEnabled();
|
||||
|
||||
@MBeanInfo("Toggle Advanced Statistics flag")
|
||||
void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled);
|
||||
|
||||
@MBeanInfo("Number of messages sent to the destination via network connection")
|
||||
long getNetworkEnqueues();
|
||||
|
||||
@MBeanInfo("Number of messages acknowledged from the destination via network connection")
|
||||
long getNetworkDequeues();
|
||||
}
|
||||
|
|
|
@ -110,6 +110,8 @@ public abstract class BaseDestination implements Destination {
|
|||
protected final Scheduler scheduler;
|
||||
private boolean disposed = false;
|
||||
private boolean doOptimzeMessageStorage = true;
|
||||
private boolean advancedStatisticsEnabled = false;
|
||||
|
||||
/*
|
||||
* percentage of in-flight messages above which optimize message store is disabled
|
||||
*/
|
||||
|
@ -868,6 +870,15 @@ public abstract class BaseDestination implements Destination {
|
|||
this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAdvancedStatisticsEnabled() {
|
||||
return this.advancedStatisticsEnabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled) {
|
||||
this.advancedStatisticsEnabled = advancedStatisticsEnabled;
|
||||
}
|
||||
|
||||
@Override
|
||||
public abstract List<Subscription> getConsumers();
|
||||
|
|
|
@ -258,4 +258,10 @@ public interface Destination extends Service, Task, Message.MessageDestination {
|
|||
boolean isSendDuplicateFromStoreToDLQ();
|
||||
|
||||
void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ);
|
||||
|
||||
// [AMQ-9437]
|
||||
boolean isAdvancedStatisticsEnabled();
|
||||
|
||||
void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled);
|
||||
|
||||
}
|
||||
|
|
|
@ -409,6 +409,16 @@ public class DestinationFilter implements Destination {
|
|||
next.setSendDuplicateFromStoreToDLQ(sendDuplicateFromStoreToDLQ);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAdvancedStatisticsEnabled() {
|
||||
return next.isAdvancedStatisticsEnabled();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled) {
|
||||
next.setAdvancedStatisticsEnabled(advancedStatisticsEnabled);
|
||||
}
|
||||
|
||||
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
|
||||
if (next instanceof DestinationFilter) {
|
||||
DestinationFilter filter = (DestinationFilter) next;
|
||||
|
|
|
@ -46,6 +46,10 @@ public class DestinationStatistics extends StatsImpl {
|
|||
protected SizeStatisticImpl messageSize;
|
||||
protected CountStatisticImpl maxUncommittedExceededCount;
|
||||
|
||||
// [AMQ-9437] Advanced Statistics are optionally enabled
|
||||
protected CountStatisticImpl networkEnqueues;
|
||||
protected CountStatisticImpl networkDequeues;
|
||||
|
||||
public DestinationStatistics() {
|
||||
|
||||
enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
|
||||
|
@ -68,6 +72,10 @@ public class DestinationStatistics extends StatsImpl {
|
|||
blockedTime = new TimeStatisticImpl("blockedTime","amount of time messages are blocked for flow control");
|
||||
messageSize = new SizeStatisticImpl("messageSize","Size of messages passing through the destination");
|
||||
maxUncommittedExceededCount = new CountStatisticImpl("maxUncommittedExceededCount", "number of times maxUncommittedCount has been exceeded");
|
||||
|
||||
networkEnqueues = new CountStatisticImpl("networkEnqueues", "The number of messages that have been sent to the destination via network connection");
|
||||
networkDequeues = new CountStatisticImpl("networkDequeues", "The number of messages that have been acknowledged from the destination via network connection");
|
||||
|
||||
addStatistic("enqueues", enqueues);
|
||||
addStatistic("dispatched", dispatched);
|
||||
addStatistic("dequeues", dequeues);
|
||||
|
@ -83,6 +91,9 @@ public class DestinationStatistics extends StatsImpl {
|
|||
addStatistic("blockedTime",blockedTime);
|
||||
addStatistic("messageSize",messageSize);
|
||||
addStatistic("maxUncommittedExceededCount", maxUncommittedExceededCount);
|
||||
|
||||
addStatistic("networkEnqueues", networkEnqueues);
|
||||
addStatistic("networkDequeues", networkDequeues);
|
||||
}
|
||||
|
||||
public CountStatisticImpl getEnqueues() {
|
||||
|
@ -151,6 +162,14 @@ public class DestinationStatistics extends StatsImpl {
|
|||
return this.maxUncommittedExceededCount;
|
||||
}
|
||||
|
||||
public CountStatisticImpl getNetworkEnqueues() {
|
||||
return networkEnqueues;
|
||||
}
|
||||
|
||||
public CountStatisticImpl getNetworkDequeues() {
|
||||
return networkDequeues;
|
||||
}
|
||||
|
||||
public void reset() {
|
||||
if (this.isDoReset()) {
|
||||
super.reset();
|
||||
|
@ -165,6 +184,8 @@ public class DestinationStatistics extends StatsImpl {
|
|||
blockedTime.reset();
|
||||
messageSize.reset();
|
||||
maxUncommittedExceededCount.reset();
|
||||
networkEnqueues.reset();
|
||||
networkDequeues.reset();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -187,6 +208,9 @@ public class DestinationStatistics extends StatsImpl {
|
|||
messageSize.setEnabled(enabled);
|
||||
maxUncommittedExceededCount.setEnabled(enabled);
|
||||
|
||||
// [AMQ-9437] Advanced Statistics
|
||||
networkEnqueues.setEnabled(enabled);
|
||||
networkDequeues.setEnabled(enabled);
|
||||
}
|
||||
|
||||
public void setParent(DestinationStatistics parent) {
|
||||
|
@ -207,6 +231,8 @@ public class DestinationStatistics extends StatsImpl {
|
|||
blockedTime.setParent(parent.blockedTime);
|
||||
messageSize.setParent(parent.messageSize);
|
||||
maxUncommittedExceededCount.setParent(parent.maxUncommittedExceededCount);
|
||||
networkEnqueues.setParent(parent.networkEnqueues);
|
||||
networkDequeues.setParent(parent.networkDequeues);
|
||||
} else {
|
||||
enqueues.setParent(null);
|
||||
dispatched.setParent(null);
|
||||
|
@ -224,6 +250,8 @@ public class DestinationStatistics extends StatsImpl {
|
|||
blockedTime.setParent(null);
|
||||
messageSize.setParent(null);
|
||||
maxUncommittedExceededCount.setParent(null);
|
||||
networkEnqueues.setParent(null);
|
||||
networkDequeues.setParent(null);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -371,6 +371,9 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
|
|||
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
|
||||
if (info.isNetworkSubscription()) {
|
||||
((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount());
|
||||
if(((Destination)node.getRegionDestination()).isAdvancedStatisticsEnabled()) {
|
||||
((Destination)node.getRegionDestination()).getDestinationStatistics().getNetworkDequeues().add(ack.getMessageCount());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1873,7 +1873,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
|||
// This sends the ack the the journal..
|
||||
if (!ack.isInTransaction()) {
|
||||
acknowledge(context, sub, ack, reference);
|
||||
dropMessage(reference);
|
||||
dropMessage(context, reference);
|
||||
} else {
|
||||
try {
|
||||
acknowledge(context, sub, ack, reference);
|
||||
|
@ -1882,7 +1882,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
|||
|
||||
@Override
|
||||
public void afterCommit() throws Exception {
|
||||
dropMessage(reference);
|
||||
dropMessage(context, reference);
|
||||
wakeup();
|
||||
}
|
||||
|
||||
|
@ -1910,11 +1910,18 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
|||
reference.setAcked(true);
|
||||
}
|
||||
|
||||
private void dropMessage(QueueMessageReference reference) {
|
||||
private void dropMessage(ConnectionContext context, QueueMessageReference reference) {
|
||||
//use dropIfLive so we only process the statistics at most one time
|
||||
if (reference.dropIfLive()) {
|
||||
getDestinationStatistics().getDequeues().increment();
|
||||
getDestinationStatistics().getMessages().decrement();
|
||||
|
||||
if(isAdvancedStatisticsEnabled()) {
|
||||
if(context.getConnection().isNetworkConnection()) {
|
||||
getDestinationStatistics().getNetworkDequeues().increment();
|
||||
}
|
||||
}
|
||||
|
||||
pagedInMessagesLock.writeLock().lock();
|
||||
try {
|
||||
pagedInMessages.remove(reference);
|
||||
|
@ -1969,6 +1976,13 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
|||
destinationStatistics.getEnqueues().increment();
|
||||
destinationStatistics.getMessages().increment();
|
||||
destinationStatistics.getMessageSize().addSize(msg.getSize());
|
||||
|
||||
if(isAdvancedStatisticsEnabled()) {
|
||||
if(context.getConnection().isNetworkConnection()) {
|
||||
destinationStatistics.getNetworkEnqueues().increment();
|
||||
}
|
||||
}
|
||||
|
||||
messageDelivered(context, msg);
|
||||
consumersLock.readLock().lock();
|
||||
try {
|
||||
|
@ -2115,7 +2129,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
|||
LOG.warn("{}, duplicate message {} - {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessageId(), ref.getMessage().getMessageId().getFutureOrSequenceLong());
|
||||
if (store != null) {
|
||||
ConnectionContext connectionContext = createConnectionContext();
|
||||
dropMessage(ref);
|
||||
dropMessage(connectionContext, ref);
|
||||
if (gotToTheStore(ref.getMessage())) {
|
||||
LOG.debug("Duplicate message {} from cursor, removing from store", ref.getMessage());
|
||||
store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POISON_ACK_TYPE, 1));
|
||||
|
|
|
@ -449,6 +449,9 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
destination.getDestinationStatistics().getInflight().subtract(count);
|
||||
if (info.isNetworkSubscription()) {
|
||||
destination.getDestinationStatistics().getForwards().add(count);
|
||||
if(destination.isAdvancedStatisticsEnabled()) {
|
||||
destination.getDestinationStatistics().getNetworkDequeues().add(count);
|
||||
}
|
||||
}
|
||||
if (ack.isExpiredAck()) {
|
||||
destination.getDestinationStatistics().getExpired().add(count);
|
||||
|
@ -746,6 +749,9 @@ public class TopicSubscription extends AbstractSubscription {
|
|||
matched.remove(message);
|
||||
if (destination != null) {
|
||||
destination.getDestinationStatistics().getDequeues().increment();
|
||||
if(destination.isAdvancedStatisticsEnabled()) {
|
||||
destination.getDestinationStatistics().getNetworkDequeues().increment();
|
||||
}
|
||||
}
|
||||
Destination dest = (Destination) message.getRegionDestination();
|
||||
if (dest != null) {
|
||||
|
|
|
@ -106,7 +106,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
private boolean doOptimzeMessageStorage = true;
|
||||
private int maxDestinations = -1;
|
||||
private boolean useTopicSubscriptionInflightStats = true;
|
||||
|
||||
private boolean advancedStatisticsEnabled = false; // [AMQ-9437]
|
||||
/*
|
||||
* percentage of in-flight messages above which optimize message store is disabled
|
||||
*/
|
||||
|
@ -306,6 +306,9 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
if (isUpdate("sendDuplicateFromStoreToDLQ", includedProperties)) {
|
||||
destination.setSendDuplicateFromStoreToDLQ(isSendDuplicateFromStoreToDLQ());
|
||||
}
|
||||
if (isUpdate("advancedStatisticsEnabled", includedProperties)) {
|
||||
destination.setAdvancedStatisticsEnabled(isAdvancedStatisticsEnabled());
|
||||
}
|
||||
}
|
||||
|
||||
public void baseConfiguration(Broker broker, BaseDestination destination) {
|
||||
|
@ -1175,5 +1178,13 @@ public class PolicyEntry extends DestinationMapEntry {
|
|||
|
||||
public MessageInterceptorStrategy getMessageInterceptorStrategy() {
|
||||
return this.messageInterceptorStrategy;
|
||||
}
|
||||
|
||||
public boolean isAdvancedStatisticsEnabled() {
|
||||
return this.advancedStatisticsEnabled;
|
||||
}
|
||||
|
||||
public void setAdvancedStatisticsEnabled(boolean advancedStatisticsEnabled) {
|
||||
this.advancedStatisticsEnabled = advancedStatisticsEnabled;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue