From a7891c3dca7b598b4664574bc88e0802e16df6ef Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Sun, 3 Sep 2006 17:09:51 +0000 Subject: [PATCH] Patch applied from Vadim: https://issues.apache.org/activemq/browse/AMQ-855 git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@439804 13f79535-47bb-0310-9956-ffa450edef68 --- ...ference.java => NullMessageReference.java} | 9 +++-- .../broker/region/QueueMessageReference.java | 2 +- .../activemq/ZeroPrefetchConsumerTest.java | 36 ++++++++++++++----- 3 files changed, 32 insertions(+), 15 deletions(-) rename activemq-core/src/main/java/org/apache/activemq/broker/region/{EndOfBrowseMarkerQueueMessageReference.java => NullMessageReference.java} (91%) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java similarity index 91% rename from activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java rename to activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java index 9a5d2730b6..86bfce4e69 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/NullMessageReference.java @@ -27,7 +27,7 @@ import org.apache.activemq.command.MessageId; /** * Only used by the {@link QueueMessageReference#NULL_MESSAGE} */ -final class EndOfBrowseMarkerQueueMessageReference implements +final class NullMessageReference implements QueueMessageReference { private ActiveMQMessage message = new ActiveMQMessage(); @@ -50,7 +50,7 @@ final class EndOfBrowseMarkerQueueMessageReference implements } public boolean lock(LockOwner subscription) { - throw new RuntimeException("not implemented"); + return true; } public void setAcked(boolean b) { @@ -58,7 +58,6 @@ final class EndOfBrowseMarkerQueueMessageReference implements } public void unlock() { - throw new RuntimeException("not implemented"); } public int decrementReferenceCount() { @@ -70,11 +69,11 @@ final class EndOfBrowseMarkerQueueMessageReference implements } public String getGroupID() { - throw new RuntimeException("not implemented"); + return null; } public int getGroupSequence() { - throw new RuntimeException("not implemented"); + return 0; } public Message getMessage() throws IOException { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java index dad84fb918..3fa7ee5d83 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java @@ -25,7 +25,7 @@ package org.apache.activemq.broker.region; */ public interface QueueMessageReference extends MessageReference { - public static final QueueMessageReference NULL_MESSAGE = new EndOfBrowseMarkerQueueMessageReference(); + public static final QueueMessageReference NULL_MESSAGE = new NullMessageReference(); public boolean isAcked(); diff --git a/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java index 2208ebfefe..8f15ad40fb 100644 --- a/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java @@ -21,14 +21,7 @@ import org.apache.activemq.spring.SpringConsumer; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import javax.jms.Connection; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Queue; -import javax.jms.Session; +import javax.jms.*; /** * @@ -60,11 +53,36 @@ public class ZeroPrefetchConsumerTest extends EmbeddedBrokerTestSupport { MessageProducer producer = session.createProducer(queue); producer.send(session.createTextMessage("Hello World!")); - + // now lets receive it MessageConsumer consumer = session.createConsumer(queue); Message answer = consumer.receive(5000); assertNotNull("Should have received a message!", answer); + // check if method will return at all and will return a null + answer = consumer.receive(1000); + assertNull("Should have not received a message!", answer); + answer = consumer.receiveNoWait(); + assertNull("Should have not received a message!", answer); + } + + public void testIdleConsumer() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Msg1")); + producer.send(session.createTextMessage("Msg2")); + + // now lets receive it + MessageConsumer consumer = session.createConsumer(queue); + //noinspection UNUSED_SYMBOL + MessageConsumer idleConsumer = session.createConsumer(queue); + TextMessage answer = (TextMessage) consumer.receive(5000); + assertEquals("Should have received a message!", answer.getText(), "Msg1"); + // this call would return null if prefetchSize > 0 + answer = (TextMessage) consumer.receive(5000); + assertEquals("Should have not received a message!", answer.getText(), "Msg2"); + answer = (TextMessage) consumer.receiveNoWait(); + assertNull("Should have not received a message!", answer); } protected void setUp() throws Exception {