Renamed the kahaPersistentAdaptor to kahaPersistenceAdapter to be consistent with other persistence adapters. Makes configuration slightly more intuitive. :)

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@426461 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Adrian T. Co 2006-07-28 09:54:32 +00:00
parent c36ea1aa9b
commit 6e755a5cc5
12 changed files with 199 additions and 199 deletions

View File

@ -1,179 +1,179 @@
/** /**
* *
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0 * The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with * (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at * the License. You may obtain a copy of the License at
* *
* http://www.apache.org/licenses/LICENSE-2.0 * http://www.apache.org/licenses/LICENSE-2.0
* *
* Unless required by applicable law or agreed to in writing, software * Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, * distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.activemq.store.kahadaptor; package org.apache.activemq.store.kahadaptor;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Set; import java.util.Set;
import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory; import org.apache.activemq.kaha.StoreFactory;
import org.apache.activemq.kaha.StringMarshaller; import org.apache.activemq.kaha.StringMarshaller;
import org.apache.activemq.memory.UsageManager; import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter; import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore; import org.apache.activemq.store.TransactionStore;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
/** /**
* @org.apache.xbean.XBean * @org.apache.xbean.XBean
* *
* @version $Revision: 1.4 $ * @version $Revision: 1.4 $
*/ */
public class KahaPersistentAdaptor implements PersistenceAdapter{ public class KahaPersistenceAdapter implements PersistenceAdapter{
private static final Log log=LogFactory.getLog(KahaPersistentAdaptor.class); private static final Log log=LogFactory.getLog(KahaPersistenceAdapter.class);
static final String PREPARED_TRANSACTIONS_NAME="PreparedTransactions"; static final String PREPARED_TRANSACTIONS_NAME="PreparedTransactions";
KahaTransactionStore transactionStore; KahaTransactionStore transactionStore;
ConcurrentHashMap topics=new ConcurrentHashMap(); ConcurrentHashMap topics=new ConcurrentHashMap();
ConcurrentHashMap queues=new ConcurrentHashMap(); ConcurrentHashMap queues=new ConcurrentHashMap();
ConcurrentHashMap messageStores=new ConcurrentHashMap(); ConcurrentHashMap messageStores=new ConcurrentHashMap();
private boolean useExternalMessageReferences; private boolean useExternalMessageReferences;
private OpenWireFormat wireFormat=new OpenWireFormat(); private OpenWireFormat wireFormat=new OpenWireFormat();
Store store; Store store;
public KahaPersistentAdaptor(File dir) throws IOException{ public KahaPersistenceAdapter(File dir) throws IOException{
if(!dir.exists()){ if(!dir.exists()){
dir.mkdirs(); dir.mkdirs();
} }
String name=dir.getAbsolutePath()+File.separator+"kaha.db"; String name=dir.getAbsolutePath()+File.separator+"kaha.db";
store=StoreFactory.open(name,"rw"); store=StoreFactory.open(name,"rw");
} }
public Set getDestinations(){ public Set getDestinations(){
Set rc=new HashSet(); Set rc=new HashSet();
try { try {
for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){ for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
Object obj=i.next(); Object obj=i.next();
if(obj instanceof ActiveMQDestination){ if(obj instanceof ActiveMQDestination){
rc.add(obj); rc.add(obj);
} }
} }
}catch(IOException e){ }catch(IOException e){
log.error("Failed to get destinations " ,e); log.error("Failed to get destinations " ,e);
} }
return rc; return rc;
} }
public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{ public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
MessageStore rc=(MessageStore) queues.get(destination); MessageStore rc=(MessageStore) queues.get(destination);
if(rc==null){ if(rc==null){
rc=new KahaMessageStore(getMapContainer(destination,"queue-data"),destination); rc=new KahaMessageStore(getMapContainer(destination,"queue-data"),destination);
messageStores.put(destination, rc); messageStores.put(destination, rc);
if(transactionStore!=null){ if(transactionStore!=null){
rc=transactionStore.proxy(rc); rc=transactionStore.proxy(rc);
} }
queues.put(destination,rc); queues.put(destination,rc);
} }
return rc; return rc;
} }
public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{ public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{
TopicMessageStore rc=(TopicMessageStore) topics.get(destination); TopicMessageStore rc=(TopicMessageStore) topics.get(destination);
if(rc==null){ if(rc==null){
MapContainer messageContainer=getMapContainer(destination,"topic-data"); MapContainer messageContainer=getMapContainer(destination,"topic-data");
MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs"); MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs");
MapContainer ackContainer=store.getMapContainer(destination.toString(),"topic-acks"); MapContainer ackContainer=store.getMapContainer(destination.toString(),"topic-acks");
ackContainer.setKeyMarshaller(new StringMarshaller()); ackContainer.setKeyMarshaller(new StringMarshaller());
ackContainer.setValueMarshaller(new AtomicIntegerMarshaller()); ackContainer.setValueMarshaller(new AtomicIntegerMarshaller());
rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination); rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination);
messageStores.put(destination, rc); messageStores.put(destination, rc);
if(transactionStore!=null){ if(transactionStore!=null){
rc=transactionStore.proxy(rc); rc=transactionStore.proxy(rc);
} }
topics.put(destination,rc); topics.put(destination,rc);
} }
return rc; return rc;
} }
protected MessageStore retrieveMessageStore(Object id){ protected MessageStore retrieveMessageStore(Object id){
MessageStore result = (MessageStore) messageStores.get(id); MessageStore result = (MessageStore) messageStores.get(id);
return result; return result;
} }
public TransactionStore createTransactionStore() throws IOException{ public TransactionStore createTransactionStore() throws IOException{
if(transactionStore==null){ if(transactionStore==null){
MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME,"transactions"); MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME,"transactions");
container.setKeyMarshaller(new CommandMarshaller(wireFormat)); container.setKeyMarshaller(new CommandMarshaller(wireFormat));
container.setValueMarshaller(new TransactionMarshaller(wireFormat)); container.setValueMarshaller(new TransactionMarshaller(wireFormat));
container.load(); container.load();
transactionStore=new KahaTransactionStore(this,container); transactionStore=new KahaTransactionStore(this,container);
} }
return transactionStore; return transactionStore;
} }
public void beginTransaction(ConnectionContext context){} public void beginTransaction(ConnectionContext context){}
public void commitTransaction(ConnectionContext context) throws IOException{ public void commitTransaction(ConnectionContext context) throws IOException{
store.force(); store.force();
} }
public void rollbackTransaction(ConnectionContext context){} public void rollbackTransaction(ConnectionContext context){}
public void start() throws Exception{} public void start() throws Exception{}
public void stop() throws Exception{ public void stop() throws Exception{
store.close(); store.close();
} }
public long getLastMessageBrokerSequenceId() throws IOException{ public long getLastMessageBrokerSequenceId() throws IOException{
return 0; return 0;
} }
public void deleteAllMessages() throws IOException{ public void deleteAllMessages() throws IOException{
if(store!=null){ if(store!=null){
store.delete(); store.delete();
} }
} }
public boolean isUseExternalMessageReferences(){ public boolean isUseExternalMessageReferences(){
return useExternalMessageReferences; return useExternalMessageReferences;
} }
public void setUseExternalMessageReferences(boolean useExternalMessageReferences){ public void setUseExternalMessageReferences(boolean useExternalMessageReferences){
this.useExternalMessageReferences=useExternalMessageReferences; this.useExternalMessageReferences=useExternalMessageReferences;
} }
protected MapContainer getMapContainer(Object id,String containerName) throws IOException{ protected MapContainer getMapContainer(Object id,String containerName) throws IOException{
MapContainer container=store.getMapContainer(id,containerName); MapContainer container=store.getMapContainer(id,containerName);
container.setKeyMarshaller(new StringMarshaller()); container.setKeyMarshaller(new StringMarshaller());
if(useExternalMessageReferences){ if(useExternalMessageReferences){
container.setValueMarshaller(new StringMarshaller()); container.setValueMarshaller(new StringMarshaller());
}else{ }else{
container.setValueMarshaller(new CommandMarshaller(wireFormat)); container.setValueMarshaller(new CommandMarshaller(wireFormat));
} }
container.load(); container.load();
return container; return container;
} }
/** /**
* @param usageManager * @param usageManager
* The UsageManager that is controlling the broker's memory usage. * The UsageManager that is controlling the broker's memory usage.
*/ */
public void setUsageManager(UsageManager usageManager){} public void setUsageManager(UsageManager usageManager){}
} }

View File

@ -43,9 +43,9 @@ import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
public class KahaTransactionStore implements TransactionStore{ public class KahaTransactionStore implements TransactionStore{
private Map transactions=new ConcurrentHashMap(); private Map transactions=new ConcurrentHashMap();
private Map prepared; private Map prepared;
private KahaPersistentAdaptor adaptor; private KahaPersistenceAdapter adaptor;
KahaTransactionStore(KahaPersistentAdaptor adaptor,Map preparedMap){ KahaTransactionStore(KahaPersistenceAdapter adaptor,Map preparedMap){
this.adaptor=adaptor; this.adaptor=adaptor;
this.prepared=preparedMap; this.prepared=preparedMap;
} }

View File

@ -22,7 +22,7 @@ import java.net.URISyntaxException;
import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.JmsTopicTransactionTest; import org.apache.activemq.JmsTopicTransactionTest;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadaptor.KahaPersistentAdaptor; import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
import org.apache.activemq.test.JmsResourceProvider; import org.apache.activemq.test.JmsResourceProvider;
/** /**
* Test failover for Queues * Test failover for Queues
@ -39,7 +39,7 @@ public class TransactedTopicMasterSlaveTest extends JmsTopicTransactionTest{
// this will create the main (or master broker) // this will create the main (or master broker)
broker=createBroker(); broker=createBroker();
broker.start(); broker.start();
KahaPersistentAdaptor adaptor=new KahaPersistentAdaptor(new File("activemq-data/slave")); KahaPersistenceAdapter adaptor=new KahaPersistenceAdapter(new File("activemq-data/slave"));
slave = new BrokerService(); slave = new BrokerService();
slave.setBrokerName("slave"); slave.setBrokerName("slave");
slave.setPersistenceAdapter(adaptor); slave.setPersistenceAdapter(adaptor);
@ -66,7 +66,7 @@ public class TransactedTopicMasterSlaveTest extends JmsTopicTransactionTest{
protected BrokerService createBroker() throws Exception,URISyntaxException{ protected BrokerService createBroker() throws Exception,URISyntaxException{
BrokerService broker=new BrokerService(); BrokerService broker=new BrokerService();
broker.setBrokerName("master"); broker.setBrokerName("master");
KahaPersistentAdaptor adaptor=new KahaPersistentAdaptor(new File("activemq-data/master")); KahaPersistenceAdapter adaptor=new KahaPersistenceAdapter(new File("activemq-data/master"));
broker.setPersistenceAdapter(adaptor); broker.setPersistenceAdapter(adaptor);
broker.addConnector("tcp://localhost:62001"); broker.addConnector("tcp://localhost:62001");
broker.setDeleteAllMessagesOnStartup(true); broker.setDeleteAllMessagesOnStartup(true);

View File

@ -22,7 +22,7 @@ import junit.framework.Test;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.RecoveryBrokerTest; import org.apache.activemq.broker.RecoveryBrokerTest;
import org.apache.activemq.store.kahadaptor.KahaPersistentAdaptor; import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
import org.apache.activemq.xbean.BrokerFactoryBean; import org.apache.activemq.xbean.BrokerFactoryBean;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
@ -41,7 +41,7 @@ public class KahaRecoveryBrokerTest extends RecoveryBrokerTest {
protected BrokerService createRestartedBroker() throws Exception { protected BrokerService createRestartedBroker() throws Exception {
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
KahaPersistentAdaptor adaptor = new KahaPersistentAdaptor(new File( System.getProperty("basedir", ".")+"/target/activemq-data/kaha-store.db")); KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File( System.getProperty("basedir", ".")+"/target/activemq-data/kaha-store.db"));
broker.setPersistenceAdapter(adaptor); broker.setPersistenceAdapter(adaptor);
broker.addConnector("tcp://localhost:0"); broker.addConnector("tcp://localhost:0");
return broker; return broker;

View File

@ -22,7 +22,7 @@ import junit.framework.Test;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.XARecoveryBrokerTest; import org.apache.activemq.broker.XARecoveryBrokerTest;
import org.apache.activemq.store.kahadaptor.KahaPersistentAdaptor; import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
import org.apache.activemq.xbean.BrokerFactoryBean; import org.apache.activemq.xbean.BrokerFactoryBean;
import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.ClassPathResource;
@ -50,7 +50,7 @@ public class KahaXARecoveryBrokerTest extends XARecoveryBrokerTest {
protected BrokerService createRestartedBroker() throws Exception { protected BrokerService createRestartedBroker() throws Exception {
BrokerService broker = new BrokerService(); BrokerService broker = new BrokerService();
KahaPersistentAdaptor adaptor = new KahaPersistentAdaptor(new File( System.getProperty("basedir", ".")+"/target/activemq-data/kaha-store.db")); KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File( System.getProperty("basedir", ".")+"/target/activemq-data/kaha-store.db"));
broker.setPersistenceAdapter(adaptor); broker.setPersistenceAdapter(adaptor);
broker.addConnector("tcp://localhost:0"); broker.addConnector("tcp://localhost:0");
return broker; return broker;

View File

@ -19,7 +19,7 @@ package org.apache.activemq.perf;
import java.io.File; import java.io.File;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadaptor.KahaPersistentAdaptor; import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
/** /**
* @version $Revision: 1.3 $ * @version $Revision: 1.3 $
*/ */
@ -37,7 +37,7 @@ public class KahaDurableTopicTest extends SimpleDurableTopicTest {
*/ */
protected void configureBroker(BrokerService answer) throws Exception{ protected void configureBroker(BrokerService answer) throws Exception{
KahaPersistentAdaptor adaptor = new KahaPersistentAdaptor(new File("activemq-data/perfTest")); KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File("activemq-data/perfTest"));
answer.setPersistenceAdapter(adaptor); answer.setPersistenceAdapter(adaptor);
answer.addConnector(bindAddress); answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true); answer.setDeleteAllMessagesOnStartup(true);
@ -47,4 +47,4 @@ public class KahaDurableTopicTest extends SimpleDurableTopicTest {
} }

View File

@ -22,7 +22,7 @@ import javax.jms.Destination;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.Session; import javax.jms.Session;
import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadaptor.KahaPersistentAdaptor; import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter;
/** /**
* @version $Revision: 1.3 $ * @version $Revision: 1.3 $
*/ */
@ -30,10 +30,10 @@ public class KahaQueueTest extends SimpleQueueTest{
protected void configureBroker(BrokerService answer) throws Exception{ protected void configureBroker(BrokerService answer) throws Exception{
KahaPersistentAdaptor adaptor = new KahaPersistentAdaptor(new File("activemq-data/perfTest")); KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File("activemq-data/perfTest"));
answer.setPersistenceAdapter(adaptor); answer.setPersistenceAdapter(adaptor);
answer.addConnector(bindAddress); answer.addConnector(bindAddress);
answer.setDeleteAllMessagesOnStartup(true); answer.setDeleteAllMessagesOnStartup(true);
} }
} }

View File

@ -24,7 +24,7 @@
</transportConnectors> </transportConnectors>
<persistenceAdapter> <persistenceAdapter>
<kahaPersistentAdaptor dir = "${basedir}/target/activemq-data/master"/> <kahaPersistenceAdapter dir = "${basedir}/target/activemq-data/master"/>
</persistenceAdapter> </persistenceAdapter>
</broker> </broker>

View File

@ -26,7 +26,7 @@
<persistenceAdapter> <persistenceAdapter>
<kahaPersistentAdaptor dir = "${basedir}/target/activemq-data/slave"/> <kahaPersistenceAdapter dir = "${basedir}/target/activemq-data/slave"/>
</persistenceAdapter> </persistenceAdapter>
</broker> </broker>

View File

@ -24,7 +24,7 @@
</transportConnectors> </transportConnectors>
<persistenceAdapter> <persistenceAdapter>
<kahaPersistentAdaptor dir = "${basedir}/target/activemq-data/kaha-broker.db"/> <kahaPersistenceAdapter dir = "${basedir}/target/activemq-data/kaha-broker.db"/>
</persistenceAdapter> </persistenceAdapter>
</broker> </broker>

View File

@ -23,7 +23,7 @@
<transportConnector uri="tcp://localhost:61616"/> <transportConnector uri="tcp://localhost:61616"/>
</transportConnectors> </transportConnectors>
<persistenceAdapter> <persistenceAdapter>
<kahaPersistentAdaptor dir = "activemq-data"/> <kahaPersistenceAdapter dir = "activemq-data"/>
</persistenceAdapter> </persistenceAdapter>
</broker> </broker>

View File

@ -3,7 +3,7 @@
<broker useJmx="false" brokerName="kahaBroker" start="false" persistent="true" useShutdownHook="false" deleteAllMessagesOnStartup="true" advisorySupport="false"> <broker useJmx="false" brokerName="kahaBroker" start="false" persistent="true" useShutdownHook="false" deleteAllMessagesOnStartup="true" advisorySupport="false">
<persistenceAdapter> <persistenceAdapter>
<kahaPersistentAdaptor dir="target/kaha-data"/> <kahaPersistenceAdapter dir="target/kaha-data"/>
</persistenceAdapter> </persistenceAdapter>
<transportConnectors> <transportConnectors>