From 07b6a38336a62913d42817a89cdc821a89419b7f Mon Sep 17 00:00:00 2001 From: David Jencks Date: Wed, 7 Jan 2009 08:03:40 +0000 Subject: [PATCH] AMQ-2053 introduce methods to remove (closed) message stores from possible caches in PersistenceAdapters git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@732259 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/region/AbstractRegion.java | 3 ++- .../broker/region/DestinationFactory.java | 2 ++ .../broker/region/DestinationFactoryImpl.java | 12 +++++++++++ .../apache/activemq/broker/region/Queue.java | 6 +++--- .../activemq/store/PersistenceAdapter.java | 15 ++++++++++++++ .../store/amq/AMQPersistenceAdapter.java | 18 +++++++++++++++++ .../store/jdbc/JDBCPersistenceAdapter.java | 18 +++++++++++++++++ .../journal/JournalPersistenceAdapter.java | 18 +++++++++++++++++ .../kahadaptor/KahaPersistenceAdapter.java | 18 +++++++++++++++++ .../activemq/store/kahadb/KahaDBStore.java | 20 ++++++++++++++++++- .../memory/MemoryPersistenceAdapter.java | 18 +++++++++++++++++ .../store/jpa/JPAPersistenceAdapter.java | 16 +++++++++++++++ 12 files changed, 159 insertions(+), 5 deletions(-) diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java index 67c1cb2d0f..3f818ef70d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java @@ -203,7 +203,7 @@ public abstract class AbstractRegion implements Region { * * @return a set of matching destination objects. */ - public Set getDestinations(ActiveMQDestination destination) { + public Set getDestinations(ActiveMQDestination destination) { synchronized (destinationsMutex) { return destinationMap.get(destination); } @@ -474,5 +474,6 @@ public abstract class AbstractRegion implements Region { protected void dispose(ConnectionContext context,Destination dest) throws Exception { dest.dispose(context); dest.stop(); + destinationFactory.removeDestination(dest); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactory.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactory.java index 964a1a95af..c53e60b5e7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactory.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactory.java @@ -37,6 +37,8 @@ public abstract class DestinationFactory { */ public abstract Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception; + public abstract void removeDestination(Destination dest); + /** * Returns a set of all the {@link org.apache.activemq.command.ActiveMQDestination} * objects that the persistence store is aware exist. diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java index 53ea129ad0..523c71c421 100644 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java @@ -100,6 +100,18 @@ public class DestinationFactoryImpl extends DestinationFactory { } } + public void removeDestination(Destination dest) { + ActiveMQDestination destination = dest.getActiveMQDestination(); + if (!destination.isTemporary()) { + if (destination.isQueue()) { + persistenceAdapter.removeQueueMessageStore((ActiveMQQueue) destination); + } + else if (!AdvisorySupport.isAdvisoryTopic(destination)) { + persistenceAdapter.removeTopicMessageStore((ActiveMQTopic) destination); + } + } + } + protected void configureQueue(Queue queue, ActiveMQDestination destination) { if (broker == null) { throw new IllegalStateException("broker property is not set"); diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java index 50c8bc2de7..91473d293c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -850,9 +850,9 @@ public class Queue extends BaseDestination implements Task { /** * Move a message - * @param context - * @param r - * @param dest + * @param context connection context + * @param m message + * @param dest ActiveMQDestination * @throws Exception */ public boolean moveMessageTo(ConnectionContext context,Message m,ActiveMQDestination dest) throws Exception { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java index 5422c4b9a1..ef4ce54f51 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/PersistenceAdapter.java @@ -22,6 +22,7 @@ import java.util.Set; import org.apache.activemq.Service; import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.broker.region.Destination; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; @@ -58,6 +59,20 @@ public interface PersistenceAdapter extends Service { */ TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException; + /** + * Cleanup method to remove any state associated with the given destination. + * This method does not stop the message store (it might not be cached). + * @param destination Destination to forget + */ + void removeQueueMessageStore(ActiveMQQueue destination); + + /** + * Cleanup method to remove any state associated with the given destination + * This method does not stop the message store (it might not be cached). + * @param destination Destination to forget + */ + void removeTopicMessageStore(ActiveMQTopic destination); + /** * Factory method to create a new persistent prepared transaction store for XA recovery * @return transaction store diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java index a7522e472a..6388340e61 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java @@ -493,6 +493,24 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener, return store; } + /** + * Cleanup method to remove any state associated with the given destination + * + * @param destination + */ + public void removeQueueMessageStore(ActiveMQQueue destination) { + queues.remove(destination); + } + + /** + * Cleanup method to remove any state associated with the given destination + * + * @param destination + */ + public void removeTopicMessageStore(ActiveMQTopic destination) { + topics.remove(destination); + } + public TransactionStore createTransactionStore() throws IOException { return transactionStore; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java index 10c23f83ba..3ea32bae9e 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/jdbc/JDBCPersistenceAdapter.java @@ -133,6 +133,24 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist return rc; } + /** + * Cleanup method to remove any state associated with the given destination + * No state retained.... nothing to do + * + * @param destination Destination to forget + */ + public void removeQueueMessageStore(ActiveMQQueue destination) { + } + + /** + * Cleanup method to remove any state associated with the given destination + * No state retained.... nothing to do + * + * @param destination Destination to forget + */ + public void removeTopicMessageStore(ActiveMQTopic destination) { + } + public TransactionStore createTransactionStore() throws IOException { if (transactionStore == null) { transactionStore = new MemoryTransactionStore(this); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java index 112d36bc96..8ae5fb039d 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/journal/JournalPersistenceAdapter.java @@ -186,6 +186,24 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve return store; } + /** + * Cleanup method to remove any state associated with the given destination + * + * @param destination Destination to forget + */ + public void removeQueueMessageStore(ActiveMQQueue destination) { + queues.remove(destination); + } + + /** + * Cleanup method to remove any state associated with the given destination + * + * @param destination Destination to forget + */ + public void removeTopicMessageStore(ActiveMQTopic destination) { + topics.remove(destination); + } + public TransactionStore createTransactionStore() throws IOException { return transactionStore; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java index fdc01c5071..2f249473e3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java @@ -134,6 +134,24 @@ public class KahaPersistenceAdapter implements PersistenceAdapter { return rc; } + /** + * Cleanup method to remove any state associated with the given destination + * + * @param destination Destination to forget + */ + public void removeQueueMessageStore(ActiveMQQueue destination) { + queues.remove(destination); + } + + /** + * Cleanup method to remove any state associated with the given destination + * + * @param destination Destination to forget + */ + public void removeTopicMessageStore(ActiveMQTopic destination) { + topics.remove(destination); + } + protected MessageStore retrieveMessageStore(Object id) { MessageStore result = messageStores.get(id); return result; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 22a3353187..7ca346b06c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -425,7 +425,25 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { return new KahaDBTopicMessageStore(destination); } - + + /** + * Cleanup method to remove any state associated with the given destination. + * This method does not stop the message store (it might not be cached). + * + * @param destination Destination to forget + */ + public void removeQueueMessageStore(ActiveMQQueue destination) { + } + + /** + * Cleanup method to remove any state associated with the given destination + * This method does not stop the message store (it might not be cached). + * + * @param destination Destination to forget + */ + public void removeTopicMessageStore(ActiveMQTopic destination) { + } + public void deleteAllMessages() throws IOException { deleteAllMessages=true; } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java index 4afaf4bc24..e2e6ad78cf 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/memory/MemoryPersistenceAdapter.java @@ -87,6 +87,24 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter { return rc; } + /** + * Cleanup method to remove any state associated with the given destination + * + * @param destination Destination to forget + */ + public void removeQueueMessageStore(ActiveMQQueue destination) { + queues.remove(destination); + } + + /** + * Cleanup method to remove any state associated with the given destination + * + * @param destination Destination to forget + */ + public void removeTopicMessageStore(ActiveMQTopic destination) { + topics.remove(destination); + } + public TransactionStore createTransactionStore() throws IOException { if (transactionStore == null) { transactionStore = new MemoryTransactionStore(this); diff --git a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java b/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java index f69036ca19..e225351b73 100755 --- a/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java +++ b/activemq-jpa-store/src/main/java/org/apache/activemq/store/jpa/JPAPersistenceAdapter.java @@ -128,6 +128,22 @@ public class JPAPersistenceAdapter implements PersistenceAdapter { return rc; } + /** + * Cleanup method to remove any state associated with the given destination + * + * @param destination Destination to forget + */ + public void removeQueueMessageStore(ActiveMQQueue destination) { + } + + /** + * Cleanup method to remove any state associated with the given destination + * + * @param destination Destination to forget + */ + public void removeTopicMessageStore(ActiveMQTopic destination) { + } + public TransactionStore createTransactionStore() throws IOException { if (transactionStore == null) { transactionStore = new MemoryTransactionStore(this);