From 8de3bd29bfc0d916f60deb1de1992bb3d1d56c8b Mon Sep 17 00:00:00 2001 From: Gary Tully Date: Wed, 20 Jan 2010 16:41:30 +0000 Subject: [PATCH] resolve https://issues.apache.org/activemq/browse/AMQ-2567 git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@901269 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/PrefetchSubscription.java | 21 +-- .../activemq/ZeroPrefetchConsumerTest.java | 165 +++++++++++++++++- 2 files changed, 171 insertions(+), 15 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 00452923c4..cb5c99b5ee 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -252,20 +252,13 @@ public abstract class PrefetchSubscription extends AbstractSubscription { } index++; acknowledge(context, ack, node); - if (ack.getLastMessageId().equals(messageId)) { - - if (context.isInTransaction()) { - // extend prefetch window only if not a pulling - // consumer - if (getPrefetchSize() != 0) { - prefetchExtension = Math.max( - prefetchExtension, index ); - } - } else { - // contract prefetch if dispatch required a pull - if (getPrefetchSize() == 0) { - prefetchExtension = Math.max(0, prefetchExtension - index); - } + if (ack.getLastMessageId().equals(messageId)) { + // contract prefetch if dispatch required a pull + if (getPrefetchSize() == 0) { + prefetchExtension = Math.max(0, prefetchExtension - index); + } else if (context.isInTransaction()) { + // extend prefetch window only if not a pulling consumer + prefetchExtension = Math.max(prefetchExtension, index); } destination = node.getRegionDestination(); callDispatchMatched = true; diff --git a/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java index 7532654e6d..afe5240c5c 100644 --- a/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java @@ -155,15 +155,178 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport { answer = (TextMessage)consumer2.receiveNoWait(); assertNull("Should have not received a message!", answer); } + + // https://issues.apache.org/activemq/browse/AMQ-2567 + public void testManyMessageConsumer() throws Exception { + doTestManyMessageConsumer(true); + } + public void testManyMessageConsumerNoTransaction() throws Exception { + doTestManyMessageConsumer(false); + } + + private void doTestManyMessageConsumer(boolean transacted) throws Exception { + Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Msg1")); + producer.send(session.createTextMessage("Msg2")); + producer.send(session.createTextMessage("Msg3")); + producer.send(session.createTextMessage("Msg4")); + producer.send(session.createTextMessage("Msg5")); + producer.send(session.createTextMessage("Msg6")); + producer.send(session.createTextMessage("Msg7")); + producer.send(session.createTextMessage("Msg8")); + if (transacted) { + session.commit(); + } + // now lets receive it + MessageConsumer consumer = session.createConsumer(queue); + + MessageConsumer consumer2 = session.createConsumer(queue); + TextMessage answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg1"); + if (transacted) { + session.commit(); + } + answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg2"); + if (transacted) { + session.commit(); + } + answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg3"); + if (transacted) { + session.commit(); + } + // this call would return null if prefetchSize > 0 + answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg4"); + if (transacted) { + session.commit(); + } + // Now using other consumer + // this call should return the next message (Msg5) still left on the queue + answer = (TextMessage)consumer2.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg5"); + if (transacted) { + session.commit(); + } + // Now using other consumer + // this call should return the next message (Msg5) still left on the queue + answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg6"); + // read one more message without commit + // Now using other consumer + // this call should return the next message (Msg5) still left on the queue + answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg7"); + if (transacted) { + session.commit(); + } + // Now using other consumer + // this call should return the next message (Msg5) still left on the queue + answer = (TextMessage)consumer2.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg8"); + if (transacted) { + session.commit(); + } + answer = (TextMessage)consumer.receiveNoWait(); + assertNull("Should have not received a message!", answer); + } + + public void testManyMessageConsumerWithSend() throws Exception { + doTestManyMessageConsumerWithSend(true); + } + + public void testManyMessageConsumerWithSendNoTransaction() throws Exception { + doTestManyMessageConsumerWithSend(false); + } + + private void doTestManyMessageConsumerWithSend(boolean transacted) throws Exception { + Session session = connection.createSession(transacted, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Msg1")); + producer.send(session.createTextMessage("Msg2")); + producer.send(session.createTextMessage("Msg3")); + producer.send(session.createTextMessage("Msg4")); + producer.send(session.createTextMessage("Msg5")); + producer.send(session.createTextMessage("Msg6")); + producer.send(session.createTextMessage("Msg7")); + producer.send(session.createTextMessage("Msg8")); + if (transacted) { + session.commit(); + } + // now lets receive it + MessageConsumer consumer = session.createConsumer(queue); + + MessageConsumer consumer2 = session.createConsumer(queue); + TextMessage answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg1"); + if (transacted) { + session.commit(); + } + answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg2"); + if (transacted) { + session.commit(); + } + answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg3"); + if (transacted) { + session.commit(); + } + // Now using other consumer take 2 + answer = (TextMessage)consumer2.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg4"); + answer = (TextMessage)consumer2.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg5"); + + // ensure prefetch extension ok by sending another that could get dispatched + producer.send(session.createTextMessage("Msg9")); + if (transacted) { + session.commit(); + } + + answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg6"); + // read one more message without commit + // and using other consumer + answer = (TextMessage)consumer2.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg7"); + if (transacted) { + session.commit(); + } + + answer = (TextMessage)consumer2.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg8"); + if (transacted) { + session.commit(); + } + + answer = (TextMessage)consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg9"); + if (transacted) { + session.commit(); + } + answer = (TextMessage)consumer.receiveNoWait(); + assertNull("Should have not received a message!", answer); + } + protected void setUp() throws Exception { - bindAddress = "tcp://localhost:61616"; + bindAddress = "tcp://localhost:0"; super.setUp(); connection = createConnection(); connection.start(); queue = createQueue(); } + + protected void startBroker() throws Exception { + super.startBroker(); + bindAddress = broker.getTransportConnectors().get(0).getConnectUri().toString(); + } protected void tearDown() throws Exception { connection.close();