UpdateManager changed to account for Store and Temp data usage as well as memory usage

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@567647 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-08-20 10:37:29 +00:00
parent 625cd7cdb6
commit 6d8e2c5b3a
77 changed files with 1339 additions and 754 deletions

View File

@ -33,7 +33,7 @@ import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.management.JMSProducerStatsImpl;
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.util.IntrospectionSupport;
/**
@ -77,7 +77,7 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl
private AtomicLong messageSequence;
private long startTime;
private MessageTransformer transformer;
private UsageManager producerWindow;
private MemoryUsage producerWindow;
protected ActiveMQMessageProducer(ActiveMQSession session, ProducerId producerId, ActiveMQDestination destination) throws JMSException {
super(session);
@ -92,7 +92,7 @@ public class ActiveMQMessageProducer extends ActiveMQMessageProducerSupport impl
// Enable producer window flow control if protocol > 3 and the window
// size > 0
if (session.connection.getProtocolVersion() >= 3 && this.info.getWindowSize() > 0) {
producerWindow = new UsageManager("Producer Window: " + producerId);
producerWindow = new MemoryUsage("Producer Window: " + producerId);
producerWindow.setLimit(this.info.getWindowSize());
}

View File

@ -79,9 +79,11 @@ import org.apache.activemq.command.TransactionId;
import org.apache.activemq.management.JMSSessionStatsImpl;
import org.apache.activemq.management.StatsCapable;
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.commons.logging.Log;
@ -1532,7 +1534,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* @throws JMSException
*/
protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
UsageManager producerWindow) throws JMSException {
MemoryUsage producerWindow) throws JMSException {
checkClosed();
if (destination.isTemporary() && connection.isDeleted(destination)) {

View File

@ -62,7 +62,6 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.network.ConnectionFilter;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
@ -78,6 +77,7 @@ import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportServer;
import org.apache.activemq.transport.vm.VMTransportFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.JMXSupport;
@ -118,9 +118,9 @@ public class BrokerService implements Service {
private ObjectName brokerObjectName;
private TaskRunnerFactory taskRunnerFactory;
private TaskRunnerFactory persistenceTaskRunnerFactory;
private UsageManager usageManager;
private UsageManager producerUsageManager;
private UsageManager consumerUsageManager;
private SystemUsage usageManager;
private SystemUsage producerSystemUsage;
private SystemUsage consumerSystemUsage;
private PersistenceAdapter persistenceAdapter;
private PersistenceAdapterFactory persistenceFactory;
private DestinationFactory destinationFactory;
@ -646,51 +646,61 @@ public class BrokerService implements Service {
this.populateJMSXUserID = populateJMSXUserID;
}
public UsageManager getMemoryManager() {
public SystemUsage getUsageManager() {
try {
if (usageManager == null) {
usageManager = new UsageManager("Main");
usageManager.setLimit(1024 * 1024 * 64); // Default to 64 Meg
// limit
usageManager = new SystemUsage("Main",getPersistenceAdapter(),getTempDataStore());
usageManager.getMemoryUsage().setLimit(1024 * 1024 * 64); // Default to 64 Meg
usageManager.getTempDiskUsage().setLimit(1024 * 1024 * 1024 * 100);//10 Gb
usageManager.getStoreUsage().setLimit(1024 * 1024 * 1024 * 100); //100 GB
}
return usageManager;
}catch(IOException e) {
LOG.fatal("Cannot create SystemUsage",e);
throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage());
}
}
public void setMemoryManager(UsageManager memoryManager) {
public void setUsageManager(SystemUsage memoryManager) {
this.usageManager = memoryManager;
}
/**
* @return the consumerUsageManager
* @throws IOException
*/
public UsageManager getConsumerUsageManager() {
if (consumerUsageManager == null) {
consumerUsageManager = new UsageManager(getMemoryManager(), "Consumer", 0.5f);
public SystemUsage getConsumerSystemUsage() throws IOException {
if (consumerSystemUsage == null) {
consumerSystemUsage = new SystemUsage(getUsageManager(), "Consumer");
consumerSystemUsage.getMemoryUsage().setUsagePortion(0.5f);
}
return consumerUsageManager;
return consumerSystemUsage;
}
/**
* @param consumerUsageManager the consumerUsageManager to set
*/
public void setConsumerUsageManager(UsageManager consumerUsageManager) {
this.consumerUsageManager = consumerUsageManager;
public void setConsumerSystemUsage(SystemUsage consumerUsageManager) {
this.consumerSystemUsage = consumerUsageManager;
}
/**
* @return the producerUsageManager
* @throws IOException
*/
public UsageManager getProducerUsageManager() {
if (producerUsageManager == null) {
producerUsageManager = new UsageManager(getMemoryManager(), "Producer", 0.45f);
public SystemUsage getProducerSystemUsage() throws IOException {
if (producerSystemUsage == null) {
producerSystemUsage = new SystemUsage(getUsageManager(), "Producer");
producerSystemUsage.getMemoryUsage().setUsagePortion(0.45f);
}
return producerUsageManager;
return producerSystemUsage;
}
/**
* @param producerUsageManager the producerUsageManager to set
*/
public void setProducerUsageManager(UsageManager producerUsageManager) {
this.producerUsageManager = producerUsageManager;
public void setProducerSystemUsage(SystemUsage producerUsageManager) {
this.producerSystemUsage = producerUsageManager;
}
public PersistenceAdapter getPersistenceAdapter() throws IOException {
@ -1377,7 +1387,7 @@ public class BrokerService implements Service {
protected Broker createRegionBroker() throws Exception {
// we must start the persistence adaptor before we can create the region
// broker
getPersistenceAdapter().setUsageManager(getProducerUsageManager());
getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
getPersistenceAdapter().setBrokerName(getBrokerName());
if (this.deleteAllMessagesOnStartup) {
getPersistenceAdapter().deleteAllMessages();
@ -1392,14 +1402,14 @@ public class BrokerService implements Service {
}
RegionBroker regionBroker = null;
if (destinationFactory == null) {
destinationFactory = new DestinationFactoryImpl(getProducerUsageManager(), getTaskRunnerFactory(), getPersistenceAdapter());
destinationFactory = new DestinationFactoryImpl(getProducerSystemUsage(), getTaskRunnerFactory(), getPersistenceAdapter());
}
if (isUseJmx()) {
MBeanServer mbeanServer = getManagementContext().getMBeanServer();
regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(), getTaskRunnerFactory(), getConsumerUsageManager(), destinationFactory,
regionBroker = new ManagedRegionBroker(this, mbeanServer, getBrokerObjectName(), getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
destinationInterceptor);
} else {
regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerUsageManager(), destinationFactory, destinationInterceptor);
regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor);
}
destinationFactory.setRegionBroker(regionBroker);

View File

@ -85,15 +85,15 @@ public class BrokerView implements BrokerViewMBean {
}
public int getMemoryPercentageUsed() {
return brokerService.getMemoryManager().getPercentUsage();
return brokerService.getUsageManager().getMemoryUsage().getPercentUsage();
}
public long getMemoryLimit() {
return brokerService.getMemoryManager().getLimit();
return brokerService.getUsageManager().getMemoryUsage().getLimit();
}
public void setMemoryLimit(long limit) {
brokerService.getMemoryManager().setLimit(limit);
brokerService.getUsageManager().getMemoryUsage().setLimit(limit);
}
public void resetStatistics() {

View File

@ -94,15 +94,15 @@ public class DestinationView implements DestinationViewMBean {
}
public int getMemoryPercentageUsed() {
return destination.getUsageManager().getPercentUsage();
return destination.getBrokerMemoryUsage().getPercentUsage();
}
public long getMemoryLimit() {
return destination.getUsageManager().getLimit();
return destination.getBrokerMemoryUsage().getLimit();
}
public void setMemoryLimit(long limit) {
destination.getUsageManager().setLimit(limit);
destination.getBrokerMemoryUsage().setLimit(limit);
}
public double getAverageEnqueueTime() {

View File

@ -27,14 +27,14 @@ import org.apache.activemq.broker.region.QueueRegion;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
public class ManagedQueueRegion extends QueueRegion {
private final ManagedRegionBroker regionBroker;
public ManagedQueueRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
public ManagedQueueRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
regionBroker = broker;

View File

@ -61,11 +61,11 @@ import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.SubscriptionKey;
@ -92,7 +92,7 @@ public class ManagedRegionBroker extends RegionBroker {
/* This is the first broker in the broker interceptor chain. */
private Broker contextBroker;
public ManagedRegionBroker(BrokerService brokerService, MBeanServer mbeanServer, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager,
public ManagedRegionBroker(BrokerService brokerService, MBeanServer mbeanServer, ObjectName brokerObjectName, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager,
DestinationFactory destinationFactory, DestinationInterceptor destinationInterceptor) throws IOException {
super(brokerService, taskRunnerFactory, memoryManager, destinationFactory, destinationInterceptor);
this.mbeanServer = mbeanServer;
@ -121,19 +121,19 @@ public class ManagedRegionBroker extends RegionBroker {
registeredMBeans.clear();
}
protected Region createQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
protected Region createTempQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
protected Region createTempTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedTempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
protected Region createTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new ManagedTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}

View File

@ -27,14 +27,14 @@ import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TempQueueRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
public class ManagedTempQueueRegion extends TempQueueRegion {
private final ManagedRegionBroker regionBroker;
public ManagedTempQueueRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
public ManagedTempQueueRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(regionBroker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
this.regionBroker = regionBroker;

View File

@ -27,14 +27,14 @@ import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TempTopicRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
public class ManagedTempTopicRegion extends TempTopicRegion {
private final ManagedRegionBroker regionBroker;
public ManagedTempTopicRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
public ManagedTempTopicRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(regionBroker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
this.regionBroker = regionBroker;

View File

@ -27,14 +27,14 @@ import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
public class ManagedTopicRegion extends TopicRegion {
private final ManagedRegionBroker regionBroker;
public ManagedTopicRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
public ManagedTopicRegion(ManagedRegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
regionBroker = broker;

View File

@ -41,8 +41,8 @@ import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.Response;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.DestinationMap;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -56,7 +56,7 @@ public abstract class AbstractRegion implements Region {
protected final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
protected final DestinationMap destinationMap = new DestinationMap();
protected final Map<ConsumerId, Subscription> subscriptions = new ConcurrentHashMap<ConsumerId, Subscription>();
protected final UsageManager memoryManager;
protected final SystemUsage memoryManager;
protected final DestinationFactory destinationFactory;
protected final DestinationStatistics destinationStatistics;
protected final RegionBroker broker;
@ -66,7 +66,7 @@ public abstract class AbstractRegion implements Region {
protected final Map<ConsumerId, Object> consumerChangeMutexMap = new HashMap<ConsumerId, Object>();
protected boolean started;
public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
public AbstractRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
if (broker == null) {
throw new IllegalArgumentException("null broker");

View File

@ -25,8 +25,9 @@ import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
/**
* @version $Revision: 1.12 $
@ -47,7 +48,7 @@ public interface Destination extends Service {
ActiveMQDestination getActiveMQDestination();
UsageManager getUsageManager();
MemoryUsage getBrokerMemoryUsage();
void dispose(ConnectionContext context) throws IOException;

View File

@ -29,11 +29,11 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
/**
* Creates standard ActiveMQ implementations of
@ -44,12 +44,12 @@ import org.apache.activemq.thread.TaskRunnerFactory;
*/
public class DestinationFactoryImpl extends DestinationFactory {
protected final UsageManager memoryManager;
protected final SystemUsage memoryManager;
protected final TaskRunnerFactory taskRunnerFactory;
protected final PersistenceAdapter persistenceAdapter;
protected RegionBroker broker;
public DestinationFactoryImpl(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
public DestinationFactoryImpl(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) {
this.memoryManager = memoryManager;
this.taskRunnerFactory = taskRunnerFactory;
if (persistenceAdapter == null) {

View File

@ -27,8 +27,9 @@ import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
/**
*
@ -79,8 +80,8 @@ public class DestinationFilter implements Destination {
return next.getName();
}
public UsageManager getUsageManager() {
return next.getUsageManager();
public MemoryUsage getBrokerMemoryUsage() {
return next.getBrokerMemoryUsage();
}
public boolean lock(MessageReference node, LockOwner lockOwner) {

View File

@ -29,8 +29,9 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.memory.UsageListener;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -42,10 +43,10 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
private final ConcurrentHashMap<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
private final SubscriptionKey subscriptionKey;
private final boolean keepDurableSubsActive;
private final UsageManager usageManager;
private final SystemUsage usageManager;
private boolean active;
public DurableTopicSubscription(Broker broker, UsageManager usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
public DurableTopicSubscription(Broker broker, SystemUsage usageManager, ConnectionContext context, ConsumerInfo info, boolean keepDurableSubsActive)
throws InvalidSelectorException {
super(broker, context, info, new StoreDurableSubscriberCursor(context.getClientId(), info.getSubscriptionName(), broker.getTempDataStore(), info.getPrefetchSize()));
this.usageManager = usageManager;
@ -77,7 +78,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
dispatchMatched();
}
public synchronized void activate(UsageManager memoryManager, ConnectionContext context, ConsumerInfo info) throws Exception {
public synchronized void activate(SystemUsage memoryManager, ConnectionContext context, ConsumerInfo info) throws Exception {
LOG.debug("Activating " + this);
if (!active) {
this.active = true;
@ -89,7 +90,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
topic.activate(context, this);
}
}
pending.setUsageManager(memoryManager);
pending.setSystemUsage(memoryManager);
pending.start();
// If nothing was in the persistent store, then try to use the
@ -101,13 +102,13 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
}
}
dispatchMatched();
this.usageManager.addUsageListener(this);
this.usageManager.getMemoryUsage().addUsageListener(this);
}
}
public synchronized void deactivate(boolean keepDurableSubsActive) throws Exception {
active = false;
this.usageManager.removeUsageListener(this);
this.usageManager.getMemoryUsage().removeUsageListener(this);
synchronized (pending) {
pending.stop();
}
@ -239,10 +240,10 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
* @param memoryManager
* @param oldPercentUsage
* @param newPercentUsage
* @see org.apache.activemq.memory.UsageListener#onMemoryUseChanged(org.apache.activemq.memory.UsageManager,
* @see org.apache.activemq.usage.UsageListener#onMemoryUseChanged(org.apache.activemq.usage.SystemUsage,
* int, int)
*/
public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
if (oldPercentUsage > newPercentUsage && oldPercentUsage >= 90) {
try {
dispatchMatched();

View File

@ -17,6 +17,7 @@
package org.apache.activemq.broker.region;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
@ -51,7 +52,6 @@ import org.apache.activemq.command.Response;
import org.apache.activemq.filter.BooleanExpression;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.selector.SelectorParser;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
@ -60,6 +60,8 @@ import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.thread.Valve;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.BrokerSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -78,7 +80,8 @@ public class Queue implements Destination, Task {
private final ActiveMQDestination destination;
private final List<Subscription> consumers = new CopyOnWriteArrayList<Subscription>();
private final Valve dispatchValve = new Valve(true);
private final UsageManager usageManager;
private final SystemUsage systemUsage;
private final MemoryUsage memoryUsage;
private final DestinationStatistics destinationStatistics = new DestinationStatistics();
private PendingMessageCursor messages;
private final LinkedList<MessageReference> pagedInMessages = new LinkedList<MessageReference>();
@ -107,12 +110,13 @@ public class Queue implements Destination, Task {
};
};
public Queue(Broker broker, ActiveMQDestination destination, final UsageManager memoryManager, MessageStore store, DestinationStatistics parentStats,
public Queue(Broker broker, ActiveMQDestination destination, final SystemUsage systemUsage, MessageStore store, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory, Store tmpStore) throws Exception {
this.broker = broker;
this.destination = destination;
this.usageManager = new UsageManager(memoryManager, destination.toString());
this.usageManager.setUsagePortion(1.0f);
this.systemUsage=systemUsage;
this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
this.memoryUsage.setUsagePortion(1.0f);
this.store = store;
if (destination.isTemporary()) {
this.messages = new VMPendingMessageCursor();
@ -126,7 +130,7 @@ public class Queue implements Destination, Task {
// flush messages to disk
// when usage gets high.
if (store != null) {
store.setUsageManager(usageManager);
store.setMemoryUsage(memoryUsage);
}
// let's copy the enabled property from the parent DestinationStatistics
@ -139,7 +143,7 @@ public class Queue implements Destination, Task {
public void initialize() throws Exception {
if (store != null) {
// Restore the persistent messages.
messages.setUsageManager(getUsageManager());
messages.setSystemUsage(systemUsage);
if (messages.isRecoveryRequired()) {
store.recover(new MessageRecoveryListener() {
@ -359,9 +363,9 @@ public class Queue implements Destination, Task {
}
return;
}
if (context.isProducerFlowControl() && usageManager.isFull()) {
if (usageManager.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
if (context.isProducerFlowControl() && memoryUsage.isFull()) {
if (systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException("SystemUsage memory limit reached");
}
// We can avoid blocking due to low usage if the producer is sending
@ -404,7 +408,7 @@ public class Queue implements Destination, Task {
// If the user manager is not full, then the task will not
// get called..
if (!usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
// so call it directly here.
sendMessagesWaitingForSpaceTask.run();
}
@ -417,7 +421,7 @@ public class Queue implements Destination, Task {
// Producer flow control cannot be used, so we have do the flow
// control at the broker
// by blocking this thread until there is space available.
while (!usageManager.waitForSpace(1000)) {
while (!memoryUsage.waitForSpace(1000)) {
if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
}
@ -444,6 +448,7 @@ public class Queue implements Destination, Task {
final ConnectionContext context = producerExchange.getConnectionContext();
message.setRegionDestination(this);
if (store != null && message.isPersistent()) {
systemUsage.getStoreUsage().waitForSpace();
store.addMessage(context, message);
}
if (context.isInTransaction()) {
@ -552,13 +557,13 @@ public class Queue implements Destination, Task {
synchronized (messages) {
size = messages.size();
}
return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + ", memory=" + usageManager.getPercentUsage() + "%, size=" + size
return "Queue: destination=" + destination.getPhysicalName() + ", subscriptions=" + consumers.size() + ", memory=" + memoryUsage.getPercentUsage() + "%, size=" + size
+ ", in flight groups=" + messageGroupOwners;
}
public void start() throws Exception {
if (usageManager != null) {
usageManager.start();
if (memoryUsage != null) {
memoryUsage.start();
}
messages.start();
doPageIn(false);
@ -571,8 +576,8 @@ public class Queue implements Destination, Task {
if (messages != null) {
messages.stop();
}
if (usageManager != null) {
usageManager.stop();
if (memoryUsage != null) {
memoryUsage.stop();
}
}
@ -586,8 +591,8 @@ public class Queue implements Destination, Task {
return destination.getPhysicalName();
}
public UsageManager getUsageManager() {
return usageManager;
public MemoryUsage getBrokerMemoryUsage() {
return memoryUsage;
}
public DestinationStatistics getDestinationStatistics() {
@ -926,7 +931,7 @@ public class Queue implements Destination, Task {
*/
public boolean iterate() {
while (!usageManager.isFull() && !messagesWaitingForSpace.isEmpty()) {
while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
Runnable op = messagesWaitingForSpace.removeFirst();
op.run();
}

View File

@ -24,8 +24,8 @@ import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
/**
*
@ -34,14 +34,14 @@ import org.apache.activemq.thread.TaskRunnerFactory;
public class QueueRegion extends AbstractRegion {
public QueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics,
UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
public String toString() {
return "QueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size()
+ ", memory=" + memoryManager.getPercentUsage() + "%";
+ ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
}
protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info)

View File

@ -55,11 +55,11 @@ import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.BrokerSupport;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.LongSequenceGenerator;
@ -99,7 +99,7 @@ public class RegionBroker implements Broker {
private final DestinationInterceptor destinationInterceptor;
private ConnectionContext adminConnectionContext;
public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, UsageManager memoryManager, DestinationFactory destinationFactory,
public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
DestinationInterceptor destinationInterceptor) throws IOException {
this.brokerService = brokerService;
if (destinationFactory == null) {
@ -158,19 +158,19 @@ public class RegionBroker implements Broker {
return topicRegion;
}
protected Region createTempTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new TempTopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
protected Region createTempQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new TempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
protected Region createTopicRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new TopicRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}
protected Region createQueueRegion(UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
protected Region createQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) {
return new QueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
}

View File

@ -23,15 +23,15 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
/**
* @version $Revision: 1.7 $
*/
public class TempQueueRegion extends AbstractRegion {
public TempQueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
public TempQueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
// We should allow the following to be configurable via a Destination
@ -65,7 +65,7 @@ public class TempQueueRegion extends AbstractRegion {
}
public String toString() {
return "TempQueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getPercentUsage() + "%";
return "TempQueueRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
}
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {

View File

@ -22,8 +22,8 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -34,7 +34,7 @@ public class TempTopicRegion extends AbstractRegion {
private static final Log LOG = LogFactory.getLog(TempTopicRegion.class);
public TempTopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
public TempTopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
// We should allow the following to be configurable via a Destination
@ -67,7 +67,7 @@ public class TempTopicRegion extends AbstractRegion {
}
public String toString() {
return "TempTopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getPercentUsage() + "%";
return "TempTopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
}
public void removeDestination(ConnectionContext context, ActiveMQDestination destination, long timeout) throws Exception {

View File

@ -43,13 +43,14 @@ import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.ProducerAck;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.thread.Valve;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -67,7 +68,8 @@ public class Topic implements Destination {
protected final Valve dispatchValve = new Valve(true);
// this could be NULL! (If an advisory)
protected final TopicMessageStore store;
protected final UsageManager usageManager;
private final SystemUsage systemUsage;
private final MemoryUsage memoryUsage;
protected final DestinationStatistics destinationStatistics = new DestinationStatistics();
private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
@ -85,7 +87,7 @@ public class Topic implements Destination {
// that the UsageManager is holding.
synchronized (messagesWaitingForSpace) {
while (!usageManager.isFull() && !messagesWaitingForSpace.isEmpty()) {
while (!memoryUsage.isFull() && !messagesWaitingForSpace.isEmpty()) {
Runnable op = messagesWaitingForSpace.removeFirst();
op.run();
}
@ -95,19 +97,20 @@ public class Topic implements Destination {
};
private final Broker broker;
public Topic(Broker broker, ActiveMQDestination destination, TopicMessageStore store, UsageManager memoryManager, DestinationStatistics parentStats,
public Topic(Broker broker, ActiveMQDestination destination, TopicMessageStore store, SystemUsage systemUsage, DestinationStatistics parentStats,
TaskRunnerFactory taskFactory) {
this.broker = broker;
this.destination = destination;
this.store = store; // this could be NULL! (If an advisory)
this.usageManager = new UsageManager(memoryManager, destination.toString());
this.usageManager.setUsagePortion(1.0f);
this.systemUsage=systemUsage;
this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString());
this.memoryUsage.setUsagePortion(1.0f);
// Let the store know what usage manager we are using so that he can
// flush messages to disk
// when usage gets high.
if (store != null) {
store.setUsageManager(usageManager);
store.setMemoryUsage(memoryUsage);
}
// let's copy the enabled property from the parent DestinationStatistics
@ -206,22 +209,16 @@ public class Topic implements Destination {
}
}
// Do we need to create the subscription?
if (info == null) {
info = new SubscriptionInfo();
if(info==null){
info=new SubscriptionInfo();
info.setClientId(clientId);
info.setSelector(selector);
info.setSubscriptionName(subscriptionName);
info.setDestination(getActiveMQDestination()); // This
// destination
// is an actual
// destination
// id.
info.setSubscribedDestination(subscription.getConsumerInfo().getDestination()); // This
// destination
// might
// be a
// pattern
store.addSubsciption(info, subscription.getConsumerInfo().isRetroactive());
info.setDestination(getActiveMQDestination());
// Thi destination is an actual destination id.
info.setSubscribedDestination(subscription.getConsumerInfo().getDestination());
// This destination might be a pattern
store.addSubsciption(info,subscription.getConsumerInfo().isRetroactive());
}
final MessageEvaluationContext msgContext = new MessageEvaluationContext();
@ -287,8 +284,8 @@ public class Topic implements Destination {
return;
}
if (context.isProducerFlowControl() && usageManager.isFull()) {
if (usageManager.isSendFailIfNoSpace()) {
if (context.isProducerFlowControl() && memoryUsage.isFull()) {
if (systemUsage.isSendFailIfNoSpace()) {
throw new javax.jms.ResourceAllocationException("Usage Manager memory limit reached");
}
@ -327,7 +324,7 @@ public class Topic implements Destination {
// If the user manager is not full, then the task will not
// get called..
if (!usageManager.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
if (!memoryUsage.notifyCallbackWhenNotFull(sendMessagesWaitingForSpaceTask)) {
// so call it directly here.
sendMessagesWaitingForSpaceTask.run();
}
@ -340,7 +337,7 @@ public class Topic implements Destination {
// Producer flow control cannot be used, so we have do the flow
// control at the broker
// by blocking this thread until there is space available.
while (!usageManager.waitForSpace(1000)) {
while (!memoryUsage.waitForSpace(1000)) {
if (context.getStopping().get()) {
throw new IOException("Connection closed, send aborted.");
}
@ -365,6 +362,7 @@ public class Topic implements Destination {
message.setRegionDestination(this);
if (store != null && message.isPersistent() && !canOptimizeOutPersistence()) {
systemUsage.getStoreUsage().waitForSpace();
store.addMessage(context, message);
}
@ -427,16 +425,16 @@ public class Topic implements Destination {
public void start() throws Exception {
this.subscriptionRecoveryPolicy.start();
if (usageManager != null) {
usageManager.start();
if (memoryUsage != null) {
memoryUsage.start();
}
}
public void stop() throws Exception {
this.subscriptionRecoveryPolicy.stop();
if (usageManager != null) {
usageManager.stop();
if (memoryUsage != null) {
memoryUsage.stop();
}
}
@ -474,8 +472,8 @@ public class Topic implements Destination {
// Properties
// -------------------------------------------------------------------------
public UsageManager getUsageManager() {
return usageManager;
public MemoryUsage getBrokerMemoryUsage() {
return memoryUsage;
}
public DestinationStatistics getDestinationStatistics() {

View File

@ -35,9 +35,9 @@ import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.command.SessionId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log;
@ -53,7 +53,7 @@ public class TopicRegion extends AbstractRegion {
private final SessionId recoveredDurableSubSessionId = new SessionId(new ConnectionId("OFFLINE"), recoveredDurableSubIdGenerator.getNextSequenceId());
private boolean keepDurableSubsActive;
public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, UsageManager memoryManager, TaskRunnerFactory taskRunnerFactory,
public TopicRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory,
DestinationFactory destinationFactory) {
super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory);
@ -140,7 +140,7 @@ public class TopicRegion extends AbstractRegion {
}
public String toString() {
return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getPercentUsage() + "%";
return "TopicRegion: destinations=" + destinations.size() + ", subscriptions=" + subscriptions.size() + ", memory=" + memoryManager.getMemoryUsage().getPercentUsage() + "%";
}
@Override

View File

@ -36,8 +36,8 @@ import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.MessagePull;
import org.apache.activemq.command.Response;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -47,7 +47,7 @@ public class TopicSubscription extends AbstractSubscription {
private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
protected PendingMessageCursor matched;
protected final UsageManager usageManager;
protected final SystemUsage usageManager;
protected AtomicLong dispatchedCounter = new AtomicLong();
protected AtomicLong prefetchExtension = new AtomicLong();
@ -62,7 +62,7 @@ public class TopicSubscription extends AbstractSubscription {
private final AtomicLong dequeueCounter = new AtomicLong(0);
private int memoryUsageHighWaterMark = 95;
public TopicSubscription(Broker broker, ConnectionContext context, ConsumerInfo info, UsageManager usageManager) throws Exception {
public TopicSubscription(Broker broker, ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
super(broker, context, info);
this.usageManager = usageManager;
String matchedName = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + info.getConsumerId().toString() + "]";
@ -71,7 +71,7 @@ public class TopicSubscription extends AbstractSubscription {
}
public void init() throws Exception {
this.matched.setUsageManager(usageManager);
this.matched.setSystemUsage(usageManager);
this.matched.start();
}
@ -317,7 +317,7 @@ public class TopicSubscription extends AbstractSubscription {
/**
* @return the usageManager
*/
public UsageManager getUsageManager() {
public SystemUsage getUsageManager() {
return this.usageManager;
}

View File

@ -20,7 +20,7 @@ import java.util.LinkedList;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.usage.SystemUsage;
/**
* Abstract method holder for pending message (messages awaiting disptach to a
@ -31,7 +31,7 @@ import org.apache.activemq.memory.UsageManager;
public class AbstractPendingMessageCursor implements PendingMessageCursor {
protected int memoryUsageHighWaterMark = 90;
protected int maxBatchSize = 100;
protected UsageManager usageManager;
protected SystemUsage systemUsage;
public void start() throws Exception {
}
@ -110,16 +110,16 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor {
public void gc() {
}
public void setUsageManager(UsageManager usageManager) {
this.usageManager = usageManager;
public void setSystemUsage(SystemUsage usageManager) {
this.systemUsage = usageManager;
}
public boolean hasSpace() {
return usageManager != null ? (usageManager.getPercentUsage() < memoryUsageHighWaterMark) : true;
return systemUsage != null ? (systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true;
}
public boolean isFull() {
return usageManager != null ? usageManager.isFull() : false;
return systemUsage != null ? systemUsage.getMemoryUsage().isFull() : false;
}
public void release() {
@ -146,8 +146,8 @@ public class AbstractPendingMessageCursor implements PendingMessageCursor {
/**
* @return the usageManager
*/
public UsageManager getUsageManager() {
return this.usageManager;
public SystemUsage getSystemUsage() {
return this.systemUsage;
}
/**

View File

@ -27,9 +27,10 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.CommandMarshaller;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.memory.UsageListener;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -65,8 +66,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
public void start() {
if (started.compareAndSet(false, true)) {
if (usageManager != null) {
usageManager.addUsageListener(this);
if (systemUsage != null) {
systemUsage.getMemoryUsage().addUsageListener(this);
}
}
}
@ -74,8 +75,8 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
public void stop() {
if (started.compareAndSet(true, false)) {
gc();
if (usageManager != null) {
usageManager.removeUsageListener(this);
if (systemUsage != null) {
systemUsage.getMemoryUsage().removeUsageListener(this);
}
}
}
@ -147,9 +148,10 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
} else {
flushToDisk();
node.decrementReferenceCount();
systemUsage.getTempDiskUsage().waitForSpace();
getDiskList().addLast(node);
}
} catch (IOException e) {
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@ -166,10 +168,11 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
memoryList.addFirst(node);
} else {
flushToDisk();
systemUsage.getTempDiskUsage().waitForSpace();
node.decrementReferenceCount();
getDiskList().addFirst(node);
}
} catch (IOException e) {
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@ -238,12 +241,12 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
return !isEmpty();
}
public void setUsageManager(UsageManager usageManager) {
super.setUsageManager(usageManager);
usageManager.addUsageListener(this);
public void setSystemUsage(SystemUsage usageManager) {
super.setSystemUsage(usageManager);
usageManager.getMemoryUsage().addUsageListener(this);
}
public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
if (newPercentUsage >= getMemoryUsageHighWaterMark()) {
synchronized (this) {
flushRequired = true;

View File

@ -23,7 +23,7 @@ import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.usage.SystemUsage;
/**
* Interface to pending message (messages awaiting disptach to a consumer)
@ -166,15 +166,15 @@ public interface PendingMessageCursor extends Service {
/**
* Set the UsageManager
*
* @param usageManager
* @see org.apache.activemq.memory.UsageManager
* @param systemUsage
* @see org.apache.activemq.usage.SystemUsage
*/
void setUsageManager(UsageManager usageManager);
void setSystemUsage(SystemUsage systemUsage);
/**
* @return the usageManager
*/
UsageManager getUsageManager();
SystemUsage getSystemUsage();
/**
* @return the memoryUsageHighWaterMark

View File

@ -29,7 +29,7 @@ import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -96,7 +96,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination.getActiveMQDestination())) {
TopicStorePrefetch tsp = new TopicStorePrefetch((Topic)destination, clientId, subscriberName);
tsp.setMaxBatchSize(getMaxBatchSize());
tsp.setUsageManager(usageManager);
tsp.setSystemUsage(systemUsage);
topics.put(destination, tsp);
storePrefetches.add(tsp);
if (started) {
@ -244,11 +244,11 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
}
}
public synchronized void setUsageManager(UsageManager usageManager) {
super.setUsageManager(usageManager);
public synchronized void setSystemUsage(SystemUsage usageManager) {
super.setSystemUsage(usageManager);
for (Iterator<PendingMessageCursor> i = storePrefetches.iterator(); i.hasNext();) {
PendingMessageCursor tsp = i.next();
tsp.setUsageManager(usageManager);
tsp.setSystemUsage(usageManager);
}
}

View File

@ -20,7 +20,7 @@ import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -58,7 +58,7 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
if (nonPersistent == null) {
nonPersistent = new FilePendingMessageCursor(queue.getDestination(), tmpStore);
nonPersistent.setMaxBatchSize(getMaxBatchSize());
nonPersistent.setUsageManager(usageManager);
nonPersistent.setSystemUsage(systemUsage);
}
nonPersistent.start();
persistent.start();
@ -201,13 +201,13 @@ public class StoreQueueCursor extends AbstractPendingMessageCursor {
}
}
public synchronized void setUsageManager(UsageManager usageManager) {
super.setUsageManager(usageManager);
public synchronized void setSystemUsage(SystemUsage usageManager) {
super.setSystemUsage(usageManager);
if (persistent != null) {
persistent.setUsageManager(usageManager);
persistent.setSystemUsage(usageManager);
}
if (nonPersistent != null) {
nonPersistent.setUsageManager(usageManager);
nonPersistent.setSystemUsage(usageManager);
}
}

View File

@ -26,7 +26,7 @@ import org.apache.activemq.broker.region.group.MessageGroupHashBucketFactory;
import org.apache.activemq.broker.region.group.MessageGroupMapFactory;
import org.apache.activemq.filter.DestinationMapEntry;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -61,7 +61,7 @@ public class PolicyEntry extends DestinationMapEntry {
}
queue.setMessageGroupMapFactory(getMessageGroupMapFactory());
if (memoryLimit > 0) {
queue.getUsageManager().setLimit(memoryLimit);
queue.getBrokerMemoryUsage().setLimit(memoryLimit);
}
if (pendingQueuePolicy != null) {
PendingMessageCursor messages = pendingQueuePolicy.getQueuePendingMessageCursor(queue, tmpStore);
@ -81,11 +81,11 @@ public class PolicyEntry extends DestinationMapEntry {
}
topic.setSendAdvisoryIfNoConsumers(sendAdvisoryIfNoConsumers);
if (memoryLimit > 0) {
topic.getUsageManager().setLimit(memoryLimit);
topic.getBrokerMemoryUsage().setLimit(memoryLimit);
}
}
public void configure(Broker broker, UsageManager memoryManager, TopicSubscription subscription) {
public void configure(Broker broker, SystemUsage memoryManager, TopicSubscription subscription) {
if (pendingMessageLimitStrategy != null) {
int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit();
@ -111,13 +111,13 @@ public class PolicyEntry extends DestinationMapEntry {
}
}
public void configure(Broker broker, UsageManager memoryManager, DurableTopicSubscription sub) {
public void configure(Broker broker, SystemUsage memoryManager, DurableTopicSubscription sub) {
String clientId = sub.getClientId();
String subName = sub.getSubscriptionName();
int prefetch = sub.getPrefetchSize();
if (pendingDurableSubscriberPolicy != null) {
PendingMessageCursor cursor = pendingDurableSubscriberPolicy.getSubscriberPendingMessageCursor(clientId, subName, broker.getTempDataStore(), prefetch);
cursor.setUsageManager(memoryManager);
cursor.setSystemUsage(memoryManager);
sub.setPending(cursor);
}
}

View File

@ -582,7 +582,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
}
if (rc == 1 && regionDestination != null) {
regionDestination.getUsageManager().increaseUsage(size);
regionDestination.getBrokerMemoryUsage().increaseUsage(size);
}
// System.out.println(" + "+getDestination()+" :::: "+getMessageId()+"
@ -599,7 +599,7 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
}
if (rc == 0 && regionDestination != null) {
regionDestination.getUsageManager().decreaseUsage(size);
regionDestination.getBrokerMemoryUsage().decreaseUsage(size);
}
// System.out.println(" - "+getDestination()+" :::: "+getMessageId()+"
// "+rc);

View File

@ -256,22 +256,14 @@ public interface Store {
*/
void setMaxDataFileLength(long maxDataFileLength);
/**
* @see org.apache.activemq.kaha.IndexTypes
* @return the default index type
*/
String getIndexTypeAsString();
/**
* Set the default index type
*
* @param type
* @see org.apache.activemq.kaha.IndexTypes
*/
void setIndexTypeAsString(String type);
/**
* @return true if the store has been initialized
*/
boolean isInitialized();
/**
* @return the amount of disk space the store is occupying
*/
long size();
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.kaha;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.kaha.impl.KahaStore;
@ -39,7 +40,19 @@ public final class StoreFactory {
* @throws IOException
*/
public static Store open(String name, String mode) throws IOException {
return new KahaStore(name, mode);
return new KahaStore(name, mode,new AtomicLong());
}
/**
* open or create a Store
* @param name
* @param mode
* @param size
* @return the opened/created store
* @throws IOException
*/
public static Store open(String name, String mode, AtomicLong size) throws IOException {
return new KahaStore(name, mode,size);
}
/**

View File

@ -26,6 +26,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
@ -79,9 +80,16 @@ public class KahaStore implements Store {
private FileLock lock;
private boolean persistentIndex = true;
private RandomAccessFile lockFile;
private final AtomicLong storeSize;
public KahaStore(String name, String mode) throws IOException {
this(name,mode,new AtomicLong());
}
public KahaStore(String name, String mode,AtomicLong storeSize) throws IOException {
this.mode = mode;
this.storeSize = storeSize;
directory = new File(name);
directory.mkdirs();
}
@ -337,14 +345,14 @@ public class KahaStore implements Store {
DataManager dm = dataManagers.get(name);
if (dm == null) {
if (isUseAsyncDataManager()) {
AsyncDataManager t = new AsyncDataManager();
AsyncDataManager t = new AsyncDataManager(storeSize);
t.setDirectory(directory);
t.setFilePrefix("async-data-" + name + "-");
t.setMaxFileLength((int)maxDataFileLength);
t.start();
dm = new DataManagerFacade(t, name);
} else {
DataManagerImpl t = new DataManagerImpl(directory, name);
DataManagerImpl t = new DataManagerImpl(directory, name,storeSize);
t.setMaxFileLength(maxDataFileLength);
dm = t;
}
@ -359,7 +367,7 @@ public class KahaStore implements Store {
public synchronized IndexManager getIndexManager(DataManager dm, String name) throws IOException {
IndexManager im = indexManagers.get(name);
if (im == null) {
im = new IndexManager(directory, name, mode, logIndexChanges ? dm : null);
im = new IndexManager(directory, name, mode, logIndexChanges ? dm : null,storeSize);
indexManagers.put(name, im);
}
return im;
@ -548,4 +556,12 @@ public class KahaStore implements Store {
this.useAsyncDataManager = useAsyncWriter;
}
/**
* @return
* @see org.apache.activemq.kaha.Store#size()
*/
public long size(){
return storeSize.get();
}
}

View File

@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
@ -89,6 +90,15 @@ public final class AsyncDataManager {
private Location mark;
private final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
private Runnable cleanupTask;
private final AtomicLong storeSize;
public AsyncDataManager(AtomicLong storeSize) {
this.storeSize=storeSize;
}
public AsyncDataManager() {
this(new AtomicLong());
}
@SuppressWarnings("unchecked")
public synchronized void start() throws IOException {
@ -128,6 +138,7 @@ public final class AsyncDataManager {
int num = Integer.parseInt(numStr);
DataFile dataFile = new DataFile(file, num, preferedFileLength);
fileMap.put(dataFile.getDataFileId(), dataFile);
storeSize.addAndGet(dataFile.getLength());
} catch (NumberFormatException e) {
// Ignore file that do not match the pattern.
}
@ -249,8 +260,10 @@ public final class AsyncDataManager {
}
location.setOffset(currentWriteFile.getLength());
location.setDataFileId(currentWriteFile.getDataFileId().intValue());
currentWriteFile.incrementLength(location.getSize());
int size = location.getSize();
currentWriteFile.incrementLength(size);
currentWriteFile.increment();
storeSize.addAndGet(size);
return currentWriteFile;
}
@ -297,6 +310,7 @@ public final class AsyncDataManager {
boolean result = true;
for (Iterator i = fileMap.values().iterator(); i.hasNext();) {
DataFile dataFile = (DataFile)i.next();
storeSize.addAndGet(-dataFile.getLength());
result &= dataFile.delete();
}
fileMap.clear();
@ -387,6 +401,7 @@ public final class AsyncDataManager {
accessorPool.disposeDataFileAccessors(dataFile);
fileMap.remove(dataFile.getDataFileId());
storeSize.addAndGet(-dataFile.getLength());
dataFile.unlink();
boolean result = dataFile.delete();
LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed"));

View File

@ -24,6 +24,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreLocation;
@ -57,10 +58,12 @@ public final class DataManagerImpl implements DataManager {
private Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
private Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER;
private String dataFilePrefix;
private final AtomicLong storeSize;
public DataManagerImpl(File dir, final String name) {
public DataManagerImpl(File dir, final String name,AtomicLong storeSize) {
this.directory = dir;
this.name = name;
this.storeSize=storeSize;
dataFilePrefix = NAME_PREFIX + name + "-";
// build up list of current dataFiles
@ -76,6 +79,7 @@ public final class DataManagerImpl implements DataManager {
String numStr = n.substring(dataFilePrefix.length(), n.length());
int num = Integer.parseInt(numStr);
DataFile dataFile = new DataFile(file, num);
storeSize.addAndGet(dataFile.getLength());
fileMap.put(dataFile.getNumber(), dataFile);
if (currentWriteFile == null || currentWriteFile.getNumber().intValue() < num) {
currentWriteFile = dataFile;
@ -111,7 +115,9 @@ public final class DataManagerImpl implements DataManager {
}
item.setOffset(currentWriteFile.getLength());
item.setFile(currentWriteFile.getNumber().intValue());
currentWriteFile.incrementLength(item.getSize() + ITEM_HEAD_SIZE);
int len = item.getSize() + ITEM_HEAD_SIZE;
currentWriteFile.incrementLength(len);
storeSize.addAndGet(len);
return currentWriteFile;
}
@ -250,6 +256,7 @@ public final class DataManagerImpl implements DataManager {
boolean result = true;
for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
DataFile dataFile = i.next();
storeSize.addAndGet(-dataFile.getLength());
result &= dataFile.delete();
}
fileMap.clear();
@ -325,6 +332,7 @@ public final class DataManagerImpl implements DataManager {
if (writer != null) {
writer.force(dataFile);
}
storeSize.addAndGet(-dataFile.getLength());
boolean result = dataFile.delete();
LOG.debug("discarding data file " + dataFile + (result ? "successful " : "failed"));
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.kaha.impl.index;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.kaha.impl.DataManager;
import org.apache.commons.logging.Log;
@ -45,12 +46,14 @@ public final class IndexManager {
private IndexItem firstFree;
private IndexItem lastFree;
private boolean dirty;
private final AtomicLong storeSize;
public IndexManager(File directory, String name, String mode, DataManager redoLog) throws IOException {
public IndexManager(File directory, String name, String mode, DataManager redoLog, AtomicLong storeSize) throws IOException {
this.directory = directory;
this.name = name;
this.mode = mode;
this.redoLog = redoLog;
this.storeSize=storeSize;
initialize();
}
@ -106,6 +109,7 @@ public final class IndexManager {
result = new IndexItem();
result.setOffset(length);
length += IndexItem.INDEX_SIZE;
storeSize.addAndGet(IndexItem.INDEX_SIZE);
}
return result;
}
@ -156,9 +160,14 @@ public final class IndexManager {
synchronized long getLength() {
return length;
}
public final long size() {
return length;
}
public synchronized void setLength(long value) {
this.length = value;
storeSize.addAndGet(length);
}
public String toString() {
@ -187,5 +196,6 @@ public final class IndexManager {
offset += IndexItem.INDEX_SIZE;
}
length = offset;
storeSize.addAndGet(length);
}
}

View File

@ -24,6 +24,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -36,10 +38,10 @@ public class CacheEvictionUsageListener implements UsageListener {
private final int usageLowMark;
private final TaskRunner evictionTask;
private final UsageManager usageManager;
private final Usage usage;
public CacheEvictionUsageListener(UsageManager usageManager, int usageHighMark, int usageLowMark, TaskRunnerFactory taskRunnerFactory) {
this.usageManager = usageManager;
public CacheEvictionUsageListener(Usage usage, int usageHighMark, int usageLowMark, TaskRunnerFactory taskRunnerFactory) {
this.usage = usage;
this.usageHighMark = usageHighMark;
this.usageLowMark = usageLowMark;
evictionTask = taskRunnerFactory.createTaskRunner(new Task() {
@ -51,10 +53,10 @@ public class CacheEvictionUsageListener implements UsageListener {
boolean evictMessages() {
// Try to take the memory usage down below the low mark.
LOG.debug("Evicting cache memory usage: " + usageManager.getPercentUsage());
LOG.debug("Evicting cache memory usage: " + usage.getPercentUsage());
List<CacheEvictor> list = new LinkedList<CacheEvictor>(evictors);
while (list.size() > 0 && usageManager.getPercentUsage() > usageLowMark) {
while (list.size() > 0 && usage.getPercentUsage() > usageLowMark) {
// Evenly evict messages from all evictors
for (Iterator<CacheEvictor> iter = list.iterator(); iter.hasNext();) {
@ -67,10 +69,10 @@ public class CacheEvictionUsageListener implements UsageListener {
return false;
}
public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
// Do we need to start evicting cache entries? Usage > than the
// high mark
if (oldPercentUsage < newPercentUsage && memoryManager.getPercentUsage() >= usageHighMark) {
if (oldPercentUsage < newPercentUsage && usage.getPercentUsage() >= usageHighMark) {
try {
evictionTask.wakeup();
} catch (InterruptedException e) {

View File

@ -1,425 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.memory;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Used to keep track of how much of something is being used so that a
* productive working set usage can be controlled.
*
* Main use case is manage memory usage.
*
* @org.apache.xbean.XBean
*
* @version $Revision: 1.3 $
*/
public class UsageManager implements Service {
private static final Log LOG = LogFactory.getLog(UsageManager.class);
private final UsageManager parent;
private long limit;
private long usage;
private int percentUsage;
private int percentUsageMinDelta = 1;
private final Object usageMutex = new Object();
private final List<UsageListener> listeners = new CopyOnWriteArrayList<UsageListener>();
private boolean sendFailIfNoSpace;
/**
* True if someone called setSendFailIfNoSpace() on this particular usage
* manager
*/
private boolean sendFailIfNoSpaceExplicitySet;
private final boolean debug = LOG.isDebugEnabled();
private String name = "";
private float usagePortion = 1.0f;
private List<UsageManager> children = new CopyOnWriteArrayList<UsageManager>();
private final List<Runnable> callbacks = new LinkedList<Runnable>();
public UsageManager() {
this(null, "default");
}
/**
* Create the memory manager linked to a parent. When the memory manager is
* linked to a parent then when usage increased or decreased, the parent's
* usage is also increased or decreased.
*
* @param parent
*/
public UsageManager(UsageManager parent) {
this(parent, "default");
}
public UsageManager(String name) {
this(null, name);
}
public UsageManager(UsageManager parent, String name) {
this(parent, name, 1.0f);
}
public UsageManager(UsageManager parent, String name, float portion) {
this.parent = parent;
this.usagePortion = portion;
if (parent != null) {
this.limit = (long)(parent.limit * portion);
this.name = parent.name + ":";
}
this.name += name;
}
/**
* Tries to increase the usage by value amount but blocks if this object is
* currently full.
*
* @throws InterruptedException
*/
public void enqueueUsage(long value) throws InterruptedException {
waitForSpace();
increaseUsage(value);
}
/**
* @throws InterruptedException
*/
public void waitForSpace() throws InterruptedException {
if (parent != null) {
parent.waitForSpace();
}
synchronized (usageMutex) {
for (int i = 0; percentUsage >= 100; i++) {
usageMutex.wait();
}
}
}
/**
* @throws InterruptedException
*
* @param timeout
*/
public boolean waitForSpace(long timeout) throws InterruptedException {
if (parent != null) {
if (!parent.waitForSpace(timeout)) {
return false;
}
}
synchronized (usageMutex) {
if (percentUsage >= 100) {
usageMutex.wait(timeout);
}
return percentUsage < 100;
}
}
/**
* Increases the usage by the value amount.
*
* @param value
*/
public void increaseUsage(long value) {
if (value == 0) {
return;
}
if (parent != null) {
parent.increaseUsage(value);
}
int percentUsage;
synchronized (usageMutex) {
usage += value;
percentUsage = caclPercentUsage();
}
setPercentUsage(percentUsage);
}
/**
* Decreases the usage by the value amount.
*
* @param value
*/
public void decreaseUsage(long value) {
if (value == 0) {
return;
}
if (parent != null) {
parent.decreaseUsage(value);
}
int percentUsage;
synchronized (usageMutex) {
usage -= value;
percentUsage = caclPercentUsage();
}
setPercentUsage(percentUsage);
}
public boolean isFull() {
if (parent != null && parent.isFull()) {
return true;
}
synchronized (usageMutex) {
return percentUsage >= 100;
}
}
public void addUsageListener(UsageListener listener) {
listeners.add(listener);
}
public void removeUsageListener(UsageListener listener) {
listeners.remove(listener);
}
public long getLimit() {
synchronized (usageMutex) {
return limit;
}
}
/**
* Sets the memory limit in bytes. Setting the limit in bytes will set the
* usagePortion to 0 since the UsageManager is not going to be portion based
* off the parent.
*
* When set using XBean, you can use values such as: "20 mb", "1024 kb", or
* "1 gb"
*
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
*/
public void setLimit(long limit) {
if (percentUsageMinDelta < 0) {
throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0");
}
synchronized (usageMutex) {
this.limit = limit;
this.usagePortion = 0;
}
onLimitChange();
}
private void onLimitChange() {
// We may need to calculate the limit
if (usagePortion > 0 && parent != null) {
synchronized (usageMutex) {
limit = (long)(parent.getLimit() * usagePortion);
}
}
// Reset the percent currently being used.
int percentUsage;
synchronized (usageMutex) {
percentUsage = caclPercentUsage();
}
setPercentUsage(percentUsage);
// Let the children know that the limit has changed. They may need to
// set
// their limits based on ours.
for (UsageManager child : children) {
child.onLimitChange();
}
}
public float getUsagePortion() {
synchronized (usageMutex) {
return usagePortion;
}
}
public void setUsagePortion(float usagePortion) {
synchronized (usageMutex) {
this.usagePortion = usagePortion;
}
onLimitChange();
}
/*
* Sets the minimum number of percentage points the usage has to change
* before a UsageListener event is fired by the manager.
*/
public int getPercentUsage() {
synchronized (usageMutex) {
return percentUsage;
}
}
public int getPercentUsageMinDelta() {
synchronized (usageMutex) {
return percentUsageMinDelta;
}
}
/**
* Sets the minimum number of percentage points the usage has to change
* before a UsageListener event is fired by the manager.
*
* @param percentUsageMinDelta
*/
public void setPercentUsageMinDelta(int percentUsageMinDelta) {
if (percentUsageMinDelta < 1) {
throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0");
}
int percentUsage;
synchronized (usageMutex) {
this.percentUsageMinDelta = percentUsageMinDelta;
percentUsage = caclPercentUsage();
}
setPercentUsage(percentUsage);
}
public long getUsage() {
synchronized (usageMutex) {
return usage;
}
}
/**
* Sets whether or not a send() should fail if there is no space free. The
* default value is false which means to block the send() method until space
* becomes available
*/
public void setSendFailIfNoSpace(boolean failProducerIfNoSpace) {
sendFailIfNoSpaceExplicitySet = true;
this.sendFailIfNoSpace = failProducerIfNoSpace;
}
public boolean isSendFailIfNoSpace() {
if (sendFailIfNoSpaceExplicitySet || parent == null) {
return sendFailIfNoSpace;
} else {
return parent.isSendFailIfNoSpace();
}
}
private void setPercentUsage(int value) {
synchronized (usageMutex) {
int oldValue = percentUsage;
percentUsage = value;
if (oldValue != value) {
fireEvent(oldValue, value);
}
}
}
private int caclPercentUsage() {
if (limit == 0) {
return 0;
}
return (int)((((usage * 100) / limit) / percentUsageMinDelta) * percentUsageMinDelta);
}
private void fireEvent(int oldPercentUsage, int newPercentUsage) {
if (debug) {
LOG.debug("Memory usage change. from: " + oldPercentUsage + ", to: " + newPercentUsage);
}
// Switching from being full to not being full..
if (oldPercentUsage >= 100 && newPercentUsage < 100) {
synchronized (usageMutex) {
usageMutex.notifyAll();
for (Iterator<Runnable> iter = new ArrayList<Runnable>(callbacks).iterator(); iter.hasNext();) {
Runnable callback = iter.next();
callback.run();
}
callbacks.clear();
}
}
// Let the listeners know
for (Iterator<UsageListener> iter = listeners.iterator(); iter.hasNext();) {
UsageListener l = iter.next();
l.onMemoryUseChanged(this, oldPercentUsage, newPercentUsage);
}
}
public String getName() {
return name;
}
public String toString() {
return "UsageManager(" + getName() + ") percentUsage=" + percentUsage + "%, usage=" + usage
+ " limit=" + limit + " percentUsageMinDelta=" + percentUsageMinDelta + "%";
}
public void start() {
if (parent != null) {
parent.addChild(this);
}
}
public void stop() {
if (parent != null) {
parent.removeChild(this);
}
}
private void addChild(UsageManager child) {
children.add(child);
}
private void removeChild(UsageManager child) {
children.remove(child);
}
/**
* @param callback
* @return true if the UsageManager was full. The callback will only be
* called if this method returns true.
*/
public boolean notifyCallbackWhenNotFull(final Runnable callback) {
if (parent != null) {
Runnable r = new Runnable() {
public void run() {
synchronized (usageMutex) {
if (percentUsage >= 100) {
callbacks.add(callback);
} else {
callback.run();
}
}
}
};
if (parent.notifyCallbackWhenNotFull(r)) {
return true;
}
}
synchronized (usageMutex) {
if (percentUsage >= 100) {
callbacks.add(callback);
return true;
} else {
return false;
}
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.memory;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.usage.MemoryUsage;
/**
* Simple CacheFilter that increases/decreases usage on a UsageManager as
@ -27,30 +28,30 @@ import java.util.concurrent.atomic.AtomicLong;
public class UsageManagerCacheFilter extends CacheFilter {
private final AtomicLong totalUsage = new AtomicLong(0);
private final UsageManager um;
private final MemoryUsage usage;
public UsageManagerCacheFilter(Cache next, UsageManager um) {
public UsageManagerCacheFilter(Cache next, MemoryUsage um) {
super(next);
this.um = um;
this.usage = um;
}
public Object put(Object key, Object value) {
long usage = getUsageOfAddedObject(value);
long usageValue = getUsageOfAddedObject(value);
Object rc = super.put(key, value);
if (rc != null) {
usage -= getUsageOfRemovedObject(rc);
usageValue -= getUsageOfRemovedObject(rc);
}
totalUsage.addAndGet(usage);
um.increaseUsage(usage);
totalUsage.addAndGet(usageValue);
usage.increaseUsage(usageValue);
return rc;
}
public Object remove(Object key) {
Object rc = super.remove(key);
if (rc != null) {
long usage = getUsageOfRemovedObject(rc);
totalUsage.addAndGet(-usage);
um.decreaseUsage(usage);
long usageValue = getUsageOfRemovedObject(rc);
totalUsage.addAndGet(-usageValue);
usage.decreaseUsage(usageValue);
}
return rc;
}
@ -64,6 +65,6 @@ public class UsageManagerCacheFilter extends CacheFilter {
}
public void close() {
um.decreaseUsage(totalUsage.get());
usage.decreaseUsage(totalUsage.get());
}
}

View File

@ -24,7 +24,8 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
/**
* Represents a message store which is used by the persistent implementations
@ -88,10 +89,10 @@ public interface MessageStore extends Service {
ActiveMQDestination getDestination();
/**
* @param usageManager The UsageManager that is controlling the
* @param memoeyUSage The SystemUsage that is controlling the
* destination's memory usage.
*/
void setUsageManager(UsageManager usageManager);
void setMemoryUsage(MemoryUsage memoeyUSage);
/**
* @return the number of messages ready to deliver

View File

@ -25,7 +25,7 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.usage.SystemUsage;
/**
* Adapter to the actual persistence mechanism used with ActiveMQ
@ -115,7 +115,7 @@ public interface PersistenceAdapter extends Service {
/**
* @param usageManager The UsageManager that is controlling the broker's memory usage.
*/
void setUsageManager(UsageManager usageManager);
void setUsageManager(SystemUsage usageManager);
/**
* Set the name of the broker using the adapter
@ -136,4 +136,10 @@ public interface PersistenceAdapter extends Service {
*
*/
void checkpoint(boolean sync) throws IOException;
/**
* A hint to return the size of the store on disk
* @return disk space used in bytes of 0 if not implemented
*/
long size();
}

View File

@ -17,13 +17,12 @@
package org.apache.activemq.store;
import java.io.IOException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.usage.MemoryUsage;
/**
* A simple proxy that delegates to another MessageStore.
@ -72,8 +71,8 @@ public class ProxyMessageStore implements MessageStore {
return delegate.getDestination();
}
public void setUsageManager(UsageManager usageManager) {
delegate.setUsageManager(usageManager);
public void setMemoryUsage(MemoryUsage memoryUsage) {
delegate.setMemoryUsage(memoryUsage);
}
public int getMessageCount() throws IOException {

View File

@ -24,7 +24,8 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
/**
* A simple proxy that delegates to another MessageStore.
@ -108,8 +109,8 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
return delegate.getAllSubscriptions();
}
public void setUsageManager(UsageManager usageManager) {
delegate.setUsageManager(usageManager);
public void setMemoryUsage(MemoryUsage memoryUsage) {
delegate.setMemoryUsage(memoryUsage);
}
public int getMessageCount(String clientId, String subscriberName) throws IOException {

View File

@ -38,7 +38,6 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
@ -47,6 +46,8 @@ import org.apache.activemq.store.ReferenceStore.ReferenceData;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.TransactionTemplate;
import org.apache.commons.logging.Log;
@ -94,15 +95,15 @@ public class AMQMessageStore implements MessageStore {
}, "Checkpoint: " + destination);
}
public void setUsageManager(UsageManager usageManager) {
referenceStore.setUsageManager(usageManager);
public void setMemoryUsage(MemoryUsage memoryUsage) {
referenceStore.setMemoryUsage(memoryUsage);
}
/**
* Not synchronized since the Journal has better throughput if you increase
* the number of concurrent writes that it is doing.
*/
public void addMessage(ConnectionContext context, final Message message) throws IOException {
public final void addMessage(ConnectionContext context, final Message message) throws IOException {
final MessageId id = message.getMessageId();
final Location location = peristenceAdapter.writeCommand(message, message.isResponseRequired());
if (!context.isInTransaction()) {
@ -142,7 +143,7 @@ public class AMQMessageStore implements MessageStore {
}
}
void addMessage(final Message message, final Location location) throws InterruptedIOException {
final void addMessage(final Message message, final Location location) throws InterruptedIOException {
ReferenceData data = new ReferenceData();
data.setExpiration(message.getExpiration());
data.setFileId(location.getDataFileId());

View File

@ -25,6 +25,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware;
@ -40,8 +41,6 @@ import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.Location;
import org.apache.activemq.memory.UsageListener;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
@ -56,6 +55,9 @@ import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.util.IOHelper;
@ -80,7 +82,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
private ReferenceStoreAdapter referenceStoreAdapter;
private TaskRunnerFactory taskRunnerFactory;
private WireFormat wireFormat = new OpenWireFormat();
private UsageManager usageManager;
private SystemUsage usageManager;
private long cleanupInterval = 1000 * 60;
private long checkpointInterval = 1000 * 10;
private int maxCheckpointWorkers = 1;
@ -96,6 +98,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
private String brokerName = "";
private File directory;
private BrokerService brokerService;
private AtomicLong storeSize = new AtomicLong();
public String getBrokerName() {
return this.brokerName;
@ -132,7 +135,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
this.directory.mkdirs();
if (this.usageManager != null) {
this.usageManager.addUsageListener(this);
this.usageManager.getMemoryUsage().addUsageListener(this);
}
if (asyncDataManager == null) {
asyncDataManager = createAsyncDataManager();
@ -217,7 +220,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
if (!started.compareAndSet(true, false)) {
return;
}
this.usageManager.removeUsageListener(this);
this.usageManager.getMemoryUsage().removeUsageListener(this);
synchronized (this) {
Scheduler.cancel(periodicCheckpointTask);
Scheduler.cancel(periodicCleanupTask);
@ -571,7 +574,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
return writeCommand(trace, sync);
}
public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
newPercentUsage = (newPercentUsage / 10) * 10;
oldPercentUsage = (oldPercentUsage / 10) * 10;
if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
@ -595,13 +598,13 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
// Subclass overridables
// /////////////////////////////////////////////////////////////////
protected AsyncDataManager createAsyncDataManager() {
AsyncDataManager manager = new AsyncDataManager();
AsyncDataManager manager = new AsyncDataManager(storeSize);
manager.setDirectory(new File(directory, "journal"));
return manager;
}
protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter();
KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(storeSize);
return adaptor;
}
@ -643,11 +646,11 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
this.wireFormat = wireFormat;
}
public UsageManager getUsageManager() {
public SystemUsage getUsageManager() {
return usageManager;
}
public void setUsageManager(UsageManager usageManager) {
public void setUsageManager(SystemUsage usageManager) {
this.usageManager = usageManager;
}
@ -689,4 +692,8 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
this.referenceStoreAdapter = referenceStoreAdapter;
}
public long size(){
return storeSize.get();
}
}

View File

@ -25,9 +25,10 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.IOExceptionSupport;
@ -194,9 +195,10 @@ public class JDBCMessageStore implements MessageStore {
return destination;
}
public void setUsageManager(UsageManager usageManager) {
// we can ignore since we don't buffer up messages.
public void setMemoryUsage(MemoryUsage memoryUsage) {
//can ignore as messages aren't buffered
}
public int getMessageCount() throws IOException {
int result = 0;

View File

@ -34,7 +34,6 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
@ -42,6 +41,7 @@ import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.apache.activemq.store.memory.MemoryTransactionStore;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.FactoryFinder;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
@ -447,7 +447,7 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
* @param usageManager The UsageManager that is controlling the
* destination's memory usage.
*/
public void setUsageManager(UsageManager usageManager) {
public void setUsageManager(SystemUsage usageManager) {
}
protected void databaseLockKeepAlive() {
@ -493,4 +493,8 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
public void checkpoint(boolean sync) throws IOException {
}
public long size(){
return 0;
}
}

View File

@ -33,11 +33,12 @@ import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.transaction.Synchronization;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.Callback;
import org.apache.activemq.util.TransactionTemplate;
import org.apache.commons.logging.Log;
@ -67,7 +68,7 @@ public class JournalMessageStore implements MessageStore {
private Map<MessageId, Message> cpAddedMessageIds;
private UsageManager usageManager;
private MemoryUsage memoryUsage;
public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
this.peristenceAdapter = adapter;
@ -77,9 +78,10 @@ public class JournalMessageStore implements MessageStore {
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
}
public void setUsageManager(UsageManager usageManager) {
this.usageManager = usageManager;
longTermStore.setUsageManager(usageManager);
public void setMemoryUsage(MemoryUsage memoryUsage) {
this.memoryUsage=memoryUsage;
longTermStore.setMemoryUsage(memoryUsage);
}
/**
@ -351,16 +353,16 @@ public class JournalMessageStore implements MessageStore {
}
public void start() throws Exception {
if (this.usageManager != null) {
this.usageManager.addUsageListener(peristenceAdapter);
if (this.memoryUsage != null) {
this.memoryUsage.addUsageListener(peristenceAdapter);
}
longTermStore.start();
}
public void stop() throws Exception {
longTermStore.stop();
if (this.usageManager != null) {
this.usageManager.removeUsageListener(peristenceAdapter);
if (this.memoryUsage != null) {
this.memoryUsage.removeUsageListener(peristenceAdapter);
}
}

View File

@ -49,8 +49,6 @@ import org.apache.activemq.command.JournalTrace;
import org.apache.activemq.command.JournalTransaction;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.memory.UsageListener;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
@ -63,6 +61,9 @@ import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.thread.Task;
import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.usage.UsageListener;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.activemq.wireformat.WireFormat;
@ -89,7 +90,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
private final ConcurrentHashMap<ActiveMQQueue, JournalMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, JournalMessageStore>();
private final ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, JournalTopicMessageStore>();
private UsageManager usageManager;
private SystemUsage usageManager;
private long checkpointInterval = 1000 * 60 * 5;
private long lastCheckpointRequest = System.currentTimeMillis();
private long lastCleanup = System.currentTimeMillis();
@ -139,7 +140,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
* @param usageManager The UsageManager that is controlling the
* destination's memory usage.
*/
public void setUsageManager(UsageManager usageManager) {
public void setUsageManager(SystemUsage usageManager) {
this.usageManager = usageManager;
longTermPersistence.setUsageManager(usageManager);
}
@ -213,7 +214,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
});
// checkpointExecutor.allowCoreThreadTimeOut(true);
this.usageManager.addUsageListener(this);
this.usageManager.getMemoryUsage().addUsageListener(this);
if (longTermPersistence instanceof JDBCPersistenceAdapter) {
// Disabled periodic clean up as it deadlocks with the checkpoint
@ -232,7 +233,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
public void stop() throws Exception {
this.usageManager.removeUsageListener(this);
this.usageManager.getMemoryUsage().removeUsageListener(this);
if (!started.compareAndSet(true, false)) {
return;
}
@ -605,7 +606,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
return writeCommand(trace, sync);
}
public void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage) {
public void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage) {
newPercentUsage = (newPercentUsage / 10) * 10;
oldPercentUsage = (oldPercentUsage / 10) * 10;
if (newPercentUsage >= 70 && oldPercentUsage < newPercentUsage) {
@ -633,7 +634,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
longTermPersistence.deleteAllMessages();
}
public UsageManager getUsageManager() {
public SystemUsage getUsageManager() {
return usageManager;
}
@ -682,5 +683,9 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
public void setDirectory(File dir) {
}
public long size(){
return 0;
}
}

View File

@ -24,9 +24,10 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
/**
* An implementation of {@link org.apache.activemq.store.MessageStore} which
@ -121,11 +122,7 @@ public class KahaMessageStore implements MessageStore {
messageContainer.clear();
}
/**
* @param usageManager The UsageManager that is controlling the
* destination's memory usage.
*/
public void setUsageManager(UsageManager usageManager) {
public void setMemoryUsage(MemoryUsage memoryUsage) {
}
/**

View File

@ -22,6 +22,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@ -38,12 +39,12 @@ import org.apache.activemq.kaha.MessageMarshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory;
import org.apache.activemq.kaha.impl.StoreLockedExcpetion;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.IOHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -69,7 +70,16 @@ public class KahaPersistenceAdapter implements PersistenceAdapter {
private String brokerName;
private Store theStore;
private boolean initialized;
private final AtomicLong storeSize;
public KahaPersistenceAdapter(AtomicLong size) {
this.storeSize=size;
}
public KahaPersistenceAdapter() {
this(new AtomicLong());
}
public Set<ActiveMQDestination> getDestinations() {
Set<ActiveMQDestination> rc = new HashSet<ActiveMQDestination>();
try {
@ -225,7 +235,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter {
* @param usageManager The UsageManager that is controlling the broker's
* memory usage.
*/
public void setUsageManager(UsageManager usageManager) {
public void setUsageManager(SystemUsage usageManager) {
}
/**
@ -245,7 +255,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter {
protected synchronized Store getStore() throws IOException {
if (theStore == null) {
theStore = StoreFactory.open(getStoreName(), "rw");
theStore = StoreFactory.open(getStoreName(), "rw",storeSize);
theStore.setMaxDataFileLength(maxDataFileLength);
}
return theStore;
@ -281,6 +291,10 @@ public class KahaPersistenceAdapter implements PersistenceAdapter {
getStore().force();
}
}
public long size(){
return storeSize.get();
}
private void initialize() {
if (!initialized) {
@ -295,5 +309,6 @@ public class KahaPersistenceAdapter implements PersistenceAdapter {
wireFormat.setTightEncodingEnabled(true);
}
}
}

View File

@ -24,9 +24,10 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
public class KahaReferenceStore implements ReferenceStore {
@ -171,9 +172,9 @@ public class KahaReferenceStore implements ReferenceStore {
return messageContainer.size();
}
public void setUsageManager(UsageManager usageManager) {
public void setMemoryUsage(MemoryUsage memoryUsage) {
}
public boolean isSupportForCursors() {
return true;
}

View File

@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
@ -48,6 +49,8 @@ import org.apache.commons.logging.LogFactory;
public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter {
private static final Log LOG = LogFactory.getLog(KahaPersistenceAdapter.class);
private static final String STORE_STATE = "store-state";
private static final String RECORD_REFERENCES = "record-references";
@ -59,6 +62,10 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
private boolean storeValid;
private Store stateStore;
public KahaReferenceStoreAdapter(AtomicLong size){
super(size);
}
public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
throw new RuntimeException("Use createQueueReferenceStore instead");
}

View File

@ -28,9 +28,10 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
/**
* An implementation of {@link org.apache.activemq.store.MessageStore} which
@ -125,13 +126,7 @@ public class MemoryMessageStore implements MessageStore {
}
}
/**
* @param usageManager The UsageManager that is controlling the
* destination's memory usage.
*/
public void setUsageManager(UsageManager usageManager) {
}
public int getMessageCount() {
return messageTable.size();
}
@ -161,4 +156,13 @@ public class MemoryMessageStore implements MessageStore {
public void resetBatching() {
lastBatchId = null;
}
/**
* @param memoeyUSage
* @see org.apache.activemq.store.MessageStore#setMemoryUsage(org.apache.activemq.usage.MemoryUsage)
*/
public void setMemoryUsage(MemoryUsage memoeyUSage){
// TODO Auto-generated method stub
}
}

View File

@ -27,11 +27,11 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -151,7 +151,7 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
* @param usageManager The UsageManager that is controlling the broker's
* memory usage.
*/
public void setUsageManager(UsageManager usageManager) {
public void setUsageManager(SystemUsage usageManager) {
}
public String toString() {
@ -166,4 +166,8 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
public void checkpoint(boolean sync) throws IOException {
}
public long size(){
return 0;
}
}

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usage;
/**
Identify if a limit has been reached
*
* @org.apache.xbean.XBean
*
* @version $Revision: 1.3 $
*/
public class DefaultUsageCapacity implements UsageCapacity{
private long limit;
/**
* @param size
* @return true if the limit is reached
* @see org.apache.activemq.usage.UsageCapacity#isLimit(long)
*/
public boolean isLimit(long size) {
return size >= limit;
}
/**
* @return the limit
*/
public final long getLimit(){
return this.limit;
}
/**
* @param limit the limit to set
*/
public final void setLimit(long limit){
this.limit=limit;
}
}

View File

@ -0,0 +1,160 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usage;
/**
* Used to keep track of how much of something is being used so that a
* productive working set usage can be controlled.
*
* Main use case is manage memory usage.
*
* @org.apache.xbean.XBean
*
* @version $Revision: 1.3 $
*/
public class MemoryUsage extends Usage{
private MemoryUsage parent;
private long usage;
public MemoryUsage(){
this(null,"default");
}
/**
* Create the memory manager linked to a parent. When the memory manager is
* linked to a parent then when usage increased or decreased, the parent's
* usage is also increased or decreased.
*
* @param parent
*/
public MemoryUsage(MemoryUsage parent){
this(parent,"default");
}
public MemoryUsage(String name){
this(null,name);
}
public MemoryUsage(MemoryUsage parent,String name){
this(parent,name,1.0f);
}
public MemoryUsage(MemoryUsage parent,String name,float portion){
super(parent,name,portion);
}
/**
* @throws InterruptedException
*/
public void waitForSpace() throws InterruptedException{
if(parent!=null){
parent.waitForSpace();
}
synchronized(usageMutex){
for(int i=0;percentUsage>=100;i++){
usageMutex.wait();
}
}
}
/**
* @param timeout
* @throws InterruptedException
*
* @return true if space
*/
public boolean waitForSpace(long timeout) throws InterruptedException{
if(parent!=null){
if(!parent.waitForSpace(timeout)){
return false;
}
}
synchronized(usageMutex){
if(percentUsage>=100){
usageMutex.wait(timeout);
}
return percentUsage<100;
}
}
public boolean isFull(){
if(parent!=null&&parent.isFull()){
return true;
}
synchronized(usageMutex){
return percentUsage>=100;
}
}
/**
* Tries to increase the usage by value amount but blocks if this object is
* currently full.
* @param value
*
* @throws InterruptedException
*/
public void enqueueUsage(long value) throws InterruptedException{
waitForSpace();
increaseUsage(value);
}
/**
* Increases the usage by the value amount.
*
* @param value
*/
public void increaseUsage(long value){
if(value==0){
return;
}
if(parent!=null){
parent.increaseUsage(value);
}
int percentUsage;
synchronized(usageMutex){
usage+=value;
percentUsage=caclPercentUsage();
}
setPercentUsage(percentUsage);
}
/**
* Decreases the usage by the value amount.
*
* @param value
*/
public void decreaseUsage(long value){
if(value==0){
return;
}
if(parent!=null){
parent.decreaseUsage(value);
}
int percentUsage;
synchronized(usageMutex){
usage-=value;
percentUsage=caclPercentUsage();
}
setPercentUsage(percentUsage);
}
protected long retrieveUsage(){
return usage;
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usage;
import org.apache.activemq.store.PersistenceAdapter;
/**
* Used to keep track of how much of something is being used so that a
* productive working set usage can be controlled.
*
* Main use case is manage memory usage.
*
* @org.apache.xbean.XBean
*
* @version $Revision: 1.3 $
*/
public class StoreUsage extends Usage{
final private PersistenceAdapter store;
public StoreUsage(String name,PersistenceAdapter store){
super(null,name,1.0f);
this.store=store;
}
public StoreUsage(StoreUsage parent,String name){
super(parent,name,1.0f);
this.store=parent.store;
}
protected long retrieveUsage(){
return store.size();
}
}

View File

@ -0,0 +1,146 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usage;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.Service;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.store.PersistenceAdapter;
/**
* Holder for Usage instances for memory, store and temp files
*
* Main use case is manage memory usage.
*
* @org.apache.xbean.XBean
*
* @version $Revision: 1.3 $
*/
public class SystemUsage implements Service{
private final SystemUsage parent;
private final String name;
private final MemoryUsage memoryUsage;
private final StoreUsage storeUsage;
private final TempUsage tempDiskUsage;
/**
* True if someone called setSendFailIfNoSpace() on this particular usage
* manager
*/
private boolean sendFailIfNoSpaceExplicitySet;
private boolean sendFailIfNoSpace;
private List<SystemUsage> children=new CopyOnWriteArrayList<SystemUsage>();
public SystemUsage(){
this.parent=null;
this.name="default";
this.memoryUsage=new MemoryUsage(name+":memory");
this.storeUsage=null;
this.tempDiskUsage=null;
}
public SystemUsage(String name,PersistenceAdapter adapter,Store tempStore){
this.parent=null;
this.name=name;
this.memoryUsage=new MemoryUsage(name+":memory");
this.storeUsage=new StoreUsage(name+":store",adapter);
this.tempDiskUsage=new TempUsage(name+":temp",tempStore);
}
public SystemUsage(SystemUsage parent,String name){
this.parent=parent;
this.name=name;
this.memoryUsage=new MemoryUsage(parent.memoryUsage,name+":memory");
this.storeUsage=new StoreUsage(parent.storeUsage,name+":store");
this.tempDiskUsage=new TempUsage(parent!=null?parent.tempDiskUsage:null,name+":temp");
}
public String getName(){
return name;
}
/**
* @return the memoryUsage
*/
public MemoryUsage getMemoryUsage(){
return this.memoryUsage;
}
/**
* @return the storeUsage
*/
public StoreUsage getStoreUsage(){
return this.storeUsage;
}
/**
* @return the tempDiskUsage
*/
public TempUsage getTempDiskUsage(){
return this.tempDiskUsage;
}
public String toString(){
return "UsageManager("+getName()+")";
}
public void start(){
if(parent!=null){
parent.addChild(this);
}
this.memoryUsage.start();
this.storeUsage.start();
this.tempDiskUsage.start();
}
public void stop(){
if(parent!=null){
parent.removeChild(this);
}
this.memoryUsage.stop();
this.storeUsage.stop();
this.tempDiskUsage.stop();
}
/**
* Sets whether or not a send() should fail if there is no space free. The
* default value is false which means to block the send() method until space
* becomes available
*/
public void setSendFailIfNoSpace(boolean failProducerIfNoSpace) {
sendFailIfNoSpaceExplicitySet = true;
this.sendFailIfNoSpace = failProducerIfNoSpace;
}
public boolean isSendFailIfNoSpace() {
if (sendFailIfNoSpaceExplicitySet || parent == null) {
return sendFailIfNoSpace;
} else {
return parent.isSendFailIfNoSpace();
}
}
private void addChild(SystemUsage child){
children.add(child);
}
private void removeChild(SystemUsage child){
children.remove(child);
}
}

View File

@ -0,0 +1,49 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usage;
import org.apache.activemq.kaha.Store;
/**
* Used to keep track of how much of something is being used so that a
* productive working set usage can be controlled.
*
* Main use case is manage memory usage.
*
* @org.apache.xbean.XBean
*
* @version $Revision: 1.3 $
*/
public class TempUsage extends Usage{
final private Store store;
public TempUsage(String name,Store store){
super(null,name,1.0f);
this.store=store;
}
public TempUsage(TempUsage parent,String name){
super(parent,name,1.0f);
this.store=parent.store;
}
protected long retrieveUsage(){
return store.size();
}
}

View File

@ -0,0 +1,358 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usage;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.activemq.Service;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Used to keep track of how much of something is being used so that a
* productive working set usage can be controlled.
*
* Main use case is manage memory usage.
*
* @org.apache.xbean.XBean
*
* @version $Revision: 1.3 $
*/
public abstract class Usage implements Service{
private static final Log LOG=LogFactory.getLog(Usage.class);
protected final Object usageMutex=new Object();
protected int percentUsage;
private final Usage parent;
private UsageCapacity limiter = new DefaultUsageCapacity();
private int percentUsageMinDelta=1;
private final List<UsageListener> listeners=new CopyOnWriteArrayList<UsageListener>();
private final boolean debug=LOG.isDebugEnabled();
private String name="";
private float usagePortion=1.0f;
private List<Usage> children=new CopyOnWriteArrayList<Usage>();
private final List<Runnable> callbacks=new LinkedList<Runnable>();
private int pollingTime = 100;
public Usage(Usage parent,String name,float portion){
this.parent=parent;
this.usagePortion=portion;
if(parent!=null){
this.limiter.setLimit((long)(parent.getLimit()*portion));
this.name=parent.name+":";
}
this.name+=name;
}
protected abstract long retrieveUsage();
/**
* @throws InterruptedException
*/
public void waitForSpace() throws InterruptedException{
waitForSpace(0);
}
/**
* @param timeout
* @throws InterruptedException
*
* @return true if space
*/
public boolean waitForSpace(long timeout) throws InterruptedException{
if(parent!=null){
if(!parent.waitForSpace(timeout)){
return false;
}
}
synchronized(usageMutex){
caclPercentUsage();
if(percentUsage>=100){
long deadline=timeout>0?System.currentTimeMillis()+timeout:Long.MAX_VALUE;
long timeleft=deadline;
while(timeleft>0){
caclPercentUsage();
if(percentUsage>=100){
usageMutex.wait(pollingTime);
timeleft=deadline-System.currentTimeMillis();
}else{
break;
}
}
}
return percentUsage<100;
}
}
public boolean isFull(){
if(parent!=null&&parent.isFull()){
return true;
}
synchronized(usageMutex){
caclPercentUsage();
return percentUsage>=100;
}
}
public void addUsageListener(UsageListener listener){
listeners.add(listener);
}
public void removeUsageListener(UsageListener listener){
listeners.remove(listener);
}
public long getLimit(){
synchronized(usageMutex){
return limiter.getLimit();
}
}
/**
* Sets the memory limit in bytes. Setting the limit in bytes will set the
* usagePortion to 0 since the UsageManager is not going to be portion based
* off the parent.
*
* When set using XBean, you can use values such as: "20 mb", "1024 kb", or
* "1 gb"
*
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
*/
public void setLimit(long limit){
if(percentUsageMinDelta<0){
throw new IllegalArgumentException("percentUsageMinDelta must be greater or equal to 0");
}
synchronized(usageMutex){
this.limiter.setLimit(limit);
this.usagePortion=0;
}
onLimitChange();
}
private void onLimitChange(){
// We may need to calculate the limit
if(usagePortion>0&&parent!=null){
synchronized(usageMutex){
this.limiter.setLimit((long)(parent.getLimit()*usagePortion));
}
}
// Reset the percent currently being used.
int percentUsage;
synchronized(usageMutex){
percentUsage=caclPercentUsage();
}
setPercentUsage(percentUsage);
// Let the children know that the limit has changed. They may need to
// set
// their limits based on ours.
for(Usage child:children){
child.onLimitChange();
}
}
public float getUsagePortion(){
synchronized(usageMutex){
return usagePortion;
}
}
public void setUsagePortion(float usagePortion){
synchronized(usageMutex){
this.usagePortion=usagePortion;
}
onLimitChange();
}
/*
* Sets the minimum number of percentage points the usage has to change
* before a UsageListener event is fired by the manager.
*/
public int getPercentUsage(){
synchronized(usageMutex){
return percentUsage;
}
}
public int getPercentUsageMinDelta(){
synchronized(usageMutex){
return percentUsageMinDelta;
}
}
/**
* Sets the minimum number of percentage points the usage has to change
* before a UsageListener event is fired by the manager.
*
* @param percentUsageMinDelta
*/
public void setPercentUsageMinDelta(int percentUsageMinDelta){
if(percentUsageMinDelta<1){
throw new IllegalArgumentException("percentUsageMinDelta must be greater than 0");
}
int percentUsage;
synchronized(usageMutex){
this.percentUsageMinDelta=percentUsageMinDelta;
percentUsage=caclPercentUsage();
}
setPercentUsage(percentUsage);
}
public long getUsage(){
synchronized(usageMutex){
return retrieveUsage();
}
}
protected void setPercentUsage(int value){
synchronized(usageMutex){
int oldValue=percentUsage;
percentUsage=value;
if(oldValue!=value){
fireEvent(oldValue,value);
}
}
}
protected int caclPercentUsage(){
if(limiter.getLimit()==0){
return 0;
}
return (int)((((retrieveUsage()*100)/limiter.getLimit())/percentUsageMinDelta)*percentUsageMinDelta);
}
private void fireEvent(int oldPercentUsage,int newPercentUsage){
if(debug){
LOG.debug("Memory usage change. from: "+oldPercentUsage+", to: "+newPercentUsage);
}
// Switching from being full to not being full..
if(oldPercentUsage>=100&&newPercentUsage<100){
synchronized(usageMutex){
usageMutex.notifyAll();
for(Iterator<Runnable> iter=new ArrayList<Runnable>(callbacks).iterator();iter.hasNext();){
Runnable callback=iter.next();
callback.run();
}
callbacks.clear();
}
}
// Let the listeners know
for(Iterator<UsageListener> iter=listeners.iterator();iter.hasNext();){
UsageListener l=iter.next();
l.onUsageChanged(this,oldPercentUsage,newPercentUsage);
}
}
public String getName(){
return name;
}
public String toString(){
return "Usage("+getName()+") percentUsage="+percentUsage+"%, usage="+retrieveUsage()+" limit="+limiter.getLimit()
+" percentUsageMinDelta="+percentUsageMinDelta+"%";
}
public void start(){
if(parent!=null){
parent.addChild(this);
}
}
public void stop(){
if(parent!=null){
parent.removeChild(this);
}
}
private void addChild(Usage child){
children.add(child);
}
private void removeChild(Usage child){
children.remove(child);
}
/**
* @param callback
* @return true if the UsageManager was full. The callback will only be
* called if this method returns true.
*/
public boolean notifyCallbackWhenNotFull(final Runnable callback){
if(parent!=null){
Runnable r=new Runnable(){
public void run(){
synchronized(usageMutex){
if(percentUsage>=100){
callbacks.add(callback);
}else{
callback.run();
}
}
}
};
if(parent.notifyCallbackWhenNotFull(r)){
return true;
}
}
synchronized(usageMutex){
if(percentUsage>=100){
callbacks.add(callback);
return true;
}else{
return false;
}
}
}
/**
* @return the limiter
*/
public UsageCapacity getLimiter(){
return this.limiter;
}
/**
* @param limiter the limiter to set
*/
public void setLimiter(UsageCapacity limiter){
this.limiter=limiter;
}
/**
* @return the pollingTime
*/
public int getPollingTime(){
return this.pollingTime;
}
/**
* @param pollingTime the pollingTime to set
*/
public void setPollingTime(int pollingTime){
this.pollingTime=pollingTime;
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usage;
/**
Identify if a limit has been reached
*
* @org.apache.xbean.XBean
*
* @version $Revision: 1.3 $
*/
public interface UsageCapacity{
/**
* Has the limit been reached ?
*
* @param size
* @return true if it has
*/
boolean isLimit(long size);
/**
* @return the limit
*/
long getLimit();
/**
* @param limit the limit to set
*/
void setLimit(long limit);
}

View File

@ -14,8 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.memory;
package org.apache.activemq.usage;
public interface UsageListener {
void onMemoryUseChanged(UsageManager memoryManager, int oldPercentUsage, int newPercentUsage);
void onUsageChanged(Usage usage, int oldPercentUsage, int newPercentUsage);
}

View File

@ -41,10 +41,10 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.core.JmsTemplate;
@ -238,9 +238,9 @@ public class AMQDeadlockTest3 extends TestCase {
brokerService.setPersistent(false);
brokerService.setUseJmx(true);
final UsageManager memoryManager = new UsageManager();
memoryManager.setLimit(5000000);
brokerService.setMemoryManager(memoryManager);
final SystemUsage memoryManager = new SystemUsage();
memoryManager.getMemoryUsage().setLimit(5000000);
brokerService.setUsageManager(memoryManager);
final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();

View File

@ -51,8 +51,8 @@ import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -74,7 +74,7 @@ public class BrokerTestSupport extends CombinationTestSupport {
protected int maxWait = 4000;
protected UsageManager memoryManager;
protected SystemUsage memoryManager;
protected void setUp() throws Exception {
super.setUp();

View File

@ -91,7 +91,7 @@ public class MessageExpirationTest extends BrokerTestSupport {
// Reduce the limit so that only 1 message can flow through the broker
// at a time.
broker.getMemoryManager().setLimit(1);
broker.getUsageManager().getMemoryUsage().setLimit(1);
final Message m1 = createMessage(producerInfo, destination, deliveryMode);
final Message m2 = createMessage(producerInfo, destination, deliveryMode, 1000);

View File

@ -32,12 +32,12 @@ import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy;
import org.apache.activemq.broker.region.policy.SubscriptionRecoveryPolicy;
import org.apache.activemq.broker.region.policy.TimedSubscriptionRecoveryPolicy;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.journal.JournalPersistenceAdapter;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.transport.tcp.TcpTransportServer;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.wireformat.ObjectStreamWireFormat;
import org.apache.activemq.xbean.BrokerFactoryBean;
import org.apache.commons.logging.Log;
@ -233,10 +233,10 @@ public class ConfigTest extends TestCase {
// Check usage manager
// System.out.print("Checking memory manager configurations... ");
UsageManager memMgr = broker.getMemoryManager();
SystemUsage memMgr = broker.getUsageManager();
assertTrue("Should have a memory manager", memMgr != null);
assertEquals("UsageManager Config Error (limit)", 200000, memMgr.getLimit());
assertEquals("UsageManager Config Error (percentUsageMinDelta)", 20, memMgr.getPercentUsageMinDelta());
assertEquals("UsageManager Config Error (limit)", 200000, memMgr.getMemoryUsage().getLimit());
assertEquals("UsageManager Config Error (percentUsageMinDelta)", 20, memMgr.getMemoryUsage().getPercentUsageMinDelta());
LOG.info("Success");
LOG.info("Success");

View File

@ -18,12 +18,14 @@ package org.apache.activemq.kaha.impl.index.hash;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import junit.framework.TestCase;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.impl.index.IndexItem;
import org.apache.activemq.kaha.impl.index.IndexManager;
import org.apache.activemq.util.IOHelper;
/**
* Test a HashIndex
*/
@ -42,7 +44,7 @@ public class HashTest extends TestCase {
super.setUp();
directory = new File(IOHelper.getDefaultDataDirectory());
directory.mkdirs();
indexManager = new IndexManager(directory, "im-hash-test", "rw", null);
indexManager = new IndexManager(directory, "im-hash-test", "rw", null, new AtomicLong());
this.hashIndex = new HashIndex(directory, "testHash", indexManager);
this.hashIndex.setKeyMarshaller(Store.STRING_MARSHALLER);
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.kaha.impl.index.tree;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import junit.framework.TestCase;
import org.apache.activemq.kaha.Store;
@ -43,7 +44,7 @@ public class TreeTest extends TestCase {
super.setUp();
directory = new File("activemq-data");
directory.mkdirs();
indexManager = new IndexManager(directory, "im-test", "rw", null);
indexManager = new IndexManager(directory, "im-test", "rw", null,new AtomicLong());
this.tree = new TreeIndex(directory, "testTree", indexManager);
this.tree.setKeyMarshaller(Store.STRING_MARSHALLER);
}

View File

@ -19,6 +19,8 @@ package org.apache.activemq.memory.buffer;
import junit.framework.TestCase;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.memory.buffer.MessageBuffer;
import org.apache.activemq.memory.buffer.MessageQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.memory.buffer;
import org.apache.activemq.memory.buffer.MessageBuffer;
import org.apache.activemq.memory.buffer.OrderBasedMessageBuffer;
/**
*

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.memory.buffer;
import org.apache.activemq.memory.buffer.MessageBuffer;
import org.apache.activemq.memory.buffer.SizeBasedMessageBuffer;
/**
*

View File

@ -28,11 +28,11 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerTestSupport;
import org.apache.activemq.broker.StubConnection;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.usage.SystemUsage;
public class NetworkTestSupport extends BrokerTestSupport {
@ -42,7 +42,7 @@ public class NetworkTestSupport extends BrokerTestSupport {
protected PersistenceAdapter remotePersistenceAdapter;
protected BrokerService remoteBroker;
protected UsageManager remoteMemoryManager;
protected SystemUsage remoteMemoryManager;
protected TransportConnector remoteConnector;
protected void setUp() throws Exception {

View File

@ -24,10 +24,10 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerTestSupport;
import org.apache.activemq.broker.StubConnection;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.usage.SystemUsage;
public class ProxyTestSupport extends BrokerTestSupport {
@ -37,7 +37,7 @@ public class ProxyTestSupport extends BrokerTestSupport {
protected PersistenceAdapter remotePersistenceAdapter;
protected BrokerService remoteBroker;
protected UsageManager remoteMemoryManager;
protected SystemUsage remoteMemoryManager;
protected TransportConnector remoteConnector;
private ProxyConnector proxyConnector;
private ProxyConnector remoteProxyConnector;

View File

@ -40,10 +40,10 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.usage.SystemUsage;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.jms.core.JmsTemplate;
@ -173,9 +173,9 @@ public class AMQDeadlockTestW4Brokers extends TestCase {
brokerService.setPersistent(false);
brokerService.setUseJmx(true);
final UsageManager memoryManager = new UsageManager();
memoryManager.setLimit(100000000);
brokerService.setMemoryManager(memoryManager);
final SystemUsage memoryManager = new SystemUsage();
memoryManager.getMemoryUsage().setLimit(100000000);
brokerService.setUsageManager(memoryManager);
final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();

View File

@ -39,10 +39,10 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.apache.activemq.usage.SystemUsage;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
@ -106,9 +106,9 @@ public class AMQFailoverIssue extends TestCase {
brokerService.setBrokerName(brokerName);
brokerService.setPersistent(false);
brokerService.setUseJmx(true);
final UsageManager memoryManager = new UsageManager();
memoryManager.setLimit(5000000);
brokerService.setMemoryManager(memoryManager);
final SystemUsage memoryManager = new SystemUsage();
memoryManager.getMemoryUsage().setLimit(5000000);
brokerService.setUsageManager(memoryManager);
final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();
final PolicyEntry entry = new PolicyEntry();
entry.setQueue(">");

View File

@ -33,9 +33,9 @@ import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.usage.SystemUsage;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
@ -117,9 +117,9 @@ public class AMQStackOverFlowTest extends TestCase {
brokerService.setPersistent(false);
brokerService.setUseJmx(true);
final UsageManager memoryManager = new UsageManager();
memoryManager.setLimit(10);
brokerService.setMemoryManager(memoryManager);
final SystemUsage memoryManager = new SystemUsage();
memoryManager.getMemoryUsage().setLimit(10);
brokerService.setUsageManager(memoryManager);
final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();