mirror of https://github.com/apache/activemq.git
Added test case to verify that the prefetch buffer works as expected for prefetch==1
http://www.nabble.com/Trouble-with-prefetch-buffer.-tf2029800.html#a5599978 git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@428022 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9c92f186a5
commit
98f3545303
|
@ -32,6 +32,7 @@ import junit.framework.Test;
|
||||||
import org.apache.activemq.ActiveMQConnection;
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
import org.apache.activemq.ActiveMQMessageConsumer;
|
import org.apache.activemq.ActiveMQMessageConsumer;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
|
||||||
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
|
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
|
||||||
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
|
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
|
||||||
|
@ -486,7 +487,48 @@ public class JMSConsumerTest extends JmsTestSupport {
|
||||||
assertNull(consumer.receiveNoWait());
|
assertNull(consumer.receiveNoWait());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
public void initCombosForTestPrefetch1MessageNotDispatched() {
|
||||||
|
addCombinationValues("deliveryMode", new Object[] { new Integer(DeliveryMode.NON_PERSISTENT),
|
||||||
|
new Integer(DeliveryMode.PERSISTENT) });
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testPrefetch1MessageNotDispatched() throws Exception {
|
||||||
|
|
||||||
|
// Set prefetch to 1
|
||||||
|
connection.getPrefetchPolicy().setAll(1);
|
||||||
|
connection.start();
|
||||||
|
|
||||||
|
Session session = connection.createSession(true, 0);
|
||||||
|
destination = new ActiveMQQueue("TEST");
|
||||||
|
MessageConsumer consumer = session.createConsumer(destination);
|
||||||
|
|
||||||
|
// Send 2 messages to the destination.
|
||||||
|
sendMessages(session, destination, 2);
|
||||||
|
session.commit();
|
||||||
|
|
||||||
|
// Only pick up the first message.
|
||||||
|
Message message1 = message1 = consumer.receive(1000);
|
||||||
|
assertNotNull(message1);
|
||||||
|
|
||||||
|
// Don't acknowledge yet. This should keep our prefetch full.
|
||||||
|
// Since prefetch is still full, the 2nd message should get dispatched to
|
||||||
|
// another consumer.. lets create the 2nd consumer test that it does make sure it does.
|
||||||
|
ActiveMQConnection connection2 = (ActiveMQConnection) factory.createConnection();
|
||||||
|
connections.add(connection2);
|
||||||
|
Session session2 = connection2.createSession(true, 0);
|
||||||
|
MessageConsumer consumer2 = session2.createConsumer(destination);
|
||||||
|
|
||||||
|
// Only pick up the 2nd messages.
|
||||||
|
Message message2 = consumer.receive(1000);
|
||||||
|
assertNotNull(message2);
|
||||||
|
|
||||||
|
session.commit();
|
||||||
|
session2.commit();
|
||||||
|
|
||||||
|
assertNull(consumer.receiveNoWait());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public void initCombosForTestDontStart() {
|
public void initCombosForTestDontStart() {
|
||||||
addCombinationValues("deliveryMode", new Object[] { new Integer(DeliveryMode.NON_PERSISTENT), });
|
addCombinationValues("deliveryMode", new Object[] { new Integer(DeliveryMode.NON_PERSISTENT), });
|
||||||
addCombinationValues("destinationType", new Object[] { new Byte(ActiveMQDestination.QUEUE_TYPE),
|
addCombinationValues("destinationType", new Object[] { new Byte(ActiveMQDestination.QUEUE_TYPE),
|
||||||
|
|
Loading…
Reference in New Issue