https://issues.apache.org/activemq/browse/AMQ-855 allow prefetch==0 to work with receive(timeout) and receiveNoWait()

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@439442 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-09-01 19:52:18 +00:00
parent e16114f99b
commit a19bfd4c9a
5 changed files with 100 additions and 41 deletions

View File

@ -381,6 +381,8 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} else { } else {
return null; return null;
} }
} else if ( md.getMessage()==null ) {
return null;
} else if (md.getMessage().isExpired()) { } else if (md.getMessage().isExpired()) {
if (log.isDebugEnabled()) { if (log.isDebugEnabled()) {
log.debug("Received expired message: " + md); log.debug("Received expired message: " + md);
@ -415,9 +417,10 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
* this message consumer is concurrently closed * this message consumer is concurrently closed
*/ */
public Message receive() throws JMSException { public Message receive() throws JMSException {
sendPullCommand();
checkClosed(); checkClosed();
checkMessageListener(); checkMessageListener();
sendPullCommand(-1);
MessageDispatch md = dequeue(-1); MessageDispatch md = dequeue(-1);
if (md == null) if (md == null)
return null; return null;
@ -454,13 +457,12 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
* expires, and the call blocks indefinitely. * expires, and the call blocks indefinitely.
* *
* @param timeout * @param timeout
* the timeout value (in milliseconds) * the timeout value (in milliseconds), a time out of zero never expires.
* @return the next message produced for this message consumer, or null if * @return the next message produced for this message consumer, or null if
* the timeout expires or this message consumer is concurrently * the timeout expires or this message consumer is concurrently
* closed * closed
*/ */
public Message receive(long timeout) throws JMSException { public Message receive(long timeout) throws JMSException {
sendPullCommand();
checkClosed(); checkClosed();
checkMessageListener(); checkMessageListener();
if (timeout == 0) { if (timeout == 0) {
@ -468,8 +470,16 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
} }
sendPullCommand(timeout);
while (timeout > 0) { while (timeout > 0) {
MessageDispatch md = dequeue(timeout);
MessageDispatch md;
if (info.getPrefetchSize() == 0) {
md = dequeue(-1); // We let the broker let us know when we timeout.
} else {
md = dequeue(timeout);
}
if (md == null) if (md == null)
return null; return null;
@ -492,7 +502,15 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
public Message receiveNoWait() throws JMSException { public Message receiveNoWait() throws JMSException {
checkClosed(); checkClosed();
checkMessageListener(); checkMessageListener();
MessageDispatch md = dequeue(0); sendPullCommand(-1);
MessageDispatch md;
if (info.getPrefetchSize() == 0) {
md = dequeue(-1); // We let the broker let us know when we timeout.
} else {
md = dequeue(0);
}
if (md == null) if (md == null)
return null; return null;
@ -598,10 +616,11 @@ public class ActiveMQMessageConsumer implements MessageAvailableConsumer, StatsC
* we are about to receive * we are about to receive
* *
*/ */
protected void sendPullCommand() throws JMSException { protected void sendPullCommand(long timeout) throws JMSException {
if (info.getPrefetchSize() == 0) { if (info.getPrefetchSize() == 0) {
MessagePull messagePull = new MessagePull(); MessagePull messagePull = new MessagePull();
messagePull.configure(info); messagePull.configure(info);
messagePull.setTimeout(timeout);
session.asyncSendPacket(messagePull); session.asyncSendPacket(messagePull);
} }
} }

View File

@ -25,7 +25,7 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
/** /**
* Only used by the {@link QueueMessageReference#END_OF_BROWSE_MARKER} * Only used by the {@link QueueMessageReference#NULL_MESSAGE}
*/ */
final class EndOfBrowseMarkerQueueMessageReference implements final class EndOfBrowseMarkerQueueMessageReference implements
QueueMessageReference { QueueMessageReference {

View File

@ -37,6 +37,7 @@ import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull; import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response; import org.apache.activemq.command.Response;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.BrokerSupport; import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -68,16 +69,51 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
/** /**
* Allows a message to be pulled on demand by a client * Allows a message to be pulled on demand by a client
*/ */
public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception { synchronized public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
if (getPrefetchSize() == 0) { // The slave should not deliver pull messages. TODO: when the slave becomes a master,
// He should send a NULL message to all the consumers to 'wake them up' in case
// they were waiting for a message.
if (getPrefetchSize() == 0 && !isSlaveBroker()) {
prefetchExtension++; prefetchExtension++;
dispatchMatched();
// TODO it might be nice one day to actually return the message itself final long dispatchCounterBeforePull = dispatchCounter;
dispatchMatched();
// If there was nothing dispatched.. we may need to setup a timeout.
if( dispatchCounterBeforePull == dispatchCounter ) {
// imediate timeout used by receiveNoWait()
if( pull.getTimeout() == -1 ) {
// Send a NULL message.
add(QueueMessageReference.NULL_MESSAGE);
dispatchMatched();
}
if( pull.getTimeout() > 0 ) {
Scheduler.executeAfterDelay(new Runnable(){
public void run() {
pullTimeout(dispatchCounterBeforePull);
}
}, pull.getTimeout());
}
}
} }
return null; return null;
} }
/**
* Occurs when a pull times out. If nothing has been dispatched
* since the timeout was setup, then send the NULL message.
*/
synchronized private void pullTimeout(long dispatchCounterBeforePull) {
if( dispatchCounterBeforePull == dispatchCounter ) {
try {
add(QueueMessageReference.NULL_MESSAGE);
dispatchMatched();
} catch (Exception e) {
context.getConnection().serviceException(e);
}
}
}
synchronized public void add(MessageReference node) throws Exception{ synchronized public void add(MessageReference node) throws Exception{
enqueueCounter++; enqueueCounter++;
if(!isFull()){ if(!isFull()){
@ -311,9 +347,17 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
} }
// Make sure we can dispatch a message. // Make sure we can dispatch a message.
if(canDispatch(node)&&!isSlaveBroker()){ if(canDispatch(node)&&!isSlaveBroker()){
dispatchCounter++;
MessageDispatch md=createMessageDispatch(node,message); MessageDispatch md=createMessageDispatch(node,message);
dispatched.addLast(node);
// NULL messages don't count... they don't get Acked.
if( node != QueueMessageReference.NULL_MESSAGE ) {
dispatchCounter++;
dispatched.addLast(node);
} else {
prefetchExtension=Math.max(0,prefetchExtension-1);
}
if(info.isDispatchAsync()){ if(info.isDispatchAsync()){
md.setConsumer(new Runnable(){ md.setConsumer(new Runnable(){
public void run(){ public void run(){
@ -335,8 +379,10 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
synchronized protected void onDispatch(final MessageReference node,final Message message){ synchronized protected void onDispatch(final MessageReference node,final Message message){
if(node.getRegionDestination()!=null){ if(node.getRegionDestination()!=null){
node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message); if( node != QueueMessageReference.NULL_MESSAGE ) {
context.getConnection().getStatistics().onMessageDequeue(message); node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
context.getConnection().getStatistics().onMessageDequeue(message);
}
try{ try{
dispatchMatched(); dispatchMatched();
}catch(IOException e){ }catch(IOException e){
@ -365,12 +411,20 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
* @return * @return
*/ */
protected MessageDispatch createMessageDispatch(MessageReference node,Message message){ protected MessageDispatch createMessageDispatch(MessageReference node,Message message){
MessageDispatch md=new MessageDispatch(); if( node == QueueMessageReference.NULL_MESSAGE ) {
md.setConsumerId(info.getConsumerId()); MessageDispatch md = new MessageDispatch();
md.setDestination(node.getRegionDestination().getActiveMQDestination()); md.setMessage(null);
md.setMessage(message); md.setConsumerId( info.getConsumerId() );
md.setRedeliveryCounter(node.getRedeliveryCounter()); md.setDestination( null );
return md; return md;
} else {
MessageDispatch md=new MessageDispatch();
md.setConsumerId(info.getConsumerId());
md.setDestination(node.getRegionDestination().getActiveMQDestination());
md.setMessage(message);
md.setRedeliveryCounter(node.getRedeliveryCounter());
return md;
}
} }
/** /**

View File

@ -17,16 +17,14 @@
*/ */
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import javax.jms.InvalidSelectorException;
import java.io.IOException; import java.io.IOException;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
public class QueueBrowserSubscription extends QueueSubscription { public class QueueBrowserSubscription extends QueueSubscription {
@ -53,19 +51,7 @@ public class QueueBrowserSubscription extends QueueSubscription {
public void browseDone() throws Exception { public void browseDone() throws Exception {
browseDone = true; browseDone = true;
add(QueueMessageReference.END_OF_BROWSE_MARKER); add(QueueMessageReference.NULL_MESSAGE);
}
protected MessageDispatch createMessageDispatch(MessageReference node, Message message) {
if( node == QueueMessageReference.END_OF_BROWSE_MARKER ) {
MessageDispatch md = new MessageDispatch();
md.setMessage(null);
md.setConsumerId( info.getConsumerId() );
md.setDestination( null );
return md;
} else {
return super.createMessageDispatch(node, message);
}
} }
public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException { public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {

View File

@ -25,7 +25,7 @@ package org.apache.activemq.broker.region;
*/ */
public interface QueueMessageReference extends MessageReference { public interface QueueMessageReference extends MessageReference {
public static final QueueMessageReference END_OF_BROWSE_MARKER = new EndOfBrowseMarkerQueueMessageReference(); public static final QueueMessageReference NULL_MESSAGE = new EndOfBrowseMarkerQueueMessageReference();
public boolean isAcked(); public boolean isAcked();