Updated support for configurable Cursor types from the Destination Policy Map

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@500862 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-01-28 19:39:02 +00:00
parent f0d9464da0
commit 0c0be94a31
29 changed files with 495 additions and 242 deletions

View File

@ -254,20 +254,6 @@ public interface Broker extends Region, Service {
* @param adminConnectionContext
*/
public abstract void setAdminConnectionContext(ConnectionContext adminConnectionContext);
/**
* @return the pendingDurableSubscriberPolicy
*/
public abstract PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy();
/**
* @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set
*/
public abstract void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy);
/**
* @return the broker's temp data store
* @throws Exception
*/
public Store getTempDataStore();
}

View File

@ -234,15 +234,6 @@ public class BrokerFilter implements Broker {
next.setAdminConnectionContext(adminConnectionContext);
}
public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
return next.getPendingDurableSubscriberPolicy();
}
public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
next.setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy);
}
public Store getTempDataStore() {
return next.getTempDataStore();
}

View File

@ -55,7 +55,6 @@ import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.broker.region.virtual.VirtualTopic;
@ -149,8 +148,6 @@ public class BrokerService implements Service, Serializable {
private Store tempDataStore;
private int persistenceThreadPriority = Thread.MAX_PRIORITY;
private boolean useLocalHostBrokerName = false;
//private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new VMPendingDurableSubscriberMessageStoragePolicy();
private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new StorePendingDurableSubscriberMessageStoragePolicy();
@ -1008,24 +1005,7 @@ public class BrokerService implements Service, Serializable {
public void setPersistenceThreadPriority(int persistenceThreadPriority){
this.persistenceThreadPriority=persistenceThreadPriority;
}
/**
* @return the pendingDurableSubscriberPolicy
*/
public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy(){
return this.pendingDurableSubscriberPolicy;
}
/**
* @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set
*/
public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy){
this.pendingDurableSubscriberPolicy=pendingDurableSubscriberPolicy;
if (broker != null) {
broker.setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy);
}
}
/**
* @return the useLocalHostBrokerName
*/
@ -1296,7 +1276,6 @@ public class BrokerService implements Service, Serializable {
regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
regionBroker.setBrokerName(getBrokerName());
regionBroker.setPendingDurableSubscriberPolicy(getPendingDurableSubscriberPolicy());
return regionBroker;
}

View File

@ -232,12 +232,6 @@ public class EmptyBroker implements Broker {
return null;
}
public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
return null;
}
public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
}
public Store getTempDataStore() {
return null;

View File

@ -231,15 +231,7 @@ public class ErrorBroker implements Broker {
public Response messagePull(ConnectionContext context, MessagePull pull) {
throw new BrokerStoppedException(this.message);
}
public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
throw new BrokerStoppedException(this.message);
}
public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
throw new BrokerStoppedException(this.message);
}
public Store getTempDataStore() {
throw new BrokerStoppedException(this.message);
}

View File

@ -246,14 +246,6 @@ public class MutableBrokerFilter implements Broker {
return getNext().messagePull(context, pull);
}
public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() {
return getNext().getPendingDurableSubscriberPolicy();
}
public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) {
getNext().setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy);
}
public Store getTempDataStore() {
return getNext().getTempDataStore();
}

View File

@ -24,10 +24,12 @@ import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -40,8 +42,8 @@ public class DurableTopicSubscription extends PrefetchSubscription {
private final boolean keepDurableSubsActive;
private boolean active=false;
public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive,PendingMessageCursor cursor) throws InvalidSelectorException {
super(broker,context,info,cursor);
public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException {
super(broker,context,info,new StoreDurableSubscriberCursor(context.getClientId(),info.getSubscriptionName(),broker.getTempDataStore(),info.getPrefetchSize()));
this.keepDurableSubsActive = keepDurableSubsActive;
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
}
@ -70,7 +72,7 @@ public class DurableTopicSubscription extends PrefetchSubscription {
dispatchMatched();
}
public void activate(ConnectionContext context, ConsumerInfo info) throws Exception {
public void activate(UsageManager memoryManager,ConnectionContext context, ConsumerInfo info) throws Exception {
log.debug("Deactivating " + this);
if( !active ) {
this.active = true;
@ -83,6 +85,7 @@ public class DurableTopicSubscription extends PrefetchSubscription {
}
}
synchronized(pending) {
pending.setUsageManager(memoryManager);
pending.start();
}
//If nothing was in the persistent store, then try to use the recovery policy.

View File

@ -49,7 +49,7 @@ import org.apache.commons.logging.LogFactory;
abstract public class PrefetchSubscription extends AbstractSubscription{
static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
final protected PendingMessageCursor pending;
protected PendingMessageCursor pending;
final protected LinkedList dispatched=new LinkedList();
protected int prefetchExtension=0;
protected long enqueueCounter;
@ -342,6 +342,17 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
public boolean isRecoveryRequired(){
return pending.isRecoveryRequired();
}
public PendingMessageCursor getPending(){
return this.pending;
}
public void setPending(PendingMessageCursor pending){
this.pending=pending;
}
/**
* optimize message consumer prefetch if the consumer supports it
@ -506,4 +517,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
protected void acknowledge(ConnectionContext context,final MessageAck ack,final MessageReference node)
throws IOException{
}
}

View File

@ -22,10 +22,9 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreQueueCursor;
@ -59,8 +58,6 @@ import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* The Queue is a List of MessageEntry objects that are dispatched to matching
* subscriptions.

View File

@ -95,7 +95,7 @@ public class RegionBroker implements Broker {
private ConnectionContext adminConnectionContext;
protected DestinationFactory destinationFactory;
protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap();
private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new VMPendingDurableSubscriberMessageStoragePolicy();
public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException {
this.brokerService = brokerService;
@ -588,15 +588,4 @@ public class RegionBroker implements Broker {
public Store getTempDataStore() {
return brokerService.getTempDataStore();
}
/**
* @return the pendingDurableSubscriberPolicy
*/
public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy(){
return this.pendingDurableSubscriberPolicy;
}
public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy durableSubscriberCursor){
this.pendingDurableSubscriberPolicy=durableSubscriberCursor;
}
}

View File

@ -1,51 +1,70 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region;
import javax.jms.JMSException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
*
* @version $Revision: 1.7 $
*/
public class TempTopicRegion extends AbstractRegion {
public class TempTopicRegion extends AbstractRegion{
public TempTopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
super(broker,destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
private static final Log log=LogFactory.getLog(TempTopicRegion.class);
public TempTopicRegion(RegionBroker broker,DestinationStatistics destinationStatistics,UsageManager memoryManager,
TaskRunnerFactory taskRunnerFactory,DestinationFactory destinationFactory){
super(broker,destinationStatistics,memoryManager,taskRunnerFactory,destinationFactory);
setAutoCreateDestinations(false);
}
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
if( info.isDurable() ) {
protected Subscription createSubscription(ConnectionContext context,ConsumerInfo info) throws JMSException{
if(info.isDurable()){
throw new JMSException("A durable subscription cannot be created for a temporary topic.");
} else {
return new TopicSubscription(broker,context, info, this.memoryManager);
}
try{
TopicSubscription answer=new TopicSubscription(broker,context,info,memoryManager);
// lets configure the subscription depending on the destination
ActiveMQDestination destination=info.getDestination();
if(destination!=null&&broker.getDestinationPolicy()!=null){
PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination);
if(entry!=null){
entry.configure(broker,memoryManager,answer);
}
}
answer.init();
return answer;
}catch(Exception e){
log.error("Failed to create TopicSubscription ",e);
JMSException jmsEx=new JMSException("Couldn't create TopicSubscription");
jmsEx.setLinkedException(e);
throw jmsEx;
}
}
public String toString() {
return "TempTopicRegion: destinations="+destinations.size()+", subscriptions="+subscriptions.size()+", memory="+memoryManager.getPercentUsage()+"%";
public String toString(){
return "TempTopicRegion: destinations="+destinations.size()+", subscriptions="+subscriptions.size()+", memory="
+memoryManager.getPercentUsage()+"%";
}
}

View File

@ -104,7 +104,7 @@ public class TopicRegion extends AbstractRegion {
+" subscriberName: "+key.getSubscriptionName());
}
}
sub.activate(context,info);
sub.activate(memoryManager,context,info);
return sub;
}else{
return super.addConsumer(context,info);
@ -208,38 +208,45 @@ public class TopicRegion extends AbstractRegion {
}
}
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws JMSException {
if (info.isDurable()) {
if (AdvisorySupport.isAdvisoryTopic(info.getDestination())){
protected Subscription createSubscription(ConnectionContext context,ConsumerInfo info) throws JMSException{
if(info.isDurable()){
if(AdvisorySupport.isAdvisoryTopic(info.getDestination())){
throw new JMSException("Cannot create a durable subscription for an advisory Topic");
}
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
SubscriptionKey key=new SubscriptionKey(context.getClientId(),info.getSubscriptionName());
DurableTopicSubscription sub=(DurableTopicSubscription)durableSubscriptions.get(key);
if(sub==null){
PendingMessageCursor cursor=broker.getPendingDurableSubscriberPolicy().getSubscriberPendingMessageCursor(
context.getClientId(),info.getSubscriptionName(),broker.getTempDataStore(),
info.getPrefetchSize());
cursor.setUsageManager(memoryManager);
sub=new DurableTopicSubscription(broker,context,info,keepDurableSubsActive,cursor);
sub=new DurableTopicSubscription(broker,context,info,keepDurableSubsActive);
ActiveMQDestination destination=info.getDestination();
if(destination!=null&&broker.getDestinationPolicy()!=null){
PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination);
if(entry!=null){
entry.configure(broker,memoryManager,sub);
}
}
durableSubscriptions.put(key,sub);
}
else {
}else{
throw new JMSException("That durable subscription is already active.");
}
return sub;
}
else {
TopicSubscription answer = new TopicSubscription(broker,context, info, memoryManager);
try{
TopicSubscription answer=new TopicSubscription(broker,context,info,memoryManager);
// lets configure the subscription depending on the destination
ActiveMQDestination destination = info.getDestination();
if (destination != null && broker.getDestinationPolicy() != null) {
PolicyEntry entry = broker.getDestinationPolicy().getEntryFor(destination);
if (entry != null) {
entry.configure(answer);
ActiveMQDestination destination=info.getDestination();
if(destination!=null&&broker.getDestinationPolicy()!=null){
PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination);
if(entry!=null){
entry.configure(broker,memoryManager,answer);
}
}
answer.init();
return answer;
}catch(Exception e){
log.error("Failed to create TopicSubscription ",e);
JMSException jmsEx=new JMSException("Couldn't create TopicSubscription");
jmsEx.setLinkedException(e);
throw jmsEx;
}
}

View File

@ -22,6 +22,7 @@ import javax.jms.JMSException;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.policy.MessageEvictionStrategy;
import org.apache.activemq.broker.region.policy.OldestMessageEvictionStrategy;
import org.apache.activemq.command.ConsumerControl;
@ -41,7 +42,7 @@ public class TopicSubscription extends AbstractSubscription{
private static final Log log=LogFactory.getLog(TopicSubscription.class);
private static final AtomicLong cursorNameCounter=new AtomicLong(0);
final protected FilePendingMessageCursor matched;
protected PendingMessageCursor matched;
final protected UsageManager usageManager;
protected AtomicLong dispatched=new AtomicLong();
protected AtomicLong delivered=new AtomicLong();
@ -56,17 +57,21 @@ public class TopicSubscription extends AbstractSubscription{
private int memoryUsageHighWaterMark=95;
public TopicSubscription(Broker broker,ConnectionContext context,ConsumerInfo info,UsageManager usageManager)
throws InvalidSelectorException{
throws Exception{
super(broker,context,info);
this.usageManager=usageManager;
String matchedName="TopicSubscription:"+cursorNameCounter.getAndIncrement()+"["+info.getConsumerId().toString()
+"]";
this.matched=new FilePendingMessageCursor(matchedName,broker.getTempDataStore());
}
public void init() throws Exception {
this.matched.setUsageManager(usageManager);
this.matched.start();
}
public void add(MessageReference node) throws InterruptedException,IOException{
public void add(MessageReference node) throws Exception{
enqueueCounter.incrementAndGet();
node.incrementReferenceCount();
if(!isFull()&&!isSlaveBroker()){
@ -309,6 +314,20 @@ public class TopicSubscription extends AbstractSubscription{
public UsageManager getUsageManager(){
return this.usageManager;
}
/**
* @return the matched
*/
public PendingMessageCursor getMatched(){
return this.matched;
}
/**
* @param matched the matched to set
*/
public void setMatched(PendingMessageCursor matched){
this.matched=matched;
}
/**
* inform the MessageConsumer on the client to change it's prefetch
@ -402,7 +421,14 @@ public class TopicSubscription extends AbstractSubscription{
public void destroy(){
synchronized(matchedListMutex){
matched.destroy();
try{
matched.destroy();
}catch(Exception e){
log.warn("Failed to destroy cursor",e);
}
}
}
}

View File

@ -14,6 +14,7 @@
package org.apache.activemq.broker.region.cursors;
import java.util.LinkedList;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
@ -150,4 +151,21 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor{
public UsageManager getUsageManager(){
return this.usageManager;
}
/**
* destroy the cursor
* @throws Exception
*/
public void destroy() throws Exception {
stop();
}
/**
* Page in a restricted number of messages
* @param maxItems
* @return a list of paged in messages
*/
public LinkedList pageInList(int maxItems) {
throw new RuntimeException("Not supported");
}
}

View File

@ -18,6 +18,7 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.Message;
@ -39,6 +40,7 @@ import org.apache.commons.logging.LogFactory;
public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener{
static private final Log log=LogFactory.getLog(FilePendingMessageCursor.class);
static private final AtomicLong nameCount = new AtomicLong();
private Store store;
private String name;
private LinkedList memoryList=new LinkedList();
@ -54,7 +56,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
* @param store
*/
public FilePendingMessageCursor(String name,Store store){
this.name=name;
this.name=nameCount.incrementAndGet() + "_"+name;
this.store=store;
}

View File

@ -14,6 +14,7 @@
package org.apache.activemq.broker.region.cursors;
import java.io.IOException;
import java.util.LinkedList;
import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
@ -189,5 +190,18 @@ public interface PendingMessageCursor extends Service{
*/
public boolean hasMessagesBufferedToDeliver();
/**
* destroy the cursor
* @throws Exception
*/
public void destroy() throws Exception;
/**
* Page in a restricted number of messages
* @param maxItems
* @return a list of paged in messages
*/
public LinkedList pageInList(int maxItems);
}

View File

@ -103,4 +103,13 @@ public class VMPendingMessageCursor extends AbstractPendingMessageCursor{
}
}
}
/**
* Page in a restricted number of messages
* @param maxItems
* @return a list of paged in messages
*/
public LinkedList pageInList(int maxItems) {
return list;
}
}

View File

@ -23,7 +23,7 @@ import org.apache.activemq.kaha.Store;
/**
* Creates a FilePendingMessageCursor
* *
* @org.apache.xbean.XBean element="fileCursor" description="Pending messages paged in from file"
* @org.apache.xbean.XBean element="fileQueueCursor" description="Pending messages paged in from file"
*
* @version $Revision$
*/

View File

@ -0,0 +1,41 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.kaha.Store;
/**
* Creates a PendIngMessageCursor for Durable subscribers
* *
* @org.apache.xbean.XBean element="fileCursor" description="Pending messages for durable subscribers
* held in temporary files"
*
* @version $Revision$
*/
public class FilePendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy{
/**
* @param name
* @param tmpStorage
* @param maxBatchSize
* @return a Cursor
* @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String, org.apache.activemq.kaha.Store, int)
*/
public PendingMessageCursor getSubscriberPendingMessageCursor(String name,Store tmpStorage,int maxBatchSize){
return new FilePendingMessageCursor("PendingCursor:" + name,tmpStorage);
}
}

View File

@ -0,0 +1,38 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.kaha.Store;
/**
* Abstraction to allow different policies for holding messages awaiting dispatch to active clients
*
* @version $Revision$
*/
public interface PendingSubscriberMessageStoragePolicy{
/**
* Retrieve the configured pending message storage cursor;
*
* @param name
* @param tmpStorage
* @param maxBatchSize
* @return the Pending Message cursor
*/
public PendingMessageCursor getSubscriberPendingMessageCursor(String name,Store tmpStorage,
int maxBatchSize);
}

View File

@ -1,22 +1,21 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.TopicSubscription;
@ -25,21 +24,21 @@ import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.memory.UsageManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Represents an entry in a {@link PolicyMap} for assigning policies to a
* specific destination or a hierarchical wildcard area of destinations.
* Represents an entry in a {@link PolicyMap} for assigning policies to a specific destination or a hierarchical
* wildcard area of destinations.
*
* @org.apache.xbean.XBean
*
* @version $Revision: 1.1 $
*/
public class PolicyEntry extends DestinationMapEntry {
public class PolicyEntry extends DestinationMapEntry{
private static final Log log = LogFactory.getLog(PolicyEntry.class);
private static final Log log=LogFactory.getLog(PolicyEntry.class);
private DispatchPolicy dispatchPolicy;
private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
private boolean sendAdvisoryIfNoConsumers;
@ -48,135 +47,149 @@ public class PolicyEntry extends DestinationMapEntry {
private MessageEvictionStrategy messageEvictionStrategy;
private long memoryLimit;
private MessageGroupMapFactory messageGroupMapFactory;
private PendingQueueMessageStoragePolicy pendingQueueMessageStoragePolicy;
public void configure(Queue queue, Store tmpStore) {
if (dispatchPolicy != null) {
private PendingQueueMessageStoragePolicy pendingQueuePolicy;
private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy;
private PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy;
public void configure(Queue queue,Store tmpStore){
if(dispatchPolicy!=null){
queue.setDispatchPolicy(dispatchPolicy);
}
if (deadLetterStrategy != null) {
if(deadLetterStrategy!=null){
queue.setDeadLetterStrategy(deadLetterStrategy);
}
queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
if( memoryLimit>0 ) {
if(memoryLimit>0){
queue.getUsageManager().setLimit(memoryLimit);
}
if (pendingQueueMessageStoragePolicy != null) {
PendingMessageCursor messages = pendingQueueMessageStoragePolicy.getQueuePendingMessageCursor(queue,tmpStore);
if(pendingQueuePolicy!=null){
PendingMessageCursor messages=pendingQueuePolicy.getQueuePendingMessageCursor(queue,tmpStore);
queue.setMessages(messages);
}
}
public void configure(Topic topic) {
if (dispatchPolicy != null) {
public void configure(Topic topic){
if(dispatchPolicy!=null){
topic.setDispatchPolicy(dispatchPolicy);
}
if (deadLetterStrategy != null) {
if(deadLetterStrategy!=null){
topic.setDeadLetterStrategy(deadLetterStrategy);
}
if (subscriptionRecoveryPolicy != null) {
if(subscriptionRecoveryPolicy!=null){
topic.setSubscriptionRecoveryPolicy(subscriptionRecoveryPolicy.copy());
}
topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
if( memoryLimit>0 ) {
if(memoryLimit>0){
topic.getUsageManager().setLimit(memoryLimit);
}
}
public void configure(TopicSubscription subscription) {
if (pendingMessageLimitStrategy != null) {
int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit();
if (consumerLimit > 0) {
if (value < 0 || consumerLimit < value) {
value = consumerLimit;
public void configure(Broker broker,UsageManager memoryManager,TopicSubscription subscription){
if(pendingMessageLimitStrategy!=null){
int value=pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
int consumerLimit=subscription.getInfo().getMaximumPendingMessageLimit();
if(consumerLimit>0){
if(value<0||consumerLimit<value){
value=consumerLimit;
}
}
if (value >= 0) {
if (log.isDebugEnabled()) {
log.debug("Setting the maximumPendingMessages size to: " + value + " for consumer: " + subscription.getInfo().getConsumerId());
if(value>=0){
if(log.isDebugEnabled()){
log.debug("Setting the maximumPendingMessages size to: "+value+" for consumer: "
+subscription.getInfo().getConsumerId());
}
subscription.setMaximumPendingMessages(value);
}
}
if (messageEvictionStrategy != null) {
if(messageEvictionStrategy!=null){
subscription.setMessageEvictionStrategy(messageEvictionStrategy);
}
if (pendingSubscriberPolicy!=null) {
String name = subscription.getContext().getClientId() + "_" + subscription.getConsumerInfo().getConsumerId();
int maxBatchSize = subscription.getConsumerInfo().getPrefetchSize();
subscription.setMatched(pendingSubscriberPolicy.getSubscriberPendingMessageCursor(name,broker.getTempDataStore(),maxBatchSize));
}
}
public void configure(Broker broker,UsageManager memoryManager,DurableTopicSubscription sub){
String clientId=sub.getClientId();
String subName=sub.getSubscriptionName();
int prefetch=sub.getPrefetchSize();
if(pendingDurableSubscriberPolicy!=null){
PendingMessageCursor cursor=pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId,
subName,broker.getTempDataStore(),prefetch);
cursor.setUsageManager(memoryManager);
sub.setPending(cursor);
}
}
// Properties
// -------------------------------------------------------------------------
public DispatchPolicy getDispatchPolicy() {
public DispatchPolicy getDispatchPolicy(){
return dispatchPolicy;
}
public void setDispatchPolicy(DispatchPolicy policy) {
this.dispatchPolicy = policy;
public void setDispatchPolicy(DispatchPolicy policy){
this.dispatchPolicy=policy;
}
public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy() {
public SubscriptionRecoveryPolicy getSubscriptionRecoveryPolicy(){
return subscriptionRecoveryPolicy;
}
public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy) {
this.subscriptionRecoveryPolicy = subscriptionRecoveryPolicy;
public void setSubscriptionRecoveryPolicy(SubscriptionRecoveryPolicy subscriptionRecoveryPolicy){
this.subscriptionRecoveryPolicy=subscriptionRecoveryPolicy;
}
public boolean isSendAdvisoryIfNoConsumers() {
public boolean isSendAdvisoryIfNoConsumers(){
return sendAdvisoryIfNoConsumers;
}
/**
* Sends an advisory message if a non-persistent message is sent and there
* are no active consumers
* Sends an advisory message if a non-persistent message is sent and there are no active consumers
*/
public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers) {
this.sendAdvisoryIfNoConsumers = sendAdvisoryIfNoConsumers;
public void setSendAdvisoryIfNoConsumers(boolean sendAdvisoryIfNoConsumers){
this.sendAdvisoryIfNoConsumers=sendAdvisoryIfNoConsumers;
}
public DeadLetterStrategy getDeadLetterStrategy() {
public DeadLetterStrategy getDeadLetterStrategy(){
return deadLetterStrategy;
}
/**
* Sets the policy used to determine which dead letter queue destination
* should be used
* Sets the policy used to determine which dead letter queue destination should be used
*/
public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy) {
this.deadLetterStrategy = deadLetterStrategy;
public void setDeadLetterStrategy(DeadLetterStrategy deadLetterStrategy){
this.deadLetterStrategy=deadLetterStrategy;
}
public PendingMessageLimitStrategy getPendingMessageLimitStrategy() {
public PendingMessageLimitStrategy getPendingMessageLimitStrategy(){
return pendingMessageLimitStrategy;
}
/**
* Sets the strategy to calculate the maximum number of messages that are
* allowed to be pending on consumers (in addition to their prefetch sizes).
* Sets the strategy to calculate the maximum number of messages that are allowed to be pending on consumers (in
* addition to their prefetch sizes).
*
* Once the limit is reached, non-durable topics can then start discarding
* old messages. This allows us to keep dispatching messages to slow
* consumers while not blocking fast consumers and discarding the messages
* oldest first.
* Once the limit is reached, non-durable topics can then start discarding old messages. This allows us to keep
* dispatching messages to slow consumers while not blocking fast consumers and discarding the messages oldest
* first.
*/
public void setPendingMessageLimitStrategy(PendingMessageLimitStrategy pendingMessageLimitStrategy) {
this.pendingMessageLimitStrategy = pendingMessageLimitStrategy;
public void setPendingMessageLimitStrategy(PendingMessageLimitStrategy pendingMessageLimitStrategy){
this.pendingMessageLimitStrategy=pendingMessageLimitStrategy;
}
public MessageEvictionStrategy getMessageEvictionStrategy() {
public MessageEvictionStrategy getMessageEvictionStrategy(){
return messageEvictionStrategy;
}
/**
* Sets the eviction strategy used to decide which message to evict when the
* slow consumer needs to discard messages
* Sets the eviction strategy used to decide which message to evict when the slow consumer needs to discard messages
*/
public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
this.messageEvictionStrategy = messageEvictionStrategy;
public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy){
this.messageEvictionStrategy=messageEvictionStrategy;
}
public long getMemoryLimit() {
public long getMemoryLimit(){
return memoryLimit;
}
@ -184,40 +197,72 @@ public class PolicyEntry extends DestinationMapEntry {
*
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
*/
public void setMemoryLimit(long memoryLimit) {
this.memoryLimit = memoryLimit;
public void setMemoryLimit(long memoryLimit){
this.memoryLimit=memoryLimit;
}
public MessageGroupMapFactory getMessageGroupMapFactory() {
if (messageGroupMapFactory == null) {
messageGroupMapFactory = new MessageGroupHashBucketFactory();
public MessageGroupMapFactory getMessageGroupMapFactory(){
if(messageGroupMapFactory==null){
messageGroupMapFactory=new MessageGroupHashBucketFactory();
}
return messageGroupMapFactory;
}
/**
* Sets the factory used to create new instances of {MessageGroupMap} used to implement the
* <a href="http://incubator.apache.org/activemq/message-groups.html">Message Groups</a> functionality.
* Sets the factory used to create new instances of {MessageGroupMap} used to implement the <a
* href="http://incubator.apache.org/activemq/message-groups.html">Message Groups</a> functionality.
*/
public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory) {
this.messageGroupMapFactory = messageGroupMapFactory;
public void setMessageGroupMapFactory(MessageGroupMapFactory messageGroupMapFactory){
this.messageGroupMapFactory=messageGroupMapFactory;
}
/**
* @return the pendingQueueMessageStoragePolicy
* @return the pendingDurableSubscriberPolicy
*/
public PendingQueueMessageStoragePolicy getPendingQueueMessageStoragePolicy(){
return this.pendingQueueMessageStoragePolicy;
public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy(){
return this.pendingDurableSubscriberPolicy;
}
/**
* @param pendingQueueMessageStoragePolicy the pendingQueueMessageStoragePolicy to set
* @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set
*/
public void setPendingQueueMessageStoragePolicy(PendingQueueMessageStoragePolicy pendingQueueMessageStoragePolicy){
this.pendingQueueMessageStoragePolicy=pendingQueueMessageStoragePolicy;
public void setPendingDurableSubscriberPolicy(
PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy){
this.pendingDurableSubscriberPolicy=pendingDurableSubscriberPolicy;
}
/**
* @return the pendingQueuePolicy
*/
public PendingQueueMessageStoragePolicy getPendingQueuePolicy(){
return this.pendingQueuePolicy;
}
/**
* @param pendingQueuePolicy the pendingQueuePolicy to set
*/
public void setPendingQueuePolicy(PendingQueueMessageStoragePolicy pendingQueuePolicy){
this.pendingQueuePolicy=pendingQueuePolicy;
}
/**
* @return the pendingSubscriberPolicy
*/
public PendingSubscriberMessageStoragePolicy getPendingSubscriberPolicy(){
return this.pendingSubscriberPolicy;
}
/**
* @param pendingSubscriberPolicy the pendingSubscriberPolicy to set
*/
public void setPendingSubscriberPolicy(PendingSubscriberMessageStoragePolicy pendingSubscriberPolicy){
this.pendingSubscriberPolicy=pendingSubscriberPolicy;
}
}

View File

@ -21,7 +21,7 @@ import org.apache.activemq.kaha.Store;
/**
* Creates a VMPendingMessageCursor
* *
* @org.apache.xbean.XBean element="vmCursor" description="Pending messages held in the JVM"
* @org.apache.xbean.XBean element="vmDurableCursor" description="Pending messages held in the JVM"
*
* @version $Revision$
*/

View File

@ -22,7 +22,7 @@ import org.apache.activemq.kaha.Store;
/**
* Creates a VMPendingMessageCursor
* *
* @org.apache.xbean.XBean element="vmCursor" description="Pending messages held in the JVM"
* @org.apache.xbean.XBean element="vmQueueCursor" description="Pending messages held in the JVM"
*
* @version $Revision$
*/

View File

@ -0,0 +1,40 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.broker.region.policy;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor;
import org.apache.activemq.kaha.Store;
/**
* Creates a VMPendingMessageCursor
* *
* @org.apache.xbean.XBean element="vmCursor" description="Pending messages held in the JVM"
*
* @version $Revision$
*/
public class VMPendingSubscriberMessageStoragePolicy implements PendingSubscriberMessageStoragePolicy{
/**
* @param name
* @param tmpStorage
* @param maxBatchSize
* @return a Cursor
* @see org.apache.activemq.broker.region.policy.PendingSubscriberMessageStoragePolicy#getSubscriberPendingMessageCursor(java.lang.String, org.apache.activemq.kaha.Store, int)
*/
public PendingMessageCursor getSubscriberPendingMessageCursor(String name,Store tmpStorage,int maxBatchSize){
return new VMPendingMessageCursor();
}
}

View File

@ -22,7 +22,6 @@ import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy;
/**
* @version $Revision: 1.3 $
@ -50,7 +49,6 @@ public class CursorDurableTest extends CursorSupport{
protected void configureBroker(BrokerService answer) throws Exception{
answer.setDeleteAllMessagesOnStartup(true);
answer.setPendingDurableSubscriberPolicy(new StorePendingDurableSubscriberMessageStoragePolicy());
answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true);
}

View File

@ -53,7 +53,7 @@ public class CursorQueueStoreTest extends CursorSupport{
protected void configureBroker(BrokerService answer) throws Exception{
PolicyEntry policy = new PolicyEntry();
policy.setPendingQueueMessageStoragePolicy(new StorePendingQueueMessageStoragePolicy());
policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy());
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
answer.setDestinationPolicy(pMap);

View File

@ -38,7 +38,7 @@ public class KahaQueueStoreTest extends CursorQueueStoreTest{
KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File("activemq-data/durableTest"));
answer.setPersistenceAdapter(adaptor);
PolicyEntry policy = new PolicyEntry();
policy.setPendingQueueMessageStoragePolicy(new StorePendingQueueMessageStoragePolicy());
policy.setPendingQueuePolicy(new StorePendingQueueMessageStoragePolicy());
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
answer.setDestinationPolicy(pMap);

View File

@ -0,0 +1,59 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Copyright 2005-2006 The Apache Software Foundation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- this file can only be parsed using the xbean-spring library -->
<!-- START SNIPPET: xbean -->
<beans>
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker persistent="false" xmlns="http://activemq.org/config/1.0">
<!-- lets define the dispatch policy -->
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic="org.apache.>">
<dispatchPolicy>
<strictOrderDispatchPolicy />
</dispatchPolicy>
<deadLetterStrategy>
<individualDeadLetterStrategy topicPrefix="Test.DLQ." />
</deadLetterStrategy>
<pendingSubscriberPolicy>
<vmCursor />
</pendingSubscriberPolicy>
</policyEntry>
<policyEntry queue="org.apache.>">
<dispatchPolicy>
<strictOrderDispatchPolicy />
</dispatchPolicy>
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="Test.DLQ."/>
</deadLetterStrategy>
<pendingQueuePolicy>
<vmQueueCursor />
</pendingQueuePolicy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
</broker>
</beans>
<!-- END SNIPPET: xbean -->

View File

@ -18,14 +18,14 @@
<beans>
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
<broker brokerName="slowConsumerBroker" persistent="true" useShutdownHook="false" xmlns="http://activemq.org/config/1.0">
<broker brokerName="slowConsumerBroker" useJmx="false" persistent="false" useShutdownHook="false" xmlns="http://activemq.org/config/1.0">
<transportConnectors>
<transportConnector uri="tcp://localhost:61616"/>
</transportConnectors>
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">">
<policyEntry topic="blob">
<!-- lets force old messages to be discarded for slow consumers -->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="10"/>