try to deliver messages if there's enough space

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@509552 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-02-20 13:22:27 +00:00
parent 3a2caf427d
commit 8de60cf980
2 changed files with 115 additions and 105 deletions

View File

@ -1,20 +1,17 @@
/** /**
* *
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* contributor license agreements. See the NOTICE file distributed with * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* this work for additional information regarding copyright ownership. * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* The ASF licenses this file to You under the Apache License, Version 2.0 * License. You may obtain a copy of the License at
* (the "License"); you may not use this file except in compliance with *
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* distributed under the License is distributed on an "AS IS" BASIS, * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * specific language governing permissions and limitations under the License.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import java.io.IOException; import java.io.IOException;
@ -23,92 +20,98 @@ import java.util.concurrent.ConcurrentHashMap;
import javax.jms.InvalidSelectorException; import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.cursors.PendingMessageCursor;
import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.memory.UsageListener;
import org.apache.activemq.memory.UsageManager; import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.util.SubscriptionKey; import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
public class DurableTopicSubscription extends PrefetchSubscription { public class DurableTopicSubscription extends PrefetchSubscription implements UsageListener{
static private final Log log=LogFactory.getLog(PrefetchSubscription.class); static private final Log log=LogFactory.getLog(PrefetchSubscription.class);
private final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap(); private final ConcurrentHashMap redeliveredMessages=new ConcurrentHashMap();
private final ConcurrentHashMap destinations = new ConcurrentHashMap(); private final ConcurrentHashMap destinations=new ConcurrentHashMap();
private final SubscriptionKey subscriptionKey; private final SubscriptionKey subscriptionKey;
private final boolean keepDurableSubsActive; private final boolean keepDurableSubsActive;
private final UsageManager usageManager;
private boolean active=false; private boolean active=false;
public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException { public DurableTopicSubscription(Broker broker,UsageManager usageManager,ConnectionContext context,
super(broker,context,info,new StoreDurableSubscriberCursor(context.getClientId(),info.getSubscriptionName(),broker.getTempDataStore(),info.getPrefetchSize())); ConsumerInfo info,boolean keepDurableSubsActive) throws InvalidSelectorException{
this.keepDurableSubsActive = keepDurableSubsActive; super(broker,context,info,new StoreDurableSubscriberCursor(context.getClientId(),info.getSubscriptionName(),
subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubscriptionName()); broker.getTempDataStore(),info.getPrefetchSize()));
} this.usageManager=usageManager;
this.keepDurableSubsActive=keepDurableSubsActive;
synchronized public boolean isActive() { subscriptionKey=new SubscriptionKey(context.getClientId(),info.getSubscriptionName());
return active;
}
protected boolean isFull() {
return !active || super.isFull();
}
synchronized public void gc() {
} }
public synchronized void add(ConnectionContext context, Destination destination) throws Exception { synchronized public boolean isActive(){
super.add(context, destination); return active;
destinations.put(destination.getActiveMQDestination(), destination); }
if( active || keepDurableSubsActive ) {
Topic topic = (Topic) destination; protected boolean isFull(){
topic.activate(context, this); return !active||super.isFull();
if (pending.isEmpty(topic)) { }
topic.recoverRetroactiveMessages(context, this);
synchronized public void gc(){
}
public synchronized void add(ConnectionContext context,Destination destination) throws Exception{
super.add(context,destination);
destinations.put(destination.getActiveMQDestination(),destination);
if(active||keepDurableSubsActive){
Topic topic=(Topic)destination;
topic.activate(context,this);
if(pending.isEmpty(topic)){
topic.recoverRetroactiveMessages(context,this);
} }
} }
dispatchMatched(); dispatchMatched();
} }
public void activate(UsageManager memoryManager,ConnectionContext context, ConsumerInfo info) throws Exception { public void activate(UsageManager memoryManager,ConnectionContext context,ConsumerInfo info) throws Exception{
log.debug("Deactivating " + this); log.debug("Deactivating "+this);
if( !active ) { if(!active){
this.active = true; this.active=true;
this.context = context; this.context=context;
this.info = info; this.info=info;
if( !keepDurableSubsActive ) { if(!keepDurableSubsActive){
for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { for(Iterator iter=destinations.values().iterator();iter.hasNext();){
Topic topic = (Topic) iter.next(); Topic topic=(Topic)iter.next();
topic.activate(context, this); topic.activate(context,this);
} }
} }
synchronized(pending) { synchronized(pending){
pending.setUsageManager(memoryManager); pending.setUsageManager(memoryManager);
pending.start(); pending.start();
} }
//If nothing was in the persistent store, then try to use the recovery policy. // If nothing was in the persistent store, then try to use the recovery policy.
if (pending.isEmpty()) { if(pending.isEmpty()){
for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { for(Iterator iter=destinations.values().iterator();iter.hasNext();){
Topic topic = (Topic) iter.next(); Topic topic=(Topic)iter.next();
topic.recoverRetroactiveMessages(context, this); topic.recoverRetroactiveMessages(context,this);
} }
} }
dispatchMatched(); dispatchMatched();
this.usageManager.addUsageListener(this);
} }
} }
synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception { synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception{
active=false; active=false;
this.usageManager.removeUsageListener(this);
synchronized(pending){ synchronized(pending){
pending.stop(); pending.stop();
} }
if( !keepDurableSubsActive ) { if(!keepDurableSubsActive){
for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { for(Iterator iter=destinations.values().iterator();iter.hasNext();){
Topic topic = (Topic) iter.next(); Topic topic=(Topic)iter.next();
topic.deactivate(context, this); topic.deactivate(context,this);
} }
} }
synchronized(dispatched){ synchronized(dispatched){
@ -131,7 +134,6 @@ public class DurableTopicSubscription extends PrefetchSubscription {
iter.remove(); iter.remove();
} }
} }
if(!keepDurableSubsActive){ if(!keepDurableSubsActive){
synchronized(pending){ synchronized(pending){
try{ try{
@ -149,74 +151,68 @@ public class DurableTopicSubscription extends PrefetchSubscription {
prefetchExtension=0; prefetchExtension=0;
} }
protected MessageDispatch createMessageDispatch(MessageReference node, Message message) { protected MessageDispatch createMessageDispatch(MessageReference node,Message message){
MessageDispatch md = super.createMessageDispatch(node, message); MessageDispatch md=super.createMessageDispatch(node,message);
Integer count = (Integer) redeliveredMessages.get(node.getMessageId()); Integer count=(Integer)redeliveredMessages.get(node.getMessageId());
if( count !=null ) { if(count!=null){
md.setRedeliveryCounter(count.intValue()); md.setRedeliveryCounter(count.intValue());
} }
return md; return md;
} }
public void add(MessageReference node) throws Exception { public void add(MessageReference node) throws Exception{
if( !active && !keepDurableSubsActive ) { if(!active&&!keepDurableSubsActive){
return; return;
} }
node.incrementReferenceCount(); node.incrementReferenceCount();
super.add(node); super.add(node);
} }
protected void doAddRecoveredMessage(MessageReference message) throws Exception { protected void doAddRecoveredMessage(MessageReference message) throws Exception{
pending.addRecoveredMessage(message); pending.addRecoveredMessage(message);
} }
public int getPendingQueueSize() { public int getPendingQueueSize(){
if( active || keepDurableSubsActive ) { if(active||keepDurableSubsActive){
return super.getPendingQueueSize(); return super.getPendingQueueSize();
} }
//TODO: need to get from store // TODO: need to get from store
return 0; return 0;
} }
public void setSelector(String selector) throws InvalidSelectorException { public void setSelector(String selector) throws InvalidSelectorException{
throw new UnsupportedOperationException("You cannot dynamically change the selector for durable topic subscriptions"); throw new UnsupportedOperationException(
"You cannot dynamically change the selector for durable topic subscriptions");
} }
protected boolean canDispatch(MessageReference node) { protected boolean canDispatch(MessageReference node){
return active; return active;
} }
protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException { protected void acknowledge(ConnectionContext context,MessageAck ack,MessageReference node) throws IOException{
node.getRegionDestination().acknowledge(context, this, ack, node); node.getRegionDestination().acknowledge(context,this,ack,node);
redeliveredMessages.remove(node.getMessageId()); redeliveredMessages.remove(node.getMessageId());
node.decrementReferenceCount(); node.decrementReferenceCount();
} }
public String getSubscriptionName() { public String getSubscriptionName(){
return subscriptionKey.getSubscriptionName(); return subscriptionKey.getSubscriptionName();
} }
public String toString() { public String toString(){
return return "DurableTopicSubscription:"+" consumer="+info.getConsumerId()+", destinations="+destinations.size()
"DurableTopicSubscription:" + +", total="+enqueueCounter+", pending="+getPendingQueueSize()+", dispatched="+dispatchCounter
" consumer="+info.getConsumerId()+ +", inflight="+dispatched.size()+", prefetchExtension="+this.prefetchExtension;
", destinations="+destinations.size()+
", total="+enqueueCounter+
", pending="+getPendingQueueSize()+
", dispatched="+dispatchCounter+
", inflight="+dispatched.size()+
", prefetchExtension="+this.prefetchExtension;
} }
public String getClientId() { public String getClientId(){
return subscriptionKey.getClientId(); return subscriptionKey.getClientId();
} }
public SubscriptionKey getSubscriptionKey() { public SubscriptionKey getSubscriptionKey(){
return subscriptionKey; return subscriptionKey;
} }
/** /**
* Release any references that we are holding. * Release any references that we are holding.
*/ */
@ -239,7 +235,21 @@ public class DurableTopicSubscription extends PrefetchSubscription {
} }
dispatched.clear(); dispatched.clear();
} }
/**
* @param memoryManager
* @param oldPercentUsage
* @param newPercentUsage
* @see org.apache.activemq.memory.UsageListener#onMemoryUseChanged(org.apache.activemq.memory.UsageManager, int,
* int)
*/
public void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
if(oldPercentUsage>newPercentUsage&&oldPercentUsage>=90){
try{
dispatchMatched();
}catch(IOException e){
log.warn("problem calling dispatchMatched",e);
}
}
}
} }

View File

@ -216,7 +216,7 @@ public class TopicRegion extends AbstractRegion {
SubscriptionKey key=new SubscriptionKey(context.getClientId(),info.getSubscriptionName()); SubscriptionKey key=new SubscriptionKey(context.getClientId(),info.getSubscriptionName());
DurableTopicSubscription sub=(DurableTopicSubscription)durableSubscriptions.get(key); DurableTopicSubscription sub=(DurableTopicSubscription)durableSubscriptions.get(key);
if(sub==null){ if(sub==null){
sub=new DurableTopicSubscription(broker,context,info,keepDurableSubsActive); sub=new DurableTopicSubscription(broker,memoryManager,context,info,keepDurableSubsActive);
ActiveMQDestination destination=info.getDestination(); ActiveMQDestination destination=info.getDestination();
if(destination!=null&&broker.getDestinationPolicy()!=null){ if(destination!=null&&broker.getDestinationPolicy()!=null){
PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination); PolicyEntry entry=broker.getDestinationPolicy().getEntryFor(destination);