added some early support for AMQ-855 to allow pure pull based consumption - adding a MessagePull command so that a client can pull messages on demand

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@430445 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
James Strachan 2006-08-10 17:17:19 +00:00
parent c3fae7767d
commit 9d671b70aa
16 changed files with 188 additions and 2 deletions

View File

@ -44,6 +44,7 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.RemoveSubscriptionInfo;
@ -384,6 +385,10 @@ public abstract class AbstractConnection implements Service, Connection, Task, C
return null; return null;
} }
public Response processMessagePull(MessagePull pull) throws Exception {
return broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(), pull);
}
public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception{ public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception{
broker.processDispatchNotification(notification); broker.processDispatchNotification(notification);
return null; return null;

View File

@ -28,7 +28,9 @@ import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.DestinationInfo; import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;

View File

@ -29,8 +29,10 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
@ -70,6 +72,10 @@ public class BrokerFilter implements Broker {
next.acknowledge(context, ack); next.acknowledge(context, ack);
} }
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
return next.messagePull(context, pull);
}
public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception { public void addConnection(ConnectionContext context, ConnectionInfo info) throws Exception {
next.addConnection(context, info); next.addConnection(context, info);
} }

View File

@ -29,8 +29,10 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
@ -223,4 +225,9 @@ public class EmptyBroker implements Broker {
public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { public void setAdminConnectionContext(ConnectionContext adminConnectionContext) {
} }
public Response messagePull(ConnectionContext context, MessagePull pull) {
return null;
}
} }

View File

@ -33,8 +33,10 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
@ -224,4 +226,8 @@ public class ErrorBroker implements Broker {
throw new BrokerStoppedException(this.message); throw new BrokerStoppedException(this.message);
} }
public Response messagePull(ConnectionContext context, MessagePull pull) {
throw new BrokerStoppedException(this.message);
}
} }

View File

@ -29,8 +29,10 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
@ -238,4 +240,8 @@ public class MutableBrokerFilter implements Broker {
getNext().setAdminConnectionContext(adminConnectionContext); getNext().setAdminConnectionContext(adminConnectionContext);
} }
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
return getNext().messagePull(context, pull);
}
} }

View File

@ -30,7 +30,9 @@ import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.DestinationMap; import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.memory.UsageManager; import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
@ -254,12 +256,17 @@ abstract public class AbstractRegion implements Region {
} }
public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception { public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
Subscription sub = (Subscription) subscriptions.get(ack.getConsumerId()); Subscription sub = (Subscription) subscriptions.get(ack.getConsumerId());
if( sub==null ) if( sub==null )
throw new IllegalArgumentException("The subscription does not exist: "+ack.getConsumerId()); throw new IllegalArgumentException("The subscription does not exist: "+ack.getConsumerId());
sub.acknowledge(context, ack); sub.acknowledge(context, ack);
}
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
Subscription sub = (Subscription) subscriptions.get(pull.getConsumerId());
if( sub==null )
throw new IllegalArgumentException("The subscription does not exist: "+pull.getConsumerId());
return sub.pullMessage(context, pull);
} }
protected Destination lookup(ConnectionContext context, ActiveMQDestination destination) throws Exception { protected Destination lookup(ConnectionContext context, ActiveMQDestination destination) throws Exception {

View File

@ -35,6 +35,8 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.util.BrokerSupport; import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -62,6 +64,20 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
super(broker,context,info); super(broker,context,info);
} }
/**
* Allows a message to be pulled on demand by a client
*/
public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
if (getPrefetchSize() == 0) {
prefetchExtension++;
dispatchMatched();
// TODO it might be nice one day to actually return the message itself
}
return null;
}
synchronized public void add(MessageReference node) throws Exception{ synchronized public void add(MessageReference node) throws Exception{
enqueueCounter++; enqueueCounter++;
if(!isFull()){ if(!isFull()){

View File

@ -24,7 +24,9 @@ import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -108,6 +110,11 @@ public interface Region extends Service {
*/ */
public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception; public void acknowledge(ConnectionContext context, MessageAck ack) throws Exception;
/**
* Allows a consumer to pull a message from a queue
*/
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception;
/** /**
* Process a notification of a dispatch - used by a Slave Broker * Process a notification of a dispatch - used by a Slave Broker
* @param messageDispatchNotification * @param messageDispatchNotification

View File

@ -36,8 +36,10 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo; import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId; import org.apache.activemq.command.TransactionId;
import org.apache.activemq.memory.UsageManager; import org.apache.activemq.memory.UsageManager;
@ -400,6 +402,26 @@ public class RegionBroker implements Broker {
} }
} }
public Response messagePull(ConnectionContext context, MessagePull pull) throws Exception {
ActiveMQDestination destination = pull.getDestination();
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
return queueRegion.messagePull(context, pull);
case ActiveMQDestination.TOPIC_TYPE:
return topicRegion.messagePull(context, pull);
case ActiveMQDestination.TEMP_QUEUE_TYPE:
return tempQueueRegion.messagePull(context, pull);
case ActiveMQDestination.TEMP_TOPIC_TYPE:
return tempTopicRegion.messagePull(context, pull);
default:
throw createUnknownDestinationTypeException(destination);
}
}
public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception { public TransactionId[] getPreparedTransactions(ConnectionContext context) throws Exception {
throw new IllegalAccessException("Transaction operation not implemented by this broker."); throw new IllegalAccessException("Transaction operation not implemented by this broker.");
} }

View File

@ -24,6 +24,8 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
@ -50,6 +52,12 @@ public interface Subscription {
*/ */
void acknowledge(ConnectionContext context, final MessageAck ack) throws Exception; void acknowledge(ConnectionContext context, final MessageAck ack) throws Exception;
/**
* Allows a consumer to pull a message on demand
*/
Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception;
/** /**
* Is the subscription interested in the message? * Is the subscription interested in the message?
* @param node * @param node

View File

@ -36,6 +36,8 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.memory.UsageManager; import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transaction.Synchronization;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -181,6 +183,11 @@ public class TopicSubscription extends AbstractSubscription{
throw new JMSException("Invalid acknowledgment: "+ack); throw new JMSException("Invalid acknowledgment: "+ack);
} }
public Response pullMessage(ConnectionContext context, MessagePull pull) throws Exception {
// not supported for topics
return null;
}
public int getPendingQueueSize(){ public int getPendingQueueSize(){
return matched(); return matched();
} }

View File

@ -57,6 +57,7 @@ public interface CommandTypes {
// and the server. // and the server.
// //
/////////////////////////////////////////////////// ///////////////////////////////////////////////////
byte MESSAGE_PULL = 20;
byte MESSAGE_DISPATCH = 21; byte MESSAGE_DISPATCH = 21;
byte MESSAGE_ACK = 22; byte MESSAGE_ACK = 22;

View File

@ -0,0 +1,78 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.command;
import org.apache.activemq.state.CommandVisitor;
/**
* Used to pull messages on demand.
*
* @openwire:marshaller code="20"
*
* @version $Revision$
*/
public class MessagePull extends BaseCommand {
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_PULL;
protected ConsumerId consumerId;
protected ActiveMQDestination destination;
protected long timeout;
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
}
public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processMessagePull(this);
}
/**
* @openwire:property version=1 cache=true
*/
public ConsumerId getConsumerId() {
return consumerId;
}
public void setConsumerId(ConsumerId consumerId) {
this.consumerId = consumerId;
}
/**
* @openwire:property version=1 cache=true
*/
public ActiveMQDestination getDestination() {
return destination;
}
public void setDestination(ActiveMQDestination destination) {
this.destination = destination;
}
/**
* @openwire:property version=1
*/
public long getTimeout() {
return timeout;
}
public void setTimeout(long timeout) {
this.timeout = timeout;
}
}

View File

@ -28,6 +28,7 @@ import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.RemoveSubscriptionInfo;
@ -56,6 +57,7 @@ public interface CommandVisitor {
Response processMessage(Message send) throws Exception; Response processMessage(Message send) throws Exception;
Response processMessageAck(MessageAck ack) throws Exception; Response processMessageAck(MessageAck ack) throws Exception;
Response processMessagePull(MessagePull pull) throws Exception;
Response processBeginTransaction(TransactionInfo info) throws Exception; Response processBeginTransaction(TransactionInfo info) throws Exception;
Response processPrepareTransaction(TransactionInfo info) throws Exception; Response processPrepareTransaction(TransactionInfo info) throws Exception;

View File

@ -32,6 +32,7 @@ import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatchNotification; import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.ProducerId; import org.apache.activemq.command.ProducerId;
import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo; import org.apache.activemq.command.RemoveSubscriptionInfo;
@ -278,6 +279,10 @@ public class ConnectionStateTracker implements CommandVisitor {
public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception{ public Response processMessageDispatchNotification(MessageDispatchNotification notification) throws Exception{
return null; return null;
} }
public Response processMessagePull(MessagePull pull) throws Exception {
return null;
}
public boolean isRestoreConsumers() { public boolean isRestoreConsumers() {
return restoreConsumers; return restoreConsumers;
@ -302,4 +307,5 @@ public class ConnectionStateTracker implements CommandVisitor {
public void setRestoreSessions(boolean restoreSessions) { public void setRestoreSessions(boolean restoreSessions) {
this.restoreSessions = restoreSessions; this.restoreSessions = restoreSessions;
} }
} }