Ensure that the mKahaDB cleans up the per-destination kahaDB data when the destination is deleted, and don't throw exceptions in cases where we find an older one that has no destinations in it any longer.

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1341601 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Timothy A. Bish 2012-05-22 19:47:23 +00:00
parent db5798d9cc
commit bed24fabd9
2 changed files with 49 additions and 3 deletions

View File

@ -26,6 +26,7 @@ import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.BrokerServiceAware; import org.apache.activemq.broker.BrokerServiceAware;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
@ -176,6 +177,16 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements Per
} }
} }
private void stopAdapter(KahaDBPersistenceAdapter kahaDBPersistenceAdapter, String destination) {
try {
kahaDBPersistenceAdapter.stop();
} catch (Exception e) {
RuntimeException detail = new RuntimeException("Failed to stop per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
LOG.error(detail.toString(), e);
throw detail;
}
}
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination); PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination)); return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination));
@ -218,11 +229,38 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements Per
} }
public void removeQueueMessageStore(ActiveMQQueue destination) { public void removeQueueMessageStore(ActiveMQQueue destination) {
getMatchingPersistenceAdapter(destination).removeQueueMessageStore(destination); PersistenceAdapter adapter = getMatchingPersistenceAdapter(destination);
adapter.removeQueueMessageStore(destination);
if (adapter instanceof KahaDBPersistenceAdapter) {
adapter.removeQueueMessageStore(destination);
removeMessageStore((KahaDBPersistenceAdapter)adapter, destination);
}
} }
public void removeTopicMessageStore(ActiveMQTopic destination) { public void removeTopicMessageStore(ActiveMQTopic destination) {
getMatchingPersistenceAdapter(destination).removeTopicMessageStore(destination); PersistenceAdapter adapter = getMatchingPersistenceAdapter(destination);
if (adapter instanceof KahaDBPersistenceAdapter) {
adapter.removeTopicMessageStore(destination);
removeMessageStore((KahaDBPersistenceAdapter)adapter, destination);
}
}
private void removeMessageStore(KahaDBPersistenceAdapter adapter, ActiveMQDestination destination) {
if (adapter.getDestinations().isEmpty()) {
stopAdapter(adapter, destination.toString());
File adapterDir = adapter.getDirectory();
if (adapterDir != null) {
if (IOHelper.deleteFile(adapterDir)) {
if (LOG.isTraceEnabled()) {
LOG.info("deleted per destination adapter directory for: " + destination);
}
} else {
if (LOG.isTraceEnabled()) {
LOG.info("failed to deleted per destination adapter directory for: " + destination);
}
}
}
}
} }
public void rollbackTransaction(ConnectionContext context) throws IOException { public void rollbackTransaction(ConnectionContext context) throws IOException {
@ -280,7 +318,11 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements Per
private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, File candidate) { private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, File candidate) {
KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), candidate.getName()); KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), candidate.getName());
startAdapter(adapter, candidate.getName()); startAdapter(adapter, candidate.getName());
if (adapter.getDestinations().size() != 0) {
registerAdapter(adapter, adapter.getDestinations().toArray(new ActiveMQDestination[]{})[0]); registerAdapter(adapter, adapter.getDestinations().toArray(new ActiveMQDestination[]{})[0]);
} else {
stopAdapter(adapter, candidate.getName());
}
} }
private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, ActiveMQDestination destination) { private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, ActiveMQDestination destination) {

View File

@ -85,6 +85,10 @@ public class AMQ3841Test {
prepareBrokerWithMultiStore(false); prepareBrokerWithMultiStore(false);
broker.start(); broker.start();
broker.getAdminView().addQueue(destinationName);
assertNotNull(broker.getDestination(new ActiveMQQueue(destinationName)));
} }
protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException { protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {