ARTEMIS-2003 - Add bridge metrics
This commit adds support for tracking metrics for bridges for both normal bridges and bridges that are part of a cluster. The two statistics added in this commit are messages pending acknowledgement and messages acknowledged but more can be added later.
This commit is contained in:
parent
f980c349a8
commit
e629ac4538
|
@ -106,4 +106,33 @@ public interface BridgeControl extends ActiveMQComponentControl {
|
|||
*/
|
||||
@Attribute(desc = "whether this bridge is using high availability")
|
||||
boolean isHA();
|
||||
|
||||
/**
|
||||
* The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but
|
||||
* is waiting acknowledgement from the other broker. This is a cumulative total and the number of outstanding
|
||||
* pending messages can be computed by subtracting messagesAcknowledged from messagesPendingAcknowledgement.
|
||||
*
|
||||
*/
|
||||
@Attribute(desc = "The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the remote broker.")
|
||||
long getMessagesPendingAcknowledgement();
|
||||
|
||||
/**
|
||||
* The messagesAcknowledged counter is the number of messages actually received by the remote broker.
|
||||
* This is a cumulative total and the number of outstanding pending messages can be computed by subtracting
|
||||
* messagesAcknowledged from messagesPendingAcknowledgement.
|
||||
*
|
||||
*/
|
||||
@Attribute(desc = "The messagesAcknowledged counter is the number of messages actually received by the remote broker.")
|
||||
long getMessagesAcknowledged();
|
||||
|
||||
/**
|
||||
* The bridge metrics for this bridge
|
||||
*
|
||||
* The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the other broker.
|
||||
* The messagesAcknowledged counter is the number of messages actually received by the remote broker.
|
||||
*
|
||||
*/
|
||||
@Attribute(desc = "The metrics for this bridge. The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the remote broker. The messagesAcknowledged counter is the number of messages actually received by the remote broker.")
|
||||
Map<String, Object> getMetrics();
|
||||
|
||||
}
|
||||
|
|
|
@ -96,4 +96,52 @@ public interface ClusterConnectionControl extends ActiveMQComponentControl {
|
|||
*/
|
||||
@Attribute(desc = "map of the nodes connected to this cluster connection (keys are node IDs, values are the addresses used to connect to the nodes)")
|
||||
Map<String, String> getNodes() throws Exception;
|
||||
|
||||
/**
|
||||
* The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has
|
||||
* forwarded a message and is waiting acknowledgement from the other broker. (aggregate over all bridges)
|
||||
*
|
||||
* This is a cumulative total and the number of outstanding pending messages for the cluster connection
|
||||
* can be computed by subtracting messagesAcknowledged from messagesPendingAcknowledgement.
|
||||
*
|
||||
*/
|
||||
@Attribute(desc = "The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has forwarded a message and is waiting acknowledgement from the other broker. (aggregate over all bridges)")
|
||||
long getMessagesPendingAcknowledgement();
|
||||
|
||||
/**
|
||||
* The messagesAcknowledged counter is the number of messages actually received by a remote broker for all
|
||||
* bridges in this cluster connection
|
||||
*
|
||||
* This is a cumulative total and the number of outstanding pending messages for the cluster connection
|
||||
* can be computed by subtracting messagesAcknowledged from messagesPendingAcknowledgement.
|
||||
*
|
||||
*/
|
||||
@Attribute(desc = "The messagesAcknowledged counter is the number of messages actually received by a remote broker for all bridges in this cluster connection")
|
||||
long getMessagesAcknowledged();
|
||||
|
||||
/**
|
||||
* The current metrics for this cluster connection (aggregate over all bridges to other nodes)
|
||||
*
|
||||
* The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has
|
||||
* forwarded a message and is waiting acknowledgement from the other broker.
|
||||
*
|
||||
* The messagesAcknowledged counter is the number of messages actually received by a remote broker for all
|
||||
* bridges in this cluster connection
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
@Attribute(desc = "The metrics for this cluster connection. The messagesPendingAcknowledgement counter is incremented when any bridge in the cluster connection has forwarded a message and is waiting acknowledgement from the other broker. The messagesAcknowledged counter is the number of messages actually received by a remote broker for all bridges in this cluster connection")
|
||||
Map<String, Object> getMetrics();
|
||||
|
||||
/**
|
||||
* The bridge metrics for the given node in the cluster connection
|
||||
*
|
||||
* The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the other broker.
|
||||
* The messagesAcknowledged counter is the number of messages actually received by the remote broker for this bridge.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Attribute(desc = "The metrics for the bridge by nodeId. The messagesPendingAcknowledgement counter is incremented when the bridge is has forwarded a message but is waiting acknowledgement from the other broker. The messagesAcknowledged counter is the number of messages actually received by the remote broker for this bridge.")
|
||||
Map<String, Object> getBridgeMetrics(String nodeId) throws Exception;
|
||||
|
||||
}
|
||||
|
|
|
@ -16,11 +16,12 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.management.impl;
|
||||
|
||||
import javax.management.MBeanAttributeInfo;
|
||||
import javax.management.MBeanOperationInfo;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.management.MBeanAttributeInfo;
|
||||
import javax.management.MBeanOperationInfo;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.JsonUtil;
|
||||
import org.apache.activemq.artemis.api.core.management.BridgeControl;
|
||||
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
|
||||
|
@ -228,6 +229,36 @@ public class BridgeControlImpl extends AbstractControl implements BridgeControl
|
|||
return MBeanInfoHelper.getMBeanAttributesInfo(BridgeControl.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMessagesPendingAcknowledgement() {
|
||||
clearIO();
|
||||
try {
|
||||
return bridge.getMetrics().getMessagesPendingAcknowledgement();
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMessagesAcknowledged() {
|
||||
clearIO();
|
||||
try {
|
||||
return bridge.getMetrics().getMessagesAcknowledged();
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getMetrics() {
|
||||
clearIO();
|
||||
try {
|
||||
return bridge.getMetrics().convertToMap();
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
}
|
||||
|
||||
// Public --------------------------------------------------------
|
||||
|
||||
// Package protected ---------------------------------------------
|
||||
|
|
|
@ -16,16 +16,18 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.core.management.impl;
|
||||
|
||||
import javax.management.MBeanAttributeInfo;
|
||||
import javax.management.MBeanOperationInfo;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.management.MBeanAttributeInfo;
|
||||
import javax.management.MBeanOperationInfo;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.JsonUtil;
|
||||
import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl;
|
||||
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
|
||||
|
||||
public class ClusterConnectionControlImpl extends AbstractControl implements ClusterConnectionControl {
|
||||
|
||||
|
@ -223,6 +225,48 @@ public class ClusterConnectionControlImpl extends AbstractControl implements Clu
|
|||
return MBeanInfoHelper.getMBeanAttributesInfo(ClusterConnectionControl.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMessagesPendingAcknowledgement() {
|
||||
clearIO();
|
||||
try {
|
||||
return clusterConnection.getMetrics().getMessagesPendingAcknowledgement();
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMessagesAcknowledged() {
|
||||
clearIO();
|
||||
try {
|
||||
return clusterConnection.getMetrics().getMessagesAcknowledged();
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getMetrics() {
|
||||
clearIO();
|
||||
try {
|
||||
return clusterConnection.getMetrics().convertToMap();
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, Object> getBridgeMetrics(String nodeId) {
|
||||
clearIO();
|
||||
try {
|
||||
final BridgeMetrics bridgeMetrics = clusterConnection.getBridgeMetrics(nodeId);
|
||||
return bridgeMetrics != null ? bridgeMetrics.convertToMap() : null;
|
||||
} finally {
|
||||
blockOnIO();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Public --------------------------------------------------------
|
||||
|
||||
// Package protected ---------------------------------------------
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
|
|||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||
import org.apache.activemq.artemis.core.server.Consumer;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
|
||||
import org.apache.activemq.artemis.core.server.management.NotificationService;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
|
||||
|
@ -52,4 +53,6 @@ public interface Bridge extends Consumer, ActiveMQComponent {
|
|||
void disconnect();
|
||||
|
||||
boolean isConnected();
|
||||
|
||||
BridgeMetrics getMetrics();
|
||||
}
|
||||
|
|
|
@ -25,6 +25,8 @@ import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
|
|||
import org.apache.activemq.artemis.core.client.impl.Topology;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;
|
||||
|
||||
public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyListener {
|
||||
|
||||
|
@ -80,4 +82,18 @@ public interface ClusterConnection extends ActiveMQComponent, ClusterTopologyLis
|
|||
|
||||
long getCallTimeout();
|
||||
|
||||
/**
|
||||
* The metric for this cluster connection
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
ClusterConnectionMetrics getMetrics();
|
||||
|
||||
/**
|
||||
* Returns the BridgeMetrics for the bridge to the given node if exists
|
||||
*
|
||||
* @param nodeId
|
||||
* @return
|
||||
*/
|
||||
BridgeMetrics getBridgeMetrics(String nodeId);
|
||||
}
|
||||
|
|
|
@ -55,10 +55,10 @@ import org.apache.activemq.artemis.core.server.LargeServerMessage;
|
|||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.Queue;
|
||||
import org.apache.activemq.artemis.core.server.cluster.Bridge;
|
||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||
import org.apache.activemq.artemis.core.server.impl.QueueImpl;
|
||||
import org.apache.activemq.artemis.core.server.management.Notification;
|
||||
import org.apache.activemq.artemis.core.server.management.NotificationService;
|
||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.EmbedMessageUtil;
|
||||
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
|
||||
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
|
||||
|
@ -159,6 +159,8 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
|
||||
private ActiveMQServer server;
|
||||
|
||||
private final BridgeMetrics metrics = new BridgeMetrics();
|
||||
|
||||
public BridgeImpl(final ServerLocatorInternal serverLocator,
|
||||
final int initialConnectAttempts,
|
||||
final int reconnectAttempts,
|
||||
|
@ -518,6 +520,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
}
|
||||
ref.getQueue().acknowledge(ref);
|
||||
pendingAcks.countDown();
|
||||
metrics.incrementMessagesAcknowledged();
|
||||
} else {
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.trace("BridgeImpl::sendAcknowledged bridge " + this + " could not find reference for message " + message);
|
||||
|
@ -611,13 +614,21 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
pendingAcks.countUp();
|
||||
|
||||
try {
|
||||
final HandleStatus status;
|
||||
if (message.isLargeMessage()) {
|
||||
deliveringLargeMessage = true;
|
||||
deliverLargeMessage(dest, ref, (LargeServerMessage) message);
|
||||
return HandleStatus.HANDLED;
|
||||
status = HandleStatus.HANDLED;
|
||||
} else {
|
||||
return deliverStandardMessage(dest, ref, message);
|
||||
status = deliverStandardMessage(dest, ref, message);
|
||||
}
|
||||
|
||||
//Only increment messages pending acknowledgement if handled by bridge
|
||||
if (status == HandleStatus.HANDLED) {
|
||||
metrics.incrementMessagesPendingAcknowledgement();
|
||||
}
|
||||
|
||||
return status;
|
||||
} catch (Exception e) {
|
||||
// If an exception happened, we must count down immediately
|
||||
pendingAcks.countDown();
|
||||
|
@ -770,6 +781,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
|
|||
return this.targetNode;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BridgeMetrics getMetrics() {
|
||||
return this.metrics;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return this.getClass().getSimpleName() + "@" +
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.server.cluster.impl;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
|
||||
|
||||
public class BridgeMetrics {
|
||||
|
||||
public static final String MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY = "messagesPendingAcknowledgement";
|
||||
public static final String MESSAGES_ACKNOWLEDGED_KEY = "messagesAcknowledged";
|
||||
|
||||
private static final AtomicLongFieldUpdater<BridgeMetrics> MESSAGES_PENDING_ACKNOWLEDGEMENT_UPDATER =
|
||||
AtomicLongFieldUpdater.newUpdater(BridgeMetrics.class, MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY);
|
||||
|
||||
private static final AtomicLongFieldUpdater<BridgeMetrics> MESSAGES_ACKNOWLEDGED_UPDATER =
|
||||
AtomicLongFieldUpdater.newUpdater(BridgeMetrics.class, MESSAGES_ACKNOWLEDGED_KEY);
|
||||
|
||||
private volatile long messagesPendingAcknowledgement;
|
||||
private volatile long messagesAcknowledged;
|
||||
|
||||
public void incrementMessagesPendingAcknowledgement() {
|
||||
MESSAGES_PENDING_ACKNOWLEDGEMENT_UPDATER.incrementAndGet(this);
|
||||
}
|
||||
|
||||
public void incrementMessagesAcknowledged() {
|
||||
MESSAGES_ACKNOWLEDGED_UPDATER.incrementAndGet(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the messagesPendingAcknowledgement
|
||||
*/
|
||||
public long getMessagesPendingAcknowledgement() {
|
||||
return messagesPendingAcknowledgement;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the messagesAcknowledged
|
||||
*/
|
||||
public long getMessagesAcknowledged() {
|
||||
return messagesAcknowledged;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return New map containing the Bridge metrics
|
||||
*/
|
||||
public Map<String, Object> convertToMap() {
|
||||
final Map<String, Object> metrics = new HashMap<>();
|
||||
metrics.put(MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY, messagesPendingAcknowledgement);
|
||||
metrics.put(MESSAGES_ACKNOWLEDGED_KEY, messagesAcknowledged);
|
||||
|
||||
return metrics;
|
||||
}
|
||||
}
|
|
@ -749,6 +749,26 @@ public final class ClusterConnectionImpl implements ClusterConnection, AfterConn
|
|||
topology.updateAsLive(nodeID, localMember);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public ClusterConnectionMetrics getMetrics() {
|
||||
long messagesPendingAcknowledgement = 0;
|
||||
long messagesAcknowledged = 0;
|
||||
for (MessageFlowRecord record : records.values()) {
|
||||
final BridgeMetrics metrics = record.getBridge() != null ? record.getBridge().getMetrics() : null;
|
||||
messagesPendingAcknowledgement += metrics != null ? metrics.getMessagesPendingAcknowledgement() : 0;
|
||||
messagesAcknowledged += metrics != null ? metrics.getMessagesAcknowledged() : 0;
|
||||
}
|
||||
|
||||
return new ClusterConnectionMetrics(messagesPendingAcknowledgement, messagesAcknowledged);
|
||||
}
|
||||
|
||||
@Override
|
||||
public BridgeMetrics getBridgeMetrics(String nodeId) {
|
||||
final MessageFlowRecord record = records.get(nodeId);
|
||||
return record != null && record.getBridge() != null ? record.getBridge().getMetrics() : null;
|
||||
}
|
||||
|
||||
private void createNewRecord(final long eventUID,
|
||||
final String targetNodeID,
|
||||
final TransportConfiguration connector,
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.artemis.core.server.cluster.impl;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
public class ClusterConnectionMetrics {
|
||||
|
||||
public static final String MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY = "messagesPendingAcknowledgement";
|
||||
public static final String MESSAGES_ACKNOWLEDGED_KEY = "messagesAcknowledged";
|
||||
|
||||
private final long messagesPendingAcknowledgement;
|
||||
private final long messagesAcknowledged;
|
||||
|
||||
/**
|
||||
* @param messagesPendingAcknowledgement
|
||||
* @param messagesAcknowledged
|
||||
*/
|
||||
public ClusterConnectionMetrics(long messagesPendingAcknowledgement, long messagesAcknowledged) {
|
||||
super();
|
||||
this.messagesPendingAcknowledgement = messagesPendingAcknowledgement;
|
||||
this.messagesAcknowledged = messagesAcknowledged;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the messagesPendingAcknowledgement
|
||||
*/
|
||||
public long getMessagesPendingAcknowledgement() {
|
||||
return messagesPendingAcknowledgement;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the messagesAcknowledged
|
||||
*/
|
||||
public long getMessagesAcknowledged() {
|
||||
return messagesAcknowledged;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return New map containing the Cluster Connection metrics
|
||||
*/
|
||||
public Map<String, Object> convertToMap() {
|
||||
final Map<String, Object> metrics = new HashMap<>();
|
||||
metrics.put(MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY, messagesPendingAcknowledgement);
|
||||
metrics.put(MESSAGES_ACKNOWLEDGED_KEY, messagesAcknowledged);
|
||||
|
||||
return metrics;
|
||||
}
|
||||
}
|
|
@ -73,6 +73,7 @@ import org.apache.activemq.artemis.core.server.cluster.Bridge;
|
|||
import org.apache.activemq.artemis.core.server.transformer.AddHeadersTransformer;
|
||||
import org.apache.activemq.artemis.core.server.transformer.Transformer;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
|
||||
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
|
||||
import org.apache.activemq.artemis.core.server.impl.ServiceRegistryImpl;
|
||||
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
|
||||
|
@ -502,6 +503,11 @@ public class BridgeTest extends ActiveMQTestBase {
|
|||
|
||||
sf1.close();
|
||||
|
||||
assertEquals(1, server0.getClusterManager().getBridges().size());
|
||||
BridgeMetrics bridgeMetrics = server0.getClusterManager().getBridges().get("bridge1").getMetrics();
|
||||
assertEquals(10, bridgeMetrics.getMessagesPendingAcknowledgement());
|
||||
assertEquals(10, bridgeMetrics.getMessagesAcknowledged());
|
||||
|
||||
closeFields();
|
||||
if (server0.getConfiguration().isPersistenceEnabled()) {
|
||||
assertEquals(0, loadQueues(server0).size());
|
||||
|
|
|
@ -75,7 +75,9 @@ import org.apache.activemq.artemis.core.server.cluster.ActiveMQServerSideProtoco
|
|||
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
|
||||
import org.apache.activemq.artemis.core.server.cluster.ClusterManager;
|
||||
import org.apache.activemq.artemis.core.server.cluster.RemoteQueueBinding;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.server.cluster.qourum.SharedNothingBackupQuorum;
|
||||
import org.apache.activemq.artemis.core.server.group.GroupingHandler;
|
||||
|
@ -1286,6 +1288,22 @@ public abstract class ClusterTestBase extends ActiveMQTestBase {
|
|||
verifyReceiveRoundRobinInSomeOrder(false, numMessages, consumerIDs);
|
||||
}
|
||||
|
||||
protected void verifyClusterMetrics(final int node, final String clusterName, final long expectedMessagesPendingAcknowledgement,
|
||||
final long expectedMessagesAcknowledged) {
|
||||
final ClusterConnection clusterConnection = servers[node].getClusterManager().getClusterConnection(clusterName);
|
||||
final ClusterConnectionMetrics clusterMetrics = clusterConnection.getMetrics();
|
||||
assertEquals(expectedMessagesPendingAcknowledgement, clusterMetrics.getMessagesPendingAcknowledgement());
|
||||
assertEquals(expectedMessagesAcknowledged, clusterMetrics.getMessagesAcknowledged());
|
||||
}
|
||||
|
||||
protected void verifyBridgeMetrics(final int node, final String clusterName, final String bridgeNodeId,
|
||||
final long expectedMessagesPendingAcknowledgement, final long expectedMessagesAcknowledged) {
|
||||
final ClusterConnection clusterConnection = servers[node].getClusterManager().getClusterConnection(clusterName);
|
||||
final BridgeMetrics bridgeMetrics = clusterConnection.getBridgeMetrics(bridgeNodeId);
|
||||
assertEquals(expectedMessagesPendingAcknowledgement, bridgeMetrics.getMessagesPendingAcknowledgement());
|
||||
assertEquals(expectedMessagesAcknowledged, bridgeMetrics.getMessagesAcknowledged());
|
||||
}
|
||||
|
||||
protected int[] getReceivedOrder(final int consumerID) throws Exception {
|
||||
return getReceivedOrder(consumerID, false);
|
||||
}
|
||||
|
|
|
@ -75,6 +75,15 @@ public class OneWayChainClusterTest extends ClusterTestBase {
|
|||
send(0, "queues.testaddress", 10, false, null);
|
||||
verifyReceiveRoundRobin(10, 0, 1);
|
||||
verifyNotReceive(0, 1);
|
||||
|
||||
//half of the messages should be sent over bridges to the last broker in the chain
|
||||
//as there is a consumer on that last broker
|
||||
verifyClusterMetrics(0, "cluster0-1", 5, 5);
|
||||
verifyClusterMetrics(1, "cluster1-2", 5, 5);
|
||||
verifyClusterMetrics(2, "cluster2-3", 5, 5);
|
||||
verifyClusterMetrics(3, "cluster3-4", 5, 5);
|
||||
verifyClusterMetrics(4, "cluster4-X", 0, 0);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -198,6 +198,15 @@ public class OnewayTwoNodeClusterTest extends ClusterTestBase {
|
|||
|
||||
addConsumer(1, 0, "queue0", null);
|
||||
verifyNotReceive(1);
|
||||
|
||||
//Should be 0 as no messages were sent to the second broker
|
||||
verifyClusterMetrics(0, "cluster1", 0, 0);
|
||||
|
||||
//Should be 0 as no messages were sent to the first broker
|
||||
verifyClusterMetrics(1, "clusterX", 0, 0);
|
||||
|
||||
//0 messages were sent across the bridge to the second broker
|
||||
verifyBridgeMetrics(0, "cluster1", servers[1].getClusterManager().getNodeId(), 0, 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -224,6 +233,15 @@ public class OnewayTwoNodeClusterTest extends ClusterTestBase {
|
|||
send(0, "queues.testaddress", 10, false, null);
|
||||
verifyReceiveRoundRobin(10, 0, 1);
|
||||
verifyNotReceive(0, 1);
|
||||
|
||||
//half of the messages should be sent over bridge, other half was consumed by local consumer
|
||||
verifyClusterMetrics(0, "cluster1", 5, 5);
|
||||
|
||||
//Should be 0 as no messages were sent to the first broker
|
||||
verifyClusterMetrics(1, "clusterX", 0, 0);
|
||||
|
||||
//5 messages were sent across the bridge to the second broker
|
||||
verifyBridgeMetrics(0, "cluster1", servers[1].getClusterManager().getNodeId(), 5, 5);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
|
|||
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
|
||||
import org.apache.activemq.artemis.core.server.management.Notification;
|
||||
import org.apache.activemq.artemis.tests.integration.SimpleNotificationService;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
|
@ -64,6 +65,11 @@ public class BridgeControlTest extends ManagementTestBase {
|
|||
Assert.assertEquals(bridgeConfig.getRetryIntervalMultiplier(), bridgeControl.getRetryIntervalMultiplier(), 0.000001);
|
||||
Assert.assertEquals(bridgeConfig.getReconnectAttempts(), bridgeControl.getReconnectAttempts());
|
||||
Assert.assertEquals(bridgeConfig.isUseDuplicateDetection(), bridgeControl.isUseDuplicateDetection());
|
||||
Map<String, Object> bridgeMetrics = bridgeControl.getMetrics();
|
||||
Assert.assertEquals(0L, bridgeControl.getMessagesPendingAcknowledgement());
|
||||
Assert.assertEquals(0L, bridgeControl.getMessagesAcknowledged());
|
||||
Assert.assertEquals(0L, bridgeMetrics.get(BridgeMetrics.MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY));
|
||||
Assert.assertEquals(0L, bridgeMetrics.get(BridgeMetrics.MESSAGES_ACKNOWLEDGED_KEY));
|
||||
|
||||
String[] connectorPairData = bridgeControl.getStaticConnectors();
|
||||
Assert.assertEquals(bridgeConfig.getStaticConnectors().get(0), connectorPairData[0]);
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
|
|||
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeMetrics;
|
||||
import org.apache.activemq.artemis.utils.RandomUtil;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
|
@ -69,6 +70,13 @@ public class BridgeControlUsingCoreTest extends ManagementTestBase {
|
|||
Assert.assertEquals(bridgeConfig.getReconnectAttempts(), proxy.retrieveAttributeValue("reconnectAttempts", Integer.class));
|
||||
Assert.assertEquals(bridgeConfig.isUseDuplicateDetection(), proxy.retrieveAttributeValue("useDuplicateDetection", Boolean.class));
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> bridgeMetrics = (Map<String, Object>) proxy.retrieveAttributeValue("metrics", Map.class);
|
||||
Assert.assertEquals(0L, proxy.retrieveAttributeValue("messagesPendingAcknowledgement", Long.class));
|
||||
Assert.assertEquals(0L, proxy.retrieveAttributeValue("messagesAcknowledged", Long.class));
|
||||
Assert.assertEquals(0L, bridgeMetrics.get(BridgeMetrics.MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY));
|
||||
Assert.assertEquals(0L, bridgeMetrics.get(BridgeMetrics.MESSAGES_ACKNOWLEDGED_KEY));
|
||||
|
||||
Object[] data = (Object[]) proxy.retrieveAttributeValue("staticConnectors");
|
||||
Assert.assertEquals(bridgeConfig.getStaticConnectors().get(0), data[0]);
|
||||
|
||||
|
|
|
@ -16,14 +16,15 @@
|
|||
*/
|
||||
package org.apache.activemq.artemis.tests.integration.management;
|
||||
|
||||
import javax.json.JsonArray;
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.MBeanServerFactory;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.json.JsonArray;
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.MBeanServerFactory;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
|
||||
import org.apache.activemq.artemis.api.core.JsonUtil;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
|
@ -40,6 +41,7 @@ import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnectorFactory;
|
|||
import org.apache.activemq.artemis.core.remoting.impl.invm.TransportConstants;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServers;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionMetrics;
|
||||
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
|
||||
import org.apache.activemq.artemis.core.server.management.Notification;
|
||||
import org.apache.activemq.artemis.tests.integration.SimpleNotificationService;
|
||||
|
@ -82,6 +84,12 @@ public class ClusterConnectionControlTest extends ManagementTestBase {
|
|||
Assert.assertEquals(clusterConnectionConfig1.isDuplicateDetection(), clusterConnectionControl.isDuplicateDetection());
|
||||
Assert.assertEquals(clusterConnectionConfig1.getMessageLoadBalancingType().getType(), clusterConnectionControl.getMessageLoadBalancingType());
|
||||
Assert.assertEquals(clusterConnectionConfig1.getMaxHops(), clusterConnectionControl.getMaxHops());
|
||||
Assert.assertEquals(0L, clusterConnectionControl.getMessagesPendingAcknowledgement());
|
||||
Assert.assertEquals(0L, clusterConnectionControl.getMessagesAcknowledged());
|
||||
Map<String, Object> clusterMetrics = clusterConnectionControl.getMetrics();
|
||||
Assert.assertEquals(0L, clusterMetrics.get(ClusterConnectionMetrics.MESSAGES_PENDING_ACKNOWLEDGEMENT_KEY));
|
||||
Assert.assertEquals(0L, clusterMetrics.get(ClusterConnectionMetrics.MESSAGES_ACKNOWLEDGED_KEY));
|
||||
Assert.assertNull(clusterConnectionControl.getBridgeMetrics("bad"));
|
||||
|
||||
Object[] connectors = clusterConnectionControl.getStaticConnectors();
|
||||
Assert.assertEquals(1, connectors.length);
|
||||
|
|
|
@ -98,6 +98,28 @@ public class ClusterConnectionControlUsingCoreTest extends ClusterConnectionCont
|
|||
return (String) proxy.retrieveAttributeValue("nodeID");
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMessagesPendingAcknowledgement() {
|
||||
return (Long) proxy.retrieveAttributeValue("messagesPendingAcknowledgement", Long.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMessagesAcknowledged() {
|
||||
return (Long) proxy.retrieveAttributeValue("messagesAcknowledged", Long.class);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public Map<String, Object> getMetrics() {
|
||||
return (Map<String, Object>) proxy.retrieveAttributeValue("metrics", Map.class);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public Map<String, Object> getBridgeMetrics(String nodeId) throws Exception {
|
||||
return (Map<String, Object>) proxy.invokeOperation("getBridgeMetrics", nodeId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isStarted() {
|
||||
return (Boolean) proxy.retrieveAttributeValue("started", Boolean.class);
|
||||
|
|
Loading…
Reference in New Issue