diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 2273e7087d..7e25c2feee 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -34,6 +34,7 @@ import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessagePull; import org.apache.activemq.management.JMSConsumerStatsImpl; import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsImpl; @@ -323,6 +324,9 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC */ public void setMessageListener(MessageListener listener) throws JMSException { checkClosed(); + if (info.getPrefetchSize() == 0) { + throw new JMSException("Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1"); + } this.messageListener = listener; if (listener != null) { boolean wasRunning = session.isRunning(); @@ -411,6 +415,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC * this message consumer is concurrently closed */ public Message receive() throws JMSException { + sendPullCommand(); checkClosed(); checkMessageListener(); MessageDispatch md = dequeue(-1); @@ -455,6 +460,7 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC * closed */ public Message receive(long timeout) throws JMSException { + sendPullCommand(); checkClosed(); checkMessageListener(); if (timeout == 0) { @@ -586,6 +592,19 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC throw new IllegalStateException("The Consumer is closed"); } } + + /** + * If we have a zero prefetch specified then send a pull command to the broker to pull a message + * we are about to receive + * + */ + protected void sendPullCommand() throws JMSException { + if (info.getPrefetchSize() == 0) { + MessagePull messagePull = new MessagePull(); + messagePull.configure(info); + session.asyncSendPacket(messagePull); + } + } protected void checkMessageListener() throws JMSException { session.checkMessageListener(); diff --git a/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java b/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java index 92f04172ce..2f2d668c4b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java +++ b/activemq-core/src/main/java/org/apache/activemq/MessageDispatchChannel.java @@ -163,6 +163,8 @@ public class MessageDispatchChannel { } public String toString() { - return list.toString(); + synchronized(mutex) { + return list.toString(); + } } } diff --git a/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java b/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java index 44e91420cb..5671a7b174 100644 --- a/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java +++ b/activemq-core/src/main/java/org/apache/activemq/command/MessagePull.java @@ -42,6 +42,14 @@ public class MessagePull extends BaseCommand { return visitor.processMessagePull(this); } + /** + * Configures a message pull from the consumer information + */ + public void configure(ConsumerInfo info) { + setConsumerId(info.getConsumerId()); + setDestination(info.getDestination()); + } + /** * @openwire:property version=1 cache=true */ diff --git a/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java b/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java new file mode 100644 index 0000000000..aa1473dfe3 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/ZeroPrefetchConsumerTest.java @@ -0,0 +1,88 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.spring.SpringConsumer; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +/** + * + * @version $Revision$ + */ +public class ZeroPrefetchConsumerTest extends TestSupport { + + private static final Log log = LogFactory.getLog(ZeroPrefetchConsumerTest.class); + + protected Connection connection; + protected Queue queue; + + public void testCannotUseMessageListener() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + + MessageListener listener = new SpringConsumer(); + try { + consumer.setMessageListener(listener); + fail("Should have thrown JMSException as we cannot use MessageListener with zero prefetch"); + } + catch (JMSException e) { + log.info("Received expected exception : " + e); + } + } + + public void testPullConsumerWorks() throws Exception { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello World!")); + + // now lets receive it + MessageConsumer consumer = session.createConsumer(queue); + Message answer = consumer.receive(5000); + assertNotNull("Should have received a message!", answer); + } + + protected void setUp() throws Exception { + topic = false; + super.setUp(); + + connection = createConnection(); + connection.start(); + queue = createQueue(); + } + + protected void tearDown() throws Exception { + connection.close(); + super.tearDown(); + } + + protected Queue createQueue() { + return new ActiveMQQueue(getClass().getName() + "." + getName() + "?consumer.prefetchSize=0"); + } + +}