mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3639 - Modify MKahaDB To Support Using One Adapter Per Destination Without Explicity Listing Every Desintation In The Configuration. Add perDestination boolean attribute to mKahaDb filtered adapter. When true, every destination will get its own persistence adapter using the configured adapter as as template. So any config applied to the destination less (default) adapter will be reused
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1231979 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
580635473d
commit
221da8bcd2
|
@ -16,6 +16,7 @@
|
|||
*/
|
||||
package org.apache.activemq.store.kahadb;
|
||||
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.filter.DestinationMapEntry;
|
||||
|
||||
/**
|
||||
|
@ -24,6 +25,16 @@ import org.apache.activemq.filter.DestinationMapEntry;
|
|||
*/
|
||||
public class FilteredKahaDBPersistenceAdapter extends DestinationMapEntry {
|
||||
private KahaDBPersistenceAdapter persistenceAdapter;
|
||||
private boolean perDestination;
|
||||
|
||||
public FilteredKahaDBPersistenceAdapter() {
|
||||
super();
|
||||
}
|
||||
|
||||
public FilteredKahaDBPersistenceAdapter(ActiveMQDestination destination, KahaDBPersistenceAdapter adapter) {
|
||||
setDestination(destination);
|
||||
persistenceAdapter = adapter;
|
||||
}
|
||||
|
||||
public KahaDBPersistenceAdapter getPersistenceAdapter() {
|
||||
return persistenceAdapter;
|
||||
|
@ -37,4 +48,12 @@ public class FilteredKahaDBPersistenceAdapter extends DestinationMapEntry {
|
|||
public void afterPropertiesSet() throws Exception {
|
||||
// ok to have no destination, we default it
|
||||
}
|
||||
|
||||
public boolean isPerDestination() {
|
||||
return perDestination;
|
||||
}
|
||||
|
||||
public void setPerDestination(boolean perDestination) {
|
||||
this.perDestination = perDestination;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -241,7 +241,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
}
|
||||
|
||||
public int getFailoverProducersAuditDepth() {
|
||||
return this.getFailoverProducersAuditDepth();
|
||||
return this.letter.getFailoverProducersAuditDepth();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -558,7 +558,7 @@ public class KahaDBPersistenceAdapter implements PersistenceAdapter, BrokerServi
|
|||
}
|
||||
|
||||
public boolean isEnableIndexPageCaching() {
|
||||
return isEnableIndexPageCaching();
|
||||
return letter.isEnableIndexPageCaching();
|
||||
}
|
||||
|
||||
public KahaDBStore getStore() {
|
||||
|
|
|
@ -17,11 +17,14 @@
|
|||
package org.apache.activemq.store.kahadb;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileFilter;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.BrokerServiceAware;
|
||||
|
@ -44,6 +47,7 @@ import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
|
|||
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
|
||||
import org.apache.activemq.usage.SystemUsage;
|
||||
import org.apache.activemq.util.IOHelper;
|
||||
import org.apache.activemq.util.IntrospectionSupport;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -103,12 +107,16 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements Per
|
|||
if (filteredAdapter.getDestination() == null) {
|
||||
filteredAdapter.setDestination(matchAll);
|
||||
}
|
||||
if (MessageDatabase.DEFAULT_DIRECTORY.equals(adapter.getDirectory())) {
|
||||
adapter.setDirectory(new File(getDirectory(), nameFromDestinationFilter(filteredAdapter.getDestination())));
|
||||
|
||||
if (filteredAdapter.isPerDestination()) {
|
||||
configureDirectory(adapter, null);
|
||||
// per destination adapters will be created on demand or during recovery
|
||||
continue;
|
||||
} else {
|
||||
configureDirectory(adapter, nameFromDestinationFilter(filteredAdapter.getDestination()));
|
||||
}
|
||||
|
||||
// need a per store factory that will put the store in the branch qualifier to disiambiguate xid mbeans
|
||||
adapter.getStore().setTransactionIdTransformer(transactionIdTransformer);
|
||||
configureAdapter(adapter);
|
||||
adapters.add(adapter);
|
||||
}
|
||||
super.setEntries(entries);
|
||||
|
@ -147,9 +155,27 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements Per
|
|||
if (result == null) {
|
||||
throw new RuntimeException("No matching persistence adapter configured for destination: " + destination + ", options:" + adapters);
|
||||
}
|
||||
FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
|
||||
if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
|
||||
result = addAdapter(filteredAdapter, destination);
|
||||
startAdapter(((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter(), destination.getQualifiedName());
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.info("created per destination adapter for: " + destination + ", " + result);
|
||||
}
|
||||
}
|
||||
return ((FilteredKahaDBPersistenceAdapter) result).getPersistenceAdapter();
|
||||
}
|
||||
|
||||
private void startAdapter(KahaDBPersistenceAdapter kahaDBPersistenceAdapter, String destination) {
|
||||
try {
|
||||
kahaDBPersistenceAdapter.start();
|
||||
} catch (Exception e) {
|
||||
RuntimeException detail = new RuntimeException("Failed to start per destination persistence adapter for destination: " + destination + ", options:" + adapters, e);
|
||||
LOG.error(detail.toString(), e);
|
||||
throw detail;
|
||||
}
|
||||
}
|
||||
|
||||
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
|
||||
PersistenceAdapter persistenceAdapter = getMatchingPersistenceAdapter(destination);
|
||||
return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createTopicMessageStore(destination));
|
||||
|
@ -164,6 +190,7 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements Per
|
|||
persistenceAdapter.deleteAllMessages();
|
||||
}
|
||||
transactionStore.deleteAllMessages();
|
||||
IOHelper.deleteChildren(getDirectory());
|
||||
}
|
||||
|
||||
public Set<ActiveMQDestination> getDestinations() {
|
||||
|
@ -223,11 +250,86 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements Per
|
|||
}
|
||||
|
||||
public void start() throws Exception {
|
||||
Object result = this.chooseValue(matchAll);
|
||||
if (result != null) {
|
||||
FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) result;
|
||||
if (filteredAdapter.getDestination() == matchAll && filteredAdapter.isPerDestination()) {
|
||||
findAndRegisterExistingAdapters(filteredAdapter);
|
||||
}
|
||||
}
|
||||
for (PersistenceAdapter persistenceAdapter : adapters) {
|
||||
persistenceAdapter.start();
|
||||
}
|
||||
}
|
||||
|
||||
private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) {
|
||||
FileFilter destinationNames = new FileFilter() {
|
||||
@Override
|
||||
public boolean accept(File file) {
|
||||
return file.getName().startsWith("queue#") || file.getName().startsWith("topic#");
|
||||
}
|
||||
};
|
||||
File[] candidates = template.getPersistenceAdapter().getDirectory().listFiles(destinationNames);
|
||||
if (candidates != null) {
|
||||
for (File candidate : candidates) {
|
||||
registerExistingAdapter(template, candidate);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, File candidate) {
|
||||
KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), candidate.getName());
|
||||
startAdapter(adapter, candidate.getName());
|
||||
registerAdapter(adapter, adapter.getDestinations().toArray(new ActiveMQDestination[]{})[0]);
|
||||
}
|
||||
|
||||
private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, ActiveMQDestination destination) {
|
||||
KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), nameFromDestinationFilter(destination));
|
||||
return registerAdapter(adapter, destination);
|
||||
}
|
||||
|
||||
private KahaDBPersistenceAdapter adapterFromTemplate(KahaDBPersistenceAdapter template, String destinationName) {
|
||||
KahaDBPersistenceAdapter adapter = kahaDBFromTemplate(template);
|
||||
configureAdapter(adapter);
|
||||
configureDirectory(adapter, destinationName);
|
||||
return adapter;
|
||||
}
|
||||
|
||||
private void configureDirectory(KahaDBPersistenceAdapter adapter, String fileName) {
|
||||
File directory = null;
|
||||
if (MessageDatabase.DEFAULT_DIRECTORY.equals(adapter.getDirectory())) {
|
||||
// not set so inherit from mkahadb
|
||||
directory = getDirectory();
|
||||
} else {
|
||||
directory = adapter.getDirectory();
|
||||
}
|
||||
if (fileName != null) {
|
||||
directory = new File(directory, fileName);
|
||||
}
|
||||
adapter.setDirectory(directory);
|
||||
}
|
||||
|
||||
private FilteredKahaDBPersistenceAdapter registerAdapter(KahaDBPersistenceAdapter adapter, ActiveMQDestination destination) {
|
||||
adapters.add(adapter);
|
||||
FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(destination, adapter);
|
||||
put(destination, result);
|
||||
return result;
|
||||
}
|
||||
|
||||
private void configureAdapter(KahaDBPersistenceAdapter adapter) {
|
||||
// need a per store factory that will put the store in the branch qualifier to disiambiguate xid mbeans
|
||||
adapter.getStore().setTransactionIdTransformer(transactionIdTransformer);
|
||||
adapter.setBrokerService(getBrokerService());
|
||||
}
|
||||
|
||||
private KahaDBPersistenceAdapter kahaDBFromTemplate(KahaDBPersistenceAdapter template) {
|
||||
Map<String, Object> configuration = new HashMap<String, Object>();
|
||||
IntrospectionSupport.getProperties(template, configuration, null);
|
||||
KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
|
||||
IntrospectionSupport.setProperties(adapter, configuration);
|
||||
return adapter;
|
||||
}
|
||||
|
||||
public void stop() throws Exception {
|
||||
for (PersistenceAdapter persistenceAdapter : adapters) {
|
||||
persistenceAdapter.stop();
|
||||
|
@ -284,7 +386,7 @@ public class MultiKahaDBPersistenceAdapter extends DestinationMap implements Per
|
|||
transactionStore.setJournalMaxWriteBatchSize(journalWriteBatchSize);
|
||||
}
|
||||
|
||||
public int getJournalMaxWriteBatchSize() {
|
||||
public int getJournalWriteBatchSize() {
|
||||
return transactionStore.getJournalMaxWriteBatchSize();
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.store;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter;
|
||||
import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter;
|
||||
|
||||
public class AutoStorePerDestinationTest extends StorePerDestinationTest {
|
||||
|
||||
// use perDestinationFlag to get multiple stores from one match all adapter
|
||||
public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception {
|
||||
|
||||
MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter();
|
||||
if (deleteAllMessages) {
|
||||
multiKahaDBPersistenceAdapter.deleteAllMessages();
|
||||
}
|
||||
ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<FilteredKahaDBPersistenceAdapter>();
|
||||
|
||||
FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter();
|
||||
template.setPersistenceAdapter(createStore(deleteAllMessages));
|
||||
template.setPerDestination(true);
|
||||
adapters.add(template);
|
||||
|
||||
multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
|
||||
brokerService = createBroker(multiKahaDBPersistenceAdapter);
|
||||
}
|
||||
}
|
|
@ -62,7 +62,7 @@ public class StorePerDestinationTest {
|
|||
return broker;
|
||||
}
|
||||
|
||||
private KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
|
||||
protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException {
|
||||
KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter();
|
||||
kaha.setJournalMaxFileLength(maxFileLength);
|
||||
kaha.setCleanupInterval(5000);
|
||||
|
@ -199,7 +199,7 @@ public class StorePerDestinationTest {
|
|||
multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters);
|
||||
|
||||
assertEquals(multiKahaDBPersistenceAdapter.getDirectory(), storeDefault.getDirectory().getParentFile());
|
||||
assertEquals(someOtherDisk, otherStore.getDirectory());
|
||||
assertEquals(someOtherDisk, otherStore.getDirectory().getParentFile());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
Loading…
Reference in New Issue