add test to veriry loadbalance across network, effect of prefetch on vm client vs nework consumer, allowed prefetch to be configured via xml property (ie string in schema) for xml config

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@950515 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-06-02 12:08:58 +00:00
parent 3271401883
commit 3c8df79eca
4 changed files with 375 additions and 0 deletions

View File

@ -185,6 +185,7 @@ public class NetworkBridgeConfiguration {
/**
* @param prefetchSize the prefetchSize to set
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryIntPropertyEditor"
*/
public void setPrefetchSize(int prefetchSize) {
this.prefetchSize = prefetchSize;

View File

@ -0,0 +1,304 @@
package org.apache.bugs;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerRegistry;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
public class LoadBalanceTest {
private static final Log LOG = LogFactory.getLog(LoadBalanceTest.class);
private static final String TESTING_QUEUE = "testingqueue";
private static int networkBridgePrefetch = 1000;
@Test
public void does_load_balance_between_consumers() throws Exception {
BrokerService brokerService1 = null;
BrokerService brokerService2 = null;
final int total = 100;
final AtomicInteger broker1Count = new AtomicInteger(0);
final AtomicInteger broker2Count = new AtomicInteger(0);
try {
{
brokerService1 = new BrokerService();
brokerService1.setBrokerName("one");
brokerService1.setUseJmx(false);
brokerService1
.setPersistenceAdapter(new MemoryPersistenceAdapter());
brokerService1.addConnector("nio://0.0.0.0:61616");
final NetworkConnector network1 = brokerService1
.addNetworkConnector("static:(tcp://localhost:51515)");
network1.setName("network1");
network1.setDynamicOnly(true);
network1.setNetworkTTL(3);
network1.setPrefetchSize(networkBridgePrefetch);
network1.setConduitSubscriptions(false);
network1.setDecreaseNetworkConsumerPriority(false);
brokerService1.start();
}
{
brokerService2 = new BrokerService();
brokerService2.setBrokerName("two");
brokerService2.setUseJmx(false);
brokerService2
.setPersistenceAdapter(new MemoryPersistenceAdapter());
brokerService2.addConnector("nio://0.0.0.0:51515");
final NetworkConnector network2 = brokerService2
.addNetworkConnector("static:(tcp://localhost:61616)");
network2.setName("network1");
network2.setDynamicOnly(true);
network2.setNetworkTTL(3);
network2.setPrefetchSize(networkBridgePrefetch);
network2.setConduitSubscriptions(false);
network2.setDecreaseNetworkConsumerPriority(false);
brokerService2.start();
}
final ExecutorService pool = Executors.newSingleThreadExecutor();
final ActiveMQConnectionFactory connectionFactory1 = new ActiveMQConnectionFactory(
"vm://one");
final SingleConnectionFactory singleConnectionFactory1 = new SingleConnectionFactory(
connectionFactory1);
singleConnectionFactory1.setReconnectOnException(true);
final DefaultMessageListenerContainer container1 = new DefaultMessageListenerContainer();
container1.setConnectionFactory(singleConnectionFactory1);
container1.setMaxConcurrentConsumers(1);
container1.setDestination(new ActiveMQQueue("testingqueue"));
container1.setMessageListener(new MessageListener() {
public void onMessage(final Message message) {
broker1Count.incrementAndGet();
}
});
container1.afterPropertiesSet();
container1.start();
pool.submit(new Callable<Object>() {
public Object call() throws Exception {
try {
final ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory(
"vm://two");
final SingleConnectionFactory singleConnectionFactory2 = new SingleConnectionFactory(
connectionFactory2);
singleConnectionFactory2.setReconnectOnException(true);
final DefaultMessageListenerContainer container2 = new DefaultMessageListenerContainer();
container2
.setConnectionFactory(singleConnectionFactory2);
container2.setMaxConcurrentConsumers(1);
container2.setDestination(new ActiveMQQueue(
"testingqueue"));
container2.setMessageListener(new MessageListener() {
public void onMessage(final Message message) {
broker2Count.incrementAndGet();
}
});
container2.afterPropertiesSet();
container2.start();
final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(
singleConnectionFactory2);
final JmsTemplate template = new JmsTemplate(
cachingConnectionFactory);
final ActiveMQQueue queue = new ActiveMQQueue(
"testingqueue");
for (int i = 0; i < total; i++) {
template.send(queue, new MessageCreator() {
public Message createMessage(
final Session session)
throws JMSException {
final TextMessage message = session
.createTextMessage();
message.setText("Hello World!");
return message;
}
});
}
// give spring time to scale back again
while (container2.getActiveConsumerCount() > 1) {
System.out.println("active consumer count: "
+ container2.getActiveConsumerCount());
System.out.println("concurrent consumer count: "
+ container2.getConcurrentConsumers());
Thread.sleep(1000);
}
cachingConnectionFactory.destroy();
container2.destroy();
} catch (final Throwable t) {
t.printStackTrace();
}
return null;
}
});
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
LOG.info("broker1Count " + broker1Count.get() + ", broker2Count " + broker2Count.get());
int count = 0;
// give it 10 seconds
while (count++ < 10
&& broker1Count.get() + broker2Count.get() != total) {
LOG.info("broker1Count " + broker1Count.get() + ", broker2Count " + broker2Count.get());
Thread.sleep(1000);
}
container1.destroy();
} finally {
try {
if (brokerService1 != null) {
brokerService1.stop();
}
} catch (final Throwable t) {
t.printStackTrace();
}
try {
if (brokerService2 != null) {
brokerService2.stop();
}
} catch (final Throwable t) {
t.printStackTrace();
}
}
if (broker1Count.get() < 25 || broker2Count.get() < 25) {
fail("Each broker should have gotten at least 25 messages but instead broker1 got "
+ broker1Count.get()
+ " and broker2 got "
+ broker2Count.get());
}
}
@Test
public void does_xml_multicast_load_balance_between_consumers() throws Exception {
final int total = 100;
final AtomicInteger broker1Count = new AtomicInteger(0);
final AtomicInteger broker2Count = new AtomicInteger(0);
final ExecutorService pool = Executors.newSingleThreadExecutor();
final CountDownLatch startProducer = new CountDownLatch(1);
final String xmlConfig = getClass().getPackage().getName().replace('.','/') + "/loadbalancetest.xml";
System.setProperty("lbt.networkBridgePrefetch", String.valueOf(networkBridgePrefetch));
System.setProperty("lbt.brokerName", "one");
final ActiveMQConnectionFactory connectionFactory1 = new ActiveMQConnectionFactory(
"vm://one?brokerConfig=xbean:" + xmlConfig);
final SingleConnectionFactory singleConnectionFactory1 = new SingleConnectionFactory(
connectionFactory1);
singleConnectionFactory1.setReconnectOnException(true);
final DefaultMessageListenerContainer container1 = new DefaultMessageListenerContainer();
container1.setConnectionFactory(singleConnectionFactory1);
container1.setMaxConcurrentConsumers(1);
container1.setDestination(new ActiveMQQueue(TESTING_QUEUE));
container1.setMessageListener(new MessageListener() {
public void onMessage(final Message message) {
broker1Count.incrementAndGet();
}
});
container1.afterPropertiesSet();
container1.start();
pool.submit(new Callable<Object>() {
public Object call() throws Exception {
System.setProperty("lbt.brokerName", "two");
final ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory(
"vm://two?brokerConfig=xbean:" + xmlConfig);
final SingleConnectionFactory singleConnectionFactory2 = new SingleConnectionFactory(
connectionFactory2);
singleConnectionFactory2.setReconnectOnException(true);
final DefaultMessageListenerContainer container2 = new DefaultMessageListenerContainer();
container2.setConnectionFactory(singleConnectionFactory2);
container2.setMaxConcurrentConsumers(1);
container2.setDestination(new ActiveMQQueue(TESTING_QUEUE));
container2.setMessageListener(new MessageListener() {
public void onMessage(final Message message) {
broker2Count.incrementAndGet();
}
});
container2.afterPropertiesSet();
container2.start();
assertTrue("wait for start signal", startProducer.await(20, TimeUnit.SECONDS));
final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(
singleConnectionFactory2);
final JmsTemplate template = new JmsTemplate(
cachingConnectionFactory);
final ActiveMQQueue queue = new ActiveMQQueue(TESTING_QUEUE);
for (int i = 0; i < total; i++) {
template.send(queue, new MessageCreator() {
public Message createMessage(final Session session)
throws JMSException {
final TextMessage message = session
.createTextMessage();
message.setText("Hello World!");
return message;
}
});
}
return null;
}
});
// give network a chance to build, needs advisories
waitForBridgeFormation(10000);
startProducer.countDown();
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
LOG.info("broker1Count " + broker1Count.get() + ", broker2Count " + broker2Count.get());
int count = 0;
// give it 10 seconds
while (count++ < 10 && broker1Count.get() + broker2Count.get() != total) {
LOG.info("broker1Count " + broker1Count.get() + ", broker2Count " + broker2Count.get());
Thread.sleep(1000);
}
if (broker1Count.get() < 25 || broker2Count.get() < 25) {
fail("Each broker should have gotten at least 25 messages but instead broker1 got "
+ broker1Count.get()
+ " and broker2 got "
+ broker2Count.get());
}
}
// need to ensure broker bridge is alive before starting the consumer
// peeking at the internals will give us this info
private void waitForBridgeFormation(long delay) throws Exception {
long done = System.currentTimeMillis() + delay;
while (done > System.currentTimeMillis()) {
BrokerService broker = BrokerRegistry.getInstance().lookup("two");
if (broker != null && !broker.getNetworkConnectors().isEmpty()) {
if (!broker.getNetworkConnectors().get(0).activeBridges().isEmpty()) {
return;
}
}
Thread.sleep(1000);
}
}
}

View File

@ -0,0 +1,36 @@
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
#
# The logging properties used during tests..
#
log4j.rootLogger=INFO, out, stdout
#log4j.logger.org.apache.activemq=DEBUG
#log4j.logger.org.apache.activemq.broker.region=TRACE
# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
# File appender
log4j.appender.out=org.apache.log4j.FileAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n
log4j.appender.out.file=target/activemq-test.log
log4j.appender.out.append=true

View File

@ -0,0 +1,34 @@
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="systemPropertiesModeName">
<value>SYSTEM_PROPERTIES_MODE_OVERRIDE</value>
</property>
</bean>
<!-- Configures a broker with the system's hostname as its name with jmx but no persistence or adivsory support -->
<amq:broker brokerName="${lbt.brokerName}" useJmx="false" persistent="false" advisorySupport="true" useLocalHostBrokerName="false">
<amq:networkConnectors>
<amq:networkConnector conduitSubscriptions="false" decreaseNetworkConsumerPriority="false" networkTTL="3"
dynamicOnly="true" uri="multicast://239.255.2.25:6155" name="network" prefetchSize="${lbt.networkBridgePrefetch}">
<amq:excludedDestinations>
<amq:topic physicalName=">"/>
</amq:excludedDestinations>
<amq:staticallyIncludedDestinations>
<amq:queue physicalName=">"/>
</amq:staticallyIncludedDestinations>
</amq:networkConnector>
</amq:networkConnectors>
<amq:transportConnectors>
<amq:transportConnector name="transport" uri="nio://0.0.0.0:0" discoveryUri="multicast://239.255.2.25:6155"/>
</amq:transportConnectors>
</amq:broker>
</beans>