- Since we now have per destination usage managers, I updated the MessageStore API so that each destination's store can be maded aware of the manager for that store.

Some stores like the journal hold messages around and use the usage manager to know when it should flush to disk.
- Moved alot of the message reference counting logic from the PrefetchSubscription up to it's subclasses since they all seem to do it just a slightly different way.
  I think it makes easier to see now how the usage manager is affected by operations.
- I fixed and verifed that the keepDurableSubsActive=true option actually works.  I think we should make this the default setting since it
  make recovery safer.  Once we have a better spooling implementation we can turn if off again.


git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@389941 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-03-30 00:36:21 +00:00
parent f1e3420e1d
commit 18d616c8db
27 changed files with 250 additions and 86 deletions

View File

@ -900,6 +900,7 @@ public class BrokerService implements Service {
protected Broker createRegionBroker() throws Exception { protected Broker createRegionBroker() throws Exception {
// we must start the persistence adaptor before we can create the region // we must start the persistence adaptor before we can create the region
// broker // broker
getPersistenceAdapter().setUsageManager(getMemoryManager());
getPersistenceAdapter().start(); getPersistenceAdapter().start();
RegionBroker regionBroker = null; RegionBroker regionBroker = null;
if (isUseJmx()) { if (isUseJmx()) {
@ -947,7 +948,6 @@ public class BrokerService implements Service {
protected DefaultPersistenceAdapterFactory createPersistenceFactory() { protected DefaultPersistenceAdapterFactory createPersistenceFactory() {
DefaultPersistenceAdapterFactory factory = new DefaultPersistenceAdapterFactory(); DefaultPersistenceAdapterFactory factory = new DefaultPersistenceAdapterFactory();
factory.setMemManager(getMemoryManager());
factory.setDataDirectory(getDataDirectory()); factory.setDataDirectory(getDataDirectory());
factory.setTaskRunnerFactory(getTaskRunnerFactory()); factory.setTaskRunnerFactory(getTaskRunnerFactory());
return factory; return factory;

View File

@ -22,6 +22,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.Map.Entry; import java.util.Map.Entry;
import javax.management.InstanceNotFoundException; import javax.management.InstanceNotFoundException;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException; import javax.management.MalformedObjectNameException;
@ -33,6 +34,7 @@ import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType; import javax.management.openmbean.TabularType;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
@ -60,6 +62,7 @@ import org.apache.activemq.util.ServiceStopper;
import org.apache.activemq.util.SubscriptionKey; import org.apache.activemq.util.SubscriptionKey;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet; import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
@ -161,10 +164,10 @@ public class ManagedRegionBroker extends RegionBroker{
SubscriptionKey key=new SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubcriptionName()); SubscriptionKey key=new SubscriptionKey(context.getClientId(),sub.getConsumerInfo().getSubcriptionName());
if(sub.getConsumerInfo().isDurable()){ if(sub.getConsumerInfo().isDurable()){
name = key.toString(); name = key.toString();
} else {
name = sub.getConsumerInfo().getConsumerId().toString();
} }
if(sub.getConsumerInfo()!=null&&sub.getConsumerInfo().getConsumerId()!=null){
name+="."+sub.getConsumerInfo().getConsumerId();
}
try{ try{
ObjectName objectName=new ObjectName(brokerObjectName.getDomain()+":"+"BrokerName="+map.get("BrokerName") ObjectName objectName=new ObjectName(brokerObjectName.getDomain()+":"+"BrokerName="+map.get("BrokerName")
+","+"Type=Subscription,"+"active=true,"+"name="+JMXSupport.encodeObjectNamePart(name)+""); +","+"Type=Subscription,"+"active=true,"+"name="+JMXSupport.encodeObjectNamePart(name)+"");

View File

@ -24,7 +24,6 @@ import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics; import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.Subscription; import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion; import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.memory.UsageManager; import org.apache.activemq.memory.UsageManager;

View File

@ -16,7 +16,10 @@
*/ */
package org.apache.activemq.broker.region; package org.apache.activemq.broker.region;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; import java.io.IOException;
import java.util.Iterator;
import javax.jms.InvalidSelectorException;
import org.apache.activemq.broker.Broker; import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
@ -26,10 +29,7 @@ import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.util.SubscriptionKey; import org.apache.activemq.util.SubscriptionKey;
import javax.jms.InvalidSelectorException; import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import java.io.IOException;
import java.util.Iterator;
public class DurableTopicSubscription extends PrefetchSubscription { public class DurableTopicSubscription extends PrefetchSubscription {
@ -103,14 +103,21 @@ public class DurableTopicSubscription extends PrefetchSubscription {
} else { } else {
redeliveredMessages.put(node.getMessageId(), new Integer(1)); redeliveredMessages.put(node.getMessageId(), new Integer(1));
} }
if( keepDurableSubsActive ) {
pending.addFirst(node);
} else {
node.decrementReferenceCount();
iter.remove(); iter.remove();
} }
}
if( !keepDurableSubsActive ) {
for (Iterator iter = pending.iterator(); iter.hasNext();) { for (Iterator iter = pending.iterator(); iter.hasNext();) {
MessageReference node = (MessageReference) iter.next(); MessageReference node = (MessageReference) iter.next();
// node.decrementTargetCount(); node.decrementReferenceCount();
iter.remove(); iter.remove();
} }
}
prefetchExtension=0; prefetchExtension=0;
} }
@ -127,9 +134,8 @@ public class DurableTopicSubscription extends PrefetchSubscription {
if( !active && !keepDurableSubsActive ) { if( !active && !keepDurableSubsActive ) {
return; return;
} }
node = new IndirectMessageReference(node.getRegionDestination(), (Message) node); node.incrementReferenceCount();
super.add(node); super.add(node);
node.decrementReferenceCount();
} }
public int getPendingQueueSize() { public int getPendingQueueSize() {
@ -148,14 +154,10 @@ public class DurableTopicSubscription extends PrefetchSubscription {
return active; return active;
} }
public synchronized void acknowledge(ConnectionContext context, MessageAck ack) throws Exception {
super.acknowledge(context, ack);
}
protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException { protected void acknowledge(ConnectionContext context, MessageAck ack, MessageReference node) throws IOException {
node.getRegionDestination().acknowledge(context, this, ack, node); node.getRegionDestination().acknowledge(context, this, ack, node);
redeliveredMessages.remove(node.getMessageId()); redeliveredMessages.remove(node.getMessageId());
((IndirectMessageReference)node).drop(); node.decrementReferenceCount();
} }
public String getSubscriptionName() { public String getSubscriptionName() {

View File

@ -79,7 +79,6 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
try{ try{
MessageDispatch md=createMessageDispatch(node,node.getMessage()); MessageDispatch md=createMessageDispatch(node,node.getMessage());
dispatched.addLast(node); dispatched.addLast(node);
node.decrementReferenceCount();
}catch(Exception e){ }catch(Exception e){
log.error("Problem processing MessageDispatchNotification: "+mdn,e); log.error("Problem processing MessageDispatchNotification: "+mdn,e);
} }
@ -166,22 +165,7 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
inAckRange=true; inAckRange=true;
} }
if(inAckRange){ if(inAckRange){
// Send the message to the DLQ sendToDLQ(context, node);
node.incrementReferenceCount();
try{
Message message=node.getMessage();
if(message!=null){
// The original destination and transaction id do not get filled when the message is first
// sent,
// it is only populated if the message is routed to another destination like the DLQ
DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message.getDestination());
BrokerSupport.resend(context, message, deadLetterDestination);
}
}finally{
node.decrementReferenceCount();
}
iter.remove(); iter.remove();
dequeueCounter++; dequeueCounter++;
index++; index++;
@ -200,6 +184,26 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
throw new JMSException("Invalid acknowledgment: "+ack); throw new JMSException("Invalid acknowledgment: "+ack);
} }
/**
* @param context
* @param node
* @throws IOException
* @throws Exception
*/
protected void sendToDLQ(final ConnectionContext context, final MessageReference node) throws IOException, Exception {
// Send the message to the DLQ
Message message=node.getMessage();
if(message!=null){
// The original destination and transaction id do not get filled when the message is first
// sent,
// it is only populated if the message is routed to another destination like the DLQ
DeadLetterStrategy deadLetterStrategy=node.getRegionDestination().getDeadLetterStrategy();
ActiveMQDestination deadLetterDestination=deadLetterStrategy.getDeadLetterQueueFor(message.getDestination());
BrokerSupport.resend(context, message, deadLetterDestination);
}
}
protected boolean isFull(){ protected boolean isFull(){
return dispatched.size()-prefetchExtension>=info.getPrefetchSize(); return dispatched.size()-prefetchExtension>=info.getPrefetchSize();
} }
@ -240,11 +244,10 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
} }
} }
private void dispatch(final MessageReference node) throws IOException{ protected boolean dispatch(final MessageReference node) throws IOException{
node.incrementReferenceCount();
final Message message=node.getMessage(); final Message message=node.getMessage();
if(message==null){ if(message==null){
return; return false;
} }
// Make sure we can dispatch a message. // Make sure we can dispatch a message.
if(canDispatch(node)&&!isSlaveBroker()){ if(canDispatch(node)&&!isSlaveBroker()){
@ -264,16 +267,14 @@ abstract public class PrefetchSubscription extends AbstractSubscription{
context.getConnection().dispatchSync(md); context.getConnection().dispatchSync(md);
onDispatch(node,message); onDispatch(node,message);
} }
// The onDispatch() does the node.decrementReferenceCount(); return true;
} else { } else {
// We were not allowed to dispatch that message (an other consumer grabbed it before we did) return false;
node.decrementReferenceCount();
} }
} }
synchronized private void onDispatch(final MessageReference node,final Message message){ synchronized protected void onDispatch(final MessageReference node,final Message message){
boolean wasFull=isFull(); boolean wasFull=isFull();
node.decrementReferenceCount();
if(node.getRegionDestination()!=null){ if(node.getRegionDestination()!=null){
node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message); node.getRegionDestination().getDestinationStatistics().onMessageDequeue(message);
context.getConnection().getStatistics().onMessageDequeue(message); context.getConnection().getStatistics().onMessageDequeue(message);

View File

@ -81,9 +81,14 @@ public class Queue implements Destination {
this.destination = destination; this.destination = destination;
this.usageManager = new UsageManager(memoryManager); this.usageManager = new UsageManager(memoryManager);
this.usageManager.setLimit(Long.MAX_VALUE); this.usageManager.setLimit(Long.MAX_VALUE);
this.store = store; this.store = store;
// Let the store know what usage manager we are using so that he can flush messages to disk
// when usage gets high.
if( store!=null ) {
store.setUsageManager(usageManager);
}
destinationStatistics.setParent(parentStats); destinationStatistics.setParent(parentStats);
this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName()); this.log = LogFactory.getLog(getClass().getName() + "." + destination.getPhysicalName());

View File

@ -24,10 +24,11 @@ import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.MessageEvaluationContext;
public class QueueBrowserSubscription extends PrefetchSubscription { public class QueueBrowserSubscription extends QueueSubscription {
boolean browseDone; boolean browseDone;
@ -65,7 +66,15 @@ public class QueueBrowserSubscription extends PrefetchSubscription {
return super.createMessageDispatch(node, message); return super.createMessageDispatch(node, message);
} }
} }
public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException { public boolean matches(MessageReference node, MessageEvaluationContext context) throws IOException {
return !browseDone && super.matches(node, context); return !browseDone && super.matches(node, context);
} }
/**
* Since we are a browser we don't really remove the message from the queue.
*/
protected void acknowledge(ConnectionContext context, final MessageAck ack, final MessageReference n) throws IOException {
}
} }

View File

@ -21,6 +21,7 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.group.MessageGroupMap; import org.apache.activemq.broker.region.group.MessageGroupMap;
import org.apache.activemq.command.ConsumerId; import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo; import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.transaction.Synchronization;
@ -34,10 +35,6 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
super(broker,context, info); super(broker,context, info);
} }
public void add(MessageReference node) throws Exception {
super.add(node);
}
/** /**
* In the queue case, mark the node as dropped and then a gc cycle will remove it from * In the queue case, mark the node as dropped and then a gc cycle will remove it from
* the queue. * the queue.
@ -138,4 +135,53 @@ public class QueueSubscription extends PrefetchSubscription implements LockOwner
return info.isExclusive(); return info.isExclusive();
} }
/**
* Override so that the message ref count is > 0 only when the message is being dispatched
* to a client. Keeping it at 0 when it is in the pending list allows the message to be swapped out
* to disk.
*
* @return true if the message was dispatched.
*/
protected boolean dispatch(MessageReference node) throws IOException {
boolean rc = false;
// This brings the message into memory if it was swapped out.
node.incrementReferenceCount();
try {
rc = super.dispatch(node);
} finally {
// If the message was dispatched, it could be getting dispatched async, so we
// can only drop the reference count when that completes @see onDispatch
if( !rc ) {
node.incrementReferenceCount();
}
}
return rc;
}
/**
* OK Message was transmitted, we can now drop the reference count.
*
* @see org.apache.activemq.broker.region.PrefetchSubscription#onDispatch(org.apache.activemq.broker.region.MessageReference, org.apache.activemq.command.Message)
*/
protected void onDispatch(MessageReference node, Message message) {
// Now that the message has been sent over the wire to the client,
// we can let it get swapped out.
node.decrementReferenceCount();
super.onDispatch(node, message);
}
/**
* Sending a message to the DQL will require us to increment the ref count so we can get at the content.
*/
protected void sendToDLQ(ConnectionContext context, MessageReference node) throws IOException, Exception {
// This brings the message into memory if it was swapped out.
node.incrementReferenceCount();
try{
super.sendToDLQ(context, node);
} finally {
// This let's the message be swapped out of needed.
node.decrementReferenceCount();
}
}
} }

View File

@ -75,6 +75,13 @@ public class Topic implements Destination {
this.store = store; this.store = store;
this.usageManager = new UsageManager(memoryManager); this.usageManager = new UsageManager(memoryManager);
this.usageManager.setLimit(Long.MAX_VALUE); this.usageManager.setLimit(Long.MAX_VALUE);
// Let the store know what usage manager we are using so that he can flush messages to disk
// when usage gets high.
if( store!=null ) {
store.setUsageManager(usageManager);
}
this.destinationStatistics.setParent(parentStats); this.destinationStatistics.setParent(parentStats);
} }

View File

@ -171,10 +171,13 @@ public class TopicRegion extends AbstractRegion {
DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key); DurableTopicSubscription sub = (DurableTopicSubscription) durableSubscriptions.get(key);
ConsumerInfo consumerInfo = createInactiveConsumerInfo(info); ConsumerInfo consumerInfo = createInactiveConsumerInfo(info);
if( sub == null ) { if( sub == null ) {
sub = (DurableTopicSubscription) createSubscription(context, consumerInfo ); ConnectionContext c = new ConnectionContext();
c.setBroker(context.getBroker());
c.setClientId(key.getClientId());
c.setConnectionId(consumerInfo.getConsumerId().getParentId().getParentId());
sub = (DurableTopicSubscription) createSubscription(c, consumerInfo );
} }
subscriptions.put(consumerInfo.getConsumerId(), sub);
topic.addSubscription(context, sub); topic.addSubscription(context, sub);
} }
} }

View File

@ -43,7 +43,7 @@ public class UsageManager {
private long usage; private long usage;
private int percentUsage; private int percentUsage;
private int percentUsageMinDelta=10; private int percentUsageMinDelta=1;
private final Object usageMutex = new Object(); private final Object usageMutex = new Object();

View File

@ -23,7 +23,6 @@ import javax.sql.DataSource;
import org.apache.activeio.journal.Journal; import org.apache.activeio.journal.Journal;
import org.apache.activeio.journal.active.JournalImpl; import org.apache.activeio.journal.active.JournalImpl;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.jdbc.JDBCAdapter; import org.apache.activemq.store.jdbc.JDBCAdapter;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.apache.activemq.store.jdbc.Statements; import org.apache.activemq.store.jdbc.Statements;
@ -42,7 +41,6 @@ public class DefaultPersistenceAdapterFactory implements PersistenceAdapterFacto
private int journalLogFileSize = 1024*1024*20; private int journalLogFileSize = 1024*1024*20;
private int journalLogFiles = 2; private int journalLogFiles = 2;
private File dataDirectory; private File dataDirectory;
private UsageManager memManager;
private DataSource dataSource; private DataSource dataSource;
private TaskRunnerFactory taskRunnerFactory; private TaskRunnerFactory taskRunnerFactory;
private Journal journal; private Journal journal;
@ -60,9 +58,9 @@ public class DefaultPersistenceAdapterFactory implements PersistenceAdapterFacto
// Setup the Journal // Setup the Journal
if( useQuickJournal ) { if( useQuickJournal ) {
return new QuickJournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getMemManager(), getTaskRunnerFactory()); return new QuickJournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
} else { } else {
return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getMemManager(), getTaskRunnerFactory()); return new JournalPersistenceAdapter(getJournal(), jdbcPersistenceAdapter, getTaskRunnerFactory());
} }
} }
@ -93,17 +91,6 @@ public class DefaultPersistenceAdapterFactory implements PersistenceAdapterFacto
this.journalLogFileSize = journalLogFileSize; this.journalLogFileSize = journalLogFileSize;
} }
public UsageManager getMemManager() {
if( memManager==null ) {
memManager = new UsageManager();
}
return memManager;
}
public void setMemManager(UsageManager memManager) {
this.memManager = memManager;
}
public DataSource getDataSource() throws IOException { public DataSource getDataSource() throws IOException {
if (dataSource == null) { if (dataSource == null) {
dataSource = createDataSource(); dataSource = createDataSource();

View File

@ -24,6 +24,7 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.memory.UsageManager;
/** /**
* Represents a message store which is used by the persistent {@link org.apache.activemq.service.MessageContainer} * Represents a message store which is used by the persistent {@link org.apache.activemq.service.MessageContainer}
@ -93,4 +94,9 @@ public interface MessageStore extends Service {
*/ */
public ActiveMQDestination getDestination(); public ActiveMQDestination getDestination();
/**
* @param usageManager The UsageManager that is controlling the destination's memory usage.
*/
public void setUsageManager(UsageManager usageManager);
} }

View File

@ -20,6 +20,7 @@ import org.apache.activemq.Service;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.memory.UsageManager;
import java.io.IOException; import java.io.IOException;
import java.util.Set; import java.util.Set;
@ -98,4 +99,10 @@ public interface PersistenceAdapter extends Service {
public boolean isUseExternalMessageReferences(); public boolean isUseExternalMessageReferences();
public void setUseExternalMessageReferences(boolean enable); public void setUseExternalMessageReferences(boolean enable);
/**
* @param usageManager The UsageManager that is controlling the broker's memory usage.
*/
public void setUsageManager(UsageManager usageManager);
} }

View File

@ -23,6 +23,7 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.memory.UsageManager;
/** /**
* A simple proxy that delegates to another MessageStore. * A simple proxy that delegates to another MessageStore.
@ -71,4 +72,8 @@ public class ProxyMessageStore implements MessageStore {
public String getMessageReference(MessageId identity) throws IOException { public String getMessageReference(MessageId identity) throws IOException {
return delegate.getMessageReference(identity); return delegate.getMessageReference(identity);
} }
public void setUsageManager(UsageManager usageManager) {
delegate.setUsageManager(usageManager);
}
} }

View File

@ -24,6 +24,7 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo; import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.memory.UsageManager;
/** /**
* A simple proxy that delegates to another MessageStore. * A simple proxy that delegates to another MessageStore.
@ -94,4 +95,8 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
public SubscriptionInfo[] getAllSubscriptions() throws IOException { public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return delegate.getAllSubscriptions(); return delegate.getAllSubscriptions();
} }
public void setUsageManager(UsageManager usageManager) {
delegate.setUsageManager(usageManager);
}
} }

View File

@ -27,6 +27,7 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.util.IOExceptionSupport; import org.apache.activemq.util.IOExceptionSupport;
@ -196,4 +197,7 @@ public class JDBCMessageStore implements MessageStore {
return destination; return destination;
} }
public void setUsageManager(UsageManager usageManager) {
// we can ignore since we don't buffer up messages.
}
} }

View File

@ -28,6 +28,7 @@ import org.apache.activeio.util.FactoryFinder;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
@ -357,4 +358,10 @@ public class JDBCPersistenceAdapter implements PersistenceAdapter {
this.statements = statements; this.statements = statements;
} }
/**
* @param usageManager The UsageManager that is controlling the destination's memory usage.
*/
public void setUsageManager(UsageManager usageManager) {
}
} }

View File

@ -30,6 +30,7 @@ import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
@ -63,6 +64,8 @@ public class JournalMessageStore implements MessageStore {
protected RecordLocation lastLocation; protected RecordLocation lastLocation;
protected HashSet inFlightTxLocations = new HashSet(); protected HashSet inFlightTxLocations = new HashSet();
private UsageManager usageManager;
public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) { public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
this.peristenceAdapter = adapter; this.peristenceAdapter = adapter;
this.transactionStore = adapter.getTransactionStore(); this.transactionStore = adapter.getTransactionStore();
@ -71,6 +74,12 @@ public class JournalMessageStore implements MessageStore {
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext()); this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
} }
public void setUsageManager(UsageManager usageManager) {
this.usageManager = usageManager;
longTermStore.setUsageManager(usageManager);
}
/** /**
* Not synchronized since the Journal has better throughput if you increase * Not synchronized since the Journal has better throughput if you increase
* the number of concurrent writes that it is doing. * the number of concurrent writes that it is doing.
@ -334,11 +343,15 @@ public class JournalMessageStore implements MessageStore {
} }
public void start() throws Exception { public void start() throws Exception {
if( this.usageManager != null )
this.usageManager.addUsageListener(peristenceAdapter);
longTermStore.start(); longTermStore.start();
} }
public void stop() throws Exception { public void stop() throws Exception {
longTermStore.stop(); longTermStore.stop();
if( this.usageManager != null )
this.usageManager.removeUsageListener(peristenceAdapter);
} }
/** /**

View File

@ -81,13 +81,13 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
private final Journal journal; private final Journal journal;
private final PersistenceAdapter longTermPersistence; private final PersistenceAdapter longTermPersistence;
final UsageManager usageManager;
private final WireFormat wireFormat = new OpenWireFormat(); private final WireFormat wireFormat = new OpenWireFormat();
private final ConcurrentHashMap queues = new ConcurrentHashMap(); private final ConcurrentHashMap queues = new ConcurrentHashMap();
private final ConcurrentHashMap topics = new ConcurrentHashMap(); private final ConcurrentHashMap topics = new ConcurrentHashMap();
private UsageManager usageManager;
private long checkpointInterval = 1000 * 60 * 5; private long checkpointInterval = 1000 * 60 * 5;
private long lastCheckpointRequest = System.currentTimeMillis(); private long lastCheckpointRequest = System.currentTimeMillis();
private long lastCleanup = System.currentTimeMillis(); private long lastCleanup = System.currentTimeMillis();
@ -111,7 +111,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
} }
}; };
public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, UsageManager memManager, TaskRunnerFactory taskRunnerFactory) throws IOException { public JournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
this.journal = journal; this.journal = journal;
journal.setJournalEventListener(this); journal.setJournalEventListener(this);
@ -123,7 +123,14 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
}); });
this.longTermPersistence = longTermPersistence; this.longTermPersistence = longTermPersistence;
this.usageManager = memManager; }
/**
* @param usageManager The UsageManager that is controlling the destination's memory usage.
*/
public void setUsageManager(UsageManager usageManager) {
this.usageManager = usageManager;
longTermPersistence.setUsageManager(usageManager);
} }
public Set getDestinations() { public Set getDestinations() {
@ -216,6 +223,7 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
public void stop() throws Exception { public void stop() throws Exception {
this.usageManager.removeUsageListener(this);
if( !started.compareAndSet(true, false) ) if( !started.compareAndSet(true, false) )
return; return;

View File

@ -31,6 +31,7 @@ import org.apache.activemq.command.JournalQueueAck;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
@ -64,6 +65,8 @@ public class QuickJournalMessageStore implements MessageStore {
protected RecordLocation lastLocation; protected RecordLocation lastLocation;
protected HashSet inFlightTxLocations = new HashSet(); protected HashSet inFlightTxLocations = new HashSet();
private UsageManager usageManager;
public QuickJournalMessageStore(QuickJournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) { public QuickJournalMessageStore(QuickJournalPersistenceAdapter adapter, MessageStore checkpointStore, ActiveMQDestination destination) {
this.peristenceAdapter = adapter; this.peristenceAdapter = adapter;
this.transactionStore = adapter.getTransactionStore(); this.transactionStore = adapter.getTransactionStore();
@ -72,6 +75,11 @@ public class QuickJournalMessageStore implements MessageStore {
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext()); this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext());
} }
public void setUsageManager(UsageManager usageManager) {
this.usageManager = usageManager;
longTermStore.setUsageManager(usageManager);
}
/** /**
* Not synchronized since the Journal has better throughput if you increase * Not synchronized since the Journal has better throughput if you increase
* the number of concurrent writes that it is doing. * the number of concurrent writes that it is doing.
@ -368,11 +376,15 @@ public class QuickJournalMessageStore implements MessageStore {
} }
public void start() throws Exception { public void start() throws Exception {
if( this.usageManager != null )
this.usageManager.addUsageListener(peristenceAdapter);
longTermStore.start(); longTermStore.start();
} }
public void stop() throws Exception { public void stop() throws Exception {
longTermStore.stop(); longTermStore.stop();
if( this.usageManager != null )
this.usageManager.removeUsageListener(peristenceAdapter);
} }
/** /**

View File

@ -81,13 +81,13 @@ public class QuickJournalPersistenceAdapter implements PersistenceAdapter, Journ
private final Journal journal; private final Journal journal;
private final PersistenceAdapter longTermPersistence; private final PersistenceAdapter longTermPersistence;
final UsageManager usageManager;
private final WireFormat wireFormat = new OpenWireFormat(); private final WireFormat wireFormat = new OpenWireFormat();
private final ConcurrentHashMap queues = new ConcurrentHashMap(); private final ConcurrentHashMap queues = new ConcurrentHashMap();
private final ConcurrentHashMap topics = new ConcurrentHashMap(); private final ConcurrentHashMap topics = new ConcurrentHashMap();
private UsageManager usageManager;
private long checkpointInterval = 1000 * 60 * 5; private long checkpointInterval = 1000 * 60 * 5;
private long lastCheckpointRequest = System.currentTimeMillis(); private long lastCheckpointRequest = System.currentTimeMillis();
private long lastCleanup = System.currentTimeMillis(); private long lastCleanup = System.currentTimeMillis();
@ -111,7 +111,7 @@ public class QuickJournalPersistenceAdapter implements PersistenceAdapter, Journ
} }
}; };
public QuickJournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, UsageManager memManager, TaskRunnerFactory taskRunnerFactory) throws IOException { public QuickJournalPersistenceAdapter(Journal journal, PersistenceAdapter longTermPersistence, TaskRunnerFactory taskRunnerFactory) throws IOException {
this.journal = journal; this.journal = journal;
journal.setJournalEventListener(this); journal.setJournalEventListener(this);
@ -123,7 +123,14 @@ public class QuickJournalPersistenceAdapter implements PersistenceAdapter, Journ
}); });
this.longTermPersistence = longTermPersistence; this.longTermPersistence = longTermPersistence;
this.usageManager = memManager; }
/**
* @param usageManager The UsageManager that is controlling the destination's memory usage.
*/
public void setUsageManager(UsageManager usageManager) {
this.usageManager = usageManager;
longTermPersistence.setUsageManager(usageManager);
} }
public Set getDestinations() { public Set getDestinations() {
@ -216,6 +223,7 @@ public class QuickJournalPersistenceAdapter implements PersistenceAdapter, Journ
public void stop() throws Exception { public void stop() throws Exception {
this.usageManager.removeUsageListener(this);
if( !started.compareAndSet(true, false) ) if( !started.compareAndSet(true, false) )
return; return;

View File

@ -22,6 +22,7 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
/** /**
@ -90,4 +91,11 @@ public class KahaMessageStore implements MessageStore{
public void delete(){ public void delete(){
messageContainer.clear(); messageContainer.clear();
} }
/**
* @param usageManager The UsageManager that is controlling the destination's memory usage.
*/
public void setUsageManager(UsageManager usageManager) {
}
} }

View File

@ -28,6 +28,7 @@ import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory; import org.apache.activemq.kaha.StoreFactory;
import org.apache.activemq.kaha.StringMarshaller; import org.apache.activemq.kaha.StringMarshaller;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
@ -150,4 +151,10 @@ public class KahaPersistentAdaptor implements PersistenceAdapter{
container.load(); container.load();
return container; return container;
} }
/**
* @param usageManager The UsageManager that is controlling the broker's memory usage.
*/
public void setUsageManager(UsageManager usageManager) {
}
} }

View File

@ -27,6 +27,7 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message; import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck; import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener; import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
@ -104,4 +105,10 @@ public class MemoryMessageStore implements MessageStore {
messageTable.clear(); messageTable.clear();
} }
/**
* @param usageManager The UsageManager that is controlling the destination's memory usage.
*/
public void setUsageManager(UsageManager usageManager) {
}
} }

View File

@ -25,6 +25,7 @@ import java.util.Set;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
@ -147,4 +148,9 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
return null; return null;
} }
/**
* @param usageManager The UsageManager that is controlling the broker's memory usage.
*/
public void setUsageManager(UsageManager usageManager) {
}
} }

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.activemq.broker.policy; package org.apache.activemq.broker.policy;
import java.util.Iterator;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TopicSubscriptionTest; import org.apache.activemq.broker.TopicSubscriptionTest;
import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.broker.region.policy.PolicyEntry;
@ -23,9 +25,6 @@ import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy; import org.apache.activemq.broker.region.policy.StrictOrderDispatchPolicy;
import org.apache.activemq.util.MessageIdList; import org.apache.activemq.util.MessageIdList;
import java.util.List;
import java.util.Iterator;
public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest { public class StrictOrderDispatchPolicyTest extends TopicSubscriptionTest {
protected BrokerService createBroker() throws Exception { protected BrokerService createBroker() throws Exception {