diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java old mode 100755 new mode 100644 similarity index 95% rename from activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java rename to activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java index 42677a7c53..5c3b913270 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java @@ -1,179 +1,179 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.store.kahadaptor; - -import java.io.File; -import java.io.IOException; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Set; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.kaha.MapContainer; -import org.apache.activemq.kaha.Store; -import org.apache.activemq.kaha.StoreFactory; -import org.apache.activemq.kaha.StringMarshaller; -import org.apache.activemq.memory.UsageManager; -import org.apache.activemq.openwire.OpenWireFormat; -import org.apache.activemq.store.MessageStore; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.TopicMessageStore; -import org.apache.activemq.store.TransactionStore; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; -/** - * @org.apache.xbean.XBean - * - * @version $Revision: 1.4 $ - */ -public class KahaPersistentAdaptor implements PersistenceAdapter{ - private static final Log log=LogFactory.getLog(KahaPersistentAdaptor.class); - static final String PREPARED_TRANSACTIONS_NAME="PreparedTransactions"; - KahaTransactionStore transactionStore; - ConcurrentHashMap topics=new ConcurrentHashMap(); - ConcurrentHashMap queues=new ConcurrentHashMap(); - ConcurrentHashMap messageStores=new ConcurrentHashMap(); - private boolean useExternalMessageReferences; - private OpenWireFormat wireFormat=new OpenWireFormat(); - Store store; - - public KahaPersistentAdaptor(File dir) throws IOException{ - if(!dir.exists()){ - dir.mkdirs(); - } - String name=dir.getAbsolutePath()+File.separator+"kaha.db"; - store=StoreFactory.open(name,"rw"); - } - - public Set getDestinations(){ - - Set rc=new HashSet(); - try { - for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){ - Object obj=i.next(); - if(obj instanceof ActiveMQDestination){ - rc.add(obj); - } - } - }catch(IOException e){ - log.error("Failed to get destinations " ,e); - } - return rc; - } - - public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{ - MessageStore rc=(MessageStore) queues.get(destination); - if(rc==null){ - rc=new KahaMessageStore(getMapContainer(destination,"queue-data"),destination); - messageStores.put(destination, rc); - if(transactionStore!=null){ - rc=transactionStore.proxy(rc); - } - queues.put(destination,rc); - } - return rc; - } - - public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{ - TopicMessageStore rc=(TopicMessageStore) topics.get(destination); - if(rc==null){ - MapContainer messageContainer=getMapContainer(destination,"topic-data"); - MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs"); - MapContainer ackContainer=store.getMapContainer(destination.toString(),"topic-acks"); - ackContainer.setKeyMarshaller(new StringMarshaller()); - ackContainer.setValueMarshaller(new AtomicIntegerMarshaller()); - rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination); - messageStores.put(destination, rc); - if(transactionStore!=null){ - rc=transactionStore.proxy(rc); - } - topics.put(destination,rc); - - } - return rc; - } - - protected MessageStore retrieveMessageStore(Object id){ - MessageStore result = (MessageStore) messageStores.get(id); - return result; - } - - public TransactionStore createTransactionStore() throws IOException{ - if(transactionStore==null){ - MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME,"transactions"); - container.setKeyMarshaller(new CommandMarshaller(wireFormat)); - container.setValueMarshaller(new TransactionMarshaller(wireFormat)); - container.load(); - transactionStore=new KahaTransactionStore(this,container); - } - return transactionStore; - } - - public void beginTransaction(ConnectionContext context){} - - public void commitTransaction(ConnectionContext context) throws IOException{ - store.force(); - } - - public void rollbackTransaction(ConnectionContext context){} - - public void start() throws Exception{} - - public void stop() throws Exception{ - store.close(); - } - - public long getLastMessageBrokerSequenceId() throws IOException{ - return 0; - } - - public void deleteAllMessages() throws IOException{ - if(store!=null){ - store.delete(); - } - } - - public boolean isUseExternalMessageReferences(){ - return useExternalMessageReferences; - } - - public void setUseExternalMessageReferences(boolean useExternalMessageReferences){ - this.useExternalMessageReferences=useExternalMessageReferences; - } - - protected MapContainer getMapContainer(Object id,String containerName) throws IOException{ - MapContainer container=store.getMapContainer(id,containerName); - container.setKeyMarshaller(new StringMarshaller()); - if(useExternalMessageReferences){ - container.setValueMarshaller(new StringMarshaller()); - }else{ - container.setValueMarshaller(new CommandMarshaller(wireFormat)); - } - container.load(); - return container; - } - - /** - * @param usageManager - * The UsageManager that is controlling the broker's memory usage. - */ - public void setUsageManager(UsageManager usageManager){} -} +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadaptor; + +import java.io.File; +import java.io.IOException; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Set; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.kaha.MapContainer; +import org.apache.activemq.kaha.Store; +import org.apache.activemq.kaha.StoreFactory; +import org.apache.activemq.kaha.StringMarshaller; +import org.apache.activemq.memory.UsageManager; +import org.apache.activemq.openwire.OpenWireFormat; +import org.apache.activemq.store.MessageStore; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.TopicMessageStore; +import org.apache.activemq.store.TransactionStore; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; +/** + * @org.apache.xbean.XBean + * + * @version $Revision: 1.4 $ + */ +public class KahaPersistenceAdapter implements PersistenceAdapter{ + private static final Log log=LogFactory.getLog(KahaPersistenceAdapter.class); + static final String PREPARED_TRANSACTIONS_NAME="PreparedTransactions"; + KahaTransactionStore transactionStore; + ConcurrentHashMap topics=new ConcurrentHashMap(); + ConcurrentHashMap queues=new ConcurrentHashMap(); + ConcurrentHashMap messageStores=new ConcurrentHashMap(); + private boolean useExternalMessageReferences; + private OpenWireFormat wireFormat=new OpenWireFormat(); + Store store; + + public KahaPersistenceAdapter(File dir) throws IOException{ + if(!dir.exists()){ + dir.mkdirs(); + } + String name=dir.getAbsolutePath()+File.separator+"kaha.db"; + store=StoreFactory.open(name,"rw"); + } + + public Set getDestinations(){ + + Set rc=new HashSet(); + try { + for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){ + Object obj=i.next(); + if(obj instanceof ActiveMQDestination){ + rc.add(obj); + } + } + }catch(IOException e){ + log.error("Failed to get destinations " ,e); + } + return rc; + } + + public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{ + MessageStore rc=(MessageStore) queues.get(destination); + if(rc==null){ + rc=new KahaMessageStore(getMapContainer(destination,"queue-data"),destination); + messageStores.put(destination, rc); + if(transactionStore!=null){ + rc=transactionStore.proxy(rc); + } + queues.put(destination,rc); + } + return rc; + } + + public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{ + TopicMessageStore rc=(TopicMessageStore) topics.get(destination); + if(rc==null){ + MapContainer messageContainer=getMapContainer(destination,"topic-data"); + MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs"); + MapContainer ackContainer=store.getMapContainer(destination.toString(),"topic-acks"); + ackContainer.setKeyMarshaller(new StringMarshaller()); + ackContainer.setValueMarshaller(new AtomicIntegerMarshaller()); + rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination); + messageStores.put(destination, rc); + if(transactionStore!=null){ + rc=transactionStore.proxy(rc); + } + topics.put(destination,rc); + + } + return rc; + } + + protected MessageStore retrieveMessageStore(Object id){ + MessageStore result = (MessageStore) messageStores.get(id); + return result; + } + + public TransactionStore createTransactionStore() throws IOException{ + if(transactionStore==null){ + MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME,"transactions"); + container.setKeyMarshaller(new CommandMarshaller(wireFormat)); + container.setValueMarshaller(new TransactionMarshaller(wireFormat)); + container.load(); + transactionStore=new KahaTransactionStore(this,container); + } + return transactionStore; + } + + public void beginTransaction(ConnectionContext context){} + + public void commitTransaction(ConnectionContext context) throws IOException{ + store.force(); + } + + public void rollbackTransaction(ConnectionContext context){} + + public void start() throws Exception{} + + public void stop() throws Exception{ + store.close(); + } + + public long getLastMessageBrokerSequenceId() throws IOException{ + return 0; + } + + public void deleteAllMessages() throws IOException{ + if(store!=null){ + store.delete(); + } + } + + public boolean isUseExternalMessageReferences(){ + return useExternalMessageReferences; + } + + public void setUseExternalMessageReferences(boolean useExternalMessageReferences){ + this.useExternalMessageReferences=useExternalMessageReferences; + } + + protected MapContainer getMapContainer(Object id,String containerName) throws IOException{ + MapContainer container=store.getMapContainer(id,containerName); + container.setKeyMarshaller(new StringMarshaller()); + if(useExternalMessageReferences){ + container.setValueMarshaller(new StringMarshaller()); + }else{ + container.setValueMarshaller(new CommandMarshaller(wireFormat)); + } + container.load(); + return container; + } + + /** + * @param usageManager + * The UsageManager that is controlling the broker's memory usage. + */ + public void setUsageManager(UsageManager usageManager){} +} diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java index c72a471d2d..145ca01f73 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTransactionStore.java @@ -43,9 +43,9 @@ import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; public class KahaTransactionStore implements TransactionStore{ private Map transactions=new ConcurrentHashMap(); private Map prepared; - private KahaPersistentAdaptor adaptor; + private KahaPersistenceAdapter adaptor; - KahaTransactionStore(KahaPersistentAdaptor adaptor,Map preparedMap){ + KahaTransactionStore(KahaPersistenceAdapter adaptor,Map preparedMap){ this.adaptor=adaptor; this.prepared=preparedMap; } diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java index 7792383d47..f0d6ce3e4f 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/ft/TransactedTopicMasterSlaveTest.java @@ -22,7 +22,7 @@ import java.net.URISyntaxException; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.JmsTopicTransactionTest; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.kahadaptor.KahaPersistentAdaptor; +import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter; import org.apache.activemq.test.JmsResourceProvider; /** * Test failover for Queues @@ -39,7 +39,7 @@ public class TransactedTopicMasterSlaveTest extends JmsTopicTransactionTest{ // this will create the main (or master broker) broker=createBroker(); broker.start(); - KahaPersistentAdaptor adaptor=new KahaPersistentAdaptor(new File("activemq-data/slave")); + KahaPersistenceAdapter adaptor=new KahaPersistenceAdapter(new File("activemq-data/slave")); slave = new BrokerService(); slave.setBrokerName("slave"); slave.setPersistenceAdapter(adaptor); @@ -66,7 +66,7 @@ public class TransactedTopicMasterSlaveTest extends JmsTopicTransactionTest{ protected BrokerService createBroker() throws Exception,URISyntaxException{ BrokerService broker=new BrokerService(); broker.setBrokerName("master"); - KahaPersistentAdaptor adaptor=new KahaPersistentAdaptor(new File("activemq-data/master")); + KahaPersistenceAdapter adaptor=new KahaPersistenceAdapter(new File("activemq-data/master")); broker.setPersistenceAdapter(adaptor); broker.addConnector("tcp://localhost:62001"); broker.setDeleteAllMessagesOnStartup(true); diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/store/KahaRecoveryBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/store/KahaRecoveryBrokerTest.java index 70cfde766a..b60c015e2e 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/store/KahaRecoveryBrokerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/store/KahaRecoveryBrokerTest.java @@ -22,7 +22,7 @@ import junit.framework.Test; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.RecoveryBrokerTest; -import org.apache.activemq.store.kahadaptor.KahaPersistentAdaptor; +import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter; import org.apache.activemq.xbean.BrokerFactoryBean; import org.springframework.core.io.ClassPathResource; @@ -41,7 +41,7 @@ public class KahaRecoveryBrokerTest extends RecoveryBrokerTest { protected BrokerService createRestartedBroker() throws Exception { BrokerService broker = new BrokerService(); - KahaPersistentAdaptor adaptor = new KahaPersistentAdaptor(new File( System.getProperty("basedir", ".")+"/target/activemq-data/kaha-store.db")); + KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File( System.getProperty("basedir", ".")+"/target/activemq-data/kaha-store.db")); broker.setPersistenceAdapter(adaptor); broker.addConnector("tcp://localhost:0"); return broker; diff --git a/activemq-core/src/test/java/org/apache/activemq/broker/store/KahaXARecoveryBrokerTest.java b/activemq-core/src/test/java/org/apache/activemq/broker/store/KahaXARecoveryBrokerTest.java index 6b3ae3cd70..9e70f278af 100644 --- a/activemq-core/src/test/java/org/apache/activemq/broker/store/KahaXARecoveryBrokerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/broker/store/KahaXARecoveryBrokerTest.java @@ -22,7 +22,7 @@ import junit.framework.Test; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.XARecoveryBrokerTest; -import org.apache.activemq.store.kahadaptor.KahaPersistentAdaptor; +import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter; import org.apache.activemq.xbean.BrokerFactoryBean; import org.springframework.core.io.ClassPathResource; @@ -50,7 +50,7 @@ public class KahaXARecoveryBrokerTest extends XARecoveryBrokerTest { protected BrokerService createRestartedBroker() throws Exception { BrokerService broker = new BrokerService(); - KahaPersistentAdaptor adaptor = new KahaPersistentAdaptor(new File( System.getProperty("basedir", ".")+"/target/activemq-data/kaha-store.db")); + KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File( System.getProperty("basedir", ".")+"/target/activemq-data/kaha-store.db")); broker.setPersistenceAdapter(adaptor); broker.addConnector("tcp://localhost:0"); return broker; diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/KahaDurableTopicTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/KahaDurableTopicTest.java index 5a649b945c..99535b2c09 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/KahaDurableTopicTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/KahaDurableTopicTest.java @@ -19,7 +19,7 @@ package org.apache.activemq.perf; import java.io.File; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.kahadaptor.KahaPersistentAdaptor; +import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter; /** * @version $Revision: 1.3 $ */ @@ -37,7 +37,7 @@ public class KahaDurableTopicTest extends SimpleDurableTopicTest { */ protected void configureBroker(BrokerService answer) throws Exception{ - KahaPersistentAdaptor adaptor = new KahaPersistentAdaptor(new File("activemq-data/perfTest")); + KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File("activemq-data/perfTest")); answer.setPersistenceAdapter(adaptor); answer.addConnector(bindAddress); answer.setDeleteAllMessagesOnStartup(true); @@ -47,4 +47,4 @@ public class KahaDurableTopicTest extends SimpleDurableTopicTest { -} \ No newline at end of file +} diff --git a/activemq-core/src/test/java/org/apache/activemq/perf/KahaQueueTest.java b/activemq-core/src/test/java/org/apache/activemq/perf/KahaQueueTest.java index 32fcb1c63a..b0384bcc48 100644 --- a/activemq-core/src/test/java/org/apache/activemq/perf/KahaQueueTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/perf/KahaQueueTest.java @@ -22,7 +22,7 @@ import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Session; import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.store.kahadaptor.KahaPersistentAdaptor; +import org.apache.activemq.store.kahadaptor.KahaPersistenceAdapter; /** * @version $Revision: 1.3 $ */ @@ -30,10 +30,10 @@ public class KahaQueueTest extends SimpleQueueTest{ protected void configureBroker(BrokerService answer) throws Exception{ - KahaPersistentAdaptor adaptor = new KahaPersistentAdaptor(new File("activemq-data/perfTest")); + KahaPersistenceAdapter adaptor = new KahaPersistenceAdapter(new File("activemq-data/perfTest")); answer.setPersistenceAdapter(adaptor); answer.addConnector(bindAddress); answer.setDeleteAllMessagesOnStartup(true); } -} \ No newline at end of file +} diff --git a/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml b/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml index ca8fc7b16d..c5abf58c17 100755 --- a/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml +++ b/activemq-core/src/test/resources/org/apache/activemq/broker/ft/master.xml @@ -24,7 +24,7 @@ - + diff --git a/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml b/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml index 5ae5687b5b..fd75a4ef43 100755 --- a/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml +++ b/activemq-core/src/test/resources/org/apache/activemq/broker/ft/slave.xml @@ -26,7 +26,7 @@ - + diff --git a/activemq-core/src/test/resources/org/apache/activemq/broker/store/kahabroker.xml b/activemq-core/src/test/resources/org/apache/activemq/broker/store/kahabroker.xml index 161a8cbb5b..3febcf323c 100755 --- a/activemq-core/src/test/resources/org/apache/activemq/broker/store/kahabroker.xml +++ b/activemq-core/src/test/resources/org/apache/activemq/broker/store/kahabroker.xml @@ -24,7 +24,7 @@ - + diff --git a/activemq-core/src/test/resources/org/apache/activemq/perf/kahaBroker.xml b/activemq-core/src/test/resources/org/apache/activemq/perf/kahaBroker.xml index 551217ef4f..97e327b830 100644 --- a/activemq-core/src/test/resources/org/apache/activemq/perf/kahaBroker.xml +++ b/activemq-core/src/test/resources/org/apache/activemq/perf/kahaBroker.xml @@ -23,7 +23,7 @@ - + diff --git a/activemq-perftest/src/main/resources/broker-conf/kaha.xml b/activemq-perftest/src/main/resources/broker-conf/kaha.xml index fe8cd25cd6..230569329b 100644 --- a/activemq-perftest/src/main/resources/broker-conf/kaha.xml +++ b/activemq-perftest/src/main/resources/broker-conf/kaha.xml @@ -3,7 +3,7 @@ - +