diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java index c8c6bec247..d3f63e78d5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java @@ -26,6 +26,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; + import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.ConnectionContext; @@ -176,6 +177,16 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements Per } } + private void stopAdapter(KahaDBPersistenceAdapter kahaDBPersistenceAdapter, String destination) { + try { + kahaDBPersistenceAdapter.stop(); + } catch (Exception e) { + RuntimeException detail = new RuntimeException("Failed to stop per destination persistence adapter for destination: " + destination + ", options:" + adapters, e); + LOG.error(detail.toString(), e); + throw detail; + } + } + public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination); return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination)); @@ -218,11 +229,38 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements Per } public void removeQueueMessageStore(ActiveMQQueue destination) { - getMatchingPersistenceAdapter(destination).removeQueueMessageStore(destination); + PersistenceAdapter adapter = getMatchingPersistenceAdapter(destination); + adapter.removeQueueMessageStore(destination); + if (adapter instanceof KahaDBPersistenceAdapter) { + adapter.removeQueueMessageStore(destination); + removeMessageStore((KahaDBPersistenceAdapter)adapter, destination); + } } public void removeTopicMessageStore(ActiveMQTopic destination) { - getMatchingPersistenceAdapter(destination).removeTopicMessageStore(destination); + PersistenceAdapter adapter = getMatchingPersistenceAdapter(destination); + if (adapter instanceof KahaDBPersistenceAdapter) { + adapter.removeTopicMessageStore(destination); + removeMessageStore((KahaDBPersistenceAdapter)adapter, destination); + } + } + + private void removeMessageStore(KahaDBPersistenceAdapter adapter, ActiveMQDestination destination) { + if (adapter.getDestinations().isEmpty()) { + stopAdapter(adapter, destination.toString()); + File adapterDir = adapter.getDirectory(); + if (adapterDir != null) { + if (IOHelper.deleteFile(adapterDir)) { + if (LOG.isTraceEnabled()) { + LOG.info("deleted per destination adapter directory for: " + destination); + } + } else { + if (LOG.isTraceEnabled()) { + LOG.info("failed to deleted per destination adapter directory for: " + destination); + } + } + } + } } public void rollbackTransaction(ConnectionContext context) throws IOException { @@ -280,7 +318,11 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements Per private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, File candidate) { KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), candidate.getName()); startAdapter(adapter, candidate.getName()); - registerAdapter(adapter, adapter.getDestinations().toArray(new ActiveMQDestination[]{})[0]); + if (adapter.getDestinations().size() != 0) { + registerAdapter(adapter, adapter.getDestinations().toArray(new ActiveMQDestination[]{})[0]); + } else { + stopAdapter(adapter, candidate.getName()); + } } private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, ActiveMQDestination destination) { diff --git a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java index 3b06007ee8..449d5e5539 100644 --- a/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java +++ b/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3841Test.java @@ -85,6 +85,10 @@ public class AMQ3841Test { prepareBrokerWithMultiStore(false); broker.start(); + + broker.getAdminView().addQueue(destinationName); + assertNotNull(broker.getDestination(new ActiveMQQueue(destinationName))); + } protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {