diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index f0c32d4212..f747917a63 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -38,6 +38,7 @@ import org.apache.activemq.memory.UsageManager; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import sun.security.x509.IssuerAlternativeNameExtension; import java.util.concurrent.ConcurrentHashMap; @@ -60,6 +61,7 @@ abstract public class AbstractRegion implements Region { protected final TaskRunnerFactory taskRunnerFactory; protected final Object destinationsMutex = new Object(); protected final Map consumerChangeMutexMap = new HashMap(); + protected boolean started = false; public AbstractRegion(RegionBroker broker,DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { if (broker == null) { @@ -76,9 +78,15 @@ abstract public class AbstractRegion implements Region { } public void start() throws Exception { + started = true; + for (Iterator i = destinations.values().iterator();i.hasNext();) { + Destination dest = (Destination)i.next(); + dest.start(); + } } public void stop() throws Exception { + started = false; for (Iterator i = destinations.values().iterator();i.hasNext();) { Destination dest = (Destination)i.next(); dest.stop(); @@ -102,7 +110,7 @@ abstract public class AbstractRegion implements Region { if (destinationInterceptor != null) { dest = destinationInterceptor.intercept(dest); } - + dest.start(); destinations.put(destination, dest); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java index fbbcf1f26d..15dac01740 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java @@ -77,7 +77,7 @@ public class DestinationFactoryImpl extends DestinationFactory { if (destination.isQueue()) { if (destination.isTemporary()) { final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination; - return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory) { + return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory,broker.getTempDataStore()) { public void addSubscription(ConnectionContext context,Subscription sub) throws Exception { // Only consumers on the same connection can consume from @@ -90,7 +90,7 @@ public class DestinationFactoryImpl extends DestinationFactory { }; } else { MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination); - Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory); + Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory,broker.getTempDataStore()); configureQueue(queue, destination); queue.initialize(); return queue; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index d83b471158..89cb524e82 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -19,22 +19,21 @@ package org.apache.activemq.broker.region; import java.io.IOException; import java.util.Iterator; +import java.util.concurrent.ConcurrentHashMap; import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; -import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.util.SubscriptionKey; -import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; public class DurableTopicSubscription extends PrefetchSubscription { - + static private final Log log=LogFactory.getLog(PrefetchSubscription.class); private final ConcurrentHashMap redeliveredMessages = new ConcurrentHashMap(); private final ConcurrentHashMap destinations = new ConcurrentHashMap(); private final SubscriptionKey subscriptionKey; @@ -72,6 +71,7 @@ public class DurableTopicSubscription extends PrefetchSubscription { } public void activate(ConnectionContext context, ConsumerInfo info) throws Exception { + log.debug("Deactivating " + this); if( !active ) { this.active = true; this.context = context; @@ -96,7 +96,8 @@ public class DurableTopicSubscription extends PrefetchSubscription { } } - synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception { + synchronized public void deactivate(boolean keepDurableSubsActive) throws Exception { + active=false; synchronized(pending){ pending.stop(); @@ -197,9 +198,12 @@ public class DurableTopicSubscription extends PrefetchSubscription { "DurableTopicSubscription:" + " consumer="+info.getConsumerId()+ ", destinations="+destinations.size()+ - ", dispatched="+dispatched.size()+ - ", delivered="+this.prefetchExtension+ - ", pending="+getPendingQueueSize(); + ", total="+enqueueCounter+ + ", pending="+getPendingQueueSize()+ + ", dispatched="+dispatchCounter+ + ", inflight="+dispatched.size()+ + ", prefetchExtension="+this.prefetchExtension; + } public String getClientId() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 853953d561..b8a05ba768 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -327,6 +327,10 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ return (dispatched.size()-prefetchExtension) >= (info.getPrefetchSize() *.9); } + public int countBeforeFull() { + return info.getPrefetchSize() + prefetchExtension - dispatched.size(); + } + public int getPendingQueueSize(){ synchronized(pending) { return pending.size(); @@ -396,28 +400,38 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ List toDispatch=null; synchronized(pending){ try{ - pending.reset(); - while(pending.hasNext()&&!isFull()){ - MessageReference node=pending.next(); - pending.remove(); - // Message may have been sitting in the pending list a while - // waiting for the consumer to ak the message. - if(node!=QueueMessageReference.NULL_MESSAGE&&node.isExpired()){ - continue; // just drop it. + int numberToDispatch=countBeforeFull(); + if(numberToDispatch>0){ + int count=0; + pending.reset(); + while(pending.hasNext()&&!isFull()&&count0){ + if((force||!consumers.isEmpty())&&toPageIn>0){ try{ dispatchValve.increment(); int count=0; @@ -877,9 +912,15 @@ public class Queue implements Destination, Task { while(messages.hasNext()&&count 0 && (message.getBrokerPath() == null || message.getBrokerPath().length == 0)) { //timestamp not been disabled and has not passed through a network message.setTimestamp(System.currentTimeMillis()); @@ -541,7 +542,7 @@ public class RegionBroker implements Broker { } public boolean isStopped(){ - return stopped; + return !started; } public Set getDurableDestinations(){