Don't use hardcoded ports.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1443642 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2013-02-07 18:20:32 +00:00
parent 4743a20be3
commit 600f209e4c
3 changed files with 170 additions and 156 deletions

View File

@ -17,12 +17,7 @@
package org.apache.bugs;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import java.util.concurrent.CountDownLatch;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@ -33,17 +28,20 @@ import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.concurrent.CountDownLatch;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
public class AMQ1730Test extends TestCase {
private static final Logger log = LoggerFactory.getLogger(AMQ1730Test.class);
private static final String JMSX_DELIVERY_COUNT = "JMSXDeliveryCount";
BrokerService brokerService;
private static final int MESSAGE_COUNT = 250;
@ -55,6 +53,7 @@ public class AMQ1730Test extends TestCase {
@Override
protected void setUp() throws Exception {
super.setUp();
brokerService = new BrokerService();
brokerService.addConnector("tcp://localhost:0");
brokerService.setUseJmx(false);
@ -65,6 +64,7 @@ public class AMQ1730Test extends TestCase {
protected void tearDown() throws Exception {
super.tearDown();
brokerService.stop();
brokerService.waitUntilStopped();
}
public void testRedelivery() throws Exception {
@ -109,7 +109,7 @@ public class AMQ1730Test extends TestCase {
messageListenerContainer.setSessionTransacted(false);
messageListenerContainer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (!(message instanceof TextMessage)) {
throw new RuntimeException();
@ -159,7 +159,5 @@ public class AMQ1730Test extends TestCase {
T get() {
return value;
}
}
}

View File

@ -35,11 +35,11 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.pool.PooledConnectionFactory;
//import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
//import org.apache.activemq.pool.PooledConnectionFactory;
public class AMQ2754Test extends TestCase {
@ -47,110 +47,117 @@ public class AMQ2754Test extends TestCase {
BrokerService brokerService1 = null;
BrokerService brokerService2 = null;
String broker1Uri;
String broker2Uri;
final int total = 100;
final CountDownLatch latch = new CountDownLatch(total);
final boolean conduitSubscriptions = true;
try {
{
brokerService1 = new BrokerService();
brokerService1.setBrokerName("consumer");
brokerService1.setUseJmx(false);
brokerService1.setPersistenceAdapter(new MemoryPersistenceAdapter());
brokerService1.addConnector("tcp://0.0.0.0:61616");
brokerService1.start();
}
{
brokerService2 = new BrokerService();
brokerService2.setBrokerName("producer");
brokerService2.setUseJmx(false);
brokerService2.setPersistenceAdapter(new MemoryPersistenceAdapter());
brokerService2.addConnector("tcp://0.0.0.0:51515");
NetworkConnector network2 = brokerService2.addNetworkConnector("static:(tcp://localhost:61616)");
network2.setName("network1");
network2.setDynamicOnly(true);
network2.setConduitSubscriptions(conduitSubscriptions);
network2.setNetworkTTL(3);
network2.setPrefetchSize(1);
brokerService2.start();
}
ExecutorService pool = Executors.newSingleThreadExecutor();
ActiveMQConnectionFactory connectionFactory1 =
new ActiveMQConnectionFactory("failover:(tcp://localhost:61616)");
connectionFactory1.setWatchTopicAdvisories(false);
final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory1);
container.setMaxConcurrentConsumers(10);
container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
container.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
container.setDestination(new ActiveMQQueue("testingqueue"));
container.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
latch.countDown();
{
brokerService1 = new BrokerService();
brokerService1.setBrokerName("consumer");
brokerService1.setUseJmx(false);
brokerService1.setPersistenceAdapter(new MemoryPersistenceAdapter());
broker1Uri = brokerService1.addConnector("tcp://0.0.0.0:0").getPublishableConnectString();
brokerService1.start();
}
});
container.setMaxMessagesPerTask(1);
container.afterPropertiesSet();
container.start();
pool.submit(new Callable<Object>() {
public Object call() throws Exception {
try {
final int batch = 10;
ActiveMQConnectionFactory connectionFactory2 =
new ActiveMQConnectionFactory("failover:(tcp://localhost:51515)");
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(connectionFactory2);
connectionFactory2.setWatchTopicAdvisories(false);
JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
ActiveMQQueue queue = new ActiveMQQueue("testingqueue");
for(int b = 0; b < batch; b++) {
for(int i = 0; i < (total / batch); i++) {
final String id = ":batch=" + b + "i=" + i;
template.send(queue, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage();
message.setText("Hello World!" + id);
return message;
}
});
}
// give spring time to scale back again
while(container.getActiveConsumerCount() > 1) {
System.out.println("active consumer count:" + container.getActiveConsumerCount());
System.out.println("concurrent consumer count: " + container.getConcurrentConsumers());
Thread.sleep(1000);
}
}
//pooledConnectionFactory.stop();
} catch(Throwable t) {
t.printStackTrace();
{
brokerService2 = new BrokerService();
brokerService2.setBrokerName("producer");
brokerService2.setUseJmx(false);
brokerService2.setPersistenceAdapter(new MemoryPersistenceAdapter());
broker2Uri = brokerService2.addConnector("tcp://0.0.0.0:0").getPublishableConnectString();
NetworkConnector network2 = brokerService2.addNetworkConnector("static:("+broker1Uri+")");
network2.setName("network1");
network2.setDynamicOnly(true);
network2.setConduitSubscriptions(conduitSubscriptions);
network2.setNetworkTTL(3);
network2.setPrefetchSize(1);
brokerService2.start();
}
ExecutorService pool = Executors.newSingleThreadExecutor();
ActiveMQConnectionFactory connectionFactory1 =
new ActiveMQConnectionFactory("failover:("+broker1Uri+")");
connectionFactory1.setWatchTopicAdvisories(false);
final DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory1);
container.setMaxConcurrentConsumers(10);
container.setSessionAcknowledgeMode(Session.AUTO_ACKNOWLEDGE);
container.setCacheLevel(DefaultMessageListenerContainer.CACHE_CONSUMER);
container.setDestination(new ActiveMQQueue("testingqueue"));
container.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
latch.countDown();
}
return null;
});
container.setMaxMessagesPerTask(1);
container.afterPropertiesSet();
container.start();
final String finalBroker2Uri = broker2Uri;
pool.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
final int batch = 10;
ActiveMQConnectionFactory connectionFactory2 =
new ActiveMQConnectionFactory("failover:("+finalBroker2Uri+")");
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(connectionFactory2);
connectionFactory2.setWatchTopicAdvisories(false);
JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
ActiveMQQueue queue = new ActiveMQQueue("testingqueue");
for(int b = 0; b < batch; b++) {
for(int i = 0; i < (total / batch); i++) {
final String id = ":batch=" + b + "i=" + i;
template.send(queue, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage();
message.setText("Hello World!" + id);
return message;
}
});
}
// give spring time to scale back again
while(container.getActiveConsumerCount() > 1) {
System.out.println("active consumer count:" + container.getActiveConsumerCount());
System.out.println("concurrent consumer count: " + container.getConcurrentConsumers());
Thread.sleep(1000);
}
}
//pooledConnectionFactory.stop();
} catch(Throwable t) {
t.printStackTrace();
}
return null;
}
});
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
int count = 0;
// give it 20 seconds
while(!latch.await(1, TimeUnit.SECONDS) && count++ < 20) {
System.out.println("count " + latch.getCount());
}
});
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
int count = 0;
// give it 20 seconds
while(!latch.await(1, TimeUnit.SECONDS) && count++ < 20) {
System.out.println("count " + latch.getCount());
}
container.destroy();
container.destroy();
} finally {
try { if(brokerService1 != null) {
try { if(brokerService1 != null) {
brokerService1.stop();
}} catch(Throwable t) { t.printStackTrace(); }
try { if(brokerService2 != null) {
try { if(brokerService2 != null) {
brokerService2.stop();
}} catch(Throwable t) { t.printStackTrace(); }
}
@ -158,7 +165,5 @@ public class AMQ2754Test extends TestCase {
if(latch.getCount() > 0) {
fail("latch should have gone down to 0 but was " + latch.getCount());
}
}
}
}

View File

@ -39,9 +39,9 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.junit.Test;
import org.springframework.jms.connection.CachingConnectionFactory;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
@ -61,48 +61,49 @@ public class LoadBalanceTest {
final AtomicInteger broker1Count = new AtomicInteger(0);
final AtomicInteger broker2Count = new AtomicInteger(0);
final CountDownLatch startProducer = new CountDownLatch(1);
String broker1Uri;
String broker2Uri;
try {
{
brokerService1 = new BrokerService();
brokerService1.setBrokerName("one");
brokerService1.setUseJmx(false);
brokerService1
.setPersistenceAdapter(new MemoryPersistenceAdapter());
brokerService1.addConnector("nio://0.0.0.0:61616");
final NetworkConnector network1 = brokerService1
.addNetworkConnector("static:(tcp://localhost:51515)");
network1.setName("network1");
network1.setDynamicOnly(true);
network1.setNetworkTTL(3);
network1.setPrefetchSize(networkBridgePrefetch);
network1.setConduitSubscriptions(false);
network1.setDecreaseNetworkConsumerPriority(false);
network1.setDispatchAsync(false);
brokerService1.start();
}
{
brokerService2 = new BrokerService();
brokerService2.setBrokerName("two");
brokerService2.setUseJmx(false);
brokerService2
.setPersistenceAdapter(new MemoryPersistenceAdapter());
brokerService2.addConnector("nio://0.0.0.0:51515");
final NetworkConnector network2 = brokerService2
.addNetworkConnector("static:(tcp://localhost:61616)");
network2.setName("network1");
network2.setDynamicOnly(true);
network2.setNetworkTTL(3);
network2.setPrefetchSize(networkBridgePrefetch);
network2.setConduitSubscriptions(false);
network2.setDecreaseNetworkConsumerPriority(false);
network2.setDispatchAsync(false);
brokerService2.start();
}
brokerService1 = new BrokerService();
brokerService1.setBrokerName("one");
brokerService1.setUseJmx(false);
brokerService1.setPersistenceAdapter(new MemoryPersistenceAdapter());
broker1Uri = brokerService1.addConnector("nio://0.0.0.0:0").getPublishableConnectString();
brokerService2 = new BrokerService();
brokerService2.setBrokerName("two");
brokerService2.setUseJmx(false);
brokerService2.setPersistenceAdapter(new MemoryPersistenceAdapter());
broker2Uri = brokerService2.addConnector("nio://0.0.0.0:0").getPublishableConnectString();
final NetworkConnector network1 = brokerService1.addNetworkConnector("static:("+broker2Uri+")");
network1.setName("network1");
network1.setDynamicOnly(true);
network1.setNetworkTTL(3);
network1.setPrefetchSize(networkBridgePrefetch);
network1.setConduitSubscriptions(false);
network1.setDecreaseNetworkConsumerPriority(false);
network1.setDispatchAsync(false);
final NetworkConnector network2 = brokerService2.addNetworkConnector("static:("+broker1Uri+")");
network2.setName("network1");
network2.setDynamicOnly(true);
network2.setNetworkTTL(3);
network2.setPrefetchSize(networkBridgePrefetch);
network2.setConduitSubscriptions(false);
network2.setDecreaseNetworkConsumerPriority(false);
network2.setDispatchAsync(false);
brokerService1.start();
brokerService2.start();
final ExecutorService pool = Executors.newSingleThreadExecutor();
final ActiveMQConnectionFactory connectionFactory1 = new ActiveMQConnectionFactory(
"vm://one");
final SingleConnectionFactory singleConnectionFactory1 = new SingleConnectionFactory(
connectionFactory1);
final ActiveMQConnectionFactory connectionFactory1 = new ActiveMQConnectionFactory("vm://one");
final SingleConnectionFactory singleConnectionFactory1 = new SingleConnectionFactory(connectionFactory1);
singleConnectionFactory1.setReconnectOnException(true);
final DefaultMessageListenerContainer container1 = new DefaultMessageListenerContainer();
container1.setConnectionFactory(singleConnectionFactory1);
@ -110,6 +111,7 @@ public class LoadBalanceTest {
container1.setDestination(new ActiveMQQueue("testingqueue"));
container1.setMessageListener(new MessageListener() {
@Override
public void onMessage(final Message message) {
broker1Count.incrementAndGet();
}
@ -118,6 +120,7 @@ public class LoadBalanceTest {
container1.start();
pool.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
final ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory(
@ -133,6 +136,7 @@ public class LoadBalanceTest {
"testingqueue"));
container2.setMessageListener(new MessageListener() {
@Override
public void onMessage(final Message message) {
broker2Count.incrementAndGet();
}
@ -151,6 +155,7 @@ public class LoadBalanceTest {
for (int i = 0; i < total; i++) {
template.send(queue, new MessageCreator() {
@Override
public Message createMessage(
final Session session)
throws JMSException {
@ -197,6 +202,7 @@ public class LoadBalanceTest {
try {
if (brokerService1 != null) {
brokerService1.stop();
brokerService1.waitUntilStopped();
}
} catch (final Throwable t) {
t.printStackTrace();
@ -204,12 +210,13 @@ public class LoadBalanceTest {
try {
if (brokerService2 != null) {
brokerService2.stop();
brokerService2.waitUntilStopped();
}
} catch (final Throwable t) {
t.printStackTrace();
}
}
if (broker1Count.get() < 25 || broker2Count.get() < 25) {
fail("Each broker should have gotten at least 25 messages but instead broker1 got "
+ broker1Count.get()
@ -240,6 +247,7 @@ public class LoadBalanceTest {
container1.setDestination(new ActiveMQQueue(TESTING_QUEUE));
container1.setMessageListener(new MessageListener() {
@Override
public void onMessage(final Message message) {
broker1Count.incrementAndGet();
}
@ -248,6 +256,7 @@ public class LoadBalanceTest {
container1.start();
pool.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
System.setProperty("lbt.brokerName", "two");
final ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory(
@ -261,16 +270,17 @@ public class LoadBalanceTest {
container2.setDestination(new ActiveMQQueue(TESTING_QUEUE));
container2.setMessageListener(new MessageListener() {
@Override
public void onMessage(final Message message) {
broker2Count.incrementAndGet();
}
});
container2.afterPropertiesSet();
container2.start();
assertTrue("wait for start signal", startProducer.await(20, TimeUnit.SECONDS));
final CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(
singleConnectionFactory2);
final JmsTemplate template = new JmsTemplate(
@ -279,6 +289,7 @@ public class LoadBalanceTest {
for (int i = 0; i < total; i++) {
template.send(queue, new MessageCreator() {
@Override
public Message createMessage(final Session session)
throws JMSException {
final TextMessage message = session
@ -291,14 +302,14 @@ public class LoadBalanceTest {
return null;
}
});
// give network a chance to build, needs advisories
waitForBridgeFormation();
startProducer.countDown();
pool.shutdown();
pool.awaitTermination(10, TimeUnit.SECONDS);
LOG.info("broker1Count " + broker1Count.get() + ", broker2Count " + broker2Count.get());
int count = 0;