diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java index 3ac5e54422..f9787e1d36 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ1866.java @@ -16,25 +16,20 @@ */ package org.apache.activemq.bugs; -import junit.framework.TestCase; - import java.util.ArrayList; -import java.util.Random; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import javax.jms.Connection; import javax.jms.Destination; -import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; +import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.ActiveMQPrefetchPolicy; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -43,8 +38,8 @@ import org.apache.commons.logging.LogFactory; * This is a test case for the issue reported at: * https://issues.apache.org/activemq/browse/AMQ-1866 * - * If you have a JMS producer sending messages to multiple consumers and - * you have a low prefetch, eventually all consumers will run as slow as + * If you have a JMS producer sending messages to multiple fast consumers and + * one slow consumer, eventually all consumers will run as slow as * the slowest consumer. */ public class AMQ1866 extends TestCase { @@ -55,9 +50,9 @@ public class AMQ1866 extends TestCase { String ACTIVEMQ_BROKER_BIND = "tcp://localhost:61616"; String ACTIVEMQ_BROKER_URI = "tcp://localhost:61616"; - String REQUEST_QUEUE = "provider.queue"; AtomicBoolean shutdown = new AtomicBoolean(); + private ActiveMQQueue destination; @Override protected void setUp() throws Exception { @@ -65,6 +60,7 @@ public class AMQ1866 extends TestCase { brokerService = new BrokerService(); brokerService.addConnector(ACTIVEMQ_BROKER_BIND); brokerService.start(); + destination = new ActiveMQQueue(getName()); } @Override @@ -96,15 +92,21 @@ public class AMQ1866 extends TestCase { } public void doTestConsumerSlowDown() throws InterruptedException { - ConsumerThread c1 = new ConsumerThread("Consumer-1"); - ConsumerThread c2 = new ConsumerThread("Consumer-2"); ProducerThread p1 = new ProducerThread("Producer-1"); - threads.add(c1); - threads.add(c2); threads.add(p1); - c1.start(); - c2.start(); p1.start(); + + // Wait a bit before starting the consumers to load up the queues a bit.. + // If the queue is loaded up it seems that the even the Default Prefetch size case fails. + Thread.sleep(10000); + + ConsumerThread c1 = new ConsumerThread("Consumer-1"); + threads.add(c1); + c1.start(); + + ConsumerThread c2 = new ConsumerThread("Consumer-2"); + threads.add(c2); + c2.start(); for ( int i=0; i < 30; i++) { Thread.sleep(1000); @@ -114,7 +116,7 @@ public class AMQ1866 extends TestCase { System.out.println("p1: "+p1Counter+", c1: "+c1Counter+", c2: "+c2Counter); // Once message have been flowing for a few seconds, start asserting that c2 always gets messages. It should be receiving about 100 / sec - if( i > 2 ) { + if( i > 3 ) { assertTrue("Consumer 2 should be receiving new messages every second.", c2Counter > 0); } } @@ -134,19 +136,18 @@ public class AMQ1866 extends TestCase { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI); factory.setDispatchAsync(true); - Destination requestDestination = new ActiveMQQueue(REQUEST_QUEUE); connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(requestDestination); + MessageProducer producer = session.createProducer(destination); connection.start(); int i = 0; while (!shutdown.get()) { producer.send(session.createTextMessage(getName()+" Message "+(++i))); counter.incrementAndGet(); - Thread.sleep(10); + Thread.sleep(1); } } catch (Exception e) { @@ -175,15 +176,13 @@ public class AMQ1866 extends TestCase { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI); factory.setDispatchAsync(true); - Destination requestDestination = new ActiveMQQueue(REQUEST_QUEUE); connection = factory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageConsumer consumer = session.createConsumer(requestDestination); + MessageConsumer consumer = session.createConsumer(destination); connection.start(); - int i = 0; while (!shutdown.get()) { TextMessage msg = (TextMessage)consumer.receive(1000); if ( msg!=null ) { @@ -191,7 +190,7 @@ public class AMQ1866 extends TestCase { if (getName().equals("Consumer-1")) { sleepingTime = 10 * 1000; } else { - sleepingTime = 10; + sleepingTime = 1; } Thread.sleep(sleepingTime); counter.incrementAndGet();