diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java index 0798e742f0..7bdd90eeff 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java @@ -120,7 +120,9 @@ public class BrokerService implements Service, Serializable { private ObjectName brokerObjectName; private TaskRunnerFactory taskRunnerFactory; private TaskRunnerFactory persistenceTaskRunnerFactory; - private UsageManager memoryManager; + private UsageManager usageManager; + private UsageManager producerUsageManager; + private UsageManager consumerUsageManager; private PersistenceAdapter persistenceAdapter; private PersistenceAdapterFactory persistenceFactory; private DestinationFactory destinationFactory; @@ -621,18 +623,56 @@ public class BrokerService implements Service, Serializable { } public UsageManager getMemoryManager() { - if (memoryManager == null) { - memoryManager = new UsageManager(); - memoryManager.setLimit(1024 * 1024 * 20); // Default to 20 Meg + if (usageManager == null) { + usageManager = new UsageManager("Main"); + usageManager.setLimit(1024 * 1024 * 20); // Default to 20 Meg // limit } - return memoryManager; + return usageManager; } + public void setMemoryManager(UsageManager memoryManager) { - this.memoryManager = memoryManager; + this.usageManager = memoryManager; + } + + /** + * @return the consumerUsageManager + */ + public UsageManager getConsumerUsageManager(){ + if (consumerUsageManager==null) { + consumerUsageManager = new UsageManager(getMemoryManager(),"Consumer",0.5f); + } + return consumerUsageManager; } + + /** + * @param consumerUsageManager the consumerUsageManager to set + */ + public void setConsumerUsageManager(UsageManager consumerUsageManager){ + this.consumerUsageManager=consumerUsageManager; + } + + + /** + * @return the producerUsageManager + */ + public UsageManager getProducerUsageManager(){ + if (producerUsageManager==null) { + producerUsageManager = new UsageManager(getMemoryManager(),"Producer",0.45f); + } + return producerUsageManager; + } + + /** + * @param producerUsageManager the producerUsageManager to set + */ + public void setProducerUsageManager(UsageManager producerUsageManager){ + this.producerUsageManager=producerUsageManager; + } + + public PersistenceAdapter getPersistenceAdapter() throws IOException { if (persistenceAdapter == null) { persistenceAdapter = createPersistenceAdapter(); @@ -1272,7 +1312,7 @@ public class BrokerService implements Service, Serializable { protected Broker createRegionBroker() throws Exception { // we must start the persistence adaptor before we can create the region // broker - getPersistenceAdapter().setUsageManager(getMemoryManager()); + getPersistenceAdapter().setUsageManager(getProducerUsageManager()); getPersistenceAdapter().start(); DestinationInterceptor destinationInterceptor = null; @@ -1284,15 +1324,15 @@ public class BrokerService implements Service, Serializable { } RegionBroker regionBroker = null; if (destinationFactory == null) { - destinationFactory = new DestinationFactoryImpl(getMemoryManager(), getTaskRunnerFactory(), getPersistenceAdapter()); + destinationFactory = new DestinationFactoryImpl(getProducerUsageManager(), getTaskRunnerFactory(), getPersistenceAdapter()); } if (isUseJmx()) { MBeanServer mbeanServer = getManagementContext().getMBeanServer(); - regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(), getTaskRunnerFactory(), getMemoryManager(), + regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(), getTaskRunnerFactory(), getConsumerUsageManager(), destinationFactory, destinationInterceptor); } else { - regionBroker = new RegionBroker(this,getTaskRunnerFactory(), getMemoryManager(), destinationFactory, destinationInterceptor); + regionBroker = new RegionBroker(this,getTaskRunnerFactory(), getConsumerUsageManager(), destinationFactory, destinationInterceptor); } destinationFactory.setRegionBroker(regionBroker); @@ -1597,4 +1637,7 @@ public class BrokerService implements Service, Serializable { } LOCAL_HOST_NAME = localHostName; } + + + } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 9512c1f0a4..c25573a5de 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -96,7 +96,7 @@ public class Queue implements Destination, Task { public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory, Store tmpStore) throws Exception { this.destination = destination; - this.usageManager = new UsageManager(memoryManager); + this.usageManager = new UsageManager(memoryManager,destination.toString()); this.usageManager.setLimit(Long.MAX_VALUE); this.store = store; if(destination.isTemporary()){ @@ -455,6 +455,9 @@ public class Queue implements Destination, Task { public void start() throws Exception { started = true; + if (usageManager != null) { + usageManager.start(); + } messages.start(); doPageIn(false); } @@ -467,6 +470,9 @@ public class Queue implements Destination, Task { if(messages!=null){ messages.stop(); } + if (usageManager != null) { + usageManager.stop(); + } } // Properties diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java index 8238f80e49..71e8b6e4ed 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java @@ -74,7 +74,7 @@ public class Topic implements Destination { this.destination = destination; this.store = store; //this could be NULL! (If an advsiory) - this.usageManager = new UsageManager(memoryManager); + this.usageManager = new UsageManager(memoryManager,destination.toString()); this.usageManager.setLimit(Long.MAX_VALUE); // Let the store know what usage manager we are using so that he can flush messages to disk @@ -321,10 +321,17 @@ public class Topic implements Destination { public void start() throws Exception { this.subscriptionRecoveryPolicy.start(); + if (usageManager != null) { + usageManager.start(); + } + } public void stop() throws Exception { this.subscriptionRecoveryPolicy.stop(); + if (usageManager != null) { + usageManager.stop(); + } } public Message[] browse(){ diff --git a/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java b/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java index 33e2d220b1..411b7e2448 100755 --- a/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java +++ b/activemq-core/src/main/java/org/apache/activemq/memory/UsageManager.java @@ -18,7 +18,9 @@ package org.apache.activemq.memory; import java.util.Iterator; +import java.util.List; +import org.apache.activemq.Service; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -35,7 +37,7 @@ import java.util.concurrent.CopyOnWriteArrayList; * * @version $Revision: 1.3 $ */ -public class UsageManager { +public class UsageManager implements Service{ private static final Log log = LogFactory.getLog(UsageManager.class); @@ -55,9 +57,12 @@ public class UsageManager { /** True if someone called setSendFailIfNoSpace() on this particular usage manager */ private boolean sendFailIfNoSpaceExplicitySet; private final boolean debug = log.isDebugEnabled(); + private String name = ""; + private float usagePortion = 1.0f; + private List children = new CopyOnWriteArrayList(); public UsageManager() { - this(null); + this(null,"default"); } /** @@ -68,7 +73,25 @@ public class UsageManager { * @param parent */ public UsageManager(UsageManager parent) { + this(parent,"default"); + } + + public UsageManager(String name) { + this(null,name); + } + + public UsageManager(UsageManager parent,String name) { + this(parent,name,1.0f); + } + + public UsageManager(UsageManager parent, String name, float portion) { this.parent = parent; + this.usagePortion=portion; + if (parent != null) { + this.limit=(long)(parent.limit * portion); + this.name= parent.name + ":"; + } + this.name += name; } /** @@ -91,9 +114,6 @@ public class UsageManager { for( int i=0; percentUsage >= 100 ; i++) { usageMutex.wait(); } - for( int i=0; percentUsage > 90 ; i++) { - usageMutex.wait(100); - } } } @@ -166,11 +186,14 @@ public class UsageManager { throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0"); } int percentUsage; - synchronized (usageMutex) { - this.limit = limit; - percentUsage = caclPercentUsage(); + synchronized(usageMutex){ + this.limit=parent!=null?(long)(parent.limit*usagePortion):limit; + percentUsage=caclPercentUsage(); } setPercentUsage(percentUsage); + for (UsageManager child:children) { + child.setLimit(limit); + } } /* @@ -259,8 +282,35 @@ public class UsageManager { l.onMemoryUseChanged(this,oldPercentUsage,newPercentUsage); } } + + public String getName() { + return name; + } - public String toString() { - return "UsageManager: percentUsage="+percentUsage+"%, usage="+usage+" limit="+limit+" percentUsageMinDelta="+percentUsageMinDelta+"%"; + public String toString(){ + + + return "UsageManager("+ getName() +") percentUsage="+percentUsage+"%, usage="+usage+" limit="+limit+" percentUsageMinDelta=" + +percentUsageMinDelta+"%"; + } + + public void start(){ + if(parent!=null){ + parent.addChild(this); + } + } + + public void stop(){ + if(parent!=null){ + parent.removeChild(this); + } + } + + private void addChild(UsageManager child){ + children.add(child); + } + + private void removeChild(UsageManager child){ + children.remove(child); } }