mirror of https://github.com/apache/activemq.git
Make the mKahaDB store agnostic to the nested persistence adapter type.
This commit is contained in:
parent
74dafd7f24
commit
21fe8cac7d
|
@ -14,11 +14,10 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.store.kahadb;
|
||||
package org.apache.activemq.store;
|
||||
|
||||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
|
||||
|
||||
public interface TransactionIdTransformer {
|
||||
KahaTransactionInfo transform(TransactionId txid);
|
||||
TransactionId transform(TransactionId txid);
|
||||
}
|
|
@ -0,0 +1,23 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.store;
|
||||
|
||||
/**
|
||||
*/
|
||||
public interface TransactionIdTransformerAware {
|
||||
void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer);
|
||||
}
|
|
@ -18,29 +18,30 @@ package org.apache.activemq.store.kahadb;
|
|||
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.filter.DestinationMapEntry;
|
||||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
|
||||
/**
|
||||
* @org.apache.xbean.XBean element="filteredKahaDB"
|
||||
*
|
||||
*/
|
||||
public class FilteredKahaDBPersistenceAdapter extends DestinationMapEntry {
|
||||
private KahaDBPersistenceAdapter persistenceAdapter;
|
||||
private PersistenceAdapter persistenceAdapter;
|
||||
private boolean perDestination;
|
||||
|
||||
public FilteredKahaDBPersistenceAdapter() {
|
||||
super();
|
||||
}
|
||||
|
||||
public FilteredKahaDBPersistenceAdapter(ActiveMQDestination destination, KahaDBPersistenceAdapter adapter) {
|
||||
public FilteredKahaDBPersistenceAdapter(ActiveMQDestination destination, PersistenceAdapter adapter) {
|
||||
setDestination(destination);
|
||||
persistenceAdapter = adapter;
|
||||
}
|
||||
|
||||
public KahaDBPersistenceAdapter getPersistenceAdapter() {
|
||||
public PersistenceAdapter getPersistenceAdapter() {
|
||||
return persistenceAdapter;
|
||||
}
|
||||
|
||||
public void setPersistenceAdapter(KahaDBPersistenceAdapter persistenceAdapter) {
|
||||
public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) {
|
||||
this.persistenceAdapter = persistenceAdapter;
|
||||
}
|
||||
|
||||
|
|
|
@ -39,12 +39,7 @@ import org.apache.activemq.command.ProducerId;
|
|||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.activemq.command.XATransactionId;
|
||||
import org.apache.activemq.protobuf.Buffer;
|
||||
import org.apache.activemq.store.JournaledStore;
|
||||
import org.apache.activemq.store.MessageStore;
|
||||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
import org.apache.activemq.store.SharedFileLocker;
|
||||
import org.apache.activemq.store.TopicMessageStore;
|
||||
import org.apache.activemq.store.TransactionStore;
|
||||
import org.apache.activemq.store.*;
|
||||
import org.apache.activemq.store.kahadb.data.KahaLocalTransactionId;
|
||||
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
|
||||
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
|
||||
|
@ -58,7 +53,7 @@ import org.apache.activemq.util.ServiceStopper;
|
|||
* @org.apache.xbean.XBean element="kahaDB"
|
||||
*
|
||||
*/
|
||||
public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, JournaledStore {
|
||||
public class KahaDBPersistenceAdapter extends LockableServiceSupport implements PersistenceAdapter, JournaledStore, TransactionIdTransformerAware {
|
||||
private final KahaDBStore letter = new KahaDBStore();
|
||||
|
||||
/**
|
||||
|
@ -655,4 +650,8 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements
|
|||
return "KahaDBPersistenceAdapter[" + path + "]";
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTransactionIdTransformer(TransactionIdTransformer transactionIdTransformer) {
|
||||
getStore().setTransactionIdTransformer(transactionIdTransformer);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,12 +56,7 @@ import org.apache.activemq.command.SubscriptionInfo;
|
|||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.activemq.openwire.OpenWireFormat;
|
||||
import org.apache.activemq.protobuf.Buffer;
|
||||
import org.apache.activemq.store.AbstractMessageStore;
|
||||
import org.apache.activemq.store.MessageRecoveryListener;
|
||||
import org.apache.activemq.store.MessageStore;
|
||||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
import org.apache.activemq.store.TopicMessageStore;
|
||||
import org.apache.activemq.store.TransactionStore;
|
||||
import org.apache.activemq.store.*;
|
||||
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
|
||||
import org.apache.activemq.store.kahadb.data.KahaDestination;
|
||||
import org.apache.activemq.store.kahadb.data.KahaDestination.DestinationType;
|
||||
|
@ -69,7 +64,6 @@ import org.apache.activemq.store.kahadb.data.KahaLocation;
|
|||
import org.apache.activemq.store.kahadb.data.KahaRemoveDestinationCommand;
|
||||
import org.apache.activemq.store.kahadb.data.KahaRemoveMessageCommand;
|
||||
import org.apache.activemq.store.kahadb.data.KahaSubscriptionCommand;
|
||||
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
|
||||
import org.apache.activemq.store.kahadb.disk.journal.Location;
|
||||
import org.apache.activemq.store.kahadb.disk.page.Transaction;
|
||||
import org.apache.activemq.usage.MemoryUsage;
|
||||
|
@ -114,8 +108,8 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
this.transactionStore = new KahaDBTransactionStore(this);
|
||||
this.transactionIdTransformer = new TransactionIdTransformer() {
|
||||
@Override
|
||||
public KahaTransactionInfo transform(TransactionId txid) {
|
||||
return TransactionIdConversion.convert(txid);
|
||||
public TransactionId transform(TransactionId txid) {
|
||||
return txid;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
@ -462,7 +456,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
KahaAddMessageCommand command = new KahaAddMessageCommand();
|
||||
command.setDestination(dest);
|
||||
command.setMessageId(message.getMessageId().toProducerKey());
|
||||
command.setTransactionInfo(transactionIdTransformer.transform(message.getTransactionId()));
|
||||
command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(message.getTransactionId())));
|
||||
command.setPriority(message.getPriority());
|
||||
command.setPrioritySupported(isPrioritizedMessages());
|
||||
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(message);
|
||||
|
@ -476,7 +470,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
KahaRemoveMessageCommand command = new KahaRemoveMessageCommand();
|
||||
command.setDestination(dest);
|
||||
command.setMessageId(ack.getLastMessageId().toProducerKey());
|
||||
command.setTransactionInfo(transactionIdTransformer.transform(ack.getTransactionId()));
|
||||
command.setTransactionInfo(TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())));
|
||||
|
||||
org.apache.activemq.util.ByteSequence packet = wireFormat.marshal(ack);
|
||||
command.setAck(new Buffer(packet.getData(), packet.getOffset(), packet.getLength()));
|
||||
|
@ -760,7 +754,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||
command.setDestination(dest);
|
||||
command.setSubscriptionKey(subscriptionKey);
|
||||
command.setMessageId(messageId.toProducerKey());
|
||||
command.setTransactionInfo(ack != null ? transactionIdTransformer.transform(ack.getTransactionId()) : null);
|
||||
command.setTransactionInfo(ack != null ? TransactionIdConversion.convert(transactionIdTransformer.transform(ack.getTransactionId())) : null);
|
||||
if (ack != null && ack.isUnmatchedAck()) {
|
||||
command.setAck(UNMATCHED);
|
||||
} else {
|
||||
|
|
|
@ -525,6 +525,6 @@ public class KahaDBTransactionStore implements TransactionStore {
|
|||
|
||||
|
||||
private KahaTransactionInfo getTransactionInfo(TransactionId txid) {
|
||||
return theStore.getTransactionIdTransformer().transform(txid);
|
||||
return TransactionIdConversion.convert(theStore.getTransactionIdTransformer().transform(txid));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,47 +16,27 @@
|
|||
*/
|
||||
package org.apache.activemq.store.kahadb;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileFilter;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.BrokerServiceAware;
|
||||
import org.apache.activemq.broker.ConnectionContext;
|
||||
import org.apache.activemq.broker.LockableServiceSupport;
|
||||
import org.apache.activemq.broker.Locker;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.command.LocalTransactionId;
|
||||
import org.apache.activemq.command.ProducerId;
|
||||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.activemq.command.XATransactionId;
|
||||
import org.apache.activemq.broker.*;
|
||||
import org.apache.activemq.command.*;
|
||||
import org.apache.activemq.filter.AnyDestination;
|
||||
import org.apache.activemq.filter.DestinationMap;
|
||||
import org.apache.activemq.filter.DestinationMapEntry;
|
||||
import org.apache.activemq.protobuf.Buffer;
|
||||
import org.apache.activemq.store.MessageStore;
|
||||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
import org.apache.activemq.store.SharedFileLocker;
|
||||
import org.apache.activemq.store.TopicMessageStore;
|
||||
import org.apache.activemq.store.TransactionStore;
|
||||
import org.apache.activemq.store.kahadb.data.KahaTransactionInfo;
|
||||
import org.apache.activemq.store.kahadb.data.KahaXATransactionId;
|
||||
import org.apache.activemq.store.*;
|
||||
import org.apache.activemq.usage.SystemUsage;
|
||||
import org.apache.activemq.util.IOExceptionSupport;
|
||||
import org.apache.activemq.util.IOHelper;
|
||||
import org.apache.activemq.util.IntrospectionSupport;
|
||||
import org.apache.activemq.util.ServiceStopper;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.transaction.xa.Xid;
|
||||
import java.io.File;
|
||||
import java.io.FileFilter;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.*;
|
||||
|
||||
/**
|
||||
* An implementation of {@link org.apache.activemq.store.PersistenceAdapter} that supports
|
||||
* distribution of destinations across multiple kahaDB persistence adapters
|
||||
|
@ -77,7 +57,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
|
|||
final DelegateDestinationMap destinationMap = new DelegateDestinationMap();
|
||||
|
||||
BrokerService brokerService;
|
||||
List<KahaDBPersistenceAdapter> adapters = new LinkedList<KahaDBPersistenceAdapter>();
|
||||
List<PersistenceAdapter> adapters = new LinkedList<PersistenceAdapter>();
|
||||
private File directory = new File(IOHelper.getDefaultDataDirectory() + File.separator + "mKahaDB");
|
||||
|
||||
MultiKahaDBTransactionStore transactionStore = new MultiKahaDBTransactionStore(this);
|
||||
|
@ -85,25 +65,31 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
|
|||
// all local store transactions are XA, 2pc if more than one adapter involved
|
||||
TransactionIdTransformer transactionIdTransformer = new TransactionIdTransformer() {
|
||||
@Override
|
||||
public KahaTransactionInfo transform(TransactionId txid) {
|
||||
public TransactionId transform(TransactionId txid) {
|
||||
if (txid == null) {
|
||||
return null;
|
||||
}
|
||||
KahaTransactionInfo rc = new KahaTransactionInfo();
|
||||
KahaXATransactionId kahaTxId = new KahaXATransactionId();
|
||||
if (txid.isLocalTransaction()) {
|
||||
LocalTransactionId t = (LocalTransactionId) txid;
|
||||
kahaTxId.setBranchQualifier(new Buffer(Long.toString(t.getValue()).getBytes(Charset.forName("utf-8"))));
|
||||
kahaTxId.setGlobalTransactionId(new Buffer(t.getConnectionId().getValue().getBytes(Charset.forName("utf-8"))));
|
||||
kahaTxId.setFormatId(LOCAL_FORMAT_ID_MAGIC);
|
||||
final LocalTransactionId t = (LocalTransactionId) txid;
|
||||
return new XATransactionId(new Xid() {
|
||||
@Override
|
||||
public int getFormatId() {
|
||||
return LOCAL_FORMAT_ID_MAGIC;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getGlobalTransactionId() {
|
||||
return t.getConnectionId().getValue().getBytes(Charset.forName("utf-8"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getBranchQualifier() {
|
||||
return Long.toString(t.getValue()).getBytes(Charset.forName("utf-8"));
|
||||
}
|
||||
});
|
||||
} else {
|
||||
XATransactionId t = (XATransactionId) txid;
|
||||
kahaTxId.setBranchQualifier(new Buffer(t.getBranchQualifier()));
|
||||
kahaTxId.setGlobalTransactionId(new Buffer(t.getGlobalTransactionId()));
|
||||
kahaTxId.setFormatId(t.getFormatId());
|
||||
return txid;
|
||||
}
|
||||
rc.setXaTransactionId(kahaTxId);
|
||||
return rc;
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -116,7 +102,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
|
|||
public void setFilteredPersistenceAdapters(List entries) {
|
||||
for (Object entry : entries) {
|
||||
FilteredKahaDBPersistenceAdapter filteredAdapter = (FilteredKahaDBPersistenceAdapter) entry;
|
||||
KahaDBPersistenceAdapter adapter = filteredAdapter.getPersistenceAdapter();
|
||||
PersistenceAdapter adapter = filteredAdapter.getPersistenceAdapter();
|
||||
if (filteredAdapter.getDestination() == null) {
|
||||
filteredAdapter.setDestination(matchAll);
|
||||
}
|
||||
|
@ -172,7 +158,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
|
|||
return transactionStore.proxy(persistenceAdapter.createTransactionStore(), persistenceAdapter.createQueueMessageStore(destination));
|
||||
}
|
||||
|
||||
private PersistenceAdapter getMatchingPersistenceAdapter(ActiveMQDestination destination) {
|
||||
private PersistenceAdapter getMatchingPersistenceAdapter(ActiveMQDestination destination) throws IOException {
|
||||
Object result = destinationMap.chooseValue(destination);
|
||||
if (result == null) {
|
||||
throw new RuntimeException("No matching persistence adapter configured for destination: " + destination + ", options:" + adapters);
|
||||
|
@ -188,7 +174,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
|
|||
return filteredAdapter.getPersistenceAdapter();
|
||||
}
|
||||
|
||||
private void startAdapter(KahaDBPersistenceAdapter kahaDBPersistenceAdapter, String destination) {
|
||||
private void startAdapter(PersistenceAdapter kahaDBPersistenceAdapter, String destination) {
|
||||
try {
|
||||
kahaDBPersistenceAdapter.start();
|
||||
} catch (Exception e) {
|
||||
|
@ -198,7 +184,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
|
|||
}
|
||||
}
|
||||
|
||||
private void stopAdapter(KahaDBPersistenceAdapter kahaDBPersistenceAdapter, String destination) {
|
||||
private void stopAdapter(PersistenceAdapter kahaDBPersistenceAdapter, String destination) {
|
||||
try {
|
||||
kahaDBPersistenceAdapter.stop();
|
||||
} catch (Exception e) {
|
||||
|
@ -257,25 +243,35 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
|
|||
|
||||
@Override
|
||||
public void removeQueueMessageStore(ActiveMQQueue destination) {
|
||||
PersistenceAdapter adapter = getMatchingPersistenceAdapter(destination);
|
||||
if (adapter instanceof KahaDBPersistenceAdapter) {
|
||||
PersistenceAdapter adapter = null;
|
||||
try {
|
||||
adapter = getMatchingPersistenceAdapter(destination);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
if (adapter instanceof PersistenceAdapter) {
|
||||
adapter.removeQueueMessageStore(destination);
|
||||
removeMessageStore((KahaDBPersistenceAdapter)adapter, destination);
|
||||
removeMessageStore((PersistenceAdapter)adapter, destination);
|
||||
destinationMap.removeAll(destination);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeTopicMessageStore(ActiveMQTopic destination) {
|
||||
PersistenceAdapter adapter = getMatchingPersistenceAdapter(destination);
|
||||
if (adapter instanceof KahaDBPersistenceAdapter) {
|
||||
PersistenceAdapter adapter = null;
|
||||
try {
|
||||
adapter = getMatchingPersistenceAdapter(destination);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
if (adapter instanceof PersistenceAdapter) {
|
||||
adapter.removeTopicMessageStore(destination);
|
||||
removeMessageStore((KahaDBPersistenceAdapter)adapter, destination);
|
||||
removeMessageStore((PersistenceAdapter)adapter, destination);
|
||||
destinationMap.removeAll(destination);
|
||||
}
|
||||
}
|
||||
|
||||
private void removeMessageStore(KahaDBPersistenceAdapter adapter, ActiveMQDestination destination) {
|
||||
private void removeMessageStore(PersistenceAdapter adapter, ActiveMQDestination destination) {
|
||||
if (adapter.getDestinations().isEmpty()) {
|
||||
stopAdapter(adapter, destination.toString());
|
||||
File adapterDir = adapter.getDirectory();
|
||||
|
@ -335,7 +331,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
|
|||
}
|
||||
}
|
||||
|
||||
private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) {
|
||||
private void findAndRegisterExistingAdapters(FilteredKahaDBPersistenceAdapter template) throws IOException {
|
||||
FileFilter destinationNames = new FileFilter() {
|
||||
@Override
|
||||
public boolean accept(File file) {
|
||||
|
@ -350,8 +346,8 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
|
|||
}
|
||||
}
|
||||
|
||||
private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, File candidate) {
|
||||
KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), candidate.getName());
|
||||
private void registerExistingAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, File candidate) throws IOException {
|
||||
PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), candidate.getName());
|
||||
startAdapter(adapter, candidate.getName());
|
||||
Set<ActiveMQDestination> destinations = adapter.getDestinations();
|
||||
if (destinations.size() != 0) {
|
||||
|
@ -361,19 +357,19 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
|
|||
}
|
||||
}
|
||||
|
||||
private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, ActiveMQDestination destination) {
|
||||
KahaDBPersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), nameFromDestinationFilter(destination));
|
||||
private FilteredKahaDBPersistenceAdapter addAdapter(FilteredKahaDBPersistenceAdapter filteredAdapter, ActiveMQDestination destination) throws IOException {
|
||||
PersistenceAdapter adapter = adapterFromTemplate(filteredAdapter.getPersistenceAdapter(), nameFromDestinationFilter(destination));
|
||||
return registerAdapter(adapter, destination);
|
||||
}
|
||||
|
||||
private KahaDBPersistenceAdapter adapterFromTemplate(KahaDBPersistenceAdapter template, String destinationName) {
|
||||
KahaDBPersistenceAdapter adapter = kahaDBFromTemplate(template);
|
||||
private PersistenceAdapter adapterFromTemplate(PersistenceAdapter template, String destinationName) throws IOException {
|
||||
PersistenceAdapter adapter = kahaDBFromTemplate(template);
|
||||
configureAdapter(adapter);
|
||||
configureDirectory(adapter, destinationName);
|
||||
return adapter;
|
||||
}
|
||||
|
||||
private void configureDirectory(KahaDBPersistenceAdapter adapter, String fileName) {
|
||||
private void configureDirectory(PersistenceAdapter adapter, String fileName) {
|
||||
File directory = null;
|
||||
if (MessageDatabase.DEFAULT_DIRECTORY.equals(adapter.getDirectory())) {
|
||||
// not set so inherit from mkahadb
|
||||
|
@ -387,28 +383,36 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
|
|||
adapter.setDirectory(directory);
|
||||
}
|
||||
|
||||
private FilteredKahaDBPersistenceAdapter registerAdapter(KahaDBPersistenceAdapter adapter, ActiveMQDestination destination) {
|
||||
private FilteredKahaDBPersistenceAdapter registerAdapter(PersistenceAdapter adapter, ActiveMQDestination destination) {
|
||||
adapters.add(adapter);
|
||||
FilteredKahaDBPersistenceAdapter result = new FilteredKahaDBPersistenceAdapter(destination, adapter);
|
||||
destinationMap.put(destination, result);
|
||||
return result;
|
||||
}
|
||||
|
||||
private void configureAdapter(KahaDBPersistenceAdapter adapter) {
|
||||
private void configureAdapter(PersistenceAdapter adapter) {
|
||||
// need a per store factory that will put the store in the branch qualifier to disiambiguate xid mbeans
|
||||
adapter.getStore().setTransactionIdTransformer(transactionIdTransformer);
|
||||
((TransactionIdTransformerAware)adapter).setTransactionIdTransformer(transactionIdTransformer);
|
||||
if (isUseLock()) {
|
||||
adapter.setUseLock(false);
|
||||
if( adapter instanceof Lockable ) {
|
||||
((Lockable)adapter).setUseLock(false);
|
||||
}
|
||||
}
|
||||
if( adapter instanceof BrokerServiceAware ) {
|
||||
((BrokerServiceAware)adapter).setBrokerService(getBrokerService());
|
||||
}
|
||||
adapter.setBrokerService(getBrokerService());
|
||||
}
|
||||
|
||||
private KahaDBPersistenceAdapter kahaDBFromTemplate(KahaDBPersistenceAdapter template) {
|
||||
Map<String, Object> configuration = new HashMap<String, Object>();
|
||||
IntrospectionSupport.getProperties(template, configuration, null);
|
||||
KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter();
|
||||
IntrospectionSupport.setProperties(adapter, configuration);
|
||||
return adapter;
|
||||
private PersistenceAdapter kahaDBFromTemplate(PersistenceAdapter template) throws IOException {
|
||||
try {
|
||||
Map<String, Object> configuration = new HashMap<String, Object>();
|
||||
IntrospectionSupport.getProperties(template, configuration, null);
|
||||
PersistenceAdapter adapter = template.getClass().newInstance();
|
||||
IntrospectionSupport.setProperties(adapter, configuration);
|
||||
return adapter;
|
||||
} catch (Exception e) {
|
||||
throw IOExceptionSupport.create(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -434,8 +438,10 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
|
|||
|
||||
@Override
|
||||
public void setBrokerService(BrokerService brokerService) {
|
||||
for (KahaDBPersistenceAdapter persistenceAdapter : adapters) {
|
||||
persistenceAdapter.setBrokerService(brokerService);
|
||||
for (PersistenceAdapter persistenceAdapter : adapters) {
|
||||
if( persistenceAdapter instanceof BrokerServiceAware ) {
|
||||
((BrokerServiceAware)persistenceAdapter).setBrokerService(getBrokerService());
|
||||
}
|
||||
}
|
||||
this.brokerService = brokerService;
|
||||
}
|
||||
|
|
|
@ -32,13 +32,7 @@ import org.apache.activemq.command.MessageAck;
|
|||
import org.apache.activemq.command.MessageId;
|
||||
import org.apache.activemq.command.TransactionId;
|
||||
import org.apache.activemq.command.XATransactionId;
|
||||
import org.apache.activemq.store.AbstractMessageStore;
|
||||
import org.apache.activemq.store.MessageStore;
|
||||
import org.apache.activemq.store.ProxyMessageStore;
|
||||
import org.apache.activemq.store.ProxyTopicMessageStore;
|
||||
import org.apache.activemq.store.TopicMessageStore;
|
||||
import org.apache.activemq.store.TransactionRecoveryListener;
|
||||
import org.apache.activemq.store.TransactionStore;
|
||||
import org.apache.activemq.store.*;
|
||||
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
|
||||
import org.apache.activemq.store.kahadb.data.KahaEntryType;
|
||||
import org.apache.activemq.store.kahadb.data.KahaPrepareCommand;
|
||||
|
@ -237,11 +231,11 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
|
|||
}
|
||||
|
||||
public void persistOutcome(Tx tx, TransactionId txid) throws IOException {
|
||||
tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
|
||||
tx.trackPrepareLocation(store(new KahaPrepareCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)))));
|
||||
}
|
||||
|
||||
public void persistCompletion(TransactionId txid) throws IOException {
|
||||
store(new KahaCommitCommand().setTransactionInfo(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid)));
|
||||
store(new KahaCommitCommand().setTransactionInfo(TransactionIdConversion.convert(multiKahaDBPersistenceAdapter.transactionIdTransformer.transform(txid))));
|
||||
}
|
||||
|
||||
private Location store(JournalCommand<?> data) throws IOException {
|
||||
|
@ -343,7 +337,7 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
|
|||
|
||||
public synchronized void recover(final TransactionRecoveryListener listener) throws IOException {
|
||||
|
||||
for (final KahaDBPersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) {
|
||||
for (final PersistenceAdapter adapter : multiKahaDBPersistenceAdapter.adapters) {
|
||||
adapter.createTransactionStore().recover(new TransactionRecoveryListener() {
|
||||
@Override
|
||||
public void recover(XATransactionId xid, Message[] addedMessages, MessageAck[] acks) {
|
||||
|
|
Loading…
Reference in New Issue