Have separate child Usage Managers - one for producers, one for consumers.

This has been done so that paged-in messages do not have to contend for memory
with fast producers

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@512637 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-02-28 07:17:55 +00:00
parent 6b38f803bc
commit 14c605feab
4 changed files with 128 additions and 22 deletions

View File

@ -120,7 +120,9 @@ public class BrokerService implements Service, Serializable {
private ObjectName brokerObjectName;
private TaskRunnerFactory taskRunnerFactory;
private TaskRunnerFactory persistenceTaskRunnerFactory;
private UsageManager memoryManager;
private UsageManager usageManager;
private UsageManager producerUsageManager;
private UsageManager consumerUsageManager;
private PersistenceAdapter persistenceAdapter;
private PersistenceAdapterFactory persistenceFactory;
private DestinationFactory destinationFactory;
@ -621,18 +623,56 @@ public class BrokerService implements Service, Serializable {
}
public UsageManager getMemoryManager() {
if (memoryManager == null) {
memoryManager = new UsageManager();
memoryManager.setLimit(1024 * 1024 * 20); // Default to 20 Meg
if (usageManager == null) {
usageManager = new UsageManager("Main");
usageManager.setLimit(1024 * 1024 * 20); // Default to 20 Meg
// limit
}
return memoryManager;
return usageManager;
}
public void setMemoryManager(UsageManager memoryManager) {
this.memoryManager = memoryManager;
this.usageManager = memoryManager;
}
/**
* @return the consumerUsageManager
*/
public UsageManager getConsumerUsageManager(){
if (consumerUsageManager==null) {
consumerUsageManager = new UsageManager(getMemoryManager(),"Consumer",0.5f);
}
return consumerUsageManager;
}
/**
* @param consumerUsageManager the consumerUsageManager to set
*/
public void setConsumerUsageManager(UsageManager consumerUsageManager){
this.consumerUsageManager=consumerUsageManager;
}
/**
* @return the producerUsageManager
*/
public UsageManager getProducerUsageManager(){
if (producerUsageManager==null) {
producerUsageManager = new UsageManager(getMemoryManager(),"Producer",0.45f);
}
return producerUsageManager;
}
/**
* @param producerUsageManager the producerUsageManager to set
*/
public void setProducerUsageManager(UsageManager producerUsageManager){
this.producerUsageManager=producerUsageManager;
}
public PersistenceAdapter getPersistenceAdapter() throws IOException {
if (persistenceAdapter == null) {
persistenceAdapter = createPersistenceAdapter();
@ -1272,7 +1312,7 @@ public class BrokerService implements Service, Serializable {
protected Broker createRegionBroker() throws Exception {
// we must start the persistence adaptor before we can create the region
// broker
getPersistenceAdapter().setUsageManager(getMemoryManager());
getPersistenceAdapter().setUsageManager(getProducerUsageManager());
getPersistenceAdapter().start();
DestinationInterceptor destinationInterceptor = null;
@ -1284,15 +1324,15 @@ public class BrokerService implements Service, Serializable {
}
RegionBroker regionBroker = null;
if (destinationFactory == null) {
destinationFactory = new DestinationFactoryImpl(getMemoryManager(), getTaskRunnerFactory(), getPersistenceAdapter());
destinationFactory = new DestinationFactoryImpl(getProducerUsageManager(), getTaskRunnerFactory(), getPersistenceAdapter());
}
if (isUseJmx()) {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(), getTaskRunnerFactory(), getMemoryManager(),
regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(), getTaskRunnerFactory(), getConsumerUsageManager(),
destinationFactory, destinationInterceptor);
}
else {
regionBroker = new RegionBroker(this,getTaskRunnerFactory(), getMemoryManager(), destinationFactory, destinationInterceptor);
regionBroker = new RegionBroker(this,getTaskRunnerFactory(), getConsumerUsageManager(), destinationFactory, destinationInterceptor);
}
destinationFactory.setRegionBroker(regionBroker);
@ -1597,4 +1637,7 @@ public class BrokerService implements Service, Serializable {
}
LOCAL_HOST_NAME = localHostName;
}
}

View File

@ -96,7 +96,7 @@ public class Queue implements Destination, Task {
public Queue(ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
this.destination = destination;
this.usageManager = new UsageManager(memoryManager);
this.usageManager = new UsageManager(memoryManager,destination.toString());
this.usageManager.setLimit(Long.MAX_VALUE);
this.store = store;
if(destination.isTemporary()){
@ -455,6 +455,9 @@ public class Queue implements Destination, Task {
public void start() throws Exception {
started = true;
if (usageManager != null) {
usageManager.start();
}
messages.start();
doPageIn(false);
}
@ -467,6 +470,9 @@ public class Queue implements Destination, Task {
if(messages!=null){
messages.stop();
}
if (usageManager != null) {
usageManager.stop();
}
}
// Properties

View File

@ -74,7 +74,7 @@ public class Topic implements Destination {
this.destination = destination;
this.store = store; //this could be NULL! (If an advsiory)
this.usageManager = new UsageManager(memoryManager);
this.usageManager = new UsageManager(memoryManager,destination.toString());
this.usageManager.setLimit(Long.MAX_VALUE);
// Let the store know what usage manager we are using so that he can flush messages to disk
@ -321,10 +321,17 @@ public class Topic implements Destination {
public void start() throws Exception {
this.subscriptionRecoveryPolicy.start();
if (usageManager != null) {
usageManager.start();
}
}
public void stop() throws Exception {
this.subscriptionRecoveryPolicy.stop();
if (usageManager != null) {
usageManager.stop();
}
}
public Message[] browse(){

View File

@ -18,7 +18,9 @@
package org.apache.activemq.memory;
import java.util.Iterator;
import java.util.List;
import org.apache.activemq.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -35,7 +37,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
*
* @version $Revision: 1.3 $
*/
public class UsageManager {
public class UsageManager implements Service{
private static final Log log = LogFactory.getLog(UsageManager.class);
@ -55,9 +57,12 @@ public class UsageManager {
/** True if someone called setSendFailIfNoSpace() on this particular usage manager */
private boolean sendFailIfNoSpaceExplicitySet;
private final boolean debug = log.isDebugEnabled();
private String name = "";
private float usagePortion = 1.0f;
private List<UsageManager> children = new CopyOnWriteArrayList<UsageManager>();
public UsageManager() {
this(null);
this(null,"default");
}
/**
@ -68,7 +73,25 @@ public class UsageManager {
* @param parent
*/
public UsageManager(UsageManager parent) {
this(parent,"default");
}
public UsageManager(String name) {
this(null,name);
}
public UsageManager(UsageManager parent,String name) {
this(parent,name,1.0f);
}
public UsageManager(UsageManager parent, String name, float portion) {
this.parent = parent;
this.usagePortion=portion;
if (parent != null) {
this.limit=(long)(parent.limit * portion);
this.name= parent.name + ":";
}
this.name += name;
}
/**
@ -91,9 +114,6 @@ public class UsageManager {
for( int i=0; percentUsage >= 100 ; i++) {
usageMutex.wait();
}
for( int i=0; percentUsage > 90 ; i++) {
usageMutex.wait(100);
}
}
}
@ -166,11 +186,14 @@ public class UsageManager {
throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0");
}
int percentUsage;
synchronized (usageMutex) {
this.limit = limit;
percentUsage = caclPercentUsage();
synchronized(usageMutex){
this.limit=parent!=null?(long)(parent.limit*usagePortion):limit;
percentUsage=caclPercentUsage();
}
setPercentUsage(percentUsage);
for (UsageManager child:children) {
child.setLimit(limit);
}
}
/*
@ -259,8 +282,35 @@ public class UsageManager {
l.onMemoryUseChanged(this,oldPercentUsage,newPercentUsage);
}
}
public String getName() {
return name;
}
public String toString() {
return "UsageManager: percentUsage="+percentUsage+"%, usage="+usage+" limit="+limit+" percentUsageMinDelta="+percentUsageMinDelta+"%";
public String toString(){
return "UsageManager("+ getName() +") percentUsage="+percentUsage+"%, usage="+usage+" limit="+limit+" percentUsageMinDelta="
+percentUsageMinDelta+"%";
}
public void start(){
if(parent!=null){
parent.addChild(this);
}
}
public void stop(){
if(parent!=null){
parent.removeChild(this);
}
}
private void addChild(UsageManager child){
children.add(child);
}
private void removeChild(UsageManager child){
children.remove(child);
}
}