This commit is contained in:
Clebert Suconic 2018-08-03 14:18:34 -04:00
commit ed643e479c
18 changed files with 445 additions and 10 deletions

View File

@ -106,4 +106,33 @@ public interface BridgeControl extends ActiveMQComponentControl {
*/ */
@Attribute(desc = "whether this bridge is using high availability") @Attribute(desc = "whether this bridge is using high availability")
boolean isHA(); boolean isHA();
/**
* The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but
* is waiting acknowledgement from the other broker. This is a cumulative total and the number of outstanding
* pending messages can be computed by subtracting messagesAcknowledged from messagesPendingAcknowledgement.
*
*/
@Attribute(desc = "The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the remote broker.")
long getMessagesPendingAcknowledgement();
/**
* The messagesAcknowledged counter is the number of messages actually received by the remote broker.
* This is a cumulative total and the number of outstanding pending messages can be computed by subtracting
* messagesAcknowledged from messagesPendingAcknowledgement.
*
*/
@Attribute(desc = "The messagesAcknowledged counter is the number of messages actually received by the remote broker.")
long getMessagesAcknowledged();
/**
* The bridge metrics for this bridge
*
* The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the other broker.
* The messagesAcknowledged counter is the number of messages actually received by the remote broker.
*
*/
@Attribute(desc = "The metrics for this bridge. The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the remote broker. The messagesAcknowledged counter is the number of messages actually received by the remote broker.")
Map<String, Object> getMetrics();
} }

View File

@ -96,4 +96,52 @@ public interface ClusterConnectionControl extends ActiveMQComponentControl {
*/ */
@Attribute(desc = "map of the nodes connected to this cluster connection (keys are node IDs, values are the addresses used to connect to the nodes)") @Attribute(desc = "map of the nodes connected to this cluster connection (keys are node IDs, values are the addresses used to connect to the nodes)")
Map<String, String> getNodes() throws Exception; Map<String, String> getNodes() throws Exception;
/**
* The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has
* forwarded a message and is waiting acknowledgement from the other broker. (aggregate over all bridges)
*
* This is a cumulative total and the number of outstanding pending messages for the cluster connection
* can be computed by subtracting messagesAcknowledged from messagesPendingAcknowledgement.
*
*/
@Attribute(desc = "The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has forwarded a message and is waiting acknowledgement from the other broker. (aggregate over all bridges)")
long getMessagesPendingAcknowledgement();
/**
* The messagesAcknowledged counter is the number of messages actually received by a remote broker for all
* bridges in this cluster connection
*
* This is a cumulative total and the number of outstanding pending messages for the cluster connection
* can be computed by subtracting messagesAcknowledged from messagesPendingAcknowledgement.
*
*/
@Attribute(desc = "The messagesAcknowledged counter is the number of messages actually received by a remote broker for all bridges in this cluster connection")
long getMessagesAcknowledged();
/**
* The current metrics for this cluster connection (aggregate over all bridges to other nodes)
*
* The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has
* forwarded a message and is waiting acknowledgement from the other broker.
*
* The messagesAcknowledged counter is the number of messages actually received by a remote broker for all
* bridges in this cluster connection
*
* @return
*/
@Attribute(desc = "The metrics for this cluster connection. The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has forwarded a message and is waiting acknowledgement from the other broker. The messagesAcknowledged counter is the number of messages actually received by a remote broker for all bridges in this cluster connection")
Map<String, Object> getMetrics();
/**
* The bridge metrics for the given node in the cluster connection
*
* The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the other broker.
* The messagesAcknowledged counter is the number of messages actually received by the remote broker for this bridge.
*
* @throws Exception
*/
@Attribute(desc = "The metrics for the bridge by nodeId. The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the other broker. The messagesAcknowledged counter is the number of messages actually received by the remote broker for this bridge.")
Map<String, Object> getBridgeMetrics(String nodeId) throws Exception;
} }

View File

@ -16,11 +16,12 @@
*/ */
package org.apache.activemq.artemis.core.management.impl; package org.apache.activemq.artemis.core.management.impl;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.management.BridgeControl; import org.apache.activemq.artemis.api.core.management.BridgeControl;
import org.apache.activemq.artemis.core.config.BridgeConfiguration; import org.apache.activemq.artemis.core.config.BridgeConfiguration;
@ -228,6 +229,36 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl
return MBeanInfoHelper.getMBeanAttributesInfo(BridgeControl.class); return MBeanInfoHelper.getMBeanAttributesInfo(BridgeControl.class);
} }
@Override
public long getMessagesPendingAcknowledgement() {
clearIO();
try {
return bridge.getMetrics().getMessagesPendingAcknowledgement();
} finally {
blockOnIO();
}
}
@Override
public long getMessagesAcknowledged() {
clearIO();
try {
return bridge.getMetrics().getMessagesAcknowledged();
} finally {
blockOnIO();
}
}
@Override
public Map<String, Object> getMetrics() {
clearIO();
try {
return bridge.getMetrics().convertToMap();
} finally {
blockOnIO();
}
}
// Public -------------------------------------------------------- // Public --------------------------------------------------------
// Package protected --------------------------------------------- // Package protected ---------------------------------------------

View File

@ -16,16 +16,18 @@
*/ */
package org.apache.activemq.artemis.core.management.impl; package org.apache.activemq.artemis.core.management.impl;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl; import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
public class ClusterConnectionControlImpl extends AbstractControl implements ClusterConnectionControl { public class ClusterConnectionControlImpl extends AbstractControl implements ClusterConnectionControl {
@ -223,6 +225,48 @@ public class ClusterConnectionControlImpl extends AbstractControl implements Clu
return MBeanInfoHelper.getMBeanAttributesInfo(ClusterConnectionControl.class); return MBeanInfoHelper.getMBeanAttributesInfo(ClusterConnectionControl.class);
} }
@Override
public long getMessagesPendingAcknowledgement() {
clearIO();
try {
return clusterConnection.getMetrics().getMessagesPendingAcknowledgement();
} finally {
blockOnIO();
}
}
@Override
public long getMessagesAcknowledged() {
clearIO();
try {
return clusterConnection.getMetrics().getMessagesAcknowledged();
} finally {
blockOnIO();
}
}
@Override
public Map<String, Object> getMetrics() {
clearIO();
try {
return clusterConnection.getMetrics().convertToMap();
} finally {
blockOnIO();
}
}
@Override
public Map<String, Object> getBridgeMetrics(String nodeId) {
clearIO();
try {
final BridgeMetrics bridgeMetrics = clusterConnection.getBridgeMetrics(nodeId);
return bridgeMetrics != null ? bridgeMetrics.convertToMap() : null;
} finally {
blockOnIO();
}
}
// Public -------------------------------------------------------- // Public --------------------------------------------------------
// Package protected --------------------------------------------- // Package protected ---------------------------------------------

View File

@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.Consumer; import org.apache.activemq.artemis.core.server.Consumer;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.core.server.management.NotificationService; import org.apache.activemq.artemis.core.server.management.NotificationService;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@ -52,4 +53,6 @@ public interface Bridge extends Consumer, ActiveMQComponent {
void disconnect(); void disconnect();
boolean isConnected(); boolean isConnected();
BridgeMetrics getMetrics();
} }

View File

@ -25,6 +25,8 @@ import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.core.client.impl.Topology; import org.apache.activemq.artemis.core.client.impl.Topology;
import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;
public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyListener { public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyListener {
@ -80,4 +82,18 @@ public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyLis
long getCallTimeout(); long getCallTimeout();
/**
* The metric for this cluster connection
*
* @return
*/
ClusterConnectionMetrics getMetrics();
/**
* Returns the BridgeMetrics for the bridge to the given node if exists
*
* @param nodeId
* @return
*/
BridgeMetrics getBridgeMetrics(String nodeId);
} }

View File

@ -55,10 +55,10 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.cluster.Bridge; import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.server.impl.QueueImpl; import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationService; import org.apache.activemq.artemis.core.server.management.NotificationService;
import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil; import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@ -159,6 +159,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
private ActiveMQServer server; private ActiveMQServer server;
private final BridgeMetrics metrics = new BridgeMetrics();
public BridgeImpl(final ServerLocatorInternal serverLocator, public BridgeImpl(final ServerLocatorInternal serverLocator,
final int initialConnectAttempts, final int initialConnectAttempts,
final int reconnectAttempts, final int reconnectAttempts,
@ -518,6 +520,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
} }
ref.getQueue().acknowledge(ref); ref.getQueue().acknowledge(ref);
pendingAcks.countDown(); pendingAcks.countDown();
metrics.incrementMessagesAcknowledged();
} else { } else {
if (logger.isTraceEnabled()) { if (logger.isTraceEnabled()) {
logger.trace("BridgeImpl::sendAcknowledged bridge " + this + " could not find reference for message " + message); logger.trace("BridgeImpl::sendAcknowledged bridge " + this + " could not find reference for message " + message);
@ -611,13 +614,21 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
pendingAcks.countUp(); pendingAcks.countUp();
try { try {
final HandleStatus status;
if (message.isLargeMessage()) { if (message.isLargeMessage()) {
deliveringLargeMessage = true; deliveringLargeMessage = true;
deliverLargeMessage(dest, ref, (LargeServerMessage) message); deliverLargeMessage(dest, ref, (LargeServerMessage) message);
return HandleStatus.HANDLED; status = HandleStatus.HANDLED;
} else { } else {
return deliverStandardMessage(dest, ref, message); status = deliverStandardMessage(dest, ref, message);
} }
//Only increment messages pending acknowledgement if handled by bridge
if (status == HandleStatus.HANDLED) {
metrics.incrementMessagesPendingAcknowledgement();
}
return status;
} catch (Exception e) { } catch (Exception e) {
// If an exception happened, we must count down immediately // If an exception happened, we must count down immediately
pendingAcks.countDown(); pendingAcks.countDown();
@ -770,6 +781,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
return this.targetNode; return this.targetNode;
} }
@Override
public BridgeMetrics getMetrics() {
return this.metrics;
}
@Override @Override
public String toString() { public String toString() {
return this.getClass().getSimpleName() + "@" + return this.getClass().getSimpleName() + "@" +

View File

@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.cluster.impl;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
public class BridgeMetrics {
public static final String MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY = "messagesPendingAcknowledgement";
public static final String MESSAGES_ACKNOWLEDGED_KEY = "messagesAcknowledged";
private static final AtomicLongFieldUpdater<BridgeMetrics> MESSAGES_PENDING_ACKNOWLEDGEMENT_UPDATER =
AtomicLongFieldUpdater.newUpdater(BridgeMetrics.class, MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY);
private static final AtomicLongFieldUpdater<BridgeMetrics> MESSAGES_ACKNOWLEDGED_UPDATER =
AtomicLongFieldUpdater.newUpdater(BridgeMetrics.class, MESSAGES_ACKNOWLEDGED_KEY);
private volatile long messagesPendingAcknowledgement;
private volatile long messagesAcknowledged;
public void incrementMessagesPendingAcknowledgement() {
MESSAGES_PENDING_ACKNOWLEDGEMENT_UPDATER.incrementAndGet(this);
}
public void incrementMessagesAcknowledged() {
MESSAGES_ACKNOWLEDGED_UPDATER.incrementAndGet(this);
}
/**
* @return the messagesPendingAcknowledgement
*/
public long getMessagesPendingAcknowledgement() {
return messagesPendingAcknowledgement;
}
/**
* @return the messagesAcknowledged
*/
public long getMessagesAcknowledged() {
return messagesAcknowledged;
}
/**
* @return New map containing the Bridge metrics
*/
public Map<String, Object> convertToMap() {
final Map<String, Object> metrics = new HashMap<>();
metrics.put(MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY, messagesPendingAcknowledgement);
metrics.put(MESSAGES_ACKNOWLEDGED_KEY, messagesAcknowledged);
return metrics;
}
}

View File

@ -749,6 +749,26 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
topology.updateAsLive(nodeID, localMember); topology.updateAsLive(nodeID, localMember);
} }
@Override
public ClusterConnectionMetrics getMetrics() {
long messagesPendingAcknowledgement = 0;
long messagesAcknowledged = 0;
for (MessageFlowRecord record : records.values()) {
final BridgeMetrics metrics = record.getBridge() != null ? record.getBridge().getMetrics() : null;
messagesPendingAcknowledgement += metrics != null ? metrics.getMessagesPendingAcknowledgement() : 0;
messagesAcknowledged += metrics != null ? metrics.getMessagesAcknowledged() : 0;
}
return new ClusterConnectionMetrics(messagesPendingAcknowledgement, messagesAcknowledged);
}
@Override
public BridgeMetrics getBridgeMetrics(String nodeId) {
final MessageFlowRecord record = records.get(nodeId);
return record != null && record.getBridge() != null ? record.getBridge().getMetrics() : null;
}
private void createNewRecord(final long eventUID, private void createNewRecord(final long eventUID,
final String targetNodeID, final String targetNodeID,
final TransportConfiguration connector, final TransportConfiguration connector,

View File

@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.cluster.impl;
import java.util.HashMap;
import java.util.Map;
public class ClusterConnectionMetrics {
public static final String MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY = "messagesPendingAcknowledgement";
public static final String MESSAGES_ACKNOWLEDGED_KEY = "messagesAcknowledged";
private final long messagesPendingAcknowledgement;
private final long messagesAcknowledged;
/**
* @param messagesPendingAcknowledgement
* @param messagesAcknowledged
*/
public ClusterConnectionMetrics(long messagesPendingAcknowledgement, long messagesAcknowledged) {
super();
this.messagesPendingAcknowledgement = messagesPendingAcknowledgement;
this.messagesAcknowledged = messagesAcknowledged;
}
/**
* @return the messagesPendingAcknowledgement
*/
public long getMessagesPendingAcknowledgement() {
return messagesPendingAcknowledgement;
}
/**
* @return the messagesAcknowledged
*/
public long getMessagesAcknowledged() {
return messagesAcknowledged;
}
/**
* @return New map containing the Cluster Connection metrics
*/
public Map<String, Object> convertToMap() {
final Map<String, Object> metrics = new HashMap<>();
metrics.put(MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY, messagesPendingAcknowledgement);
metrics.put(MESSAGES_ACKNOWLEDGED_KEY, messagesAcknowledged);
return metrics;
}
}

View File

@ -73,6 +73,7 @@ import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.transformer.AddHeadersTransformer; import org.apache.activemq.artemis.core.server.transformer.AddHeadersTransformer;
import org.apache.activemq.artemis.core.server.transformer.Transformer; import org.apache.activemq.artemis.core.server.transformer.Transformer;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl; import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl; import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
@ -502,6 +503,11 @@ public class BridgeTest extends ActiveMQTestBase {
sf1.close(); sf1.close();
assertEquals(1, server0.getClusterManager().getBridges().size());
BridgeMetrics bridgeMetrics = server0.getClusterManager().getBridges().get("bridge1").getMetrics();
assertEquals(10, bridgeMetrics.getMessagesPendingAcknowledgement());
assertEquals(10, bridgeMetrics.getMessagesAcknowledged());
closeFields(); closeFields();
if (server0.getConfiguration().isPersistenceEnabled()) { if (server0.getConfiguration().isPersistenceEnabled()) {
assertEquals(0, loadQueues(server0).size()); assertEquals(0, loadQueues(server0).size());

View File

@ -75,7 +75,9 @@ import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtoco
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection; import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.apache.activemq.artemis.core.server.cluster.ClusterManager; import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding; import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl; import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum; import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
import org.apache.activemq.artemis.core.server.group.GroupingHandler; import org.apache.activemq.artemis.core.server.group.GroupingHandler;
@ -1286,6 +1288,22 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
verifyReceiveRoundRobinInSomeOrder(false, numMessages, consumerIDs); verifyReceiveRoundRobinInSomeOrder(false, numMessages, consumerIDs);
} }
protected void verifyClusterMetrics(final int node, final String clusterName, final long expectedMessagesPendingAcknowledgement,
final long expectedMessagesAcknowledged) {
final ClusterConnection clusterConnection = servers[node].getClusterManager().getClusterConnection(clusterName);
final ClusterConnectionMetrics clusterMetrics = clusterConnection.getMetrics();
assertEquals(expectedMessagesPendingAcknowledgement, clusterMetrics.getMessagesPendingAcknowledgement());
assertEquals(expectedMessagesAcknowledged, clusterMetrics.getMessagesAcknowledged());
}
protected void verifyBridgeMetrics(final int node, final String clusterName, final String bridgeNodeId,
final long expectedMessagesPendingAcknowledgement, final long expectedMessagesAcknowledged) {
final ClusterConnection clusterConnection = servers[node].getClusterManager().getClusterConnection(clusterName);
final BridgeMetrics bridgeMetrics = clusterConnection.getBridgeMetrics(bridgeNodeId);
assertEquals(expectedMessagesPendingAcknowledgement, bridgeMetrics.getMessagesPendingAcknowledgement());
assertEquals(expectedMessagesAcknowledged, bridgeMetrics.getMessagesAcknowledged());
}
protected int[] getReceivedOrder(final int consumerID) throws Exception { protected int[] getReceivedOrder(final int consumerID) throws Exception {
return getReceivedOrder(consumerID, false); return getReceivedOrder(consumerID, false);
} }

View File

@ -75,6 +75,15 @@ public class OneWayChainClusterTest extends ClusterTestBase {
send(0, "queues.testaddress", 10, false, null); send(0, "queues.testaddress", 10, false, null);
verifyReceiveRoundRobin(10, 0, 1); verifyReceiveRoundRobin(10, 0, 1);
verifyNotReceive(0, 1); verifyNotReceive(0, 1);
//half of the messages should be sent over bridges to the last broker in the chain
//as there is a consumer on that last broker
verifyClusterMetrics(0, "cluster0-1", 5, 5);
verifyClusterMetrics(1, "cluster1-2", 5, 5);
verifyClusterMetrics(2, "cluster2-3", 5, 5);
verifyClusterMetrics(3, "cluster3-4", 5, 5);
verifyClusterMetrics(4, "cluster4-X", 0, 0);
} }
@Test @Test

View File

@ -198,6 +198,15 @@ public class OnewayTwoNodeClusterTest extends ClusterTestBase {
addConsumer(1, 0, "queue0", null); addConsumer(1, 0, "queue0", null);
verifyNotReceive(1); verifyNotReceive(1);
//Should be 0 as no messages were sent to the second broker
verifyClusterMetrics(0, "cluster1", 0, 0);
//Should be 0 as no messages were sent to the first broker
verifyClusterMetrics(1, "clusterX", 0, 0);
//0 messages were sent across the bridge to the second broker
verifyBridgeMetrics(0, "cluster1", servers[1].getClusterManager().getNodeId(), 0, 0);
} }
@Test @Test
@ -224,6 +233,15 @@ public class OnewayTwoNodeClusterTest extends ClusterTestBase {
send(0, "queues.testaddress", 10, false, null); send(0, "queues.testaddress", 10, false, null);
verifyReceiveRoundRobin(10, 0, 1); verifyReceiveRoundRobin(10, 0, 1);
verifyNotReceive(0, 1); verifyNotReceive(0, 1);
//half of the messages should be sent over bridge, other half was consumed by local consumer
verifyClusterMetrics(0, "cluster1", 5, 5);
//Should be 0 as no messages were sent to the first broker
verifyClusterMetrics(1, "clusterX", 0, 0);
//5 messages were sent across the bridge to the second broker
verifyBridgeMetrics(0, "cluster1", servers[1].getClusterManager().getNodeId(), 5, 5);
} }
@Test @Test

View File

@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.tests.integration.SimpleNotificationService; import org.apache.activemq.artemis.tests.integration.SimpleNotificationService;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
@ -64,6 +65,11 @@ public class BridgeControlTest extends ManagementTestBase {
Assert.assertEquals(bridgeConfig.getRetryIntervalMultiplier(), bridgeControl.getRetryIntervalMultiplier(), 0.000001); Assert.assertEquals(bridgeConfig.getRetryIntervalMultiplier(), bridgeControl.getRetryIntervalMultiplier(), 0.000001);
Assert.assertEquals(bridgeConfig.getReconnectAttempts(), bridgeControl.getReconnectAttempts()); Assert.assertEquals(bridgeConfig.getReconnectAttempts(), bridgeControl.getReconnectAttempts());
Assert.assertEquals(bridgeConfig.isUseDuplicateDetection(), bridgeControl.isUseDuplicateDetection()); Assert.assertEquals(bridgeConfig.isUseDuplicateDetection(), bridgeControl.isUseDuplicateDetection());
Map<String, Object> bridgeMetrics = bridgeControl.getMetrics();
Assert.assertEquals(0L, bridgeControl.getMessagesPendingAcknowledgement());
Assert.assertEquals(0L, bridgeControl.getMessagesAcknowledged());
Assert.assertEquals(0L, bridgeMetrics.get(BridgeMetrics.MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY));
Assert.assertEquals(0L, bridgeMetrics.get(BridgeMetrics.MESSAGES_ACKNOWLEDGED_KEY));
String[] connectorPairData = bridgeControl.getStaticConnectors(); String[] connectorPairData = bridgeControl.getStaticConnectors();
Assert.assertEquals(bridgeConfig.getStaticConnectors().get(0), connectorPairData[0]); Assert.assertEquals(bridgeConfig.getStaticConnectors().get(0), connectorPairData[0]);

View File

@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
import org.apache.activemq.artemis.utils.RandomUtil; import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -69,6 +70,13 @@ public class BridgeControlUsingCoreTest extends ManagementTestBase {
Assert.assertEquals(bridgeConfig.getReconnectAttempts(), proxy.retrieveAttributeValue("reconnectAttempts", Integer.class)); Assert.assertEquals(bridgeConfig.getReconnectAttempts(), proxy.retrieveAttributeValue("reconnectAttempts", Integer.class));
Assert.assertEquals(bridgeConfig.isUseDuplicateDetection(), proxy.retrieveAttributeValue("useDuplicateDetection", Boolean.class)); Assert.assertEquals(bridgeConfig.isUseDuplicateDetection(), proxy.retrieveAttributeValue("useDuplicateDetection", Boolean.class));
@SuppressWarnings("unchecked")
Map<String, Object> bridgeMetrics = (Map<String, Object>) proxy.retrieveAttributeValue("metrics", Map.class);
Assert.assertEquals(0L, proxy.retrieveAttributeValue("messagesPendingAcknowledgement", Long.class));
Assert.assertEquals(0L, proxy.retrieveAttributeValue("messagesAcknowledged", Long.class));
Assert.assertEquals(0L, bridgeMetrics.get(BridgeMetrics.MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY));
Assert.assertEquals(0L, bridgeMetrics.get(BridgeMetrics.MESSAGES_ACKNOWLEDGED_KEY));
Object[] data = (Object[]) proxy.retrieveAttributeValue("staticConnectors"); Object[] data = (Object[]) proxy.retrieveAttributeValue("staticConnectors");
Assert.assertEquals(bridgeConfig.getStaticConnectors().get(0), data[0]); Assert.assertEquals(bridgeConfig.getStaticConnectors().get(0), data[0]);

View File

@ -16,14 +16,15 @@
*/ */
package org.apache.activemq.artemis.tests.integration.management; package org.apache.activemq.artemis.tests.integration.management;
import javax.json.JsonArray;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import javax.json.JsonArray;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration; import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.JsonUtil; import org.apache.activemq.artemis.api.core.JsonUtil;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -40,6 +41,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants; import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.management.Notification; import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.tests.integration.SimpleNotificationService; import org.apache.activemq.artemis.tests.integration.SimpleNotificationService;
@ -82,6 +84,12 @@ public class ClusterConnectionControlTest extends ManagementTestBase {
Assert.assertEquals(clusterConnectionConfig1.isDuplicateDetection(), clusterConnectionControl.isDuplicateDetection()); Assert.assertEquals(clusterConnectionConfig1.isDuplicateDetection(), clusterConnectionControl.isDuplicateDetection());
Assert.assertEquals(clusterConnectionConfig1.getMessageLoadBalancingType().getType(), clusterConnectionControl.getMessageLoadBalancingType()); Assert.assertEquals(clusterConnectionConfig1.getMessageLoadBalancingType().getType(), clusterConnectionControl.getMessageLoadBalancingType());
Assert.assertEquals(clusterConnectionConfig1.getMaxHops(), clusterConnectionControl.getMaxHops()); Assert.assertEquals(clusterConnectionConfig1.getMaxHops(), clusterConnectionControl.getMaxHops());
Assert.assertEquals(0L, clusterConnectionControl.getMessagesPendingAcknowledgement());
Assert.assertEquals(0L, clusterConnectionControl.getMessagesAcknowledged());
Map<String, Object> clusterMetrics = clusterConnectionControl.getMetrics();
Assert.assertEquals(0L, clusterMetrics.get(ClusterConnectionMetrics.MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY));
Assert.assertEquals(0L, clusterMetrics.get(ClusterConnectionMetrics.MESSAGES_ACKNOWLEDGED_KEY));
Assert.assertNull(clusterConnectionControl.getBridgeMetrics("bad"));
Object[] connectors = clusterConnectionControl.getStaticConnectors(); Object[] connectors = clusterConnectionControl.getStaticConnectors();
Assert.assertEquals(1, connectors.length); Assert.assertEquals(1, connectors.length);

View File

@ -98,6 +98,28 @@ public class ClusterConnectionControlUsingCoreTest extends ClusterConnectionCont
return (String) proxy.retrieveAttributeValue("nodeID"); return (String) proxy.retrieveAttributeValue("nodeID");
} }
@Override
public long getMessagesPendingAcknowledgement() {
return (Long) proxy.retrieveAttributeValue("messagesPendingAcknowledgement", Long.class);
}
@Override
public long getMessagesAcknowledged() {
return (Long) proxy.retrieveAttributeValue("messagesAcknowledged", Long.class);
}
@SuppressWarnings("unchecked")
@Override
public Map<String, Object> getMetrics() {
return (Map<String, Object>) proxy.retrieveAttributeValue("metrics", Map.class);
}
@SuppressWarnings("unchecked")
@Override
public Map<String, Object> getBridgeMetrics(String nodeId) throws Exception {
return (Map<String, Object>) proxy.invokeOperation("getBridgeMetrics", nodeId);
}
@Override @Override
public boolean isStarted() { public boolean isStarted() {
return (Boolean) proxy.retrieveAttributeValue("started", Boolean.class); return (Boolean) proxy.retrieveAttributeValue("started", Boolean.class);