mirror of https://github.com/apache/activemq.git
AMQ-2053 introduce methods to remove (closed) message stores from possible caches in PersistenceAdapters
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@732259 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
34acbdc154
commit
07b6a38336
|
@ -203,7 +203,7 @@ public abstract class AbstractRegion implements Region {
|
||||||
*
|
*
|
||||||
* @return a set of matching destination objects.
|
* @return a set of matching destination objects.
|
||||||
*/
|
*/
|
||||||
public Set getDestinations(ActiveMQDestination destination) {
|
public Set<Destination> getDestinations(ActiveMQDestination destination) {
|
||||||
synchronized (destinationsMutex) {
|
synchronized (destinationsMutex) {
|
||||||
return destinationMap.get(destination);
|
return destinationMap.get(destination);
|
||||||
}
|
}
|
||||||
|
@ -474,5 +474,6 @@ public abstract class AbstractRegion implements Region {
|
||||||
protected void dispose(ConnectionContext context,Destination dest) throws Exception {
|
protected void dispose(ConnectionContext context,Destination dest) throws Exception {
|
||||||
dest.dispose(context);
|
dest.dispose(context);
|
||||||
dest.stop();
|
dest.stop();
|
||||||
|
destinationFactory.removeDestination(dest);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,6 +37,8 @@ public abstract class DestinationFactory {
|
||||||
*/
|
*/
|
||||||
public abstract Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception;
|
public abstract Destination createDestination(ConnectionContext context, ActiveMQDestination destination, DestinationStatistics destinationStatistics) throws Exception;
|
||||||
|
|
||||||
|
public abstract void removeDestination(Destination dest);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a set of all the {@link org.apache.activemq.command.ActiveMQDestination}
|
* Returns a set of all the {@link org.apache.activemq.command.ActiveMQDestination}
|
||||||
* objects that the persistence store is aware exist.
|
* objects that the persistence store is aware exist.
|
||||||
|
|
|
@ -100,6 +100,18 @@ public class DestinationFactoryImpl extends DestinationFactory {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void removeDestination(Destination dest) {
|
||||||
|
ActiveMQDestination destination = dest.getActiveMQDestination();
|
||||||
|
if (!destination.isTemporary()) {
|
||||||
|
if (destination.isQueue()) {
|
||||||
|
persistenceAdapter.removeQueueMessageStore((ActiveMQQueue) destination);
|
||||||
|
}
|
||||||
|
else if (!AdvisorySupport.isAdvisoryTopic(destination)) {
|
||||||
|
persistenceAdapter.removeTopicMessageStore((ActiveMQTopic) destination);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected void configureQueue(Queue queue, ActiveMQDestination destination) {
|
protected void configureQueue(Queue queue, ActiveMQDestination destination) {
|
||||||
if (broker == null) {
|
if (broker == null) {
|
||||||
throw new IllegalStateException("broker property is not set");
|
throw new IllegalStateException("broker property is not set");
|
||||||
|
|
|
@ -850,9 +850,9 @@ public class Queue extends BaseDestination implements Task {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Move a message
|
* Move a message
|
||||||
* @param context
|
* @param context connection context
|
||||||
* @param r
|
* @param m message
|
||||||
* @param dest
|
* @param dest ActiveMQDestination
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
public boolean moveMessageTo(ConnectionContext context,Message m,ActiveMQDestination dest) throws Exception {
|
public boolean moveMessageTo(ConnectionContext context,Message m,ActiveMQDestination dest) throws Exception {
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.util.Set;
|
||||||
|
|
||||||
import org.apache.activemq.Service;
|
import org.apache.activemq.Service;
|
||||||
import org.apache.activemq.broker.ConnectionContext;
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
|
import org.apache.activemq.broker.region.Destination;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTopic;
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
@ -58,6 +59,20 @@ public interface PersistenceAdapter extends Service {
|
||||||
*/
|
*/
|
||||||
TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException;
|
TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup method to remove any state associated with the given destination.
|
||||||
|
* This method does not stop the message store (it might not be cached).
|
||||||
|
* @param destination Destination to forget
|
||||||
|
*/
|
||||||
|
void removeQueueMessageStore(ActiveMQQueue destination);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup method to remove any state associated with the given destination
|
||||||
|
* This method does not stop the message store (it might not be cached).
|
||||||
|
* @param destination Destination to forget
|
||||||
|
*/
|
||||||
|
void removeTopicMessageStore(ActiveMQTopic destination);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Factory method to create a new persistent prepared transaction store for XA recovery
|
* Factory method to create a new persistent prepared transaction store for XA recovery
|
||||||
* @return transaction store
|
* @return transaction store
|
||||||
|
|
|
@ -493,6 +493,24 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
||||||
return store;
|
return store;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup method to remove any state associated with the given destination
|
||||||
|
*
|
||||||
|
* @param destination
|
||||||
|
*/
|
||||||
|
public void removeQueueMessageStore(ActiveMQQueue destination) {
|
||||||
|
queues.remove(destination);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup method to remove any state associated with the given destination
|
||||||
|
*
|
||||||
|
* @param destination
|
||||||
|
*/
|
||||||
|
public void removeTopicMessageStore(ActiveMQTopic destination) {
|
||||||
|
topics.remove(destination);
|
||||||
|
}
|
||||||
|
|
||||||
public TransactionStore createTransactionStore() throws IOException {
|
public TransactionStore createTransactionStore() throws IOException {
|
||||||
return transactionStore;
|
return transactionStore;
|
||||||
}
|
}
|
||||||
|
|
|
@ -133,6 +133,24 @@ public class JDBCPersistenceAdapter extends DataSourceSupport implements Persist
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup method to remove any state associated with the given destination
|
||||||
|
* No state retained.... nothing to do
|
||||||
|
*
|
||||||
|
* @param destination Destination to forget
|
||||||
|
*/
|
||||||
|
public void removeQueueMessageStore(ActiveMQQueue destination) {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup method to remove any state associated with the given destination
|
||||||
|
* No state retained.... nothing to do
|
||||||
|
*
|
||||||
|
* @param destination Destination to forget
|
||||||
|
*/
|
||||||
|
public void removeTopicMessageStore(ActiveMQTopic destination) {
|
||||||
|
}
|
||||||
|
|
||||||
public TransactionStore createTransactionStore() throws IOException {
|
public TransactionStore createTransactionStore() throws IOException {
|
||||||
if (transactionStore == null) {
|
if (transactionStore == null) {
|
||||||
transactionStore = new MemoryTransactionStore(this);
|
transactionStore = new MemoryTransactionStore(this);
|
||||||
|
|
|
@ -186,6 +186,24 @@ public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEve
|
||||||
return store;
|
return store;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup method to remove any state associated with the given destination
|
||||||
|
*
|
||||||
|
* @param destination Destination to forget
|
||||||
|
*/
|
||||||
|
public void removeQueueMessageStore(ActiveMQQueue destination) {
|
||||||
|
queues.remove(destination);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup method to remove any state associated with the given destination
|
||||||
|
*
|
||||||
|
* @param destination Destination to forget
|
||||||
|
*/
|
||||||
|
public void removeTopicMessageStore(ActiveMQTopic destination) {
|
||||||
|
topics.remove(destination);
|
||||||
|
}
|
||||||
|
|
||||||
public TransactionStore createTransactionStore() throws IOException {
|
public TransactionStore createTransactionStore() throws IOException {
|
||||||
return transactionStore;
|
return transactionStore;
|
||||||
}
|
}
|
||||||
|
|
|
@ -134,6 +134,24 @@ public class KahaPersistenceAdapter implements PersistenceAdapter {
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup method to remove any state associated with the given destination
|
||||||
|
*
|
||||||
|
* @param destination Destination to forget
|
||||||
|
*/
|
||||||
|
public void removeQueueMessageStore(ActiveMQQueue destination) {
|
||||||
|
queues.remove(destination);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup method to remove any state associated with the given destination
|
||||||
|
*
|
||||||
|
* @param destination Destination to forget
|
||||||
|
*/
|
||||||
|
public void removeTopicMessageStore(ActiveMQTopic destination) {
|
||||||
|
topics.remove(destination);
|
||||||
|
}
|
||||||
|
|
||||||
protected MessageStore retrieveMessageStore(Object id) {
|
protected MessageStore retrieveMessageStore(Object id) {
|
||||||
MessageStore result = messageStores.get(id);
|
MessageStore result = messageStores.get(id);
|
||||||
return result;
|
return result;
|
||||||
|
|
|
@ -426,6 +426,24 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
||||||
return new KahaDBTopicMessageStore(destination);
|
return new KahaDBTopicMessageStore(destination);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup method to remove any state associated with the given destination.
|
||||||
|
* This method does not stop the message store (it might not be cached).
|
||||||
|
*
|
||||||
|
* @param destination Destination to forget
|
||||||
|
*/
|
||||||
|
public void removeQueueMessageStore(ActiveMQQueue destination) {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup method to remove any state associated with the given destination
|
||||||
|
* This method does not stop the message store (it might not be cached).
|
||||||
|
*
|
||||||
|
* @param destination Destination to forget
|
||||||
|
*/
|
||||||
|
public void removeTopicMessageStore(ActiveMQTopic destination) {
|
||||||
|
}
|
||||||
|
|
||||||
public void deleteAllMessages() throws IOException {
|
public void deleteAllMessages() throws IOException {
|
||||||
deleteAllMessages=true;
|
deleteAllMessages=true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -87,6 +87,24 @@ public class MemoryPersistenceAdapter implements PersistenceAdapter {
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup method to remove any state associated with the given destination
|
||||||
|
*
|
||||||
|
* @param destination Destination to forget
|
||||||
|
*/
|
||||||
|
public void removeQueueMessageStore(ActiveMQQueue destination) {
|
||||||
|
queues.remove(destination);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup method to remove any state associated with the given destination
|
||||||
|
*
|
||||||
|
* @param destination Destination to forget
|
||||||
|
*/
|
||||||
|
public void removeTopicMessageStore(ActiveMQTopic destination) {
|
||||||
|
topics.remove(destination);
|
||||||
|
}
|
||||||
|
|
||||||
public TransactionStore createTransactionStore() throws IOException {
|
public TransactionStore createTransactionStore() throws IOException {
|
||||||
if (transactionStore == null) {
|
if (transactionStore == null) {
|
||||||
transactionStore = new MemoryTransactionStore(this);
|
transactionStore = new MemoryTransactionStore(this);
|
||||||
|
|
|
@ -128,6 +128,22 @@ public class JPAPersistenceAdapter implements PersistenceAdapter {
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup method to remove any state associated with the given destination
|
||||||
|
*
|
||||||
|
* @param destination Destination to forget
|
||||||
|
*/
|
||||||
|
public void removeQueueMessageStore(ActiveMQQueue destination) {
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cleanup method to remove any state associated with the given destination
|
||||||
|
*
|
||||||
|
* @param destination Destination to forget
|
||||||
|
*/
|
||||||
|
public void removeTopicMessageStore(ActiveMQTopic destination) {
|
||||||
|
}
|
||||||
|
|
||||||
public TransactionStore createTransactionStore() throws IOException {
|
public TransactionStore createTransactionStore() throws IOException {
|
||||||
if (transactionStore == null) {
|
if (transactionStore == null) {
|
||||||
transactionStore = new MemoryTransactionStore(this);
|
transactionStore = new MemoryTransactionStore(this);
|
||||||
|
|
Loading…
Reference in New Issue