mirror of
https://github.com/apache/activemq.git
synced 2025-02-09 03:25:33 +00:00
Adding NetworkBridgeStatistics and also a received count for bridges when they are in duplex mode. (cherry picked from commit 10c998b0bc9728276a738ed24d20c6fc82c6365a)
This commit is contained in:
parent
1d9fdbcbea
commit
087e5da254
@ -31,43 +31,58 @@ public class NetworkBridgeView implements NetworkBridgeViewMBean {
|
|||||||
this.bridge = bridge;
|
this.bridge = bridge;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void start() throws Exception {
|
public void start() throws Exception {
|
||||||
bridge.start();
|
bridge.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void stop() throws Exception {
|
public void stop() throws Exception {
|
||||||
bridge.stop();
|
bridge.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public String getLocalAddress() {
|
public String getLocalAddress() {
|
||||||
return bridge.getLocalAddress();
|
return bridge.getLocalAddress();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public String getRemoteAddress() {
|
public String getRemoteAddress() {
|
||||||
return bridge.getRemoteAddress();
|
return bridge.getRemoteAddress();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public String getRemoteBrokerName() {
|
public String getRemoteBrokerName() {
|
||||||
return bridge.getRemoteBrokerName();
|
return bridge.getRemoteBrokerName();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
public String getRemoteBrokerId() {
|
public String getRemoteBrokerId() {
|
||||||
return bridge.getRemoteBrokerId();
|
return bridge.getRemoteBrokerId();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public String getLocalBrokerName() {
|
public String getLocalBrokerName() {
|
||||||
return bridge.getLocalBrokerName();
|
return bridge.getLocalBrokerName();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public long getEnqueueCounter() {
|
public long getEnqueueCounter() {
|
||||||
return bridge.getEnqueueCounter();
|
return bridge.getEnqueueCounter();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public long getDequeueCounter() {
|
public long getDequeueCounter() {
|
||||||
return bridge.getDequeueCounter();
|
return bridge.getDequeueCounter();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getReceivedCounter() {
|
||||||
|
return bridge.getNetworkBridgeStatistics().getReceivedCount().getCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isCreatedByDuplex() {
|
public boolean isCreatedByDuplex() {
|
||||||
return createByDuplex;
|
return createByDuplex;
|
||||||
}
|
}
|
||||||
@ -76,6 +91,7 @@ public class NetworkBridgeView implements NetworkBridgeViewMBean {
|
|||||||
this.createByDuplex = createByDuplex;
|
this.createByDuplex = createByDuplex;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void resetStats(){
|
public void resetStats(){
|
||||||
bridge.resetStats();
|
bridge.resetStats();
|
||||||
for (NetworkDestinationView networkDestinationView:networkDestinationViewList){
|
for (NetworkDestinationView networkDestinationView:networkDestinationViewList){
|
||||||
|
@ -34,6 +34,8 @@ public interface NetworkBridgeViewMBean extends Service {
|
|||||||
|
|
||||||
long getDequeueCounter();
|
long getDequeueCounter();
|
||||||
|
|
||||||
|
long getReceivedCounter();
|
||||||
|
|
||||||
boolean isCreatedByDuplex();
|
boolean isCreatedByDuplex();
|
||||||
|
|
||||||
void resetStats();
|
void resetStats();
|
||||||
|
@ -34,7 +34,6 @@ import java.util.concurrent.Future;
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
|
||||||
|
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
|
||||||
@ -139,8 +138,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||||||
protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null};
|
protected final BrokerId remoteBrokerPath[] = new BrokerId[]{null};
|
||||||
protected BrokerId remoteBrokerId;
|
protected BrokerId remoteBrokerId;
|
||||||
|
|
||||||
final AtomicLong enqueueCounter = new AtomicLong();
|
protected final NetworkBridgeStatistics networkBridgeStatistics = new NetworkBridgeStatistics();
|
||||||
final AtomicLong dequeueCounter = new AtomicLong();
|
|
||||||
|
|
||||||
private NetworkBridgeListener networkBridgeListener;
|
private NetworkBridgeListener networkBridgeListener;
|
||||||
private boolean createdByDuplex;
|
private boolean createdByDuplex;
|
||||||
@ -181,6 +179,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||||||
throw new IllegalArgumentException("BrokerService is null on " + this);
|
throw new IllegalArgumentException("BrokerService is null on " + this);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
networkBridgeStatistics.setEnabled(brokerService.isEnableStatistics());
|
||||||
|
|
||||||
if (isDuplex()) {
|
if (isDuplex()) {
|
||||||
duplexInboundLocalBroker = NetworkBridgeFactory.createLocalTransport(brokerService.getBroker());
|
duplexInboundLocalBroker = NetworkBridgeFactory.createLocalTransport(brokerService.getBroker());
|
||||||
duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
|
duplexInboundLocalBroker.setTransportListener(new DefaultTransportListener() {
|
||||||
@ -640,6 +640,8 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||||||
Response reply = resp.getResult();
|
Response reply = resp.getResult();
|
||||||
reply.setCorrelationId(correlationId);
|
reply.setCorrelationId(correlationId);
|
||||||
remoteBroker.oneway(reply);
|
remoteBroker.oneway(reply);
|
||||||
|
//increment counter when messages are received in duplex mode
|
||||||
|
networkBridgeStatistics.getReceivedCount().increment();
|
||||||
} catch (IOException error) {
|
} catch (IOException error) {
|
||||||
LOG.error("Exception: {} on duplex forward of: {}", error, message);
|
LOG.error("Exception: {} on duplex forward of: {}", error, message);
|
||||||
serviceRemoteException(error);
|
serviceRemoteException(error);
|
||||||
@ -648,6 +650,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
duplexInboundLocalBroker.oneway(message);
|
duplexInboundLocalBroker.oneway(message);
|
||||||
|
networkBridgeStatistics.getReceivedCount().increment();
|
||||||
}
|
}
|
||||||
serviceInboundMessage(message);
|
serviceInboundMessage(message);
|
||||||
} else {
|
} else {
|
||||||
@ -968,7 +971,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||||||
try {
|
try {
|
||||||
if (command.isMessageDispatch()) {
|
if (command.isMessageDispatch()) {
|
||||||
safeWaitUntilStarted();
|
safeWaitUntilStarted();
|
||||||
enqueueCounter.incrementAndGet();
|
networkBridgeStatistics.getEnqueues().increment();
|
||||||
final MessageDispatch md = (MessageDispatch) command;
|
final MessageDispatch md = (MessageDispatch) command;
|
||||||
final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
|
final DemandSubscription sub = subscriptionMapByLocalId.get(md.getConsumerId());
|
||||||
if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
|
if (sub != null && md.getMessage() != null && sub.incrementOutstandingResponses()) {
|
||||||
@ -1016,7 +1019,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||||||
serviceLocalException(md, er.getException());
|
serviceLocalException(md, er.getException());
|
||||||
} else {
|
} else {
|
||||||
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
|
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
|
||||||
dequeueCounter.incrementAndGet();
|
networkBridgeStatistics.getDequeues().increment();
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
serviceLocalException(md, e);
|
serviceLocalException(md, e);
|
||||||
@ -1033,7 +1036,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||||||
try {
|
try {
|
||||||
remoteBroker.oneway(message);
|
remoteBroker.oneway(message);
|
||||||
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
|
localBroker.oneway(new MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
|
||||||
dequeueCounter.incrementAndGet();
|
networkBridgeStatistics.getDequeues().increment();
|
||||||
} finally {
|
} finally {
|
||||||
sub.decrementOutstandingResponses();
|
sub.decrementOutstandingResponses();
|
||||||
}
|
}
|
||||||
@ -1559,12 +1562,17 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getDequeueCounter() {
|
public long getDequeueCounter() {
|
||||||
return dequeueCounter.get();
|
return networkBridgeStatistics.getDequeues().getCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getEnqueueCounter() {
|
public long getEnqueueCounter() {
|
||||||
return enqueueCounter.get();
|
return networkBridgeStatistics.getEnqueues().getCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NetworkBridgeStatistics getNetworkBridgeStatistics() {
|
||||||
|
return networkBridgeStatistics;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean isDuplex() {
|
protected boolean isDuplex() {
|
||||||
@ -1594,8 +1602,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void resetStats() {
|
public void resetStats() {
|
||||||
enqueueCounter.set(0);
|
networkBridgeStatistics.reset();
|
||||||
dequeueCounter.set(0);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -78,6 +78,11 @@ public interface NetworkBridge extends Service {
|
|||||||
*/
|
*/
|
||||||
long getDequeueCounter();
|
long getDequeueCounter();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the statistics for this NetworkBridge
|
||||||
|
*/
|
||||||
|
NetworkBridgeStatistics getNetworkBridgeStatistics();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param objectName
|
* @param objectName
|
||||||
* The ObjectName assigned to this bridge in the MBean server.
|
* The ObjectName assigned to this bridge in the MBean server.
|
||||||
|
@ -0,0 +1,102 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.network;
|
||||||
|
|
||||||
|
import org.apache.activemq.management.CountStatisticImpl;
|
||||||
|
import org.apache.activemq.management.StatsImpl;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The Statistics for a NewtorkBridge.
|
||||||
|
*/
|
||||||
|
public class NetworkBridgeStatistics extends StatsImpl {
|
||||||
|
|
||||||
|
protected CountStatisticImpl enqueues;
|
||||||
|
protected CountStatisticImpl dequeues;
|
||||||
|
protected CountStatisticImpl receivedCount;
|
||||||
|
|
||||||
|
public NetworkBridgeStatistics() {
|
||||||
|
enqueues = new CountStatisticImpl("enqueues", "The current number of enqueues this bridge has, which is the number of potential messages to be forwarded.");
|
||||||
|
dequeues = new CountStatisticImpl("dequeues", "The current number of dequeues this bridge has, which is the number of messages received by the remote broker.");
|
||||||
|
receivedCount = new CountStatisticImpl("receivedCount", "The number of messages that have been received by the NetworkBridge from the remote broker. Only applies for Duplex bridges.");
|
||||||
|
|
||||||
|
addStatistic("enqueues", enqueues);
|
||||||
|
addStatistic("dequeues", dequeues);
|
||||||
|
addStatistic("receivedCount", receivedCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The current number of enqueues this bridge has, which is the number of potential messages to be forwarded
|
||||||
|
* Messages may not be forwarded if there is no subscription
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public CountStatisticImpl getEnqueues() {
|
||||||
|
return enqueues;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The current number of dequeues this bridge has, which is the number of
|
||||||
|
* messages actually sent to and received by the remote broker.
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public CountStatisticImpl getDequeues() {
|
||||||
|
return dequeues;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The number of messages that have been received by the NetworkBridge from the remote broker.
|
||||||
|
* Only applies for Duplex bridges.
|
||||||
|
*
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
public CountStatisticImpl getReceivedCount() {
|
||||||
|
return receivedCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void reset() {
|
||||||
|
if (this.isDoReset()) {
|
||||||
|
super.reset();
|
||||||
|
enqueues.reset();
|
||||||
|
dequeues.reset();
|
||||||
|
receivedCount.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setEnabled(boolean enabled) {
|
||||||
|
super.setEnabled(enabled);
|
||||||
|
enqueues.setEnabled(enabled);
|
||||||
|
dequeues.setEnabled(enabled);
|
||||||
|
receivedCount.setEnabled(enabled);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setParent(NetworkBridgeStatistics parent) {
|
||||||
|
if (parent != null) {
|
||||||
|
enqueues.setParent(parent.enqueues);
|
||||||
|
dequeues.setParent(parent.dequeues);
|
||||||
|
receivedCount.setParent(parent.receivedCount);
|
||||||
|
} else {
|
||||||
|
enqueues.setParent(null);
|
||||||
|
dequeues.setParent(null);
|
||||||
|
receivedCount.setParent(null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -35,6 +35,7 @@ import org.apache.activemq.command.DiscoveryEvent;
|
|||||||
import org.apache.activemq.network.DiscoveryNetworkConnector;
|
import org.apache.activemq.network.DiscoveryNetworkConnector;
|
||||||
import org.apache.activemq.network.NetworkBridge;
|
import org.apache.activemq.network.NetworkBridge;
|
||||||
import org.apache.activemq.network.NetworkBridgeListener;
|
import org.apache.activemq.network.NetworkBridgeListener;
|
||||||
|
import org.apache.activemq.network.NetworkBridgeStatistics;
|
||||||
import org.apache.activemq.network.NetworkConnector;
|
import org.apache.activemq.network.NetworkConnector;
|
||||||
import org.apache.activemq.thread.TaskRunnerFactory;
|
import org.apache.activemq.thread.TaskRunnerFactory;
|
||||||
import org.apache.activemq.transport.Transport;
|
import org.apache.activemq.transport.Transport;
|
||||||
@ -56,6 +57,7 @@ public class AMQ4160Test extends JmsMultipleBrokersTestSupport {
|
|||||||
* Since these tests involve wait conditions, protect against indefinite
|
* Since these tests involve wait conditions, protect against indefinite
|
||||||
* waits (due to unanticipated issues).
|
* waits (due to unanticipated issues).
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
setAutoFail(true);
|
setAutoFail(true);
|
||||||
setMaxTestTime(MAX_TEST_TIME);
|
setMaxTestTime(MAX_TEST_TIME);
|
||||||
@ -327,6 +329,11 @@ public class AMQ4160Test extends JmsMultipleBrokersTestSupport {
|
|||||||
return next.getDequeueCounter();
|
return next.getDequeueCounter();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public NetworkBridgeStatistics getNetworkBridgeStatistics() {
|
||||||
|
return next.getNetworkBridgeStatistics();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setMbeanObjectName(ObjectName objectName) {
|
public void setMbeanObjectName(ObjectName objectName) {
|
||||||
next.setMbeanObjectName(objectName);
|
next.setMbeanObjectName(objectName);
|
||||||
@ -337,6 +344,7 @@ public class AMQ4160Test extends JmsMultipleBrokersTestSupport {
|
|||||||
return next.getMbeanObjectName();
|
return next.getMbeanObjectName();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void resetStats(){
|
public void resetStats(){
|
||||||
next.resetStats();
|
next.resetStats();
|
||||||
}
|
}
|
||||||
|
@ -86,4 +86,19 @@ public class DuplexNetworkTest extends SimpleNetworkTest {
|
|||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final long expectedRemoteSent) throws Exception {
|
||||||
|
|
||||||
|
final NetworkBridge localBridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
|
||||||
|
|
||||||
|
assertTrue(Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return expectedLocalSent == localBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
|
||||||
|
expectedRemoteSent == localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -96,6 +96,8 @@ public class SimpleNetworkTest {
|
|||||||
}
|
}
|
||||||
// ensure no more messages received
|
// ensure no more messages received
|
||||||
assertNull(consumer1.receive(1000));
|
assertNull(consumer1.receive(1000));
|
||||||
|
|
||||||
|
assertNetworkBridgeStatistics(MESSAGE_COUNT, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 60 * 1000)
|
@Test(timeout = 60 * 1000)
|
||||||
@ -128,6 +130,8 @@ public class SimpleNetworkTest {
|
|||||||
assertNotNull(result);
|
assertNotNull(result);
|
||||||
LOG.info(result.getText());
|
LOG.info(result.getText());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assertNetworkBridgeStatistics(MESSAGE_COUNT, MESSAGE_COUNT);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 60 * 1000)
|
@Test(timeout = 60 * 1000)
|
||||||
@ -136,13 +140,15 @@ public class SimpleNetworkTest {
|
|||||||
MessageConsumer excludedConsumer = remoteSession.createConsumer(excluded);
|
MessageConsumer excludedConsumer = remoteSession.createConsumer(excluded);
|
||||||
MessageProducer includedProducer = localSession.createProducer(included);
|
MessageProducer includedProducer = localSession.createProducer(included);
|
||||||
MessageProducer excludedProducer = localSession.createProducer(excluded);
|
MessageProducer excludedProducer = localSession.createProducer(excluded);
|
||||||
// allow for consumer infos to perculate arround
|
// allow for consumer infos to perculate around
|
||||||
Thread.sleep(2000);
|
Thread.sleep(2000);
|
||||||
Message test = localSession.createTextMessage("test");
|
Message test = localSession.createTextMessage("test");
|
||||||
includedProducer.send(test);
|
includedProducer.send(test);
|
||||||
excludedProducer.send(test);
|
excludedProducer.send(test);
|
||||||
assertNull(excludedConsumer.receive(1000));
|
assertNull(excludedConsumer.receive(1000));
|
||||||
assertNotNull(includedConsumer.receive(1000));
|
assertNotNull(includedConsumer.receive(1000));
|
||||||
|
|
||||||
|
assertNetworkBridgeStatistics(1, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 60 * 1000)
|
@Test(timeout = 60 * 1000)
|
||||||
@ -163,6 +169,8 @@ public class SimpleNetworkTest {
|
|||||||
// ensure no more messages received
|
// ensure no more messages received
|
||||||
assertNull(consumer1.receive(1000));
|
assertNull(consumer1.receive(1000));
|
||||||
assertNull(consumer2.receive(1000));
|
assertNull(consumer2.receive(1000));
|
||||||
|
|
||||||
|
assertNetworkBridgeStatistics(MESSAGE_COUNT, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void waitForConsumerRegistration(final BrokerService brokerService, final int min, final ActiveMQDestination destination) throws Exception {
|
private void waitForConsumerRegistration(final BrokerService brokerService, final int min, final ActiveMQDestination destination) throws Exception {
|
||||||
@ -319,4 +327,20 @@ public class SimpleNetworkTest {
|
|||||||
protected BrokerService createRemoteBroker() throws Exception {
|
protected BrokerService createRemoteBroker() throws Exception {
|
||||||
return createBroker(getRemoteBrokerURI());
|
return createBroker(getRemoteBrokerURI());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final long expectedRemoteSent) throws Exception {
|
||||||
|
|
||||||
|
final NetworkBridge localBridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
|
||||||
|
final NetworkBridge remoteBridge = remoteBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
|
||||||
|
|
||||||
|
assertTrue(Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return expectedLocalSent == localBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
|
||||||
|
0 == localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount() &&
|
||||||
|
expectedRemoteSent == remoteBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
|
||||||
|
0 == remoteBridge.getNetworkBridgeStatistics().getReceivedCount().getCount();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user