fix for AMQ-1902 introduced by fix for AMQ-1866

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@687677 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2008-08-21 10:01:10 +00:00
parent fc58b6f6a5
commit 3c32abd791
4 changed files with 57 additions and 22 deletions

View File

@ -20,6 +20,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@ -1166,19 +1167,24 @@ public class Queue extends BaseDestination implements Task {
// list anything that does not actually get dispatched.
if (list != null && !list.isEmpty()) {
// System.out.println(getName()+": dispatching from paged in: "+list.size());
pagedInPendingDispatch.addAll(doActualDispatch(list));
if (pagedInPendingDispatch.isEmpty()) {
pagedInPendingDispatch.addAll(doActualDispatch(list));
} else {
pagedInPendingDispatch.addAll(list);
}
// System.out.println(getName()+": new pending list2: "+pagedInPendingDispatch.size());
}
} finally {
dispatchLock.unlock();
}
}
/**
* @return list of messages that could get dispatched to consumers if they were not full.
*/
private List<QueueMessageReference> doActualDispatch(List<QueueMessageReference> list) throws Exception {
List<QueueMessageReference> rc = new ArrayList<QueueMessageReference>(list.size());
Set<Subscription> fullConsumers = new HashSet<Subscription>(this.consumers.size());
List<Subscription> consumers;
synchronized (this.consumers) {
@ -1190,13 +1196,18 @@ public class Queue extends BaseDestination implements Task {
int interestCount=0;
for (Subscription s : consumers) {
if (dispatchSelector.canSelect(s, node)) {
if (!s.isFull()) {
// Dispatch it.
s.add(node);
// System.out.println(getName()+" Dispatched to "+s.getConsumerInfo().getConsumerId()+", "+node.getMessageId());
target = s;
break;
}
if (!fullConsumers.contains(s)) {
if (!s.isFull()) {
// Dispatch it.
s.add(node);
//System.err.println(getName()+" Dispatched to "+s.getConsumerInfo().getConsumerId()+", "+node.getMessageId());
target = s;
break;
} else {
// no further dispatch of list to a full consumer to avoid out of order message receipt
fullConsumers.add(s);
}
}
interestCount++;
}
}

View File

@ -23,6 +23,8 @@ import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import junit.framework.Test;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
@ -62,4 +64,12 @@ public class CursorQueueStoreTest extends CursorSupport {
answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true);
}
public static Test suite() {
return suite(CursorQueueStoreTest.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
}

View File

@ -23,6 +23,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@ -31,19 +32,24 @@ import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.Test;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerTest;
import org.apache.activemq.broker.region.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* @version $Revision: 1.3 $
*/
public abstract class CursorSupport extends TestCase {
public abstract class CursorSupport extends CombinationTestSupport {
protected static final int MESSAGE_COUNT = 500;
protected static final int PREFETCH_SIZE = 50;
public int MESSAGE_COUNT = 500;
public int PREFETCH_SIZE = 50;
private static final Log LOG = LogFactory.getLog(CursorSupport.class);
protected BrokerService broker;
@ -55,7 +61,7 @@ public abstract class CursorSupport extends TestCase {
protected abstract void configureBroker(BrokerService answer) throws Exception;
public void XtestSendFirstThenConsume() throws Exception {
public void testSendFirstThenConsume() throws Exception {
ConnectionFactory factory = createConnectionFactory();
Connection consumerConnection = getConsumerConnection(factory);
MessageConsumer consumer = getConsumer(consumerConnection);
@ -85,7 +91,15 @@ public abstract class CursorSupport extends TestCase {
consumerConnection.close();
}
public void testSendWhilstConaume() throws Exception {
public void initCombosForTestSendWhilstConsume() {
addCombinationValues("MESSAGE_COUNT", new Object[] {Integer.valueOf(400),
Integer.valueOf(500)});
addCombinationValues("PREFETCH_SIZE", new Object[] {Integer.valueOf(100),
Integer.valueOf(50)});
}
public void testSendWhilstConsume() throws Exception {
ConnectionFactory factory = createConnectionFactory();
Connection consumerConnection = getConsumerConnection(factory);
// create durable subs
@ -150,7 +164,7 @@ public abstract class CursorSupport extends TestCase {
assertEquals("This should be the same at pos " + i + " in the list", sent.getJMSMessageID(), consumed.getJMSMessageID());
}
}
protected Connection getConsumerConnection(ConnectionFactory fac) throws JMSException {
Connection connection = fac.createConnection();
connection.setClientID("testConsumer");

View File

@ -21,13 +21,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
@ -89,19 +89,16 @@ public class AMQ1866 extends TestCase {
brokerService.stop();
}
// Failing
public void testConsumerSlowDownPrefetch0() throws Exception {
ACTIVEMQ_BROKER_URI = "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=0";
doTestConsumerSlowDown();
}
// Failing
public void testConsumerSlowDownPrefetch10() throws Exception {
ACTIVEMQ_BROKER_URI = "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=10";
doTestConsumerSlowDown();
}
// Passing
public void testConsumerSlowDownDefaultPrefetch() throws Exception {
ACTIVEMQ_BROKER_URI = "tcp://localhost:61616";
doTestConsumerSlowDown();
@ -137,15 +134,18 @@ public class AMQ1866 extends TestCase {
threads.add(c2);
c2.start();
int totalReceived = 0;
for ( int i=0; i < 30; i++) {
Thread.sleep(1000);
long c1Counter = c1.counter.getAndSet(0);
long c2Counter = c2.counter.getAndSet(0);
System.out.println("c1: "+c1Counter+", c2: "+c2Counter);
totalReceived += c1Counter;
totalReceived += c2Counter;
// Once message have been flowing for a few seconds, start asserting that c2 always gets messages. It should be receiving about 100 / sec
if( i > 3 ) {
assertTrue("Consumer 2 should be receiving new messages every second.", c2Counter > 0);
if( i > 10 ) {
assertTrue("Total received=" + totalReceived + ", Consumer 2 should be receiving new messages every second.", c2Counter > 0);
}
}
}
@ -204,8 +204,8 @@ public class AMQ1866 extends TestCase {
} else {
sleepingTime = 1;
}
Thread.sleep(sleepingTime);
counter.incrementAndGet();
Thread.sleep(sleepingTime);
}
}