diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java index 31c7539f1b..9c95bab238 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java @@ -21,6 +21,7 @@ import java.util.Set; import org.apache.activemq.Service; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Region; +import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -250,10 +251,19 @@ public interface Broker extends Region, Service { /** * Sets the default administration connection context used when configuring the broker on startup or via JMX + * @param adminConnectionContext */ public abstract void setAdminConnectionContext(ConnectionContext adminConnectionContext); - + /** + * @return the pendingDurableSubscriberPolicy + */ + public abstract PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy(); + + /** + * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set + */ + public abstract void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy); /** * @return the broker's temp data store * @throws Exception diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java index de6888c4e3..ea44ce86f3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java @@ -19,6 +19,7 @@ package org.apache.activemq.broker; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -232,6 +233,15 @@ public class BrokerFilter implements Broker { public void setAdminConnectionContext(ConnectionContext adminConnectionContext) { next.setAdminConnectionContext(adminConnectionContext); } + + public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() { + return next.getPendingDurableSubscriberPolicy(); + } + + public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) { + next.setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy); + } + public Store getTempDataStore() { return next.getTempDataStore(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index bf510764c7..a397cf6062 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -45,7 +45,9 @@ import org.apache.activemq.broker.region.DestinationFactory; import org.apache.activemq.broker.region.DestinationFactoryImpl; import org.apache.activemq.broker.region.DestinationInterceptor; import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.broker.region.virtual.VirtualDestination; import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; import org.apache.activemq.broker.region.virtual.VirtualTopic; @@ -137,6 +139,7 @@ public class BrokerService implements Service, Serializable { private ActiveMQDestination[] destinations; private Store tempDataStore; private int persistenceThreadPriority = Thread.MAX_PRIORITY; + private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new VMPendingDurableSubscriberMessageStoragePolicy(); /** @@ -388,7 +391,13 @@ public class BrokerService implements Service, Serializable { } getBroker().start(); - + /* + if(isUseJmx()){ + // yes - this is orer dependent! + // register all destination in persistence store including inactive destinations as mbeans + this.startDestinationsInPersistenceStore(broker); + } + */ startAllConnectors(); if (isUseJmx() && masterConnector != null) { @@ -987,6 +996,23 @@ public class BrokerService implements Service, Serializable { public void setPersistenceThreadPriority(int persistenceThreadPriority){ this.persistenceThreadPriority=persistenceThreadPriority; } + + /** + * @return the pendingDurableSubscriberPolicy + */ + public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy(){ + return this.pendingDurableSubscriberPolicy; + } + + /** + * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set + */ + public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy){ + this.pendingDurableSubscriberPolicy=pendingDurableSubscriberPolicy; + if (broker != null) { + broker.setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy); + } + } // Implementation methods // ------------------------------------------------------------------------- @@ -1199,8 +1225,6 @@ public class BrokerService implements Service, Serializable { mbeanServer.registerMBean(adminView, objectName); registeredMBeanNames.add(objectName); } - //register all destination in persistence store including inactive destinations as mbeans - this.startDestinationsInPersistenceStore(broker); } @@ -1243,6 +1267,7 @@ public class BrokerService implements Service, Serializable { regionBroker.setKeepDurableSubsActive(keepDurableSubsActive); regionBroker.setBrokerName(getBrokerName()); + regionBroker.setPendingDurableSubscriberPolicy(getPendingDurableSubscriberPolicy()); return regionBroker; } @@ -1515,8 +1540,5 @@ public class BrokerService implements Service, Serializable { broker.addDestination(adminConnectionContext, destination); } } - } - - - + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java index 5ebe6ddbfb..129701f5e3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java @@ -19,6 +19,7 @@ package org.apache.activemq.broker; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -231,6 +232,13 @@ public class EmptyBroker implements Broker { return null; } + public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() { + return null; + } + + public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) { + } + public Store getTempDataStore() { return null; } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java index 127daa81ac..12d764f4a7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java @@ -23,6 +23,7 @@ import java.util.Set; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -231,6 +232,14 @@ public class ErrorBroker implements Broker { throw new BrokerStoppedException(this.message); } + public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() { + throw new BrokerStoppedException(this.message); + } + + public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) { + throw new BrokerStoppedException(this.message); + } + public Store getTempDataStore() { throw new BrokerStoppedException(this.message); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java index e8d895e4f2..069ed5c895 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java @@ -19,6 +19,7 @@ package org.apache.activemq.broker; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -245,6 +246,14 @@ public class MutableBrokerFilter implements Broker { return getNext().messagePull(context, pull); } + public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() { + return getNext().getPendingDurableSubscriberPolicy(); + } + + public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) { + getNext().setPendingDurableSubscriberPolicy(pendingDurableSubscriberPolicy); + } + public Store getTempDataStore() { return getNext().getTempDataStore(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index b3bcd80e42..ff1055a473 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -177,7 +177,6 @@ abstract public class AbstractRegion implements Region { public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { log.debug("Adding consumer: "+info.getConsumerId()); - ActiveMQDestination destination = info.getDestination(); if (destination != null && ! destination.isPattern() && ! destination.isComposite()) { // lets auto-create the destination @@ -260,7 +259,6 @@ abstract public class AbstractRegion implements Region { } public void removeConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - log.debug("Removing consumer: "+info.getConsumerId()); Subscription sub = (Subscription) subscriptions.remove(info.getConsumerId()); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java index 1d99e09b35..9c899c9f8d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java @@ -78,7 +78,7 @@ public class DestinationFactoryImpl extends DestinationFactory { if (destination.isTemporary()) { final ActiveMQTempDestination tempDest = (ActiveMQTempDestination) destination; return new Queue(destination, memoryManager, null, destinationStatistics, taskRunnerFactory) { - + public void addSubscription(ConnectionContext context,Subscription sub) throws Exception { // Only consumers on the same connection can consume from // the temporary destination @@ -92,6 +92,7 @@ public class DestinationFactoryImpl extends DestinationFactory { MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue) destination); Queue queue = new Queue(destination, memoryManager, store, destinationStatistics, taskRunnerFactory); configureQueue(queue, destination); + queue.initialize(); return queue; } } else if (destination.isTemporary()){ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index a5d905a914..366faf677d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -24,6 +24,7 @@ import javax.jms.JMSException; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.cursors.FilePendingMessageCursor; +import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.cursors.StoreDurableSubscriberCursor; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; @@ -40,10 +41,8 @@ public class DurableTopicSubscription extends PrefetchSubscription { private final boolean keepDurableSubsActive; private boolean active=false; - public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive) throws InvalidSelectorException { - //super(broker,context, info, new StoreDurableSubscriberCursor(context.getClientId(),info.getSubcriptionName(),broker.getTempDataStore(),info.getPrefetchSize())); - //super(broker,context, info, new FilePendingMessageCursor(context.getClientId() + info.getConsumerId().toString(),broker.getTempDataStore())); - super(broker,context,info); + public DurableTopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive,PendingMessageCursor cursor) throws InvalidSelectorException { + super(broker,context,info,cursor); this.keepDurableSubsActive = keepDurableSubsActive; subscriptionKey = new SubscriptionKey(context.getClientId(), info.getSubcriptionName()); } @@ -192,7 +191,6 @@ public class DurableTopicSubscription extends PrefetchSubscription { * Release any references that we are holding. */ synchronized public void destroy() { - synchronized(pending) { pending.reset(); while(pending.hasNext()) { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index 1039e9eee5..dbf728a295 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -124,8 +124,8 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ synchronized public void add(MessageReference node) throws Exception{ enqueueCounter++; - //if(!isFull()){ - if(!isFull() && pending.isEmpty() && canDispatch(node)){ + + if(!isFull() && pending.isEmpty() ){ dispatch(node); }else{ optimizePrefetch(); @@ -376,7 +376,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ if(canDispatch(node)&&!isSlaveBroker()){ MessageDispatch md=createMessageDispatch(node,message); - // NULL messages don't count... they don't get Acked. if( node != QueueMessageReference.NULL_MESSAGE ) { dispatchCounter++; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index d74bb18d1f..8e8fe3cbfe 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -20,12 +20,15 @@ package org.apache.activemq.broker.region; import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.cursors.PendingMessageCursor; +import org.apache.activemq.broker.region.cursors.VMPendingMessageCursor; import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory; import org.apache.activemq.broker.region.group.MessageGroupMap; import org.apache.activemq.broker.region.group.MessageGroupMapFactory; import org.apache.activemq.broker.region.group.MessageGroupSet; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.DispatchPolicy; +import org.apache.activemq.broker.region.policy.PendingQueueMessageStoragePolicy; import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; import org.apache.activemq.command.ActiveMQDestination; @@ -67,10 +70,10 @@ public class Queue implements Destination { protected final ActiveMQDestination destination; protected final List consumers = new CopyOnWriteArrayList(); - private final LinkedList messages = new LinkedList(); protected final Valve dispatchValve = new Valve(true); protected final UsageManager usageManager; protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); + protected PendingMessageCursor messages = new VMPendingMessageCursor(); private LockOwner exclusiveOwner; private MessageGroupMap messageGroupOwners; @@ -100,6 +103,10 @@ public class Queue implements Destination { destinationStatistics.setParent(parentStats); this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName()); + + } + + public void initialize() throws Exception { if (store != null) { // Restore the persistent messages. store.recover(new MessageRecoveryListener() { @@ -107,7 +114,11 @@ public class Queue implements Destination { message.setRegionDestination(Queue.this); MessageReference reference = createMessageReference(message); synchronized (messages) { - messages.add(reference); + try{ + messages.addMessageLast(reference); + }catch(Exception e){ + log.fatal("Failed to add message to cursor",e); + } } reference.decrementReferenceCount(); destinationStatistics.getMessages().increment(); @@ -158,9 +169,10 @@ public class Queue implements Destination { synchronized (messages) { // Add all the matching messages in the queue to the // subscription. - for (Iterator iter = messages.iterator(); iter.hasNext();) { + messages.reset(); + while(messages.hasNext()) { - QueueMessageReference node = (QueueMessageReference) iter.next(); + QueueMessageReference node = (QueueMessageReference) messages.next(); if (node.isDropped()) { continue; } @@ -219,8 +231,9 @@ public class Queue implements Destination { // lets copy the messages to dispatch to avoid deadlock List messagesToDispatch = new ArrayList(); synchronized (messages) { - for (Iterator iter = messages.iterator(); iter.hasNext();) { - QueueMessageReference node = (QueueMessageReference) iter.next(); + messages.reset(); + while(messages.hasNext()) { + QueueMessageReference node = (QueueMessageReference) messages.next(); if (node.isDropped()) { continue; } @@ -314,12 +327,13 @@ public class Queue implements Destination { public void gc() { synchronized (messages) { - for (Iterator iter = messages.iterator(); iter.hasNext();) { + messages.resetForGC(); + while(messages.hasNext()) { // Remove dropped messages from the queue. - QueueMessageReference node = (QueueMessageReference) iter.next(); + QueueMessageReference node = (QueueMessageReference) messages.next(); if (node.isDropped()) { garbageSize--; - iter.remove(); + messages.remove(); continue; } } @@ -456,6 +470,12 @@ public class Queue implements Destination { public void setMemoryLimit(long limit) { getUsageManager().setLimit(limit); } + public PendingMessageCursor getMessages(){ + return this.messages; + } + public void setMessages(PendingMessageCursor messages){ + this.messages=messages; + } // Implementation methods // ------------------------------------------------------------------------- @@ -470,7 +490,7 @@ public class Queue implements Destination { try { destinationStatistics.onMessageEnqueue(message); synchronized (messages) { - messages.add(node); + messages.addMessageLast(node); } synchronized (consumers) { @@ -509,12 +529,12 @@ public class Queue implements Destination { } public Message[] browse() { - ArrayList l = new ArrayList(); synchronized (messages) { - for (Iterator iter = messages.iterator(); iter.hasNext();) { + messages.reset(); + while(messages.hasNext()) { try { - MessageReference r = (MessageReference) iter.next(); + MessageReference r = (MessageReference) messages.next(); r.incrementReferenceCount(); try { Message m = r.getMessage(); @@ -536,9 +556,10 @@ public class Queue implements Destination { public Message getMessage(String messageId) { synchronized (messages) { - for (Iterator iter = messages.iterator(); iter.hasNext();) { + messages.reset(); + while(messages.hasNext()) { try { - MessageReference r = (MessageReference) iter.next(); + MessageReference r = (MessageReference) messages.next(); if (messageId.equals(r.getMessageId().toString())) { r.incrementReferenceCount(); try { @@ -563,9 +584,10 @@ public class Queue implements Destination { public void purge() { synchronized (messages) { ConnectionContext c = createConnectionContext(); - for (Iterator iter = messages.iterator(); iter.hasNext();) { + messages.reset(); + while(messages.hasNext()) { try { - QueueMessageReference r = (QueueMessageReference) iter.next(); + QueueMessageReference r = (QueueMessageReference) messages.next(); // We should only delete messages that can be locked. if (r.lock(LockOwner.HIGH_PRIORITY_LOCK_OWNER)) { @@ -623,8 +645,9 @@ public class Queue implements Destination { int counter = 0; synchronized (messages) { ConnectionContext c = createConnectionContext(); - for (Iterator iter = messages.iterator(); iter.hasNext();) { - IndirectMessageReference r = (IndirectMessageReference) iter.next(); + messages.reset(); + while(messages.hasNext()) { + IndirectMessageReference r = (IndirectMessageReference) messages.next(); if (filter.evaluate(c, r)) { // We should only delete messages that can be locked. if (lockMessage(r)) { @@ -672,8 +695,9 @@ public class Queue implements Destination { public int copyMatchingMessages(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { int counter = 0; synchronized (messages) { - for (Iterator iter = messages.iterator(); iter.hasNext();) { - MessageReference r = (MessageReference) iter.next(); + messages.reset(); + while(messages.hasNext()) { + MessageReference r = (MessageReference) messages.next(); if (filter.evaluate(context, r)) { r.incrementReferenceCount(); try { @@ -721,8 +745,9 @@ public class Queue implements Destination { public int moveMatchingMessagesTo(ConnectionContext context, MessageReferenceFilter filter, ActiveMQDestination dest, int maximumMessages) throws Exception { int counter = 0; synchronized (messages) { - for (Iterator iter = messages.iterator(); iter.hasNext();) { - IndirectMessageReference r = (IndirectMessageReference) iter.next(); + messages.reset(); + while(messages.hasNext()) { + IndirectMessageReference r = (IndirectMessageReference) messages.next(); if (filter.evaluate(context, r)) { // We should only move messages that can be locked. if (lockMessage(r)) { @@ -789,5 +814,4 @@ public class Queue implements Destination { answer.getMessageEvaluationContext().setDestination(getActiveMQDestination()); return answer; } - } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java index 5d4cdbebb3..e6d8cd6f99 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java @@ -79,7 +79,6 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner String groupId = node.getGroupID(); int sequence = node.getGroupSequence(); if( groupId!=null ) { - MessageGroupMap messageGroupOwners = ((Queue)node.getRegionDestination()).getMessageGroupOwners(); // If we can own the first, then no-one else should own the rest. diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index 75b336e261..d3434e1725 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -31,7 +31,9 @@ import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.Connection; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.VMPendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -93,6 +95,7 @@ public class RegionBroker implements Broker { private ConnectionContext adminConnectionContext; protected DestinationFactory destinationFactory; protected final ConcurrentHashMap connectionStates = new ConcurrentHashMap(); + private PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy = new VMPendingDurableSubscriberMessageStoragePolicy(); public RegionBroker(BrokerService brokerService,TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException { this.brokerService = brokerService; @@ -584,4 +587,18 @@ public class RegionBroker implements Broker { public Store getTempDataStore() { return brokerService.getTempDataStore(); } + + /** + * @return the pendingDurableSubscriberPolicy + */ + public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy(){ + return this.pendingDurableSubscriberPolicy; + } + + /** + * @param pendingDurableSubscriberPolicy the pendingDurableSubscriberPolicy to set + */ + public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy durableSubscriberCursor){ + this.pendingDurableSubscriberPolicy=durableSubscriberCursor; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java index 6f3e1223ba..087b0b80a7 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java @@ -26,6 +26,7 @@ import javax.jms.JMSException; import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionId; @@ -61,60 +62,52 @@ public class TopicRegion extends AbstractRegion { } - public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception { - if (info.isDurable()) { - - ActiveMQDestination destination = info.getDestination(); - if( !destination.isPattern() ) { + public Subscription addConsumer(ConnectionContext context,ConsumerInfo info) throws Exception{ + if(info.isDurable()){ + ActiveMQDestination destination=info.getDestination(); + if(!destination.isPattern()){ // Make sure the destination is created. - lookup(context, destination); + lookup(context,destination); } - - String clientId = context.getClientId(); - String subcriptionName = info.getSubcriptionName(); - SubscriptionKey key = new SubscriptionKey(clientId, subcriptionName); - DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key); - if (sub != null) { - - if (sub.isActive()) { - throw new JMSException("Durable consumer is in use for client: " + clientId + " and subscriptionName: " + subcriptionName); + String clientId=context.getClientId(); + String subcriptionName=info.getSubcriptionName(); + SubscriptionKey key=new SubscriptionKey(clientId,subcriptionName); + DurableTopicSubscription sub=(DurableTopicSubscription)durableSubscriptions.get(key); + if(sub!=null){ + if(sub.isActive()){ + throw new JMSException("Durable consumer is in use for client: "+clientId+" and subscriptionName: " + +subcriptionName); } - // Has the selector changed?? - if (hasDurableSubChanged(info, sub.getConsumerInfo())) { - + if(hasDurableSubChanged(info,sub.getConsumerInfo())){ // Remove the consumer first then add it. durableSubscriptions.remove(key); - for (Iterator iter = destinations.values().iterator(); iter.hasNext();) { - Topic topic = (Topic) iter.next(); - topic.deleteSubscription(context, key); + for(Iterator iter=destinations.values().iterator();iter.hasNext();){ + Topic topic=(Topic)iter.next(); + topic.deleteSubscription(context,key); } - super.removeConsumer(context, sub.getConsumerInfo()); - - super.addConsumer(context, info); - sub = (DurableTopicSubscription) durableSubscriptions.get(key); - } - else { + super.removeConsumer(context,sub.getConsumerInfo()); + super.addConsumer(context,info); + sub=(DurableTopicSubscription)durableSubscriptions.get(key); + }else{ // Change the consumer id key of the durable sub. - if( sub.getConsumerInfo().getConsumerId()!=null ) + if(sub.getConsumerInfo().getConsumerId()!=null) subscriptions.remove(sub.getConsumerInfo().getConsumerId()); - subscriptions.put(info.getConsumerId(), sub); + subscriptions.put(info.getConsumerId(),sub); + } + }else{ + super.addConsumer(context,info); + sub=(DurableTopicSubscription)durableSubscriptions.get(key); + if(sub==null){ + throw new JMSException("Cannot use the same consumerId: "+info.getConsumerId() + +" for two different durable subscriptions clientID: "+key.getClientId() + +" subscriberName: "+key.getSubscriptionName()); } } - else { - super.addConsumer(context, info); - sub = (DurableTopicSubscription) durableSubscriptions.get(key); - if (sub == null) { - throw new JMSException("Cannot use the same consumerId: " + info.getConsumerId() + " for two different durable subscriptions clientID: " - + key.getClientId() + " subscriberName: " + key.getSubscriptionName()); - } - } - - sub.activate(context, info); + sub.activate(context,info); return sub; - } - else { - return super.addConsumer(context, info); + }else{ + return super.addConsumer(context,info); } } @@ -222,9 +215,12 @@ public class TopicRegion extends AbstractRegion { } SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubcriptionName()); DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key); - if (sub == null) { - sub = new DurableTopicSubscription(broker,context, info, keepDurableSubsActive); - durableSubscriptions.put(key, sub); + if(sub==null){ + PendingMessageCursor cursor=broker.getPendingDurableSubscriberPolicy().getSubscriberPendingMessageCursor( + context.getClientId(),info.getSubcriptionName(),broker.getTempDataStore(), + info.getPrefetchSize()); + sub=new DurableTopicSubscription(broker,context,info,keepDurableSubsActive,cursor); + durableSubscriptions.put(key,sub); } else { throw new JMSException("That durable subscription is already active."); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java index b13d7f19e0..72862f4dea 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursor.java @@ -1,19 +1,15 @@ /** * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with this - * work for additional information regarding copyright ownership. The ASF - * licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. */ package org.apache.activemq.broker.region.cursors; @@ -24,13 +20,13 @@ import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; /** - * Abstract method holder for pending message (messages awaiting disptach to a - * consumer) cursor + * Abstract method holder for pending message (messages awaiting disptach to a consumer) cursor * * @version $Revision$ */ -public class AbstractPendingMessageCursor implements PendingMessageCursor { - protected int maxBatchSize = 100; +public class AbstractPendingMessageCursor implements PendingMessageCursor{ + + protected int maxBatchSize=100; public void start() throws Exception{ } @@ -38,12 +34,10 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor { public void stop() throws Exception{ } - public void add(ConnectionContext context,Destination destination) - throws Exception{ + public void add(ConnectionContext context,Destination destination) throws Exception{ } - public void remove(ConnectionContext context,Destination destination) - throws Exception{ + public void remove(ConnectionContext context,Destination destination) throws Exception{ } public boolean isRecoveryRequired(){ @@ -80,7 +74,7 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor { public int size(){ return 0; } - + public int getMaxBatchSize(){ return maxBatchSize; } @@ -91,6 +85,11 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor { protected void fillBatch() throws Exception{ } - - + + /** + * Give the cursor a hint that we are about to remove messages from memory only + */ + public void resetForGC(){ + reset(); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index d7c0e9ce8b..a441c36708 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -24,7 +24,7 @@ import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.store.kahadaptor.CommandMarshaller; /** * perist pending messages pending message (messages awaiting disptach to a consumer) cursor - * + * * @version $Revision$ */ public class FilePendingMessageCursor extends AbstractPendingMessageCursor{ diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java index 44f66d2873..da13b0a021 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/PendingMessageCursor.java @@ -113,4 +113,10 @@ public interface PendingMessageCursor extends Service{ * @param maxBatchSize */ public void setMaxBatchSize(int maxBatchSize); + + /** + * Give the cursor a hint that we are about to remove + * messages from memory only + */ + public void resetForGC(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java index 49bbb43071..4ea74fac91 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/StoreDurableSubscriberCursor.java @@ -127,12 +127,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor{ return false; } - public synchronized void addMessageFirst(MessageReference node) throws IOException{ - if(started){ - throw new RuntimeException("This shouldn't be called!"); - } - } - + public synchronized void addMessageLast(MessageReference node) throws Exception{ if(node!=null){ Message msg=node.getMessage(); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java index 5f529d92e5..d3490effa5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/policy/PolicyEntry.java @@ -20,6 +20,7 @@ package org.apache.activemq.broker.region.policy; import org.apache.activemq.broker.region.Queue; import org.apache.activemq.broker.region.Topic; import org.apache.activemq.broker.region.TopicSubscription; +import org.apache.activemq.broker.region.cursors.PendingMessageCursor; import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory; import org.apache.activemq.broker.region.group.MessageGroupMapFactory; import org.apache.activemq.filter.DestinationMapEntry; @@ -46,6 +47,7 @@ public class PolicyEntry extends DestinationMapEntry { private MessageEvictionStrategy messageEvictionStrategy; private long memoryLimit; private MessageGroupMapFactory messageGroupMapFactory; + private PendingQueueMessageStoragePolicy pendingQueueMessageStoragePolicy; public void configure(Queue queue) { if (dispatchPolicy != null) { @@ -58,6 +60,10 @@ public class PolicyEntry extends DestinationMapEntry { if( memoryLimit>0 ) { queue.getUsageManager().setLimit(memoryLimit); } + if (pendingQueueMessageStoragePolicy != null) { + PendingMessageCursor messages = pendingQueueMessageStoragePolicy.getQueuePendingMessageCursor(); + queue.setMessages(messages); + } } public void configure(Topic topic) { @@ -74,6 +80,7 @@ public class PolicyEntry extends DestinationMapEntry { if( memoryLimit>0 ) { topic.getUsageManager().setLimit(memoryLimit); } + } public void configure(TopicSubscription subscription) { @@ -196,4 +203,20 @@ public class PolicyEntry extends DestinationMapEntry { } + /** + * @return the pendingQueueMessageStoragePolicy + */ + public PendingQueueMessageStoragePolicy getPendingQueueMessageStoragePolicy(){ + return this.pendingQueueMessageStoragePolicy; + } + + + /** + * @param pendingQueueMessageStoragePolicy the pendingQueueMessageStoragePolicy to set + */ + public void setPendingQueueMessageStoragePolicy(PendingQueueMessageStoragePolicy pendingQueueMessageStoragePolicy){ + this.pendingQueueMessageStoragePolicy=pendingQueueMessageStoragePolicy; + } + + } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java b/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java index e48d721adb..1122d968c2 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/StubBroker.java @@ -20,6 +20,7 @@ package org.apache.activemq.broker; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.Subscription; +import org.apache.activemq.broker.region.policy.PendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.BrokerId; import org.apache.activemq.command.BrokerInfo; @@ -218,6 +219,13 @@ public class StubBroker implements Broker { } public void stop() throws Exception { + } + + public PendingDurableSubscriberMessageStoragePolicy getPendingDurableSubscriberPolicy() { + return null; + } + + public void setPendingDurableSubscriberPolicy(PendingDurableSubscriberMessageStoragePolicy pendingDurableSubscriberPolicy) { } public Store getTempDataStore() { diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java index 9d8025572a..fedb6181fa 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/CursorDurableTest.java @@ -39,6 +39,7 @@ import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.StorePendingDurableSubscriberMessageStoragePolicy; import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -51,7 +52,7 @@ public class CursorDurableTest extends TestCase{ protected static final Log log = LogFactory.getLog(CursorDurableTest.class); - protected static final int MESSAGE_COUNT=50; + protected static final int MESSAGE_COUNT=100; protected static final int PREFETCH_SIZE = 5; protected BrokerService broker; protected String bindAddress="tcp://localhost:60706"; @@ -138,7 +139,10 @@ public class CursorDurableTest extends TestCase{ for (int i =MESSAGE_COUNT/10; i < MESSAGE_COUNT; i++) { TextMessage msg=session.createTextMessage("test"+i); senderList.add(msg); + producer.send(msg); + + } @@ -204,11 +208,13 @@ public class CursorDurableTest extends TestCase{ BrokerService answer=new BrokerService(); configureBroker(answer); answer.setDeleteAllMessagesOnStartup(true); + answer.setPendingDurableSubscriberPolicy(new StorePendingDurableSubscriberMessageStoragePolicy()); answer.start(); return answer; } protected void configureBroker(BrokerService answer) throws Exception{ + answer.addConnector(bindAddress); answer.setDeleteAllMessagesOnStartup(true); } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java index 745dd4eeda..4c2fb8c4e4 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/region/cursors/KahaCursorDurableTest.java @@ -35,6 +35,6 @@ public class KahaCursorDurableTest extends CursorDurableTest{ KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File("activemq-data/durableTest")); answer.setPersistenceAdapter(adaptor); answer.addConnector(bindAddress); - answer.setDeleteAllMessagesOnStartup(true); + //answer.setDeleteAllMessagesOnStartup(true); } }