From b44074f5f77e56c7b0c7e958779b644646454c04 Mon Sep 17 00:00:00 2001 From: Matt Pavlovich Date: Fri, 23 Feb 2024 08:51:17 -0600 Subject: [PATCH] [AMQ-9437] AdvancedDestination statistics networkEnqueue and networkDequeue counters --- .../activemq/broker/TransactionBroker.java | 7 + .../activemq/broker/jmx/DestinationView.java | 21 +++ .../broker/jmx/DestinationViewMBean.java | 12 ++ .../broker/region/BaseDestination.java | 11 ++ .../activemq/broker/region/Destination.java | 6 + .../broker/region/DestinationFilter.java | 10 ++ .../broker/region/DestinationStatistics.java | 28 +++ .../region/DurableTopicSubscription.java | 3 + .../apache/activemq/broker/region/Queue.java | 18 +- .../apache/activemq/broker/region/Topic.java | 5 + .../broker/region/TopicSubscription.java | 6 + .../broker/region/policy/PolicyEntry.java | 13 +- .../org/apache/activemq/PolicyEntryTest.java | 15 ++ ...t-policy-advancedNetworkStatistics-mod.xml | 36 ++++ ...yTest-policy-advancedNetworkStatistics.xml | 36 ++++ .../NetworkAdvancedStatisticsTest.java | 167 ++++++++++++++++++ .../localBroker-advancedNetworkStatistics.xml | 63 +++++++ ...remoteBroker-advancedNetworkStatistics.xml | 49 +++++ 18 files changed, 501 insertions(+), 5 deletions(-) create mode 100644 activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics-mod.xml create mode 100644 activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics.xml create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java create mode 100644 activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml create mode 100644 activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java index 12e9f7742a..c8af4a6144 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransactionBroker.java @@ -190,9 +190,16 @@ public class TransactionBroker extends BrokerFilter { dest.clearPendingMessages(opCount); dest.getDestinationStatistics().getEnqueues().add(opCount); dest.getDestinationStatistics().getMessages().add(opCount); + + if(dest.isAdvancedNetworkStatisticsEnabled() && transactionBroker.context != null && transactionBroker.context.isNetworkConnection()) { + dest.getDestinationStatistics().getNetworkEnqueues().add(opCount); + } LOG.debug("cleared pending from afterCommit: {}", destination); } else { dest.getDestinationStatistics().getDequeues().add(opCount); + if(dest.isAdvancedNetworkStatisticsEnabled() && transactionBroker.context != null && transactionBroker.context.isNetworkConnection()) { + dest.getDestinationStatistics().getNetworkDequeues().add(opCount); + } } } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java index 8abcc67163..02a7f65057 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationView.java @@ -599,4 +599,25 @@ public class DestinationView implements DestinationViewMBean { public long getMaxUncommittedExceededCount() { return destination.getDestinationStatistics().getMaxUncommittedExceededCount().getCount(); } + + @Override + public boolean isAdvancedNetworkStatisticsEnabled() { + return destination.isAdvancedNetworkStatisticsEnabled(); + } + + @Override + public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) { + destination.setAdvancedNetworkStatisticsEnabled(advancedNetworkStatisticsEnabled); + } + + @Override + public long getNetworkEnqueues() { + return destination.getDestinationStatistics().getNetworkEnqueues().getCount(); + } + + @Override + public long getNetworkDequeues() { + return destination.getDestinationStatistics().getNetworkDequeues().getCount(); + } + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java index 45ed51b994..328ddb09f0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/DestinationViewMBean.java @@ -481,4 +481,16 @@ public interface DestinationViewMBean { @MBeanInfo("Number of times the max uncommitted limit has been exceed for this destination") long getMaxUncommittedExceededCount(); + + @MBeanInfo("Query Advanced Network Statistics flag") + boolean isAdvancedNetworkStatisticsEnabled(); + + @MBeanInfo("Toggle Advanced Network Statistics flag") + void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled); + + @MBeanInfo("Number of messages sent to the destination via network connection") + long getNetworkEnqueues(); + + @MBeanInfo("Number of messages acknowledged from the destination via network connection") + long getNetworkDequeues(); } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java index 4ca3913c74..e34f23a4dd 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/BaseDestination.java @@ -110,6 +110,8 @@ public abstract class BaseDestination implements Destination { protected final Scheduler scheduler; private boolean disposed = false; private boolean doOptimzeMessageStorage = true; + private boolean advancedNetworkStatisticsEnabled = false; + /* * percentage of in-flight messages above which optimize message store is disabled */ @@ -868,6 +870,15 @@ public abstract class BaseDestination implements Destination { this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit; } + @Override + public boolean isAdvancedNetworkStatisticsEnabled() { + return this.advancedNetworkStatisticsEnabled; + } + + @Override + public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) { + this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled; + } @Override public abstract List getConsumers(); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java index 70f807be86..45e3de7b3c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Destination.java @@ -258,4 +258,10 @@ public interface Destination extends Service, Task, Message.MessageDestination { boolean isSendDuplicateFromStoreToDLQ(); void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ); + + // [AMQ-9437] + boolean isAdvancedNetworkStatisticsEnabled(); + + void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled); + } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java index 6b288a234f..85ef367a77 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java @@ -409,6 +409,16 @@ public class DestinationFilter implements Destination { next.setSendDuplicateFromStoreToDLQ(sendDuplicateFromStoreToDLQ); } + @Override + public boolean isAdvancedNetworkStatisticsEnabled() { + return next.isAdvancedNetworkStatisticsEnabled(); + } + + @Override + public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) { + next.setAdvancedNetworkStatisticsEnabled(advancedNetworkStatisticsEnabled); + } + public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception { if (next instanceof DestinationFilter) { DestinationFilter filter = (DestinationFilter) next; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java index 9d30c622f3..dc6b17dfaf 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java @@ -46,6 +46,10 @@ public class DestinationStatistics extends StatsImpl { protected SizeStatisticImpl messageSize; protected CountStatisticImpl maxUncommittedExceededCount; + // [AMQ-9437] Advanced Statistics are optionally enabled + protected CountStatisticImpl networkEnqueues; + protected CountStatisticImpl networkDequeues; + public DestinationStatistics() { enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination"); @@ -68,6 +72,10 @@ public class DestinationStatistics extends StatsImpl { blockedTime = new TimeStatisticImpl("blockedTime","amount of time messages are blocked for flow control"); messageSize = new SizeStatisticImpl("messageSize","Size of messages passing through the destination"); maxUncommittedExceededCount = new CountStatisticImpl("maxUncommittedExceededCount", "number of times maxUncommittedCount has been exceeded"); + + networkEnqueues = new CountStatisticImpl("networkEnqueues", "The number of messages that have been sent to the destination via network connection"); + networkDequeues = new CountStatisticImpl("networkDequeues", "The number of messages that have been acknowledged from the destination via network connection"); + addStatistic("enqueues", enqueues); addStatistic("dispatched", dispatched); addStatistic("dequeues", dequeues); @@ -83,6 +91,9 @@ public class DestinationStatistics extends StatsImpl { addStatistic("blockedTime",blockedTime); addStatistic("messageSize",messageSize); addStatistic("maxUncommittedExceededCount", maxUncommittedExceededCount); + + addStatistic("networkEnqueues", networkEnqueues); + addStatistic("networkDequeues", networkDequeues); } public CountStatisticImpl getEnqueues() { @@ -151,6 +162,14 @@ public class DestinationStatistics extends StatsImpl { return this.maxUncommittedExceededCount; } + public CountStatisticImpl getNetworkEnqueues() { + return networkEnqueues; + } + + public CountStatisticImpl getNetworkDequeues() { + return networkDequeues; + } + public void reset() { if (this.isDoReset()) { super.reset(); @@ -165,6 +184,8 @@ public class DestinationStatistics extends StatsImpl { blockedTime.reset(); messageSize.reset(); maxUncommittedExceededCount.reset(); + networkEnqueues.reset(); + networkDequeues.reset(); } } @@ -187,6 +208,9 @@ public class DestinationStatistics extends StatsImpl { messageSize.setEnabled(enabled); maxUncommittedExceededCount.setEnabled(enabled); + // [AMQ-9437] Advanced Statistics + networkEnqueues.setEnabled(enabled); + networkDequeues.setEnabled(enabled); } public void setParent(DestinationStatistics parent) { @@ -207,6 +231,8 @@ public class DestinationStatistics extends StatsImpl { blockedTime.setParent(parent.blockedTime); messageSize.setParent(parent.messageSize); maxUncommittedExceededCount.setParent(parent.maxUncommittedExceededCount); + networkEnqueues.setParent(parent.networkEnqueues); + networkDequeues.setParent(parent.networkDequeues); } else { enqueues.setParent(null); dispatched.setParent(null); @@ -224,6 +250,8 @@ public class DestinationStatistics extends StatsImpl { blockedTime.setParent(null); messageSize.setParent(null); maxUncommittedExceededCount.setParent(null); + networkEnqueues.setParent(null); + networkDequeues.setParent(null); } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index e0dc6d0f07..6946a33fa5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -371,6 +371,9 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment(); if (info.isNetworkSubscription()) { ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount()); + if(((Destination)node.getRegionDestination()).isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) { + ((Destination)node.getRegionDestination()).getDestinationStatistics().getNetworkDequeues().add(ack.getMessageCount()); + } } } diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 6502a20633..0ed6763f7b 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -1873,7 +1873,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index // This sends the ack the the journal.. if (!ack.isInTransaction()) { acknowledge(context, sub, ack, reference); - dropMessage(reference); + dropMessage(context, reference); } else { try { acknowledge(context, sub, ack, reference); @@ -1882,7 +1882,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index @Override public void afterCommit() throws Exception { - dropMessage(reference); + dropMessage(context, reference); wakeup(); } @@ -1910,11 +1910,16 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index reference.setAcked(true); } - private void dropMessage(QueueMessageReference reference) { + private void dropMessage(ConnectionContext context, QueueMessageReference reference) { //use dropIfLive so we only process the statistics at most one time if (reference.dropIfLive()) { getDestinationStatistics().getDequeues().increment(); getDestinationStatistics().getMessages().decrement(); + + if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) { + getDestinationStatistics().getNetworkDequeues().increment(); + } + pagedInMessagesLock.writeLock().lock(); try { pagedInMessages.remove(reference); @@ -1969,6 +1974,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index destinationStatistics.getEnqueues().increment(); destinationStatistics.getMessages().increment(); destinationStatistics.getMessageSize().addSize(msg.getSize()); + + if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) { + destinationStatistics.getNetworkEnqueues().increment(); + } + messageDelivered(context, msg); consumersLock.readLock().lock(); try { @@ -2115,7 +2125,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index LOG.warn("{}, duplicate message {} - {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessageId(), ref.getMessage().getMessageId().getFutureOrSequenceLong()); if (store != null) { ConnectionContext connectionContext = createConnectionContext(); - dropMessage(ref); + dropMessage(connectionContext, ref); if (gotToTheStore(ref.getMessage())) { LOG.debug("Duplicate message {} from cursor, removing from store", ref.getMessage()); store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POISON_ACK_TYPE, 1)); diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java index cad0d3b883..a9e07874e0 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -778,6 +778,11 @@ public class Topic extends BaseDestination implements Task { // misleading metrics. // destinationStatistics.getMessages().increment(); destinationStatistics.getEnqueues().increment(); + + if(isAdvancedNetworkStatisticsEnabled() && context != null && context.isNetworkConnection()) { + destinationStatistics.getNetworkEnqueues().increment(); + } + destinationStatistics.getMessageSize().addSize(message.getSize()); MessageEvaluationContext msgContext = null; diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java index a5b97241b4..4403dea6b5 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java @@ -449,6 +449,9 @@ public class TopicSubscription extends AbstractSubscription { destination.getDestinationStatistics().getInflight().subtract(count); if (info.isNetworkSubscription()) { destination.getDestinationStatistics().getForwards().add(count); + if(destination.isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) { + destination.getDestinationStatistics().getNetworkDequeues().add(count); + } } if (ack.isExpiredAck()) { destination.getDestinationStatistics().getExpired().add(count); @@ -746,6 +749,9 @@ public class TopicSubscription extends AbstractSubscription { matched.remove(message); if (destination != null) { destination.getDestinationStatistics().getDequeues().increment(); + if(destination.isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) { + destination.getDestinationStatistics().getNetworkDequeues().increment(); + } } Destination dest = (Destination) message.getRegionDestination(); if (dest != null) { diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 7230957022..e33f13b48c 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -106,7 +106,7 @@ public class PolicyEntry extends DestinationMapEntry { private boolean doOptimzeMessageStorage = true; private int maxDestinations = -1; private boolean useTopicSubscriptionInflightStats = true; - + private boolean advancedNetworkStatisticsEnabled = false; // [AMQ-9437] /* * percentage of in-flight messages above which optimize message store is disabled */ @@ -306,6 +306,9 @@ public class PolicyEntry extends DestinationMapEntry { if (isUpdate("sendDuplicateFromStoreToDLQ", includedProperties)) { destination.setSendDuplicateFromStoreToDLQ(isSendDuplicateFromStoreToDLQ()); } + if (isUpdate("advancedNetworkStatisticsEnabled", includedProperties)) { + destination.setAdvancedNetworkStatisticsEnabled(isAdvancedNetworkStatisticsEnabled()); + } } public void baseConfiguration(Broker broker, BaseDestination destination) { @@ -1175,5 +1178,13 @@ public class PolicyEntry extends DestinationMapEntry { public MessageInterceptorStrategy getMessageInterceptorStrategy() { return this.messageInterceptorStrategy; + } + + public boolean isAdvancedNetworkStatisticsEnabled() { + return this.advancedNetworkStatisticsEnabled; + } + + public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) { + this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled; } } diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java index 0f46e089e1..53341c3a53 100644 --- a/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/PolicyEntryTest.java @@ -57,6 +57,18 @@ public class PolicyEntryTest extends RuntimeConfigTestSupport { verifyBooleanField("AMQ.8397", "sendDuplicateFromStoreToDLQ", true); } + @Test + public void testModAdvancedNetworkStatistics() throws Exception { + final String brokerConfig = configurationSeed + "-policy-ml-broker"; + applyNewConfig(brokerConfig, configurationSeed + "-policy-advancedNetworkStatistics"); + startBroker(brokerConfig); + assertTrue("broker alive", brokerService.isStarted()); + + verifyBooleanField("AMQ.9437", "advancedNetworkStatisticsEnabled", false); + applyNewConfig(brokerConfig, configurationSeed + "-policy-advancedNetworkStatistics-mod", SLEEP); + verifyBooleanField("AMQ.9437", "advancedNetworkStatisticsEnabled", true); + } + @Test public void testAddNdMod() throws Exception { final String brokerConfig = configurationSeed + "-policy-ml-broker"; @@ -121,6 +133,9 @@ public class PolicyEntryTest extends RuntimeConfigTestSupport { session.createConsumer(session.createQueue(dest)); switch(fieldName) { + case "advancedNetworkStatisticsEnabled": + assertEquals(value, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).isAdvancedNetworkStatisticsEnabled()); + break; case "sendDuplicateFromStoreToDLQ": assertEquals(value, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).isSendDuplicateFromStoreToDLQ()); break; diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics-mod.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics-mod.xml new file mode 100644 index 0000000000..534f884d4b --- /dev/null +++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics-mod.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + diff --git a/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics.xml b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics.xml new file mode 100644 index 0000000000..a6c710e075 --- /dev/null +++ b/activemq-runtime-config/src/test/resources/org/apache/activemq/policyEntryTest-policy-advancedNetworkStatistics.xml @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java new file mode 100644 index 0000000000..df99bab354 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/NetworkAdvancedStatisticsTest.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +import java.util.Arrays; +import java.util.Collection; + +import jakarta.jms.Message; +import jakarta.jms.MessageConsumer; +import jakarta.jms.MessageListener; +import jakarta.jms.MessageProducer; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; +import org.apache.activemq.util.Wait.Condition; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.springframework.context.support.AbstractApplicationContext; + +@RunWith(value = Parameterized.class) +public class NetworkAdvancedStatisticsTest extends BaseNetworkTest { + + @Parameterized.Parameters(name="includedDestination={0}, excludedDestination={1}") + public static Collection data() { + return Arrays.asList(new Object[][] { + { new ActiveMQTopic("include.test.bar"), new ActiveMQTopic("exclude.test.bar")}, + { new ActiveMQQueue("include.test.foo"), new ActiveMQQueue("exclude.test.foo")}}); + } + + protected static final int MESSAGE_COUNT = 10; + + protected AbstractApplicationContext context; + protected String consumerName = "durableSubs"; + + private final ActiveMQDestination includedDestination; + private final ActiveMQDestination excludedDestination; + + public NetworkAdvancedStatisticsTest(ActiveMQDestination includedDestionation, ActiveMQDestination excludedDestination) { + this.includedDestination = includedDestionation; + this.excludedDestination = excludedDestination; + } + + @Override + protected void doSetUp(boolean deleteAllMessages) throws Exception { + super.doSetUp(deleteAllMessages); + } + + @Override + protected String getRemoteBrokerURI() { + return "org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml"; + } + + @Override + protected String getLocalBrokerURI() { + return "org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml"; + } + + //Added for AMQ-9437 test advancedStatistics for networkEnqueue and networkDequeue + @Test(timeout = 60 * 1000) + public void testNetworkAdvancedStatistics() throws Exception { + + // create a remote durable consumer to create demand + MessageConsumer remoteConsumer; + if(includedDestination.isTopic()) { + remoteConsumer = remoteSession.createDurableSubscriber(ActiveMQTopic.class.cast(includedDestination), consumerName); + } else { + remoteConsumer = remoteSession.createConsumer(includedDestination); + remoteConsumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + } + }); + } + Thread.sleep(1000); + + MessageProducer producer = localSession.createProducer(includedDestination); + for (int i = 0; i < MESSAGE_COUNT; i++) { + Message test = localSession.createTextMessage("test-" + i); + producer.send(test); + } + Thread.sleep(1000); + + MessageProducer producerExcluded = localSession.createProducer(excludedDestination); + for (int i = 0; i < MESSAGE_COUNT; i++) { + Message test = localSession.createTextMessage("test-" + i); + producerExcluded.send(test); + } + Thread.sleep(1000); + + //Make sure stats are correct for local -> remote + assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getEnqueues().getCount()); + assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getDequeues().getCount()); + assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getForwards().getCount()); + assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkDequeues().getCount()); + assertEquals(0, localBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkEnqueues().getCount()); + assertEquals(MESSAGE_COUNT, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getEnqueues().getCount()); + assertEquals(0, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getForwards().getCount()); + assertEquals(MESSAGE_COUNT, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkEnqueues().getCount()); + assertEquals(0, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkDequeues().getCount()); + + // Make sure stats do not increment for local-only + assertEquals(MESSAGE_COUNT, localBroker.getDestination(excludedDestination).getDestinationStatistics().getEnqueues().getCount()); + assertEquals(0, localBroker.getDestination(excludedDestination).getDestinationStatistics().getForwards().getCount()); + assertEquals(0, localBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkDequeues().getCount()); + assertEquals(0, localBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkEnqueues().getCount()); + assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getEnqueues().getCount()); + assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getDequeues().getCount()); + assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getForwards().getCount()); + assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkEnqueues().getCount()); + assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkDequeues().getCount()); + + if(includedDestination.isTopic()) { + assertTrue(Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0; + } + }, 10000, 500)); + } else { + assertTrue(Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + // The number of message that remain is due to the exclude queue + return localBroker.getAdminView().getTotalMessageCount() == MESSAGE_COUNT; + } + }, 10000, 500)); + } + remoteConsumer.close(); + } + + protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final long expectedRemoteSent) throws Exception { + + final NetworkBridge localBridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next(); + final NetworkBridge remoteBridge = remoteBroker.getNetworkConnectors().get(0).activeBridges().iterator().next(); + + assertTrue(Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return expectedLocalSent == localBridge.getNetworkBridgeStatistics().getDequeues().getCount() && + 0 == localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount() && + expectedRemoteSent == remoteBridge.getNetworkBridgeStatistics().getDequeues().getCount() && + 0 == remoteBridge.getNetworkBridgeStatistics().getReceivedCount().getCount(); + } + })); + } +} diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml new file mode 100644 index 0000000000..b543169f6e --- /dev/null +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml @@ -0,0 +1,63 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml new file mode 100644 index 0000000000..a9cf93f73a --- /dev/null +++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml @@ -0,0 +1,49 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + +