diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java index b588652b94..3ba005b840 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/FilteredKahaDBPersistenceAdapter.java @@ -16,6 +16,7 @@ */ package org.apache.activemq.store.kahadb; +import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.filter.DestinationMapEntry; /** @@ -24,6 +25,16 @@ import org.apache.activemq.filter.DestinationMapEntry; */ public class FilteredKahaDBPersistenceAdapter extends DestinationMapEntry { private KahaDBPersistenceAdapter persistenceAdapter; + private boolean perDestination; + + public FilteredKahaDBPersistenceAdapter() { + super(); + } + + public FilteredKahaDBPersistenceAdapter(ActiveMQDestination destination, KahaDBPersistenceAdapter adapter) { + setDestination(destination); + persistenceAdapter = adapter; + } public KahaDBPersistenceAdapter getPersistenceAdapter() { return persistenceAdapter; @@ -37,4 +48,12 @@ public class FilteredKahaDBPersistenceAdapter extends DestinationMapEntry { public void afterPropertiesSet() throws Exception { // ok to have no destination, we default it } + + public boolean isPerDestination() { + return perDestination; + } + + public void setPerDestination(boolean perDestination) { + this.perDestination = perDestination; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index 8ab950bc7c..7046f38c5c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -241,7 +241,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi } public int getFailoverProducersAuditDepth() { - return this.getFailoverProducersAuditDepth(); + return this.letter.getFailoverProducersAuditDepth(); } /** @@ -558,7 +558,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi } public boolean isEnableIndexPageCaching() { - return isEnableIndexPageCaching(); + return letter.isEnableIndexPageCaching(); } public KahaDBStore getStore() { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java index a787d8db31..c8c6bec247 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java @@ -17,11 +17,14 @@ package org.apache.activemq.store.kahadb; import java.io.File; +import java.io.FileFilter; import java.io.IOException; import java.nio.charset.Charset; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerServiceAware; @@ -44,6 +47,7 @@ import org.apache.activemq.store.kahadb.data.KahaTransactionInfo; import org.apache.activemq.store.kahadb.data.KahaXATransactionId; import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.IOHelper; +import org.apache.activemq.util.IntrospectionSupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -103,12 +107,16 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements Per if (filteredAdapter.getDestination() == null) { filteredAdapter.setDestination(matchAll); } - if (MessageDatabase.DEFAULT_DIRECTORY.equals(adapter.getDirectory())) { - adapter.setDirectory(new File(getDirectory(), nameFromDestinationFilter(filteredAdapter.getDestination()))); + + if (filteredAdapter.isPerDestination()) { + configureDirectory(adapter, null); + // per destination adapters will be created on demand or during recovery + continue; + } else { + configureDirectory(adapter, nameFromDestinationFilter(filteredAdapter.getDestination())); } - // need a per store factory that will put the store in the branch qualifier to disiambiguate xid mbeans - adapter.getStore().setTransactionIdTransformer(transactionIdTransformer); + configureAdapter(adapter); adapters.add(adapter); } super.setEntries(entries); @@ -147,9 +155,27 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements Per if (result == null) { throw new RuntimeException("No matching persistence adapter configured for destination: " + destination + ", options:" + adapters); } + FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result; + if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) { + result = addAdapter(filteredAdapter, destination); + startAdapter(((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter(), destination.getQualifiedName()); + if (LOG.isTraceEnabled()) { + LOG.info("created per destination adapter for: " + destination + ", " + result); + } + } return ((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter(); } + private void startAdapter(KahaDBPersistenceAdapter kahaDBPersistenceAdapter, String destination) { + try { + kahaDBPersistenceAdapter.start(); + } catch (Exception e) { + RuntimeException detail = new RuntimeException("Failed to start per destination persistence adapter for destination: " + destination + ", options:" + adapters, e); + LOG.error(detail.toString(), e); + throw detail; + } + } + public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException { PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination); return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination)); @@ -164,6 +190,7 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements Per persistenceAdapter.deleteAllMessages(); } transactionStore.deleteAllMessages(); + IOHelper.deleteChildren(getDirectory()); } public Set getDestinations() { @@ -223,11 +250,86 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements Per } public void start() throws Exception { + Object result = this.chooseValue(matchAll); + if (result != null) { + FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result; + if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) { + findAndRegisterExistingAdapters(filteredAdapter); + } + } for (PersistenceAdapter persistenceAdapter : adapters) { persistenceAdapter.start(); } } + private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) { + FileFilter destinationNames = new FileFilter() { + @Override + public boolean accept(File file) { + return file.getName().startsWith("queue#") || file.getName().startsWith("topic#"); + } + }; + File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames); + if (candidates != null) { + for (File candidate : candidates) { + registerExistingAdapter(template, candidate); + } + } + } + + private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, File candidate) { + KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), candidate.getName()); + startAdapter(adapter, candidate.getName()); + registerAdapter(adapter, adapter.getDestinations().toArray(new ActiveMQDestination[]{})[0]); + } + + private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, ActiveMQDestination destination) { + KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), nameFromDestinationFilter(destination)); + return registerAdapter(adapter, destination); + } + + private KahaDBPersistenceAdapter adapterFromTemplate(KahaDBPersistenceAdapter template, String destinationName) { + KahaDBPersistenceAdapter adapter = kahaDBFromTemplate(template); + configureAdapter(adapter); + configureDirectory(adapter, destinationName); + return adapter; + } + + private void configureDirectory(KahaDBPersistenceAdapter adapter, String fileName) { + File directory = null; + if (MessageDatabase.DEFAULT_DIRECTORY.equals(adapter.getDirectory())) { + // not set so inherit from mkahadb + directory = getDirectory(); + } else { + directory = adapter.getDirectory(); + } + if (fileName != null) { + directory = new File(directory, fileName); + } + adapter.setDirectory(directory); + } + + private FilteredKahaDBPersistenceAdapter registerAdapter(KahaDBPersistenceAdapter adapter, ActiveMQDestination destination) { + adapters.add(adapter); + FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(destination, adapter); + put(destination, result); + return result; + } + + private void configureAdapter(KahaDBPersistenceAdapter adapter) { + // need a per store factory that will put the store in the branch qualifier to disiambiguate xid mbeans + adapter.getStore().setTransactionIdTransformer(transactionIdTransformer); + adapter.setBrokerService(getBrokerService()); + } + + private KahaDBPersistenceAdapter kahaDBFromTemplate(KahaDBPersistenceAdapter template) { + Map configuration = new HashMap(); + IntrospectionSupport.getProperties(template, configuration, null); + KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); + IntrospectionSupport.setProperties(adapter, configuration); + return adapter; + } + public void stop() throws Exception { for (PersistenceAdapter persistenceAdapter : adapters) { persistenceAdapter.stop(); @@ -284,7 +386,7 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements Per transactionStore.setJournalMaxWriteBatchSize(journalWriteBatchSize); } - public int getJournalMaxWriteBatchSize() { + public int getJournalWriteBatchSize() { return transactionStore.getJournalMaxWriteBatchSize(); } diff --git a/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java b/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java new file mode 100644 index 0000000000..2b76a224df --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/store/AutoStorePerDestinationTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store; + +import java.util.ArrayList; +import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter; + +public class AutoStorePerDestinationTest extends StorePerDestinationTest { + + // use perDestinationFlag to get multiple stores from one match all adapter + public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception { + + MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter(); + if (deleteAllMessages) { + multiKahaDBPersistenceAdapter.deleteAllMessages(); + } + ArrayList adapters = new ArrayList(); + + FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter(); + template.setPersistenceAdapter(createStore(deleteAllMessages)); + template.setPerDestination(true); + adapters.add(template); + + multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters); + brokerService = createBroker(multiKahaDBPersistenceAdapter); + } +} \ No newline at end of file diff --git a/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java b/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java index 6ddaf7a6b0..f5165b120a 100644 --- a/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/store/StorePerDestinationTest.java @@ -62,7 +62,7 @@ public class StorePerDestinationTest { return broker; } - private KahaDBPersistenceAdapter createStore(boolean delete) throws IOException { + protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException { KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter(); kaha.setJournalMaxFileLength(maxFileLength); kaha.setCleanupInterval(5000); @@ -199,7 +199,7 @@ public class StorePerDestinationTest { multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters); assertEquals(multiKahaDBPersistenceAdapter.getDirectory(), storeDefault.getDirectory().getParentFile()); - assertEquals(someOtherDisk, otherStore.getDirectory()); + assertEquals(someOtherDisk, otherStore.getDirectory().getParentFile()); } @Test