From 18d616c8db14f15d4b30fba51050c7c86b9890b3 Mon Sep 17 00:00:00 2001 From: "Hiram R. Chirino" Date: Thu, 30 Mar 2006 00:36:21 +0000 Subject: [PATCH] - Since we now have per destination usage managers, I updated the MessageStore API so that each destination's store can be maded aware of the manager for that store. Some stores like the journal hold messages around and use the usage manager to know when it should flush to disk. - Moved alot of the message reference counting logic from the PrefetchSubscription up to it's subclasses since they all seem to do it just a slightly different way. I think it makes easier to see now how the usage manager is affected by operations. - I fixed and verifed that the keepDurableSubsActive=true option actually works. I think we should make this the default setting since it make recovery safer. Once we have a better spooling implementation we can turn if off again. git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@389941 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/activemq/broker/BrokerService.java | 2 +- .../broker/jmx/ManagedRegionBroker.java | 11 ++-- .../broker/jmx/ManagedTopicRegion.java | 1 - .../region/DurableTopicSubscription.java | 40 ++++++------- .../broker/region/PrefetchSubscription.java | 53 +++++++++--------- .../apache/activemq/broker/region/Queue.java | 7 ++- .../region/QueueBrowserSubscription.java | 11 +++- .../broker/region/QueueSubscription.java | 56 +++++++++++++++++-- .../apache/activemq/broker/region/Topic.java | 7 +++ .../activemq/broker/region/TopicRegion.java | 7 ++- .../apache/activemq/memory/UsageManager.java | 2 +- .../DefaultPersistenceAdapterFactory.java | 17 +----- .../apache/activemq/store/MessageStore.java | 8 ++- .../activemq/store/PersistenceAdapter.java | 7 +++ .../activemq/store/ProxyMessageStore.java | 5 ++ .../store/ProxyTopicMessageStore.java | 5 ++ .../activemq/store/jdbc/JDBCMessageStore.java | 4 ++ .../store/jdbc/JDBCPersistenceAdapter.java | 7 +++ .../store/journal/JournalMessageStore.java | 13 +++++ .../journal/JournalPersistenceAdapter.java | 14 ++++- .../journal/QuickJournalMessageStore.java | 12 ++++ .../QuickJournalPersistenceAdapter.java | 14 ++++- .../store/kahadaptor/KahaMessageStore.java | 8 +++ .../kahadaptor/KahaPersistentAdaptor.java | 7 +++ .../store/memory/MemoryMessageStore.java | 7 +++ .../memory/MemoryPersistenceAdapter.java | 6 ++ .../policy/StrictOrderDispatchPolicyTest.java | 5 +- 27 files changed, 250 insertions(+), 86 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 47f9a48e0a..de989baf38 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -900,6 +900,7 @@ public class BrokerService implements Service { protected Broker createRegionBroker() throws Exception { // we must start the persistence adaptor before we can create the region // broker + getPersistenceAdapter().setUsageManager(getMemoryManager()); getPersistenceAdapter().start(); RegionBroker regionBroker = null; if (isUseJmx()) { @@ -947,7 +948,6 @@ public class BrokerService implements Service { protected DefaultPersistenceAdapterFactory createPersistenceFactory() { DefaultPersistenceAdapterFactory factory = new DefaultPersistenceAdapterFactory(); - factory.setMemManager(getMemoryManager()); factory.setDataDirectory(getDataDirectory()); factory.setTaskRunnerFactory(getTaskRunnerFactory()); return factory; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java index ebde08e366..dcefb6cdf1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.Map.Entry; + import javax.management.InstanceNotFoundException; import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; @@ -33,6 +34,7 @@ import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; + import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; @@ -60,6 +62,7 @@ import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.SubscriptionKey; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; + import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet; @@ -160,11 +163,11 @@ public class ManagedRegionBroker extends RegionBroker{ String name=""; SubscriptionKey key=new SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubcriptionName()); if(sub.getConsumerInfo().isDurable()){ - name=key.toString(); - } - if(sub.getConsumerInfo()!=null&&sub.getConsumerInfo().getConsumerId()!=null){ - name+="."+sub.getConsumerInfo().getConsumerId(); + name = key.toString(); + } else { + name = sub.getConsumerInfo().getConsumerId().toString(); } + try{ ObjectName objectName=new ObjectName(brokerObjectName.getDomain()+":"+"BrokerName="+map.get("BrokerName") +","+"Type=Subscription,"+"active=true,"+"name="+JMXSupport.encodeObjectNamePart(name)+""); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java index 51d47cc87e..68aec1f7ea 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTopicRegion.java @@ -24,7 +24,6 @@ import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationStatistics; import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.TopicRegion; -import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.memory.UsageManager; diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java index 0b8c097798..4d1a9b17fb 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java @@ -16,7 +16,10 @@ */ package org.apache.activemq.broker.region; -import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; +import java.io.IOException; +import java.util.Iterator; + +import javax.jms.InvalidSelectorException; import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; @@ -26,10 +29,7 @@ import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.util.SubscriptionKey; -import javax.jms.InvalidSelectorException; - -import java.io.IOException; -import java.util.Iterator; +import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; public class DurableTopicSubscription extends PrefetchSubscription { @@ -103,14 +103,21 @@ public class DurableTopicSubscription extends PrefetchSubscription { } else { redeliveredMessages.put(node.getMessageId(), new Integer(1)); } - - iter.remove(); + if( keepDurableSubsActive ) { + pending.addFirst(node); + } else { + node.decrementReferenceCount(); + iter.remove(); + } + } + + if( !keepDurableSubsActive ) { + for (Iterator iter = pending.iterator(); iter.hasNext();) { + MessageReference node = (MessageReference) iter.next(); + node.decrementReferenceCount(); + iter.remove(); + } } - for (Iterator iter = pending.iterator(); iter.hasNext();) { - MessageReference node = (MessageReference) iter.next(); - // node.decrementTargetCount(); - iter.remove(); - } prefetchExtension=0; } @@ -127,9 +134,8 @@ public class DurableTopicSubscription extends PrefetchSubscription { if( !active && !keepDurableSubsActive ) { return; } - node = new IndirectMessageReference(node.getRegionDestination(), (Message) node); + node.incrementReferenceCount(); super.add(node); - node.decrementReferenceCount(); } public int getPendingQueueSize() { @@ -148,14 +154,10 @@ public class DurableTopicSubscription extends PrefetchSubscription { return active; } - public synchronized void acknowledge(ConnectionContext context, MessageAck ack) throws Exception { - super.acknowledge(context, ack); - } - protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException { node.getRegionDestination().acknowledge(context, this, ack, node); redeliveredMessages.remove(node.getMessageId()); - ((IndirectMessageReference)node).drop(); + node.decrementReferenceCount(); } public String getSubscriptionName() { diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java index fe90a97d8c..e153e36655 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java @@ -79,7 +79,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ try{ MessageDispatch md=createMessageDispatch(node,node.getMessage()); dispatched.addLast(node); - node.decrementReferenceCount(); }catch(Exception e){ log.error("Problem processing MessageDispatchNotification: "+mdn,e); } @@ -166,22 +165,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ inAckRange=true; } if(inAckRange){ - // Send the message to the DLQ - node.incrementReferenceCount(); - try{ - Message message=node.getMessage(); - if(message!=null){ - // The original destination and transaction id do not get filled when the message is first - // sent, - // it is only populated if the message is routed to another destination like the DLQ - DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy(); - ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message.getDestination()); - BrokerSupport.resend(context, message, deadLetterDestination); - - } - }finally{ - node.decrementReferenceCount(); - } + sendToDLQ(context, node); iter.remove(); dequeueCounter++; index++; @@ -200,6 +184,26 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ throw new JMSException("Invalid acknowledgment: "+ack); } + /** + * @param context + * @param node + * @throws IOException + * @throws Exception + */ + protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception { + // Send the message to the DLQ + Message message=node.getMessage(); + if(message!=null){ + // The original destination and transaction id do not get filled when the message is first + // sent, + // it is only populated if the message is routed to another destination like the DLQ + DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy(); + ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message.getDestination()); + BrokerSupport.resend(context, message, deadLetterDestination); + + } + } + protected boolean isFull(){ return dispatched.size()-prefetchExtension>=info.getPrefetchSize(); } @@ -240,11 +244,10 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ } } - private void dispatch(final MessageReference node) throws IOException{ - node.incrementReferenceCount(); + protected boolean dispatch(final MessageReference node) throws IOException{ final Message message=node.getMessage(); if(message==null){ - return; + return false; } // Make sure we can dispatch a message. if(canDispatch(node)&&!isSlaveBroker()){ @@ -264,16 +267,14 @@ abstract public class PrefetchSubscription extends AbstractSubscription{ context.getConnection().dispatchSync(md); onDispatch(node,message); } - // The onDispatch() does the node.decrementReferenceCount(); - }else{ - // We were not allowed to dispatch that message (an other consumer grabbed it before we did) - node.decrementReferenceCount(); + return true; + } else { + return false; } } - synchronized private void onDispatch(final MessageReference node,final Message message){ + synchronized protected void onDispatch(final MessageReference node,final Message message){ boolean wasFull=isFull(); - node.decrementReferenceCount(); if(node.getRegionDestination()!=null){ node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message); context.getConnection().getStatistics().onMessageDequeue(message); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index b8548bb55d..8f83617b66 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -81,9 +81,14 @@ public class Queue implements Destination { this.destination = destination; this.usageManager = new UsageManager(memoryManager); this.usageManager.setLimit(Long.MAX_VALUE); - this.store = store; + // Let the store know what usage manager we are using so that he can flush messages to disk + // when usage gets high. + if( store!=null ) { + store.setUsageManager(usageManager); + } + destinationStatistics.setParent(parentStats); this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName()); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java index 69409e9b37..a643b5fe36 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueBrowserSubscription.java @@ -24,10 +24,11 @@ import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.filter.MessageEvaluationContext; -public class QueueBrowserSubscription extends PrefetchSubscription { +public class QueueBrowserSubscription extends QueueSubscription { boolean browseDone; @@ -65,7 +66,15 @@ public class QueueBrowserSubscription extends PrefetchSubscription { return super.createMessageDispatch(node, message); } } + public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException { return !browseDone && super.matches(node, context); } + + /** + * Since we are a browser we don't really remove the message from the queue. + */ + protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException { + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java index 00e2992417..63ae6f160e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/QueueSubscription.java @@ -21,6 +21,7 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.group.MessageGroupMap; import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.transaction.Synchronization; @@ -33,11 +34,7 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner public QueueSubscription(Broker broker,ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { super(broker,context, info); } - - public void add(MessageReference node) throws Exception { - super.add(node); - } - + /** * In the queue case, mark the node as dropped and then a gc cycle will remove it from * the queue. @@ -138,4 +135,53 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner return info.isExclusive(); } + /** + * Override so that the message ref count is > 0 only when the message is being dispatched + * to a client. Keeping it at 0 when it is in the pending list allows the message to be swapped out + * to disk. + * + * @return true if the message was dispatched. + */ + protected boolean dispatch(MessageReference node) throws IOException { + boolean rc = false; + // This brings the message into memory if it was swapped out. + node.incrementReferenceCount(); + try { + rc = super.dispatch(node); + } finally { + // If the message was dispatched, it could be getting dispatched async, so we + // can only drop the reference count when that completes @see onDispatch + if( !rc ) { + node.incrementReferenceCount(); + } + } + return rc; + } + + /** + * OK Message was transmitted, we can now drop the reference count. + * + * @see org.apache.activemq.broker.region.PrefetchSubscription#onDispatch(org.apache.activemq.broker.region.MessageReference, org.apache.activemq.command.Message) + */ + protected void onDispatch(MessageReference node, Message message) { + // Now that the message has been sent over the wire to the client, + // we can let it get swapped out. + node.decrementReferenceCount(); + super.onDispatch(node, message); + } + + /** + * Sending a message to the DQL will require us to increment the ref count so we can get at the content. + */ + protected void sendToDLQ(ConnectionContext context, MessageReference node) throws IOException, Exception { + // This brings the message into memory if it was swapped out. + node.incrementReferenceCount(); + try{ + super.sendToDLQ(context, node); + } finally { + // This let's the message be swapped out of needed. + node.decrementReferenceCount(); + } + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index d89101fe9e..d5236abde4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -75,6 +75,13 @@ public class Topic implements Destination { this.store = store; this.usageManager = new UsageManager(memoryManager); this.usageManager.setLimit(Long.MAX_VALUE); + + // Let the store know what usage manager we are using so that he can flush messages to disk + // when usage gets high. + if( store!=null ) { + store.setUsageManager(usageManager); + } + this.destinationStatistics.setParent(parentStats); } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java index e58524536d..71223e3b6d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicRegion.java @@ -171,10 +171,13 @@ public class TopicRegion extends AbstractRegion { DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key); ConsumerInfo consumerInfo = createInactiveConsumerInfo(info); if( sub == null ) { - sub = (DurableTopicSubscription) createSubscription(context, consumerInfo ); + ConnectionContext c = new ConnectionContext(); + c.setBroker(context.getBroker()); + c.setClientId(key.getClientId()); + c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId()); + sub = (DurableTopicSubscription) createSubscription(c, consumerInfo ); } - subscriptions.put(consumerInfo.getConsumerId(), sub); topic.addSubscription(context, sub); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java b/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java index c5ac58f12f..0fe3618bb0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java +++ b/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java @@ -43,7 +43,7 @@ public class UsageManager { private long usage; private int percentUsage; - private int percentUsageMinDelta=10; + private int percentUsageMinDelta=1; private final Object usageMutex = new Object(); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java b/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java index ef7da9d06b..3267b76b7c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/DefaultPersistenceAdapterFactory.java @@ -23,7 +23,6 @@ import javax.sql.DataSource; import org.apache.activeio.journal.Journal; import org.apache.activeio.journal.active.JournalImpl; -import org.apache.activemq.memory.UsageManager; import org.apache.activemq.store.jdbc.JDBCAdapter; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.jdbc.Statements; @@ -42,7 +41,6 @@ public class DefaultPersistenceAdapterFactory implements PersistenceAdapterFacto private int journalLogFileSize = 1024*1024*20; private int journalLogFiles = 2; private File dataDirectory; - private UsageManager memManager; private DataSource dataSource; private TaskRunnerFactory taskRunnerFactory; private Journal journal; @@ -60,9 +58,9 @@ public class DefaultPersistenceAdapterFactory implements PersistenceAdapterFacto // Setup the Journal if( useQuickJournal ) { - return new QuickJournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getMemManager(), getTaskRunnerFactory()); + return new QuickJournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory()); } else { - return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getMemManager(), getTaskRunnerFactory()); + return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory()); } } @@ -92,17 +90,6 @@ public class DefaultPersistenceAdapterFactory implements PersistenceAdapterFacto public void setJournalLogFileSize(int journalLogFileSize) { this.journalLogFileSize = journalLogFileSize; } - - public UsageManager getMemManager() { - if( memManager==null ) { - memManager = new UsageManager(); - } - return memManager; - } - - public void setMemManager(UsageManager memManager) { - this.memManager = memManager; - } public DataSource getDataSource() throws IOException { if (dataSource == null) { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java index ad531044e7..36098ad7cd 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/MessageStore.java @@ -24,6 +24,7 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; +import org.apache.activemq.memory.UsageManager; /** * Represents a message store which is used by the persistent {@link org.apache.activemq.service.MessageContainer} @@ -92,5 +93,10 @@ public interface MessageStore extends Service { * @return */ public ActiveMQDestination getDestination(); - + + /** + * @param usageManager The UsageManager that is controlling the destination's memory usage. + */ + public void setUsageManager(UsageManager usageManager); + } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java index dd9f171378..726d5f1655 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java @@ -20,6 +20,7 @@ import org.apache.activemq.Service; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.memory.UsageManager; import java.io.IOException; import java.util.Set; @@ -97,5 +98,11 @@ public interface PersistenceAdapter extends Service { public boolean isUseExternalMessageReferences(); public void setUseExternalMessageReferences(boolean enable); + + /** + * @param usageManager The UsageManager that is controlling the broker's memory usage. + */ + public void setUsageManager(UsageManager usageManager); + } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java index 51cc4e97a5..61e7a53029 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/ProxyMessageStore.java @@ -23,6 +23,7 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; +import org.apache.activemq.memory.UsageManager; /** * A simple proxy that delegates to another MessageStore. @@ -71,4 +72,8 @@ public class ProxyMessageStore implements MessageStore { public String getMessageReference(MessageId identity) throws IOException { return delegate.getMessageReference(identity); } + + public void setUsageManager(UsageManager usageManager) { + delegate.setUsageManager(usageManager); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java index 3e503a7186..a1f949ee26 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/ProxyTopicMessageStore.java @@ -24,6 +24,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.command.SubscriptionInfo; +import org.apache.activemq.memory.UsageManager; /** * A simple proxy that delegates to another MessageStore. @@ -94,4 +95,8 @@ public class ProxyTopicMessageStore implements TopicMessageStore { public SubscriptionInfo[] getAllSubscriptions() throws IOException { return delegate.getAllSubscriptions(); } + + public void setUsageManager(UsageManager usageManager) { + delegate.setUsageManager(usageManager); + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java index a51af87d1c..2a84094c14 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCMessageStore.java @@ -27,6 +27,7 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; +import org.apache.activemq.memory.UsageManager; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; import org.apache.activemq.util.IOExceptionSupport; @@ -196,4 +197,7 @@ public class JDBCMessageStore implements MessageStore { return destination; } + public void setUsageManager(UsageManager usageManager) { + // we can ignore since we don't buffer up messages. + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java index 4f20b58f2e..893495d13b 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java @@ -28,6 +28,7 @@ import org.apache.activeio.util.FactoryFinder; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.memory.UsageManager; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; @@ -357,4 +358,10 @@ public class JDBCPersistenceAdapter implements PersistenceAdapter { this.statements = statements; } + /** + * @param usageManager The UsageManager that is controlling the destination's memory usage. + */ + public void setUsageManager(UsageManager usageManager) { + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java index 098db8939f..80c26e0e9a 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalMessageStore.java @@ -30,6 +30,7 @@ import org.apache.activemq.command.JournalQueueAck; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; +import org.apache.activemq.memory.UsageManager; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; @@ -62,6 +63,8 @@ public class JournalMessageStore implements MessageStore { protected RecordLocation lastLocation; protected HashSet inFlightTxLocations = new HashSet(); + + private UsageManager usageManager; public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) { this.peristenceAdapter = adapter; @@ -70,6 +73,12 @@ public class JournalMessageStore implements MessageStore { this.destination = destination; this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext()); } + + public void setUsageManager(UsageManager usageManager) { + this.usageManager = usageManager; + longTermStore.setUsageManager(usageManager); + } + /** * Not synchronized since the Journal has better throughput if you increase @@ -334,11 +343,15 @@ public class JournalMessageStore implements MessageStore { } public void start() throws Exception { + if( this.usageManager != null ) + this.usageManager.addUsageListener(peristenceAdapter); longTermStore.start(); } public void stop() throws Exception { longTermStore.stop(); + if( this.usageManager != null ) + this.usageManager.removeUsageListener(peristenceAdapter); } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java index 9b9dbaed89..53def4570e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java @@ -81,13 +81,13 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve private final Journal journal; private final PersistenceAdapter longTermPersistence; - final UsageManager usageManager; private final WireFormat wireFormat = new OpenWireFormat(); private final ConcurrentHashMap queues = new ConcurrentHashMap(); private final ConcurrentHashMap topics = new ConcurrentHashMap(); + private UsageManager usageManager; private long checkpointInterval = 1000 * 60 * 5; private long lastCheckpointRequest = System.currentTimeMillis(); private long lastCleanup = System.currentTimeMillis(); @@ -111,7 +111,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve } }; - public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, UsageManager memManager, TaskRunnerFactory taskRunnerFactory) throws IOException { + public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException { this.journal = journal; journal.setJournalEventListener(this); @@ -123,7 +123,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve }); this.longTermPersistence = longTermPersistence; - this.usageManager = memManager; + } + + /** + * @param usageManager The UsageManager that is controlling the destination's memory usage. + */ + public void setUsageManager(UsageManager usageManager) { + this.usageManager = usageManager; + longTermPersistence.setUsageManager(usageManager); } public Set getDestinations() { @@ -216,6 +223,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve public void stop() throws Exception { + this.usageManager.removeUsageListener(this); if( !started.compareAndSet(true, false) ) return; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java index a27fed0703..e40e5ea0d4 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalMessageStore.java @@ -31,6 +31,7 @@ import org.apache.activemq.command.JournalQueueAck; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; +import org.apache.activemq.memory.UsageManager; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; @@ -63,6 +64,8 @@ public class QuickJournalMessageStore implements MessageStore { protected RecordLocation lastLocation; protected HashSet inFlightTxLocations = new HashSet(); + + private UsageManager usageManager; public QuickJournalMessageStore(QuickJournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) { this.peristenceAdapter = adapter; @@ -71,6 +74,11 @@ public class QuickJournalMessageStore implements MessageStore { this.destination = destination; this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext()); } + + public void setUsageManager(UsageManager usageManager) { + this.usageManager = usageManager; + longTermStore.setUsageManager(usageManager); + } /** * Not synchronized since the Journal has better throughput if you increase @@ -368,11 +376,15 @@ public class QuickJournalMessageStore implements MessageStore { } public void start() throws Exception { + if( this.usageManager != null ) + this.usageManager.addUsageListener(peristenceAdapter); longTermStore.start(); } public void stop() throws Exception { longTermStore.stop(); + if( this.usageManager != null ) + this.usageManager.removeUsageListener(peristenceAdapter); } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java index 35250d35b0..cdb9238b17 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/QuickJournalPersistenceAdapter.java @@ -81,13 +81,13 @@ public class QuickJournalPersistenceAdapter implements PersistenceAdapter, Journ private final Journal journal; private final PersistenceAdapter longTermPersistence; - final UsageManager usageManager; private final WireFormat wireFormat = new OpenWireFormat(); private final ConcurrentHashMap queues = new ConcurrentHashMap(); private final ConcurrentHashMap topics = new ConcurrentHashMap(); + private UsageManager usageManager; private long checkpointInterval = 1000 * 60 * 5; private long lastCheckpointRequest = System.currentTimeMillis(); private long lastCleanup = System.currentTimeMillis(); @@ -111,7 +111,7 @@ public class QuickJournalPersistenceAdapter implements PersistenceAdapter, Journ } }; - public QuickJournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, UsageManager memManager, TaskRunnerFactory taskRunnerFactory) throws IOException { + public QuickJournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException { this.journal = journal; journal.setJournalEventListener(this); @@ -123,7 +123,14 @@ public class QuickJournalPersistenceAdapter implements PersistenceAdapter, Journ }); this.longTermPersistence = longTermPersistence; - this.usageManager = memManager; + } + + /** + * @param usageManager The UsageManager that is controlling the destination's memory usage. + */ + public void setUsageManager(UsageManager usageManager) { + this.usageManager = usageManager; + longTermPersistence.setUsageManager(usageManager); } public Set getDestinations() { @@ -216,6 +223,7 @@ public class QuickJournalPersistenceAdapter implements PersistenceAdapter, Journ public void stop() throws Exception { + this.usageManager.removeUsageListener(this); if( !started.compareAndSet(true, false) ) return; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java index 54090deff0..ad33bb4709 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaMessageStore.java @@ -22,6 +22,7 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; import org.apache.activemq.kaha.MapContainer; +import org.apache.activemq.memory.UsageManager; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; /** @@ -90,4 +91,11 @@ public class KahaMessageStore implements MessageStore{ public void delete(){ messageContainer.clear(); } + + /** + * @param usageManager The UsageManager that is controlling the destination's memory usage. + */ + public void setUsageManager(UsageManager usageManager) { + } + } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java index 659a84d83a..318c409ef3 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java @@ -28,6 +28,7 @@ import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.StoreFactory; import org.apache.activemq.kaha.StringMarshaller; +import org.apache.activemq.memory.UsageManager; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; @@ -150,4 +151,10 @@ public class KahaPersistentAdaptor implements PersistenceAdapter{ container.load(); return container; } + + /** + * @param usageManager The UsageManager that is controlling the broker's memory usage. + */ + public void setUsageManager(UsageManager usageManager) { + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java index eeafaaa274..900f4c9eef 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryMessageStore.java @@ -27,6 +27,7 @@ import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageId; +import org.apache.activemq.memory.UsageManager; import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageStore; @@ -103,5 +104,11 @@ public class MemoryMessageStore implements MessageStore { public void delete() { messageTable.clear(); } + + /** + * @param usageManager The UsageManager that is controlling the destination's memory usage. + */ + public void setUsageManager(UsageManager usageManager) { + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java index e63b234aaf..dd9e3062d0 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java @@ -25,6 +25,7 @@ import java.util.Set; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.memory.UsageManager; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.TopicMessageStore; @@ -147,4 +148,9 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { return null; } + /** + * @param usageManager The UsageManager that is controlling the broker's memory usage. + */ + public void setUsageManager(UsageManager usageManager) { + } } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java index 3a08418d15..9d8d714d92 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/policy/StrictOrderDispatchPolicyTest.java @@ -16,6 +16,8 @@ */ package org.apache.activemq.broker.policy; +import java.util.Iterator; + import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TopicSubscriptionTest; import org.apache.activemq.broker.region.policy.PolicyEntry; @@ -23,9 +25,6 @@ import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy; import org.apache.activemq.util.MessageIdList; -import java.util.List; -import java.util.Iterator; - public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest { protected BrokerService createBroker() throws Exception {