diff --git a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java index 7e25c2feee..d6fa90c9fc 100755 --- a/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java +++ b/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java @@ -381,6 +381,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC } else { return null; } + } else if ( md.getMessage()==null ) { + return null; } else if (md.getMessage().isExpired()) { if (log.isDebugEnabled()) { log.debug("Received expired message: " + md); @@ -415,9 +417,10 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC * this message consumer is concurrently closed */ public Message receive() throws JMSException { - sendPullCommand(); checkClosed(); checkMessageListener(); + + sendPullCommand(-1); MessageDispatch md = dequeue(-1); if (md == null) return null; @@ -454,22 +457,29 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC * expires, and the call blocks indefinitely. * * @param timeout - * the timeout value (in milliseconds) + * the timeout value (in milliseconds), a time out of zero never expires. * @return the next message produced for this message consumer, or null if * the timeout expires or this message consumer is concurrently * closed */ public Message receive(long timeout) throws JMSException { - sendPullCommand(); checkClosed(); checkMessageListener(); if (timeout == 0) { return this.receive(); } - + + sendPullCommand(timeout); while (timeout > 0) { - MessageDispatch md = dequeue(timeout); + + MessageDispatch md; + if (info.getPrefetchSize() == 0) { + md = dequeue(-1); // We let the broker let us know when we timeout. + } else { + md = dequeue(timeout); + } + if (md == null) return null; @@ -492,7 +502,15 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC public Message receiveNoWait() throws JMSException { checkClosed(); checkMessageListener(); - MessageDispatch md = dequeue(0); + sendPullCommand(-1); + + MessageDispatch md; + if (info.getPrefetchSize() == 0) { + md = dequeue(-1); // We let the broker let us know when we timeout. + } else { + md = dequeue(0); + } + if (md == null) return null; @@ -598,10 +616,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC * we are about to receive * */ - protected void sendPullCommand() throws JMSException { + protected void sendPullCommand(long timeout) throws JMSException { if (info.getPrefetchSize() == 0) { MessagePull messagePull = new MessagePull(); messagePull.configure(info); + messagePull.setTimeout(timeout); session.asyncSendPacket(messagePull); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java index 14d5425414..9a5d2730b6 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/EndOfBrowseMarkerQueueMessageReference.java @@ -25,7 +25,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; /** - * Only used by the {@link QueueMessageReference#END_OF_BROWSE_MARKER} + * Only used by the {@link QueueMessageReference#NULL_MESSAGE} */ final class EndOfBrowseMarkerQueueMessageReference implements QueueMessageReference { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 5cd6d6eace..d4b4905dcd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -37,6 +37,7 @@ import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.Response; +import org.apache.activemq.thread.Scheduler; import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.util.BrokerSupport; import org.apache.commons.logging.Log; @@ -58,7 +59,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ long enqueueCounter; long dispatchCounter; long dequeueCounter; - + public PrefetchSubscription(Broker broker,ConnectionContext context,ConsumerInfo info) throws InvalidSelectorException{ super(broker,context,info); @@ -68,16 +69,51 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ /** * Allows a message to be pulled on demand by a client */ - public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception { - if (getPrefetchSize() == 0) { + synchronized public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception { + // The slave should not deliver pull messages. TODO: when the slave becomes a master, + // He should send a NULL message to all the consumers to 'wake them up' in case + // they were waiting for a message. + if (getPrefetchSize() == 0 && !isSlaveBroker()) { prefetchExtension++; - dispatchMatched(); - // TODO it might be nice one day to actually return the message itself + final long dispatchCounterBeforePull = dispatchCounter; + dispatchMatched(); + + // If there was nothing dispatched.. we may need to setup a timeout. + if( dispatchCounterBeforePull == dispatchCounter ) { + // imediate timeout used by receiveNoWait() + if( pull.getTimeout() == -1 ) { + // Send a NULL message. + add(QueueMessageReference.NULL_MESSAGE); + dispatchMatched(); + } + if( pull.getTimeout() > 0 ) { + Scheduler.executeAfterDelay(new Runnable(){ + public void run() { + pullTimeout(dispatchCounterBeforePull); + } + }, pull.getTimeout()); + } + } } return null; } + /** + * Occurs when a pull times out. If nothing has been dispatched + * since the timeout was setup, then send the NULL message. + */ + synchronized private void pullTimeout(long dispatchCounterBeforePull) { + if( dispatchCounterBeforePull == dispatchCounter ) { + try { + add(QueueMessageReference.NULL_MESSAGE); + dispatchMatched(); + } catch (Exception e) { + context.getConnection().serviceException(e); + } + } + } + synchronized public void add(MessageReference node) throws Exception{ enqueueCounter++; if(!isFull()){ @@ -311,9 +347,17 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ } // Make sure we can dispatch a message. if(canDispatch(node)&&!isSlaveBroker()){ - dispatchCounter++; + MessageDispatch md=createMessageDispatch(node,message); - dispatched.addLast(node); + + // NULL messages don't count... they don't get Acked. + if( node != QueueMessageReference.NULL_MESSAGE ) { + dispatchCounter++; + dispatched.addLast(node); + } else { + prefetchExtension=Math.max(0,prefetchExtension-1); + } + if(info.isDispatchAsync()){ md.setConsumer(new Runnable(){ public void run(){ @@ -335,8 +379,10 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ synchronized protected void onDispatch(final MessageReference node,final Message message){ if(node.getRegionDestination()!=null){ - node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message); - context.getConnection().getStatistics().onMessageDequeue(message); + if( node != QueueMessageReference.NULL_MESSAGE ) { + node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message); + context.getConnection().getStatistics().onMessageDequeue(message); + } try{ dispatchMatched(); }catch(IOException e){ @@ -365,12 +411,20 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ * @return */ protected MessageDispatch createMessageDispatch(MessageReference node,Message message){ - MessageDispatch md=new MessageDispatch(); - md.setConsumerId(info.getConsumerId()); - md.setDestination(node.getRegionDestination().getActiveMQDestination()); - md.setMessage(message); - md.setRedeliveryCounter(node.getRedeliveryCounter()); - return md; + if( node == QueueMessageReference.NULL_MESSAGE ) { + MessageDispatch md = new MessageDispatch(); + md.setMessage(null); + md.setConsumerId( info.getConsumerId() ); + md.setDestination( null ); + return md; + } else { + MessageDispatch md=new MessageDispatch(); + md.setConsumerId(info.getConsumerId()); + md.setDestination(node.getRegionDestination().getActiveMQDestination()); + md.setMessage(message); + md.setRedeliveryCounter(node.getRedeliveryCounter()); + return md; + } } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java index 70dbab1849..7062984582 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java @@ -17,16 +17,14 @@ */ package org.apache.activemq.broker.region; -import javax.jms.InvalidSelectorException; - import java.io.IOException; +import javax.jms.InvalidSelectorException; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.filter.MessageEvaluationContext; public class QueueBrowserSubscription extends QueueSubscription { @@ -53,19 +51,7 @@ public class QueueBrowserSubscription extends QueueSubscription { public void browseDone() throws Exception { browseDone = true; - add(QueueMessageReference.END_OF_BROWSE_MARKER); - } - - protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { - if( node == QueueMessageReference.END_OF_BROWSE_MARKER ) { - MessageDispatch md = new MessageDispatch(); - md.setMessage(null); - md.setConsumerId( info.getConsumerId() ); - md.setDestination( null ); - return md; - } else { - return super.createMessageDispatch(node, message); - } + add(QueueMessageReference.NULL_MESSAGE); } public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java index 5910913412..dad84fb918 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueMessageReference.java @@ -25,7 +25,7 @@ package org.apache.activemq.broker.region; */ public interface QueueMessageReference extends MessageReference { - public static final QueueMessageReference END_OF_BROWSE_MARKER = new EndOfBrowseMarkerQueueMessageReference(); + public static final QueueMessageReference NULL_MESSAGE = new EndOfBrowseMarkerQueueMessageReference(); public boolean isAcked();