mirror of https://github.com/apache/activemq.git
Adding NetworkBridgeStatistics and also a received count for bridges when they are in duplex mode.
This commit is contained in:
parent
2b84cd60ba
commit
10c998b0bc
|
@ -31,43 +31,58 @@ public class NetworkBridgeView implements NetworkBridgeViewMBean {
|
|||
this.bridge = bridge;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() throws Exception {
|
||||
bridge.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() throws Exception {
|
||||
bridge.stop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getLocalAddress() {
|
||||
return bridge.getLocalAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRemoteAddress() {
|
||||
return bridge.getRemoteAddress();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getRemoteBrokerName() {
|
||||
return bridge.getRemoteBrokerName();
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public String getRemoteBrokerId() {
|
||||
return bridge.getRemoteBrokerId();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getLocalBrokerName() {
|
||||
return bridge.getLocalBrokerName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getEnqueueCounter() {
|
||||
return bridge.getEnqueueCounter();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getDequeueCounter() {
|
||||
return bridge.getDequeueCounter();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getReceivedCounter() {
|
||||
return bridge.getNetworkBridgeStatistics().getReceivedCount().getCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCreatedByDuplex() {
|
||||
return createByDuplex;
|
||||
}
|
||||
|
@ -76,6 +91,7 @@ public class NetworkBridgeView implements NetworkBridgeViewMBean {
|
|||
this.createByDuplex = createByDuplex;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resetStats(){
|
||||
bridge.resetStats();
|
||||
for (NetworkDestinationView networkDestinationView:networkDestinationViewList){
|
||||
|
|
|
@ -34,6 +34,8 @@ public interface NetworkBridgeViewMBean extends Service {
|
|||
|
||||
long getDequeueCounter();
|
||||
|
||||
long getReceivedCounter();
|
||||
|
||||
boolean isCreatedByDuplex();
|
||||
|
||||
void resetStats();
|
||||
|
|
|
@ -34,7 +34,6 @@ import java.util.concurrent.Future;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import javax.management.ObjectName;
|
||||
|
||||
|
@ -139,8 +138,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null};
|
||||
protected BrokerId remoteBrokerId;
|
||||
|
||||
final AtomicLong enqueueCounter = new AtomicLong();
|
||||
final AtomicLong dequeueCounter = new AtomicLong();
|
||||
protected final NetworkBridgeStatistics networkBridgeStatistics = new NetworkBridgeStatistics();
|
||||
|
||||
private NetworkBridgeListener networkBridgeListener;
|
||||
private boolean createdByDuplex;
|
||||
|
@ -181,6 +179,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
throw new IllegalArgumentException("BrokerService is null on " + this);
|
||||
}
|
||||
|
||||
networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics());
|
||||
|
||||
if (isDuplex()) {
|
||||
duplexInboundLocalBroker = NetworkBridgeFactory.createLocalTransport(brokerService.getBroker());
|
||||
duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
|
||||
|
@ -640,6 +640,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
Response reply = resp.getResult();
|
||||
reply.setCorrelationId(correlationId);
|
||||
remoteBroker.oneway(reply);
|
||||
//increment counter when messages are received in duplex mode
|
||||
networkBridgeStatistics.getReceivedCount().increment();
|
||||
} catch (IOException error) {
|
||||
LOG.error("Exception: {} on duplex forward of: {}", error, message);
|
||||
serviceRemoteException(error);
|
||||
|
@ -648,6 +650,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
});
|
||||
} else {
|
||||
duplexInboundLocalBroker.oneway(message);
|
||||
networkBridgeStatistics.getReceivedCount().increment();
|
||||
}
|
||||
serviceInboundMessage(message);
|
||||
} else {
|
||||
|
@ -968,7 +971,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
try {
|
||||
if (command.isMessageDispatch()) {
|
||||
safeWaitUntilStarted();
|
||||
enqueueCounter.incrementAndGet();
|
||||
networkBridgeStatistics.getEnqueues().increment();
|
||||
final MessageDispatch md = (MessageDispatch) command;
|
||||
final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
|
||||
if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
|
||||
|
@ -1016,7 +1019,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
serviceLocalException(md, er.getException());
|
||||
} else {
|
||||
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
|
||||
dequeueCounter.incrementAndGet();
|
||||
networkBridgeStatistics.getDequeues().increment();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
serviceLocalException(md, e);
|
||||
|
@ -1033,7 +1036,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
try {
|
||||
remoteBroker.oneway(message);
|
||||
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
|
||||
dequeueCounter.incrementAndGet();
|
||||
networkBridgeStatistics.getDequeues().increment();
|
||||
} finally {
|
||||
sub.decrementOutstandingResponses();
|
||||
}
|
||||
|
@ -1559,12 +1562,17 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
|
||||
@Override
|
||||
public long getDequeueCounter() {
|
||||
return dequeueCounter.get();
|
||||
return networkBridgeStatistics.getDequeues().getCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getEnqueueCounter() {
|
||||
return enqueueCounter.get();
|
||||
return networkBridgeStatistics.getEnqueues().getCount();
|
||||
}
|
||||
|
||||
@Override
|
||||
public NetworkBridgeStatistics getNetworkBridgeStatistics() {
|
||||
return networkBridgeStatistics;
|
||||
}
|
||||
|
||||
protected boolean isDuplex() {
|
||||
|
@ -1594,8 +1602,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||
|
||||
@Override
|
||||
public void resetStats() {
|
||||
enqueueCounter.set(0);
|
||||
dequeueCounter.set(0);
|
||||
networkBridgeStatistics.reset();
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -78,6 +78,11 @@ public interface NetworkBridge extends Service {
|
|||
*/
|
||||
long getDequeueCounter();
|
||||
|
||||
/**
|
||||
* @return the statistics for this NetworkBridge
|
||||
*/
|
||||
NetworkBridgeStatistics getNetworkBridgeStatistics();
|
||||
|
||||
/**
|
||||
* @param objectName
|
||||
* The ObjectName assigned to this bridge in the MBean server.
|
||||
|
|
|
@ -0,0 +1,102 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.network;
|
||||
|
||||
import org.apache.activemq.management.CountStatisticImpl;
|
||||
import org.apache.activemq.management.StatsImpl;
|
||||
|
||||
/**
|
||||
* The Statistics for a NewtorkBridge.
|
||||
*/
|
||||
public class NetworkBridgeStatistics extends StatsImpl {
|
||||
|
||||
protected CountStatisticImpl enqueues;
|
||||
protected CountStatisticImpl dequeues;
|
||||
protected CountStatisticImpl receivedCount;
|
||||
|
||||
public NetworkBridgeStatistics() {
|
||||
enqueues = new CountStatisticImpl("enqueues", "The current number of enqueues this bridge has, which is the number of potential messages to be forwarded.");
|
||||
dequeues = new CountStatisticImpl("dequeues", "The current number of dequeues this bridge has, which is the number of messages received by the remote broker.");
|
||||
receivedCount = new CountStatisticImpl("receivedCount", "The number of messages that have been received by the NetworkBridge from the remote broker. Only applies for Duplex bridges.");
|
||||
|
||||
addStatistic("enqueues", enqueues);
|
||||
addStatistic("dequeues", dequeues);
|
||||
addStatistic("receivedCount", receivedCount);
|
||||
}
|
||||
|
||||
/**
|
||||
* The current number of enqueues this bridge has, which is the number of potential messages to be forwarded
|
||||
* Messages may not be forwarded if there is no subscription
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public CountStatisticImpl getEnqueues() {
|
||||
return enqueues;
|
||||
}
|
||||
|
||||
/**
|
||||
* The current number of dequeues this bridge has, which is the number of
|
||||
* messages actually sent to and received by the remote broker.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public CountStatisticImpl getDequeues() {
|
||||
return dequeues;
|
||||
}
|
||||
|
||||
/**
|
||||
* The number of messages that have been received by the NetworkBridge from the remote broker.
|
||||
* Only applies for Duplex bridges.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public CountStatisticImpl getReceivedCount() {
|
||||
return receivedCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void reset() {
|
||||
if (this.isDoReset()) {
|
||||
super.reset();
|
||||
enqueues.reset();
|
||||
dequeues.reset();
|
||||
receivedCount.reset();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setEnabled(boolean enabled) {
|
||||
super.setEnabled(enabled);
|
||||
enqueues.setEnabled(enabled);
|
||||
dequeues.setEnabled(enabled);
|
||||
receivedCount.setEnabled(enabled);
|
||||
}
|
||||
|
||||
public void setParent(NetworkBridgeStatistics parent) {
|
||||
if (parent != null) {
|
||||
enqueues.setParent(parent.enqueues);
|
||||
dequeues.setParent(parent.dequeues);
|
||||
receivedCount.setParent(parent.receivedCount);
|
||||
} else {
|
||||
enqueues.setParent(null);
|
||||
dequeues.setParent(null);
|
||||
receivedCount.setParent(null);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -35,6 +35,7 @@ import org.apache.activemq.command.DiscoveryEvent;
|
|||
import org.apache.activemq.network.DiscoveryNetworkConnector;
|
||||
import org.apache.activemq.network.NetworkBridge;
|
||||
import org.apache.activemq.network.NetworkBridgeListener;
|
||||
import org.apache.activemq.network.NetworkBridgeStatistics;
|
||||
import org.apache.activemq.network.NetworkConnector;
|
||||
import org.apache.activemq.thread.TaskRunnerFactory;
|
||||
import org.apache.activemq.transport.Transport;
|
||||
|
@ -56,6 +57,7 @@ public class AMQ4160Test extends JmsMultipleBrokersTestSupport {
|
|||
* Since these tests involve wait conditions, protect against indefinite
|
||||
* waits (due to unanticipated issues).
|
||||
*/
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
setAutoFail(true);
|
||||
setMaxTestTime(MAX_TEST_TIME);
|
||||
|
@ -327,6 +329,11 @@ public class AMQ4160Test extends JmsMultipleBrokersTestSupport {
|
|||
return next.getDequeueCounter();
|
||||
}
|
||||
|
||||
@Override
|
||||
public NetworkBridgeStatistics getNetworkBridgeStatistics() {
|
||||
return next.getNetworkBridgeStatistics();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMbeanObjectName(ObjectName objectName) {
|
||||
next.setMbeanObjectName(objectName);
|
||||
|
@ -337,6 +344,7 @@ public class AMQ4160Test extends JmsMultipleBrokersTestSupport {
|
|||
return next.getMbeanObjectName();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void resetStats(){
|
||||
next.resetStats();
|
||||
}
|
||||
|
|
|
@ -86,4 +86,19 @@ public class DuplexNetworkTest extends SimpleNetworkTest {
|
|||
return id;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final long expectedRemoteSent) throws Exception {
|
||||
|
||||
final NetworkBridge localBridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
|
||||
|
||||
assertTrue(Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return expectedLocalSent == localBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
|
||||
expectedRemoteSent == localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount();
|
||||
}
|
||||
}));
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -96,6 +96,8 @@ public class SimpleNetworkTest {
|
|||
}
|
||||
// ensure no more messages received
|
||||
assertNull(consumer1.receive(1000));
|
||||
|
||||
assertNetworkBridgeStatistics(MESSAGE_COUNT, 0);
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
|
@ -128,6 +130,8 @@ public class SimpleNetworkTest {
|
|||
assertNotNull(result);
|
||||
LOG.info(result.getText());
|
||||
}
|
||||
|
||||
assertNetworkBridgeStatistics(MESSAGE_COUNT, MESSAGE_COUNT);
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
|
@ -136,13 +140,15 @@ public class SimpleNetworkTest {
|
|||
MessageConsumer excludedConsumer = remoteSession.createConsumer(excluded);
|
||||
MessageProducer includedProducer = localSession.createProducer(included);
|
||||
MessageProducer excludedProducer = localSession.createProducer(excluded);
|
||||
// allow for consumer infos to perculate arround
|
||||
// allow for consumer infos to perculate around
|
||||
Thread.sleep(2000);
|
||||
Message test = localSession.createTextMessage("test");
|
||||
includedProducer.send(test);
|
||||
excludedProducer.send(test);
|
||||
assertNull(excludedConsumer.receive(1000));
|
||||
assertNotNull(includedConsumer.receive(1000));
|
||||
|
||||
assertNetworkBridgeStatistics(1, 0);
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
|
@ -163,6 +169,8 @@ public class SimpleNetworkTest {
|
|||
// ensure no more messages received
|
||||
assertNull(consumer1.receive(1000));
|
||||
assertNull(consumer2.receive(1000));
|
||||
|
||||
assertNetworkBridgeStatistics(MESSAGE_COUNT, 0);
|
||||
}
|
||||
|
||||
private void waitForConsumerRegistration(final BrokerService brokerService, final int min, final ActiveMQDestination destination) throws Exception {
|
||||
|
@ -319,4 +327,20 @@ public class SimpleNetworkTest {
|
|||
protected BrokerService createRemoteBroker() throws Exception {
|
||||
return createBroker(getRemoteBrokerURI());
|
||||
}
|
||||
|
||||
protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final long expectedRemoteSent) throws Exception {
|
||||
|
||||
final NetworkBridge localBridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
|
||||
final NetworkBridge remoteBridge = remoteBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
|
||||
|
||||
assertTrue(Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return expectedLocalSent == localBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
|
||||
0 == localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount() &&
|
||||
expectedRemoteSent == remoteBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
|
||||
0 == remoteBridge.getNetworkBridgeStatistics().getReceivedCount().getCount();
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue