[AMQ-9437] AdvancedDestination statistics networkEnqueue and networkDequeue counters

This commit is contained in:
Matt Pavlovich 2024-02-23 08:51:17 -06:00
parent 216d73fcb5
commit b44074f5f7
No known key found for this signature in database
18 changed files with 501 additions and 5 deletions

View File

@ -190,9 +190,16 @@ public class TransactionBroker extends BrokerFilter {
dest.clearPendingMessages(opCount);
dest.getDestinationStatistics().getEnqueues().add(opCount);
dest.getDestinationStatistics().getMessages().add(opCount);
if(dest.isAdvancedNetworkStatisticsEnabled() && transactionBroker.context != null && transactionBroker.context.isNetworkConnection()) {
dest.getDestinationStatistics().getNetworkEnqueues().add(opCount);
}
LOG.debug("cleared pending from afterCommit: {}", destination);
} else {
dest.getDestinationStatistics().getDequeues().add(opCount);
if(dest.isAdvancedNetworkStatisticsEnabled() && transactionBroker.context != null && transactionBroker.context.isNetworkConnection()) {
dest.getDestinationStatistics().getNetworkDequeues().add(opCount);
}
}
}
}

View File

@ -599,4 +599,25 @@ public class DestinationView implements DestinationViewMBean {
public long getMaxUncommittedExceededCount() {
return destination.getDestinationStatistics().getMaxUncommittedExceededCount().getCount();
}
@Override
public boolean isAdvancedNetworkStatisticsEnabled() {
return destination.isAdvancedNetworkStatisticsEnabled();
}
@Override
public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
destination.setAdvancedNetworkStatisticsEnabled(advancedNetworkStatisticsEnabled);
}
@Override
public long getNetworkEnqueues() {
return destination.getDestinationStatistics().getNetworkEnqueues().getCount();
}
@Override
public long getNetworkDequeues() {
return destination.getDestinationStatistics().getNetworkDequeues().getCount();
}
}

View File

@ -481,4 +481,16 @@ public interface DestinationViewMBean {
@MBeanInfo("Number of times the max uncommitted limit has been exceed for this destination")
long getMaxUncommittedExceededCount();
@MBeanInfo("Query Advanced Network Statistics flag")
boolean isAdvancedNetworkStatisticsEnabled();
@MBeanInfo("Toggle Advanced Network Statistics flag")
void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled);
@MBeanInfo("Number of messages sent to the destination via network connection")
long getNetworkEnqueues();
@MBeanInfo("Number of messages acknowledged from the destination via network connection")
long getNetworkDequeues();
}

View File

@ -110,6 +110,8 @@ public abstract class BaseDestination implements Destination {
protected final Scheduler scheduler;
private boolean disposed = false;
private boolean doOptimzeMessageStorage = true;
private boolean advancedNetworkStatisticsEnabled = false;
/*
* percentage of in-flight messages above which optimize message store is disabled
*/
@ -868,6 +870,15 @@ public abstract class BaseDestination implements Destination {
this.optimizeMessageStoreInFlightLimit = optimizeMessageStoreInFlightLimit;
}
@Override
public boolean isAdvancedNetworkStatisticsEnabled() {
return this.advancedNetworkStatisticsEnabled;
}
@Override
public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled;
}
@Override
public abstract List<Subscription> getConsumers();

View File

@ -258,4 +258,10 @@ public interface Destination extends Service, Task, Message.MessageDestination {
boolean isSendDuplicateFromStoreToDLQ();
void setSendDuplicateFromStoreToDLQ(boolean sendDuplicateFromStoreToDLQ);
// [AMQ-9437]
boolean isAdvancedNetworkStatisticsEnabled();
void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled);
}

View File

@ -409,6 +409,16 @@ public class DestinationFilter implements Destination {
next.setSendDuplicateFromStoreToDLQ(sendDuplicateFromStoreToDLQ);
}
@Override
public boolean isAdvancedNetworkStatisticsEnabled() {
return next.isAdvancedNetworkStatisticsEnabled();
}
@Override
public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
next.setAdvancedNetworkStatisticsEnabled(advancedNetworkStatisticsEnabled);
}
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws Exception {
if (next instanceof DestinationFilter) {
DestinationFilter filter = (DestinationFilter) next;

View File

@ -46,6 +46,10 @@ public class DestinationStatistics extends StatsImpl {
protected SizeStatisticImpl messageSize;
protected CountStatisticImpl maxUncommittedExceededCount;
// [AMQ-9437] Advanced Statistics are optionally enabled
protected CountStatisticImpl networkEnqueues;
protected CountStatisticImpl networkDequeues;
public DestinationStatistics() {
enqueues = new CountStatisticImpl("enqueues", "The number of messages that have been sent to the destination");
@ -68,6 +72,10 @@ public class DestinationStatistics extends StatsImpl {
blockedTime = new TimeStatisticImpl("blockedTime","amount of time messages are blocked for flow control");
messageSize = new SizeStatisticImpl("messageSize","Size of messages passing through the destination");
maxUncommittedExceededCount = new CountStatisticImpl("maxUncommittedExceededCount", "number of times maxUncommittedCount has been exceeded");
networkEnqueues = new CountStatisticImpl("networkEnqueues", "The number of messages that have been sent to the destination via network connection");
networkDequeues = new CountStatisticImpl("networkDequeues", "The number of messages that have been acknowledged from the destination via network connection");
addStatistic("enqueues", enqueues);
addStatistic("dispatched", dispatched);
addStatistic("dequeues", dequeues);
@ -83,6 +91,9 @@ public class DestinationStatistics extends StatsImpl {
addStatistic("blockedTime",blockedTime);
addStatistic("messageSize",messageSize);
addStatistic("maxUncommittedExceededCount", maxUncommittedExceededCount);
addStatistic("networkEnqueues", networkEnqueues);
addStatistic("networkDequeues", networkDequeues);
}
public CountStatisticImpl getEnqueues() {
@ -151,6 +162,14 @@ public class DestinationStatistics extends StatsImpl {
return this.maxUncommittedExceededCount;
}
public CountStatisticImpl getNetworkEnqueues() {
return networkEnqueues;
}
public CountStatisticImpl getNetworkDequeues() {
return networkDequeues;
}
public void reset() {
if (this.isDoReset()) {
super.reset();
@ -165,6 +184,8 @@ public class DestinationStatistics extends StatsImpl {
blockedTime.reset();
messageSize.reset();
maxUncommittedExceededCount.reset();
networkEnqueues.reset();
networkDequeues.reset();
}
}
@ -187,6 +208,9 @@ public class DestinationStatistics extends StatsImpl {
messageSize.setEnabled(enabled);
maxUncommittedExceededCount.setEnabled(enabled);
// [AMQ-9437] Advanced Statistics
networkEnqueues.setEnabled(enabled);
networkDequeues.setEnabled(enabled);
}
public void setParent(DestinationStatistics parent) {
@ -207,6 +231,8 @@ public class DestinationStatistics extends StatsImpl {
blockedTime.setParent(parent.blockedTime);
messageSize.setParent(parent.messageSize);
maxUncommittedExceededCount.setParent(parent.maxUncommittedExceededCount);
networkEnqueues.setParent(parent.networkEnqueues);
networkDequeues.setParent(parent.networkDequeues);
} else {
enqueues.setParent(null);
dispatched.setParent(null);
@ -224,6 +250,8 @@ public class DestinationStatistics extends StatsImpl {
blockedTime.setParent(null);
messageSize.setParent(null);
maxUncommittedExceededCount.setParent(null);
networkEnqueues.setParent(null);
networkDequeues.setParent(null);
}
}

View File

@ -371,6 +371,9 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
if (info.isNetworkSubscription()) {
((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount());
if(((Destination)node.getRegionDestination()).isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) {
((Destination)node.getRegionDestination()).getDestinationStatistics().getNetworkDequeues().add(ack.getMessageCount());
}
}
}

View File

@ -1873,7 +1873,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
// This sends the ack the the journal..
if (!ack.isInTransaction()) {
acknowledge(context, sub, ack, reference);
dropMessage(reference);
dropMessage(context, reference);
} else {
try {
acknowledge(context, sub, ack, reference);
@ -1882,7 +1882,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
@Override
public void afterCommit() throws Exception {
dropMessage(reference);
dropMessage(context, reference);
wakeup();
}
@ -1910,11 +1910,16 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
reference.setAcked(true);
}
private void dropMessage(QueueMessageReference reference) {
private void dropMessage(ConnectionContext context, QueueMessageReference reference) {
//use dropIfLive so we only process the statistics at most one time
if (reference.dropIfLive()) {
getDestinationStatistics().getDequeues().increment();
getDestinationStatistics().getMessages().decrement();
if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) {
getDestinationStatistics().getNetworkDequeues().increment();
}
pagedInMessagesLock.writeLock().lock();
try {
pagedInMessages.remove(reference);
@ -1969,6 +1974,11 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment();
destinationStatistics.getMessageSize().addSize(msg.getSize());
if(isAdvancedNetworkStatisticsEnabled() && context.getConnection() != null && context.getConnection().isNetworkConnection()) {
destinationStatistics.getNetworkEnqueues().increment();
}
messageDelivered(context, msg);
consumersLock.readLock().lock();
try {
@ -2115,7 +2125,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
LOG.warn("{}, duplicate message {} - {} from cursor, is cursor audit disabled or too constrained? Redirecting to dlq", this, ref.getMessageId(), ref.getMessage().getMessageId().getFutureOrSequenceLong());
if (store != null) {
ConnectionContext connectionContext = createConnectionContext();
dropMessage(ref);
dropMessage(connectionContext, ref);
if (gotToTheStore(ref.getMessage())) {
LOG.debug("Duplicate message {} from cursor, removing from store", ref.getMessage());
store.removeMessage(connectionContext, new MessageAck(ref.getMessage(), MessageAck.POISON_ACK_TYPE, 1));

View File

@ -778,6 +778,11 @@ public class Topic extends BaseDestination implements Task {
// misleading metrics.
// destinationStatistics.getMessages().increment();
destinationStatistics.getEnqueues().increment();
if(isAdvancedNetworkStatisticsEnabled() && context != null && context.isNetworkConnection()) {
destinationStatistics.getNetworkEnqueues().increment();
}
destinationStatistics.getMessageSize().addSize(message.getSize());
MessageEvaluationContext msgContext = null;

View File

@ -449,6 +449,9 @@ public class TopicSubscription extends AbstractSubscription {
destination.getDestinationStatistics().getInflight().subtract(count);
if (info.isNetworkSubscription()) {
destination.getDestinationStatistics().getForwards().add(count);
if(destination.isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) {
destination.getDestinationStatistics().getNetworkDequeues().add(count);
}
}
if (ack.isExpiredAck()) {
destination.getDestinationStatistics().getExpired().add(count);
@ -746,6 +749,9 @@ public class TopicSubscription extends AbstractSubscription {
matched.remove(message);
if (destination != null) {
destination.getDestinationStatistics().getDequeues().increment();
if(destination.isAdvancedNetworkStatisticsEnabled() && getContext() != null && getContext().isNetworkConnection()) {
destination.getDestinationStatistics().getNetworkDequeues().increment();
}
}
Destination dest = (Destination) message.getRegionDestination();
if (dest != null) {

View File

@ -106,7 +106,7 @@ public class PolicyEntry extends DestinationMapEntry {
private boolean doOptimzeMessageStorage = true;
private int maxDestinations = -1;
private boolean useTopicSubscriptionInflightStats = true;
private boolean advancedNetworkStatisticsEnabled = false; // [AMQ-9437]
/*
* percentage of in-flight messages above which optimize message store is disabled
*/
@ -306,6 +306,9 @@ public class PolicyEntry extends DestinationMapEntry {
if (isUpdate("sendDuplicateFromStoreToDLQ", includedProperties)) {
destination.setSendDuplicateFromStoreToDLQ(isSendDuplicateFromStoreToDLQ());
}
if (isUpdate("advancedNetworkStatisticsEnabled", includedProperties)) {
destination.setAdvancedNetworkStatisticsEnabled(isAdvancedNetworkStatisticsEnabled());
}
}
public void baseConfiguration(Broker broker, BaseDestination destination) {
@ -1176,4 +1179,12 @@ public class PolicyEntry extends DestinationMapEntry {
public MessageInterceptorStrategy getMessageInterceptorStrategy() {
return this.messageInterceptorStrategy;
}
public boolean isAdvancedNetworkStatisticsEnabled() {
return this.advancedNetworkStatisticsEnabled;
}
public void setAdvancedNetworkStatisticsEnabled(boolean advancedNetworkStatisticsEnabled) {
this.advancedNetworkStatisticsEnabled = advancedNetworkStatisticsEnabled;
}
}

View File

@ -57,6 +57,18 @@ public class PolicyEntryTest extends RuntimeConfigTestSupport {
verifyBooleanField("AMQ.8397", "sendDuplicateFromStoreToDLQ", true);
}
@Test
public void testModAdvancedNetworkStatistics() throws Exception {
final String brokerConfig = configurationSeed + "-policy-ml-broker";
applyNewConfig(brokerConfig, configurationSeed + "-policy-advancedNetworkStatistics");
startBroker(brokerConfig);
assertTrue("broker alive", brokerService.isStarted());
verifyBooleanField("AMQ.9437", "advancedNetworkStatisticsEnabled", false);
applyNewConfig(brokerConfig, configurationSeed + "-policy-advancedNetworkStatistics-mod", SLEEP);
verifyBooleanField("AMQ.9437", "advancedNetworkStatisticsEnabled", true);
}
@Test
public void testAddNdMod() throws Exception {
final String brokerConfig = configurationSeed + "-policy-ml-broker";
@ -121,6 +133,9 @@ public class PolicyEntryTest extends RuntimeConfigTestSupport {
session.createConsumer(session.createQueue(dest));
switch(fieldName) {
case "advancedNetworkStatisticsEnabled":
assertEquals(value, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).isAdvancedNetworkStatisticsEnabled());
break;
case "sendDuplicateFromStoreToDLQ":
assertEquals(value, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).isSendDuplicateFromStoreToDLQ());
break;

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
<plugins>
<runtimeConfigurationPlugin checkPeriod="1000" />
</plugins>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="AMQ.9437" advancedNetworkStatisticsEnabled="true"/>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
</beans>

View File

@ -0,0 +1,36 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<broker xmlns="http://activemq.apache.org/schema/core" start="false" persistent="false">
<plugins>
<runtimeConfigurationPlugin checkPeriod="1000" />
</plugins>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="AMQ.9437"/>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
</beans>

View File

@ -0,0 +1,167 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.network;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collection;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageListener;
import jakarta.jms.MessageProducer;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.springframework.context.support.AbstractApplicationContext;
@RunWith(value = Parameterized.class)
public class NetworkAdvancedStatisticsTest extends BaseNetworkTest {
@Parameterized.Parameters(name="includedDestination={0}, excludedDestination={1}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
{ new ActiveMQTopic("include.test.bar"), new ActiveMQTopic("exclude.test.bar")},
{ new ActiveMQQueue("include.test.foo"), new ActiveMQQueue("exclude.test.foo")}});
}
protected static final int MESSAGE_COUNT = 10;
protected AbstractApplicationContext context;
protected String consumerName = "durableSubs";
private final ActiveMQDestination includedDestination;
private final ActiveMQDestination excludedDestination;
public NetworkAdvancedStatisticsTest(ActiveMQDestination includedDestionation, ActiveMQDestination excludedDestination) {
this.includedDestination = includedDestionation;
this.excludedDestination = excludedDestination;
}
@Override
protected void doSetUp(boolean deleteAllMessages) throws Exception {
super.doSetUp(deleteAllMessages);
}
@Override
protected String getRemoteBrokerURI() {
return "org/apache/activemq/network/remoteBroker-advancedNetworkStatistics.xml";
}
@Override
protected String getLocalBrokerURI() {
return "org/apache/activemq/network/localBroker-advancedNetworkStatistics.xml";
}
//Added for AMQ-9437 test advancedStatistics for networkEnqueue and networkDequeue
@Test(timeout = 60 * 1000)
public void testNetworkAdvancedStatistics() throws Exception {
// create a remote durable consumer to create demand
MessageConsumer remoteConsumer;
if(includedDestination.isTopic()) {
remoteConsumer = remoteSession.createDurableSubscriber(ActiveMQTopic.class.cast(includedDestination), consumerName);
} else {
remoteConsumer = remoteSession.createConsumer(includedDestination);
remoteConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
}
});
}
Thread.sleep(1000);
MessageProducer producer = localSession.createProducer(includedDestination);
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message test = localSession.createTextMessage("test-" + i);
producer.send(test);
}
Thread.sleep(1000);
MessageProducer producerExcluded = localSession.createProducer(excludedDestination);
for (int i = 0; i < MESSAGE_COUNT; i++) {
Message test = localSession.createTextMessage("test-" + i);
producerExcluded.send(test);
}
Thread.sleep(1000);
//Make sure stats are correct for local -> remote
assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getEnqueues().getCount());
assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getDequeues().getCount());
assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getForwards().getCount());
assertEquals(MESSAGE_COUNT, localBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
assertEquals(0, localBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkEnqueues().getCount());
assertEquals(MESSAGE_COUNT, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getEnqueues().getCount());
assertEquals(0, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getForwards().getCount());
assertEquals(MESSAGE_COUNT, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkEnqueues().getCount());
assertEquals(0, remoteBroker.getDestination(includedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
// Make sure stats do not increment for local-only
assertEquals(MESSAGE_COUNT, localBroker.getDestination(excludedDestination).getDestinationStatistics().getEnqueues().getCount());
assertEquals(0, localBroker.getDestination(excludedDestination).getDestinationStatistics().getForwards().getCount());
assertEquals(0, localBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
assertEquals(0, localBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkEnqueues().getCount());
assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getEnqueues().getCount());
assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getDequeues().getCount());
assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getForwards().getCount());
assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkEnqueues().getCount());
assertEquals(0, remoteBroker.getDestination(excludedDestination).getDestinationStatistics().getNetworkDequeues().getCount());
if(includedDestination.isTopic()) {
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return localBroker.getSystemUsage().getMemoryUsage().getUsage() == 0;
}
}, 10000, 500));
} else {
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
// The number of message that remain is due to the exclude queue
return localBroker.getAdminView().getTotalMessageCount() == MESSAGE_COUNT;
}
}, 10000, 500));
}
remoteConsumer.close();
}
protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final long expectedRemoteSent) throws Exception {
final NetworkBridge localBridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
final NetworkBridge remoteBridge = remoteBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return expectedLocalSent == localBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
0 == localBridge.getNetworkBridgeStatistics().getReceivedCount().getCount() &&
expectedRemoteSent == remoteBridge.getNetworkBridgeStatistics().getDequeues().getCount() &&
0 == remoteBridge.getNetworkBridgeStatistics().getReceivedCount().getCount();
}
}));
}
}

View File

@ -0,0 +1,63 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker brokerName="localBroker" start="false" persistent="true" useShutdownHook="false" monitorConnectionSplits="true" xmlns="http://activemq.apache.org/schema/core">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="exclude.>" advancedNetworkStatisticsEnabled="true"/>
<policyEntry queue="include.>" advancedNetworkStatisticsEnabled="true"/>
<policyEntry topic="ActiveMQ.Advisory.>" />
<policyEntry topic="exclude.>" advancedNetworkStatisticsEnabled="true"/>
<policyEntry topic="include.>" advancedNetworkStatisticsEnabled="true"/>
</policyEntries>
</policyMap>
</destinationPolicy>
<networkConnectors>
<networkConnector uri="static:(tcp://localhost:61617)"
dynamicOnly = "false"
conduitSubscriptions = "true"
decreaseNetworkConsumerPriority = "false"
name="networkConnector">
<dynamicallyIncludedDestinations>
<queue physicalName="include.test.foo"/>
<topic physicalName="include.test.bar"/>
</dynamicallyIncludedDestinations>
<excludedDestinations>
<queue physicalName="exclude.test.foo"/>
<topic physicalName="exclude.test.bar"/>
</excludedDestinations>
</networkConnector>
</networkConnectors>
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
</transportConnectors>
</broker>
</beans>

View File

@ -0,0 +1,49 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker brokerName="remoteBroker" start="false" useJmx="false" persistent="true" useShutdownHook="false" monitorConnectionSplits="false" xmlns="http://activemq.apache.org/schema/core">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue="exclude.>" advancedNetworkStatisticsEnabled="true"/>
<policyEntry queue="include.>" advancedNetworkStatisticsEnabled="true"/>
<policyEntry topic="ActiveMQ.Advisory.>" />
<policyEntry topic="exclude.>" advancedNetworkStatisticsEnabled="true"/>
<policyEntry topic="include.>" advancedNetworkStatisticsEnabled="true"/>
</policyEntries>
</policyMap>
</destinationPolicy>
<networkConnectors>
<networkConnector uri="static:(tcp://localhost:61616)" />
</networkConnectors>
<transportConnectors>
<transportConnector uri="tcp://localhost:61617"/>
</transportConnectors>
</broker>
</beans>