diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java b/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java index 36e7a2e850..26507558b4 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java @@ -2,36 +2,36 @@ package org.apache.activemq.kaha; import java.io.IOException; import java.util.Set; - /** -* A Store is holds persistent containers -* -* @version $Revision: 1.2 $ -*/ + * A Store is holds persistent containers + * + * @version $Revision: 1.2 $ + */ public interface Store{ - /** * close the store + * * @throws IOException */ public void close() throws IOException; - - + /** * Force all writes to disk + * * @throws IOException */ public void force() throws IOException; - + /** * empty all the contents of the store + * * @throws IOException */ public void clear() throws IOException; - - + /** * delete the store + * * @return true if the delete was successful * @throws IOException */ @@ -39,22 +39,35 @@ public interface Store{ /** * Checks if a MapContainer exists + * * @param id * @return new MapContainer - * @throws IOException + * @throws IOException */ public boolean doesMapContainerExist(Object id) throws IOException; /** * Get a MapContainer with the given id - the MapContainer is created if needed + * * @param id * @return container for the associated id or null if it doesn't exist - * @throws IOException + * @throws IOException */ - public MapContainer getMapContainer(Object id) throws IOException; + public MapContainer getMapContainer(Object id) throws IOException; + + /** + * Get a MapContainer with the given id - the MapContainer is created if needed + * + * @param id + * @param containerName + * @return container for the associated id or null if it doesn't exist + * @throws IOException + */ + public MapContainer getMapContainer(Object id,String containerName) throws IOException; /** * delete a container + * * @param id * @throws IOException */ @@ -62,40 +75,53 @@ public interface Store{ /** * Get a Set of call MapContainer Ids + * * @return the set of ids - * @throws IOException + * @throws IOException */ public Set getMapContainerIds() throws IOException; - + /** * Checks if a ListContainer exists + * * @param id * @return new MapContainer - * @throws IOException + * @throws IOException */ public boolean doesListContainerExist(Object id) throws IOException; - /** - * Get a ListContainer with the given id and creates it if it doesn't exist - * @param id - * @return container for the associated id or null if it doesn't exist - * @throws IOException - */ - public ListContainer getListContainer(Object id) throws IOException; + /** + * Get a ListContainer with the given id and creates it if it doesn't exist + * + * @param id + * @return container for the associated id or null if it doesn't exist + * @throws IOException + */ + public ListContainer getListContainer(Object id) throws IOException; - /** - * delete a ListContainer - * @param id - * @throws IOException - */ - public void deleteListContainer(Object id) throws IOException; + /** + * Get a ListContainer with the given id and creates it if it doesn't exist + * + * @param id + * @param containerName + * @return container for the associated id or null if it doesn't exist + * @throws IOException + */ + public ListContainer getListContainer(Object id,String containerName) throws IOException; - /** - * Get a Set of call ListContainer Ids - * @return the set of ids - * @throws IOException - */ - public Set getListContainerIds() throws IOException; - - + /** + * delete a ListContainer + * + * @param id + * @throws IOException + */ + public void deleteListContainer(Object id) throws IOException; + + /** + * Get a Set of call ListContainer Ids + * + * @return the set of ids + * @throws IOException + */ + public Set getListContainerIds() throws IOException; } \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java index 510f2759a8..c07783d879 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/BaseContainerImpl.java @@ -30,18 +30,22 @@ public abstract class BaseContainerImpl{ protected IndexLinkedList list; protected IndexManager indexManager; protected DataManager dataManager; - protected Object id; + protected ContainerId containerId; protected boolean loaded=false; protected boolean closed=false; protected final Object mutex=new Object(); - protected BaseContainerImpl(Object id,IndexItem root,IndexManager indexManager,DataManager dataManager){ - this.id=id; + protected BaseContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager){ + this.containerId=id; this.root=root; this.indexManager=indexManager; this.dataManager=dataManager; this.list=new IndexLinkedList(root); } + + ContainerId getContainerId(){ + return containerId; + } public abstract void unload(); @@ -81,7 +85,7 @@ public abstract class BaseContainerImpl{ */ public final Object getId(){ checkClosed(); - return id; + return containerId.getKey(); } protected final void expressDataInterest() throws IOException{ diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerId.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerId.java new file mode 100644 index 0000000000..6915fb3961 --- /dev/null +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ContainerId.java @@ -0,0 +1,80 @@ +/** + * + * Copyright 2005-2006 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.activemq.kaha.impl; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +/** + * Used by RootContainers + * + * @version $Revision: 1.1.1.1 $ + */ +public class ContainerId implements Externalizable{ + private static final long serialVersionUID=-8883779541021821943L; + private Object key; + private String dataContainerPrefix; + + /** + * @return Returns the dataContainerPrefix. + */ + public String getDataContainerPrefix(){ + return dataContainerPrefix; + } + + /** + * @param dataContainerPrefix The dataContainerPrefix to set. + */ + public void setDataContainerPrefix(String dataContainerPrefix){ + this.dataContainerPrefix=dataContainerPrefix; + } + + /** + * @return Returns the key. + */ + public Object getKey(){ + return key; + } + + /** + * @param key The key to set. + */ + public void setKey(Object key){ + this.key=key; + } + + public int hashCode(){ + return key.hashCode(); + } + + public boolean equals(Object obj){ + boolean result = false; + if (obj != null && obj instanceof ContainerId){ + ContainerId other = (ContainerId) obj; + result = other.key.equals(this.key); + } + return result; + } + + public void writeExternal(ObjectOutput out) throws IOException{ + out.writeUTF(getDataContainerPrefix()); + out.writeObject(key); + } + + public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException{ + dataContainerPrefix=in.readUTF(); + key=in.readObject(); + } +} \ No newline at end of file diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java index 1733e7d2cb..337fedd809 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/DataManager.java @@ -65,6 +65,10 @@ final class DataManager{ } } } + + public String getPrefix(){ + return prefix; + } DataFile findSpaceForData(DataItem item) throws IOException{ if(currentWriteFile==null||((currentWriteFile.getLength()+item.getSize())>MAX_FILE_LENGTH)){ @@ -168,7 +172,7 @@ final class DataManager{ DataFile dataFile=(DataFile) purgeList.get(i); fileMap.remove(dataFile.getNumber()); boolean result=dataFile.delete(); - log.info("discarding data file "+dataFile+(result?"successful ":"failed")); + log.debug("discarding data file "+dataFile+(result?"successful ":"failed")); } } @@ -183,6 +187,6 @@ final class DataManager{ private void removeDataFile(DataFile dataFile) throws IOException{ fileMap.remove(dataFile.getNumber()); boolean result=dataFile.delete(); - log.info("discarding data file "+dataFile+(result?"successful ":"failed")); + log.debug("discarding data file "+dataFile+(result?"successful ":"failed")); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java index d944261e54..36189806de 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java @@ -34,13 +34,16 @@ import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap; * @version $Revision: 1.1.1.1 $ */ public class KahaStore implements Store{ - DataManager rootData; - DataManager containersData; - IndexManager indexManager; + private static final String DEFAULT_CONTAINER_NAME = "data-container"; + private File directory; + private DataManager rootData; + private DataManager defaultContainerManager; + private IndexManager indexManager; private IndexRootContainer mapsContainer; private IndexRootContainer listsContainer; private Map lists=new ConcurrentHashMap(); private Map maps=new ConcurrentHashMap(); + private Map dataManagers = new ConcurrentHashMap(); private boolean closed=false; private String name; private String mode; @@ -58,7 +61,7 @@ public class KahaStore implements Store{ if(initialized){ indexManager.close(); rootData.close(); - containersData.close(); + defaultContainerManager.close(); } } } @@ -67,7 +70,7 @@ public class KahaStore implements Store{ if(initialized){ indexManager.force(); rootData.force(); - containersData.force(); + defaultContainerManager.force(); } } @@ -90,7 +93,7 @@ public class KahaStore implements Store{ clear(); boolean result=indexManager.delete(); result&=rootData.delete(); - result&=containersData.delete(); + result&=defaultContainerManager.delete(); initialized=false; return result; } @@ -101,22 +104,31 @@ public class KahaStore implements Store{ } public MapContainer getMapContainer(Object id) throws IOException{ + return getMapContainer(id, DEFAULT_CONTAINER_NAME); + } + + public synchronized MapContainer getMapContainer(Object id, String dataContainerName) throws IOException{ initialize(); + MapContainer result=(MapContainer) maps.get(id); if(result==null){ - IndexItem root=mapsContainer.addRoot(id); - result=new MapContainerImpl(id,root,indexManager,containersData); - maps.put(id,result); + DataManager dm = getDataManager(dataContainerName); + ContainerId containerId = new ContainerId(); + containerId.setKey(id); + containerId.setDataContainerPrefix(dataContainerName); + IndexItem root=mapsContainer.addRoot(containerId); + result=new MapContainerImpl(containerId,root,indexManager,dm); + maps.put(containerId.getKey(),result); } return result; } public void deleteMapContainer(Object id) throws IOException{ initialize(); - MapContainer container=(MapContainer) maps.remove(id); + MapContainerImpl container=(MapContainerImpl) maps.remove(id); if(container!=null){ container.clear(); - mapsContainer.removeRoot(id); + mapsContainer.removeRoot(container.getContainerId()); } } @@ -131,22 +143,31 @@ public class KahaStore implements Store{ } public ListContainer getListContainer(Object id) throws IOException{ + return getListContainer(id,DEFAULT_CONTAINER_NAME); + } + + public synchronized ListContainer getListContainer(Object id, String dataContainerName) throws IOException{ initialize(); + ListContainer result=(ListContainer) lists.get(id); if(result==null){ - IndexItem root=listsContainer.addRoot(id); - result=new ListContainerImpl(id,root,indexManager,containersData); - lists.put(id,result); + DataManager dm = getDataManager(dataContainerName); + ContainerId containerId = new ContainerId(); + containerId.setKey(id); + containerId.setDataContainerPrefix(dataContainerName); + IndexItem root=listsContainer.addRoot(containerId); + result=new ListContainerImpl(containerId,root,indexManager,dm); + lists.put(containerId.getKey(),result); } return result; } public void deleteListContainer(Object id) throws IOException{ initialize(); - ListContainer container=(ListContainer) lists.remove(id); + ListContainerImpl container=(ListContainerImpl) lists.remove(id); if(container!=null){ container.clear(); - listsContainer.removeRoot(id); + listsContainer.removeRoot(container.getContainerId()); } } @@ -164,12 +185,13 @@ public class KahaStore implements Store{ protected synchronized void initialize() throws IOException{ if(!initialized){ initialized=true; - File dir=new File(name); - dir.mkdirs(); - File ifile=new File(dir,"kaha.idx"); + directory=new File(name); + directory.mkdirs(); + File ifile=new File(directory,"kaha.idx"); indexManager=new IndexManager(ifile,mode); - rootData=new DataManager(dir,"roots-data"); - containersData=new DataManager(dir,"containers-data"); + rootData=new DataManager(directory,"roots-data"); + defaultContainerManager=new DataManager(directory,DEFAULT_CONTAINER_NAME); + dataManagers.put(DEFAULT_CONTAINER_NAME, defaultContainerManager); IndexItem mapRoot=new IndexItem(); IndexItem listRoot=new IndexItem(); if(indexManager.isEmpty()){ @@ -186,20 +208,34 @@ public class KahaStore implements Store{ listsContainer=new IndexRootContainer(listRoot,indexManager,rootData); rootData.consolidateDataFiles(); for(Iterator i=mapsContainer.getKeys().iterator();i.hasNext();){ - Object key=i.next(); + ContainerId key=(ContainerId) i.next(); + DataManager dm = getDataManager(key.getDataContainerPrefix()); IndexItem root=mapsContainer.getRoot(key); - BaseContainerImpl container=new MapContainerImpl(key,root,indexManager,containersData); + BaseContainerImpl container=new MapContainerImpl(key,root,indexManager,dm); container.expressDataInterest(); - maps.put(key,container); + maps.put(key.getKey(),container); } for(Iterator i=listsContainer.getKeys().iterator();i.hasNext();){ - Object key=i.next(); + ContainerId key=(ContainerId) i.next(); + DataManager dm = getDataManager(key.getDataContainerPrefix()); IndexItem root=listsContainer.getRoot(key); - BaseContainerImpl container=new ListContainerImpl(key,root,indexManager,containersData); + BaseContainerImpl container=new ListContainerImpl(key,root,indexManager,dm); container.expressDataInterest(); - lists.put(key,container); + lists.put(key.getKey(),container); + } + for (Iterator i = dataManagers.values().iterator(); i.hasNext();){ + DataManager dm = (DataManager) i.next(); + dm.consolidateDataFiles(); } - containersData.consolidateDataFiles(); } } + + protected DataManager getDataManager(String prefix){ + DataManager dm = (DataManager) dataManagers.get(prefix); + if (dm == null){ + dm = new DataManager(directory,prefix); + dataManagers.put(prefix,dm); + } + return dm; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java index b26a461e8f..3f5e6e1fdf 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/ListContainerImpl.java @@ -34,7 +34,7 @@ final class ListContainerImpl extends BaseContainerImpl implements ListContainer private static final Log log=LogFactory.getLog(ListContainerImpl.class); protected Marshaller marshaller=new ObjectMarshaller(); - protected ListContainerImpl(Object id,IndexItem root,IndexManager indexManager,DataManager dataManager) + protected ListContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager) throws IOException{ super(id,root,indexManager,dataManager); } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java index 5cc836a71a..455728dd6c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/MapContainerImpl.java @@ -39,7 +39,7 @@ final class MapContainerImpl extends BaseContainerImpl implements MapContainer{ protected Marshaller keyMarshaller=new ObjectMarshaller(); protected Marshaller valueMarshaller=new ObjectMarshaller(); - protected MapContainerImpl(Object id,IndexItem root,IndexManager indexManager,DataManager dataManager){ + protected MapContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager){ super(id,root,indexManager,dataManager); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java index 083ab4e5ca..b3dde6cb91 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistentAdaptor.java @@ -78,7 +78,7 @@ public class KahaPersistentAdaptor implements PersistenceAdapter{ public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{ MessageStore rc=(MessageStore) queues.get(destination); if(rc==null){ - rc=new KahaMessageStore(getMapContainer(destination),destination); + rc=new KahaMessageStore(getMapContainer(destination,"queue-data"),destination); messageStores.put(destination, rc); if(transactionStore!=null){ rc=transactionStore.proxy(rc); @@ -91,9 +91,9 @@ public class KahaPersistentAdaptor implements PersistenceAdapter{ public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{ TopicMessageStore rc=(TopicMessageStore) topics.get(destination); if(rc==null){ - MapContainer messageContainer=getMapContainer(destination); - MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions"); - MapContainer ackContainer=store.getMapContainer(destination.toString()+"-Acks"); + MapContainer messageContainer=getMapContainer(destination,"topic-data"); + MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs"); + MapContainer ackContainer=store.getMapContainer(destination.toString(),"topic-acks"); ackContainer.setKeyMarshaller(new StringMarshaller()); ackContainer.setValueMarshaller(new AtomicIntegerMarshaller()); rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination); @@ -114,7 +114,7 @@ public class KahaPersistentAdaptor implements PersistenceAdapter{ public TransactionStore createTransactionStore() throws IOException{ if(transactionStore==null){ - MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME); + MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME,"transactions"); container.setKeyMarshaller(new CommandMarshaller(wireFormat)); container.setValueMarshaller(new TransactionMarshaller(wireFormat)); container.load(); @@ -155,8 +155,8 @@ public class KahaPersistentAdaptor implements PersistenceAdapter{ this.useExternalMessageReferences=useExternalMessageReferences; } - protected MapContainer getMapContainer(Object id) throws IOException{ - MapContainer container=store.getMapContainer(id); + protected MapContainer getMapContainer(Object id,String containerName) throws IOException{ + MapContainer container=store.getMapContainer(id,containerName); container.setKeyMarshaller(new StringMarshaller()); if(useExternalMessageReferences){ container.setValueMarshaller(new StringMarshaller()); diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java index 5dd6044daf..c7c29537a1 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java @@ -59,7 +59,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess ackContainer.put(id,new AtomicInteger(subscriberCount)); for(Iterator i=subscriberAcks.keySet().iterator();i.hasNext();){ Object key=i.next(); - ListContainer container=store.getListContainer(key); + ListContainer container=store.getListContainer(key,"durable-subs"); container.add(id); } super.addMessage(context,message); @@ -163,7 +163,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess } protected void addSubscriberAckContainer(Object key) throws IOException{ - ListContainer container=store.getListContainer(key); + ListContainer container=store.getListContainer(key,"topic-subs"); Marshaller marshaller=new StringMarshaller(); container.setMarshaller(marshaller); subscriberAcks.put(key,container);