From e629ac4538988cbb957a9a2599b2430d4266c943 Mon Sep 17 00:00:00 2001 From: "Christopher L. Shannon (cshannon)" Date: Tue, 31 Jul 2018 13:01:32 -0400 Subject: [PATCH] ARTEMIS-2003 - Add bridge metrics This commit adds support for tracking metrics for bridges for both normal bridges and bridges that are part of a cluster. The two statistics added in this commit are messages pending acknowledgement and messages acknowledged but more can be added later. --- .../api/core/management/BridgeControl.java | 29 ++++++++ .../management/ClusterConnectionControl.java | 48 +++++++++++++ .../management/impl/BridgeControlImpl.java | 35 +++++++++- .../impl/ClusterConnectionControlImpl.java | 48 ++++++++++++- .../artemis/core/server/cluster/Bridge.java | 3 + .../server/cluster/ClusterConnection.java | 16 +++++ .../core/server/cluster/impl/BridgeImpl.java | 22 +++++- .../server/cluster/impl/BridgeMetrics.java | 69 +++++++++++++++++++ .../cluster/impl/ClusterConnectionImpl.java | 20 ++++++ .../impl/ClusterConnectionMetrics.java | 64 +++++++++++++++++ .../cluster/bridge/BridgeTest.java | 6 ++ .../cluster/distribution/ClusterTestBase.java | 18 +++++ .../distribution/OneWayChainClusterTest.java | 9 +++ .../OnewayTwoNodeClusterTest.java | 18 +++++ .../management/BridgeControlTest.java | 6 ++ .../BridgeControlUsingCoreTest.java | 8 +++ .../ClusterConnectionControlTest.java | 14 +++- ...ClusterConnectionControlUsingCoreTest.java | 22 ++++++ 18 files changed, 445 insertions(+), 10 deletions(-) create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeMetrics.java create mode 100644 artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionMetrics.java diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BridgeControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BridgeControl.java index 3bf4554daf..9dd7dc87c1 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BridgeControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/BridgeControl.java @@ -106,4 +106,33 @@ public interface BridgeControl extends ActiveMQComponentControl { */ @Attribute(desc = "whether this bridge is using high availability") boolean isHA(); + + /** + * The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but + * is waiting acknowledgement from the other broker. This is a cumulative total and the number of outstanding + * pending messages can be computed by subtracting messagesAcknowledged from messagesPendingAcknowledgement. + * + */ + @Attribute(desc = "The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the remote broker.") + long getMessagesPendingAcknowledgement(); + + /** + * The messagesAcknowledged counter is the number of messages actually received by the remote broker. + * This is a cumulative total and the number of outstanding pending messages can be computed by subtracting + * messagesAcknowledged from messagesPendingAcknowledgement. + * + */ + @Attribute(desc = "The messagesAcknowledged counter is the number of messages actually received by the remote broker.") + long getMessagesAcknowledged(); + + /** + * The bridge metrics for this bridge + * + * The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the other broker. + * The messagesAcknowledged counter is the number of messages actually received by the remote broker. + * + */ + @Attribute(desc = "The metrics for this bridge. The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the remote broker. The messagesAcknowledged counter is the number of messages actually received by the remote broker.") + Map getMetrics(); + } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ClusterConnectionControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ClusterConnectionControl.java index 194afade19..39f0825759 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ClusterConnectionControl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ClusterConnectionControl.java @@ -96,4 +96,52 @@ public interface ClusterConnectionControl extends ActiveMQComponentControl { */ @Attribute(desc = "map of the nodes connected to this cluster connection (keys are node IDs, values are the addresses used to connect to the nodes)") Map getNodes() throws Exception; + + /** + * The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has + * forwarded a message and is waiting acknowledgement from the other broker. (aggregate over all bridges) + * + * This is a cumulative total and the number of outstanding pending messages for the cluster connection + * can be computed by subtracting messagesAcknowledged from messagesPendingAcknowledgement. + * + */ + @Attribute(desc = "The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has forwarded a message and is waiting acknowledgement from the other broker. (aggregate over all bridges)") + long getMessagesPendingAcknowledgement(); + + /** + * The messagesAcknowledged counter is the number of messages actually received by a remote broker for all + * bridges in this cluster connection + * + * This is a cumulative total and the number of outstanding pending messages for the cluster connection + * can be computed by subtracting messagesAcknowledged from messagesPendingAcknowledgement. + * + */ + @Attribute(desc = "The messagesAcknowledged counter is the number of messages actually received by a remote broker for all bridges in this cluster connection") + long getMessagesAcknowledged(); + + /** + * The current metrics for this cluster connection (aggregate over all bridges to other nodes) + * + * The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has + * forwarded a message and is waiting acknowledgement from the other broker. + * + * The messagesAcknowledged counter is the number of messages actually received by a remote broker for all + * bridges in this cluster connection + * + * @return + */ + @Attribute(desc = "The metrics for this cluster connection. The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has forwarded a message and is waiting acknowledgement from the other broker. The messagesAcknowledged counter is the number of messages actually received by a remote broker for all bridges in this cluster connection") + Map getMetrics(); + + /** + * The bridge metrics for the given node in the cluster connection + * + * The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the other broker. + * The messagesAcknowledged counter is the number of messages actually received by the remote broker for this bridge. + * + * @throws Exception + */ + @Attribute(desc = "The metrics for the bridge by nodeId. The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the other broker. The messagesAcknowledged counter is the number of messages actually received by the remote broker for this bridge.") + Map getBridgeMetrics(String nodeId) throws Exception; + } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BridgeControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BridgeControlImpl.java index 6e0e055b86..d0e55238e3 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BridgeControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/BridgeControlImpl.java @@ -16,11 +16,12 @@ */ package org.apache.activemq.artemis.core.management.impl; -import javax.management.MBeanAttributeInfo; -import javax.management.MBeanOperationInfo; import java.util.List; import java.util.Map; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanOperationInfo; + import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.management.BridgeControl; import org.apache.activemq.artemis.core.config.BridgeConfiguration; @@ -228,6 +229,36 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl return MBeanInfoHelper.getMBeanAttributesInfo(BridgeControl.class); } + @Override + public long getMessagesPendingAcknowledgement() { + clearIO(); + try { + return bridge.getMetrics().getMessagesPendingAcknowledgement(); + } finally { + blockOnIO(); + } + } + + @Override + public long getMessagesAcknowledged() { + clearIO(); + try { + return bridge.getMetrics().getMessagesAcknowledged(); + } finally { + blockOnIO(); + } + } + + @Override + public Map getMetrics() { + clearIO(); + try { + return bridge.getMetrics().convertToMap(); + } finally { + blockOnIO(); + } + } + // Public -------------------------------------------------------- // Package protected --------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ClusterConnectionControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ClusterConnectionControlImpl.java index 9186dbbf13..24c7dccf6e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ClusterConnectionControlImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ClusterConnectionControlImpl.java @@ -16,16 +16,18 @@ */ package org.apache.activemq.artemis.core.management.impl; -import javax.management.MBeanAttributeInfo; -import javax.management.MBeanOperationInfo; import java.util.List; import java.util.Map; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanOperationInfo; + import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl; import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; +import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics; public class ClusterConnectionControlImpl extends AbstractControl implements ClusterConnectionControl { @@ -223,6 +225,48 @@ public class ClusterConnectionControlImpl extends AbstractControl implements Clu return MBeanInfoHelper.getMBeanAttributesInfo(ClusterConnectionControl.class); } + @Override + public long getMessagesPendingAcknowledgement() { + clearIO(); + try { + return clusterConnection.getMetrics().getMessagesPendingAcknowledgement(); + } finally { + blockOnIO(); + } + } + + @Override + public long getMessagesAcknowledged() { + clearIO(); + try { + return clusterConnection.getMetrics().getMessagesAcknowledged(); + } finally { + blockOnIO(); + } + } + + @Override + public Map getMetrics() { + clearIO(); + try { + return clusterConnection.getMetrics().convertToMap(); + } finally { + blockOnIO(); + } + } + + @Override + public Map getBridgeMetrics(String nodeId) { + clearIO(); + try { + final BridgeMetrics bridgeMetrics = clusterConnection.getBridgeMetrics(nodeId); + return bridgeMetrics != null ? bridgeMetrics.convertToMap() : null; + } finally { + blockOnIO(); + } + + } + // Public -------------------------------------------------------- // Package protected --------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Bridge.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Bridge.java index 7e8cacb41b..28fbc7c294 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Bridge.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/Bridge.java @@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics; import org.apache.activemq.artemis.core.server.management.NotificationService; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -52,4 +53,6 @@ public interface Bridge extends Consumer, ActiveMQComponent { void disconnect(); boolean isConnected(); + + BridgeMetrics getMetrics(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java index 9392ed59e2..6171476188 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ClusterConnection.java @@ -25,6 +25,8 @@ import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener; import org.apache.activemq.artemis.core.client.impl.Topology; import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics; +import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics; public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyListener { @@ -80,4 +82,18 @@ public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyLis long getCallTimeout(); + /** + * The metric for this cluster connection + * + * @return + */ + ClusterConnectionMetrics getMetrics(); + + /** + * Returns the BridgeMetrics for the bridge to the given node if exists + * + * @param nodeId + * @return + */ + BridgeMetrics getBridgeMetrics(String nodeId); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 48f59f4907..c811b6356c 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -55,10 +55,10 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage; import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.cluster.Bridge; -import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.NotificationService; +import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; @@ -159,6 +159,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled private ActiveMQServer server; + private final BridgeMetrics metrics = new BridgeMetrics(); + public BridgeImpl(final ServerLocatorInternal serverLocator, final int initialConnectAttempts, final int reconnectAttempts, @@ -518,6 +520,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled } ref.getQueue().acknowledge(ref); pendingAcks.countDown(); + metrics.incrementMessagesAcknowledged(); } else { if (logger.isTraceEnabled()) { logger.trace("BridgeImpl::sendAcknowledged bridge " + this + " could not find reference for message " + message); @@ -611,13 +614,21 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled pendingAcks.countUp(); try { + final HandleStatus status; if (message.isLargeMessage()) { deliveringLargeMessage = true; deliverLargeMessage(dest, ref, (LargeServerMessage) message); - return HandleStatus.HANDLED; + status = HandleStatus.HANDLED; } else { - return deliverStandardMessage(dest, ref, message); + status = deliverStandardMessage(dest, ref, message); } + + //Only increment messages pending acknowledgement if handled by bridge + if (status == HandleStatus.HANDLED) { + metrics.incrementMessagesPendingAcknowledgement(); + } + + return status; } catch (Exception e) { // If an exception happened, we must count down immediately pendingAcks.countDown(); @@ -770,6 +781,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled return this.targetNode; } + @Override + public BridgeMetrics getMetrics() { + return this.metrics; + } + @Override public String toString() { return this.getClass().getSimpleName() + "@" + diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeMetrics.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeMetrics.java new file mode 100644 index 0000000000..d8689b6abc --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeMetrics.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.cluster.impl; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + +public class BridgeMetrics { + + public static final String MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY = "messagesPendingAcknowledgement"; + public static final String MESSAGES_ACKNOWLEDGED_KEY = "messagesAcknowledged"; + + private static final AtomicLongFieldUpdater MESSAGES_PENDING_ACKNOWLEDGEMENT_UPDATER = + AtomicLongFieldUpdater.newUpdater(BridgeMetrics.class, MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY); + + private static final AtomicLongFieldUpdater MESSAGES_ACKNOWLEDGED_UPDATER = + AtomicLongFieldUpdater.newUpdater(BridgeMetrics.class, MESSAGES_ACKNOWLEDGED_KEY); + + private volatile long messagesPendingAcknowledgement; + private volatile long messagesAcknowledged; + + public void incrementMessagesPendingAcknowledgement() { + MESSAGES_PENDING_ACKNOWLEDGEMENT_UPDATER.incrementAndGet(this); + } + + public void incrementMessagesAcknowledged() { + MESSAGES_ACKNOWLEDGED_UPDATER.incrementAndGet(this); + } + + /** + * @return the messagesPendingAcknowledgement + */ + public long getMessagesPendingAcknowledgement() { + return messagesPendingAcknowledgement; + } + + /** + * @return the messagesAcknowledged + */ + public long getMessagesAcknowledged() { + return messagesAcknowledged; + } + + /** + * @return New map containing the Bridge metrics + */ + public Map convertToMap() { + final Map metrics = new HashMap<>(); + metrics.put(MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY, messagesPendingAcknowledgement); + metrics.put(MESSAGES_ACKNOWLEDGED_KEY, messagesAcknowledged); + + return metrics; + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java index 70923be669..849575848e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionImpl.java @@ -749,6 +749,26 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn topology.updateAsLive(nodeID, localMember); } + + @Override + public ClusterConnectionMetrics getMetrics() { + long messagesPendingAcknowledgement = 0; + long messagesAcknowledged = 0; + for (MessageFlowRecord record : records.values()) { + final BridgeMetrics metrics = record.getBridge() != null ? record.getBridge().getMetrics() : null; + messagesPendingAcknowledgement += metrics != null ? metrics.getMessagesPendingAcknowledgement() : 0; + messagesAcknowledged += metrics != null ? metrics.getMessagesAcknowledged() : 0; + } + + return new ClusterConnectionMetrics(messagesPendingAcknowledgement, messagesAcknowledged); + } + + @Override + public BridgeMetrics getBridgeMetrics(String nodeId) { + final MessageFlowRecord record = records.get(nodeId); + return record != null && record.getBridge() != null ? record.getBridge().getMetrics() : null; + } + private void createNewRecord(final long eventUID, final String targetNodeID, final TransportConfiguration connector, diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionMetrics.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionMetrics.java new file mode 100644 index 0000000000..5b9e084224 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/ClusterConnectionMetrics.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.cluster.impl; + +import java.util.HashMap; +import java.util.Map; + +public class ClusterConnectionMetrics { + + public static final String MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY = "messagesPendingAcknowledgement"; + public static final String MESSAGES_ACKNOWLEDGED_KEY = "messagesAcknowledged"; + + private final long messagesPendingAcknowledgement; + private final long messagesAcknowledged; + + /** + * @param messagesPendingAcknowledgement + * @param messagesAcknowledged + */ + public ClusterConnectionMetrics(long messagesPendingAcknowledgement, long messagesAcknowledged) { + super(); + this.messagesPendingAcknowledgement = messagesPendingAcknowledgement; + this.messagesAcknowledged = messagesAcknowledged; + } + + /** + * @return the messagesPendingAcknowledgement + */ + public long getMessagesPendingAcknowledgement() { + return messagesPendingAcknowledgement; + } + + /** + * @return the messagesAcknowledged + */ + public long getMessagesAcknowledged() { + return messagesAcknowledged; + } + + /** + * @return New map containing the Cluster Connection metrics + */ + public Map convertToMap() { + final Map metrics = new HashMap<>(); + metrics.put(MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY, messagesPendingAcknowledgement); + metrics.put(MESSAGES_ACKNOWLEDGED_KEY, messagesAcknowledged); + + return metrics; + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java index 2d6add7f72..73116f84ef 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/bridge/BridgeTest.java @@ -73,6 +73,7 @@ import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.transformer.AddHeadersTransformer; import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl; +import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; @@ -502,6 +503,11 @@ public class BridgeTest extends ActiveMQTestBase { sf1.close(); + assertEquals(1, server0.getClusterManager().getBridges().size()); + BridgeMetrics bridgeMetrics = server0.getClusterManager().getBridges().get("bridge1").getMetrics(); + assertEquals(10, bridgeMetrics.getMessagesPendingAcknowledgement()); + assertEquals(10, bridgeMetrics.getMessagesAcknowledged()); + closeFields(); if (server0.getConfiguration().isPersistenceEnabled()) { assertEquals(0, loadQueues(server0).size()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java index 89d5175dbb..6e7f9b338a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusterTestBase.java @@ -75,7 +75,9 @@ import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtoco import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; +import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics; import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl; +import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum; import org.apache.activemq.artemis.core.server.group.GroupingHandler; @@ -1286,6 +1288,22 @@ public abstract class ClusterTestBase extends ActiveMQTestBase { verifyReceiveRoundRobinInSomeOrder(false, numMessages, consumerIDs); } + protected void verifyClusterMetrics(final int node, final String clusterName, final long expectedMessagesPendingAcknowledgement, + final long expectedMessagesAcknowledged) { + final ClusterConnection clusterConnection = servers[node].getClusterManager().getClusterConnection(clusterName); + final ClusterConnectionMetrics clusterMetrics = clusterConnection.getMetrics(); + assertEquals(expectedMessagesPendingAcknowledgement, clusterMetrics.getMessagesPendingAcknowledgement()); + assertEquals(expectedMessagesAcknowledged, clusterMetrics.getMessagesAcknowledged()); + } + + protected void verifyBridgeMetrics(final int node, final String clusterName, final String bridgeNodeId, + final long expectedMessagesPendingAcknowledgement, final long expectedMessagesAcknowledged) { + final ClusterConnection clusterConnection = servers[node].getClusterManager().getClusterConnection(clusterName); + final BridgeMetrics bridgeMetrics = clusterConnection.getBridgeMetrics(bridgeNodeId); + assertEquals(expectedMessagesPendingAcknowledgement, bridgeMetrics.getMessagesPendingAcknowledgement()); + assertEquals(expectedMessagesAcknowledged, bridgeMetrics.getMessagesAcknowledged()); + } + protected int[] getReceivedOrder(final int consumerID) throws Exception { return getReceivedOrder(consumerID, false); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OneWayChainClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OneWayChainClusterTest.java index e82f465c60..c360b74388 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OneWayChainClusterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OneWayChainClusterTest.java @@ -75,6 +75,15 @@ public class OneWayChainClusterTest extends ClusterTestBase { send(0, "queues.testaddress", 10, false, null); verifyReceiveRoundRobin(10, 0, 1); verifyNotReceive(0, 1); + + //half of the messages should be sent over bridges to the last broker in the chain + //as there is a consumer on that last broker + verifyClusterMetrics(0, "cluster0-1", 5, 5); + verifyClusterMetrics(1, "cluster1-2", 5, 5); + verifyClusterMetrics(2, "cluster2-3", 5, 5); + verifyClusterMetrics(3, "cluster3-4", 5, 5); + verifyClusterMetrics(4, "cluster4-X", 0, 0); + } @Test diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java index f722d6702e..c2c6982c0d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/OnewayTwoNodeClusterTest.java @@ -198,6 +198,15 @@ public class OnewayTwoNodeClusterTest extends ClusterTestBase { addConsumer(1, 0, "queue0", null); verifyNotReceive(1); + + //Should be 0 as no messages were sent to the second broker + verifyClusterMetrics(0, "cluster1", 0, 0); + + //Should be 0 as no messages were sent to the first broker + verifyClusterMetrics(1, "clusterX", 0, 0); + + //0 messages were sent across the bridge to the second broker + verifyBridgeMetrics(0, "cluster1", servers[1].getClusterManager().getNodeId(), 0, 0); } @Test @@ -224,6 +233,15 @@ public class OnewayTwoNodeClusterTest extends ClusterTestBase { send(0, "queues.testaddress", 10, false, null); verifyReceiveRoundRobin(10, 0, 1); verifyNotReceive(0, 1); + + //half of the messages should be sent over bridge, other half was consumed by local consumer + verifyClusterMetrics(0, "cluster1", 5, 5); + + //Should be 0 as no messages were sent to the first broker + verifyClusterMetrics(1, "clusterX", 0, 0); + + //5 messages were sent across the bridge to the second broker + verifyBridgeMetrics(0, "cluster1", servers[1].getClusterManager().getNodeId(), 5, 5); } @Test diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlTest.java index df63043501..c0dd06da92 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlTest.java @@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory; import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.tests.integration.SimpleNotificationService; import org.apache.activemq.artemis.utils.RandomUtil; @@ -64,6 +65,11 @@ public class BridgeControlTest extends ManagementTestBase { Assert.assertEquals(bridgeConfig.getRetryIntervalMultiplier(), bridgeControl.getRetryIntervalMultiplier(), 0.000001); Assert.assertEquals(bridgeConfig.getReconnectAttempts(), bridgeControl.getReconnectAttempts()); Assert.assertEquals(bridgeConfig.isUseDuplicateDetection(), bridgeControl.isUseDuplicateDetection()); + Map bridgeMetrics = bridgeControl.getMetrics(); + Assert.assertEquals(0L, bridgeControl.getMessagesPendingAcknowledgement()); + Assert.assertEquals(0L, bridgeControl.getMessagesAcknowledged()); + Assert.assertEquals(0L, bridgeMetrics.get(BridgeMetrics.MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY)); + Assert.assertEquals(0L, bridgeMetrics.get(BridgeMetrics.MESSAGES_ACKNOWLEDGED_KEY)); String[] connectorPairData = bridgeControl.getStaticConnectors(); Assert.assertEquals(bridgeConfig.getStaticConnectors().get(0), connectorPairData[0]); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlUsingCoreTest.java index e0ff4c84b4..bd8b51dcfb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/BridgeControlUsingCoreTest.java @@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory; import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics; import org.apache.activemq.artemis.utils.RandomUtil; import org.junit.Assert; import org.junit.Before; @@ -69,6 +70,13 @@ public class BridgeControlUsingCoreTest extends ManagementTestBase { Assert.assertEquals(bridgeConfig.getReconnectAttempts(), proxy.retrieveAttributeValue("reconnectAttempts", Integer.class)); Assert.assertEquals(bridgeConfig.isUseDuplicateDetection(), proxy.retrieveAttributeValue("useDuplicateDetection", Boolean.class)); + @SuppressWarnings("unchecked") + Map bridgeMetrics = (Map) proxy.retrieveAttributeValue("metrics", Map.class); + Assert.assertEquals(0L, proxy.retrieveAttributeValue("messagesPendingAcknowledgement", Long.class)); + Assert.assertEquals(0L, proxy.retrieveAttributeValue("messagesAcknowledged", Long.class)); + Assert.assertEquals(0L, bridgeMetrics.get(BridgeMetrics.MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY)); + Assert.assertEquals(0L, bridgeMetrics.get(BridgeMetrics.MESSAGES_ACKNOWLEDGED_KEY)); + Object[] data = (Object[]) proxy.retrieveAttributeValue("staticConnectors"); Assert.assertEquals(bridgeConfig.getStaticConnectors().get(0), data[0]); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlTest.java index f414c15583..1fc2c5ecdd 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlTest.java @@ -16,14 +16,15 @@ */ package org.apache.activemq.artemis.tests.integration.management; -import javax.json.JsonArray; -import javax.management.MBeanServer; -import javax.management.MBeanServerFactory; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.json.JsonArray; +import javax.management.MBeanServer; +import javax.management.MBeanServerFactory; + import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.SimpleString; @@ -40,6 +41,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory; import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; +import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.tests.integration.SimpleNotificationService; @@ -82,6 +84,12 @@ public class ClusterConnectionControlTest extends ManagementTestBase { Assert.assertEquals(clusterConnectionConfig1.isDuplicateDetection(), clusterConnectionControl.isDuplicateDetection()); Assert.assertEquals(clusterConnectionConfig1.getMessageLoadBalancingType().getType(), clusterConnectionControl.getMessageLoadBalancingType()); Assert.assertEquals(clusterConnectionConfig1.getMaxHops(), clusterConnectionControl.getMaxHops()); + Assert.assertEquals(0L, clusterConnectionControl.getMessagesPendingAcknowledgement()); + Assert.assertEquals(0L, clusterConnectionControl.getMessagesAcknowledged()); + Map clusterMetrics = clusterConnectionControl.getMetrics(); + Assert.assertEquals(0L, clusterMetrics.get(ClusterConnectionMetrics.MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY)); + Assert.assertEquals(0L, clusterMetrics.get(ClusterConnectionMetrics.MESSAGES_ACKNOWLEDGED_KEY)); + Assert.assertNull(clusterConnectionControl.getBridgeMetrics("bad")); Object[] connectors = clusterConnectionControl.getStaticConnectors(); Assert.assertEquals(1, connectors.length); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlUsingCoreTest.java index 2756b1e704..0c76a6449c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlUsingCoreTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ClusterConnectionControlUsingCoreTest.java @@ -98,6 +98,28 @@ public class ClusterConnectionControlUsingCoreTest extends ClusterConnectionCont return (String) proxy.retrieveAttributeValue("nodeID"); } + @Override + public long getMessagesPendingAcknowledgement() { + return (Long) proxy.retrieveAttributeValue("messagesPendingAcknowledgement", Long.class); + } + + @Override + public long getMessagesAcknowledged() { + return (Long) proxy.retrieveAttributeValue("messagesAcknowledged", Long.class); + } + + @SuppressWarnings("unchecked") + @Override + public Map getMetrics() { + return (Map) proxy.retrieveAttributeValue("metrics", Map.class); + } + + @SuppressWarnings("unchecked") + @Override + public Map getBridgeMetrics(String nodeId) throws Exception { + return (Map) proxy.invokeOperation("getBridgeMetrics", nodeId); + } + @Override public boolean isStarted() { return (Boolean) proxy.retrieveAttributeValue("started", Boolean.class);