Changed the KahaStore so that it works with set of data and index managers.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@409300 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-05-25 04:30:46 +00:00
parent aa026f1f6a
commit 129f879607
1 changed files with 81 additions and 33 deletions

View File

@ -23,10 +23,12 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.activemq.kaha.Store;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
/**
* Optimized Store writer
@ -34,16 +36,20 @@ import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
* @version $Revision: 1.1.1.1 $
*/
public class KahaStore implements Store{
private static final String DEFAULT_CONTAINER_NAME = "data-container";
private static final String DEFAULT_DATA_CONTAINER_NAME = "kaha-data.";
private static final String DEFAULT_INDEX_CONTAINER_NAME = "kaha-index.";
private File directory;
private DataManager rootData;
private DataManager defaultContainerManager;
private IndexManager indexManager;
private IndexRootContainer mapsContainer;
private IndexRootContainer listsContainer;
private Map lists=new ConcurrentHashMap();
private Map maps=new ConcurrentHashMap();
private Map dataManagers = new ConcurrentHashMap();
private Map indexManagers = new ConcurrentHashMap();
private boolean closed=false;
private String name;
private String mode;
@ -59,18 +65,37 @@ public class KahaStore implements Store{
if(!closed){
closed=true;
if(initialized){
indexManager.close();
rootData.close();
defaultContainerManager.close();
// indexManager.close();
for (Iterator iter = indexManagers.values().iterator(); iter.hasNext();) {
IndexManager im = (IndexManager) iter.next();
im.close();
iter.remove();
}
for (Iterator iter = dataManagers.values().iterator(); iter.hasNext();) {
DataManager dm = (DataManager) iter.next();
dm.close();
iter.remove();
}
}
}
}
public synchronized void force() throws IOException{
if(initialized){
indexManager.force();
rootData.force();
defaultContainerManager.force();
// indexManager.force();
for (Iterator iter = indexManagers.values().iterator(); iter.hasNext();) {
IndexManager im = (IndexManager) iter.next();
im.force();
}
for (Iterator iter = dataManagers.values().iterator(); iter.hasNext();) {
DataManager dm = (DataManager) iter.next();
dm.force();
}
}
}
@ -91,9 +116,20 @@ public class KahaStore implements Store{
public synchronized boolean delete() throws IOException{
initialize();
clear();
boolean result=indexManager.delete();
result&=rootData.delete();
result&=defaultContainerManager.delete();
boolean result=true; //indexManager.delete();
for (Iterator iter = indexManagers.values().iterator(); iter.hasNext();) {
IndexManager im = (IndexManager) iter.next();
result &= im.delete();
iter.remove();
}
for (Iterator iter = dataManagers.values().iterator(); iter.hasNext();) {
DataManager dm = (DataManager) iter.next();
result &= dm.delete();
iter.remove();
}
initialized=false;
return result;
}
@ -104,7 +140,7 @@ public class KahaStore implements Store{
}
public MapContainer getMapContainer(Object id) throws IOException{
return getMapContainer(id, DEFAULT_CONTAINER_NAME);
return getMapContainer(id, DEFAULT_DATA_CONTAINER_NAME);
}
public synchronized MapContainer getMapContainer(Object id, String dataContainerName) throws IOException{
@ -113,11 +149,12 @@ public class KahaStore implements Store{
MapContainer result=(MapContainer) maps.get(id);
if(result==null){
DataManager dm = getDataManager(dataContainerName);
IndexManager im = getIndexManager(DEFAULT_INDEX_CONTAINER_NAME);
ContainerId containerId = new ContainerId();
containerId.setKey(id);
containerId.setDataContainerPrefix(dataContainerName);
IndexItem root=mapsContainer.addRoot(containerId);
result=new MapContainerImpl(containerId,root,indexManager,dm);
result=new MapContainerImpl(containerId,root,im,dm);
maps.put(containerId.getKey(),result);
}
return result;
@ -143,7 +180,7 @@ public class KahaStore implements Store{
}
public ListContainer getListContainer(Object id) throws IOException{
return getListContainer(id,DEFAULT_CONTAINER_NAME);
return getListContainer(id,DEFAULT_DATA_CONTAINER_NAME);
}
public synchronized ListContainer getListContainer(Object id, String dataContainerName) throws IOException{
@ -152,11 +189,12 @@ public class KahaStore implements Store{
ListContainer result=(ListContainer) lists.get(id);
if(result==null){
DataManager dm = getDataManager(dataContainerName);
IndexManager im = getIndexManager(DEFAULT_INDEX_CONTAINER_NAME);
ContainerId containerId = new ContainerId();
containerId.setKey(id);
containerId.setDataContainerPrefix(dataContainerName);
IndexItem root=listsContainer.addRoot(containerId);
result=new ListContainerImpl(containerId,root,indexManager,dm);
result=new ListContainerImpl(containerId,root,im,dm);
lists.put(containerId.getKey(),result);
}
return result;
@ -189,31 +227,30 @@ public class KahaStore implements Store{
initialized=true;
directory=new File(name);
directory.mkdirs();
File ifile=new File(directory,"kaha.idx");
indexManager=new IndexManager(ifile,mode);
rootData=new DataManager(directory,"roots-data");
defaultContainerManager=new DataManager(directory,DEFAULT_CONTAINER_NAME);
dataManagers.put(DEFAULT_CONTAINER_NAME, defaultContainerManager);
DataManager rootData = getDataManager(DEFAULT_DATA_CONTAINER_NAME);
IndexManager rootIndex = getIndexManager(DEFAULT_INDEX_CONTAINER_NAME);
IndexItem mapRoot=new IndexItem();
IndexItem listRoot=new IndexItem();
if(indexManager.isEmpty()){
if(rootIndex.isEmpty()){
mapRoot.setOffset(0);
indexManager.updateIndex(mapRoot);
rootIndex.updateIndex(mapRoot);
listRoot.setOffset(IndexItem.INDEX_SIZE);
indexManager.updateIndex(listRoot);
indexManager.setLength(IndexItem.INDEX_SIZE*2);
rootIndex.updateIndex(listRoot);
rootIndex.setLength(IndexItem.INDEX_SIZE*2);
}else{
mapRoot=indexManager.getIndex(0);
listRoot=indexManager.getIndex(IndexItem.INDEX_SIZE);
mapRoot=rootIndex.getIndex(0);
listRoot=rootIndex.getIndex(IndexItem.INDEX_SIZE);
}
mapsContainer=new IndexRootContainer(mapRoot,indexManager,rootData);
listsContainer=new IndexRootContainer(listRoot,indexManager,rootData);
mapsContainer=new IndexRootContainer(mapRoot,rootIndex,rootData);
listsContainer=new IndexRootContainer(listRoot,rootIndex,rootData);
rootData.consolidateDataFiles();
for(Iterator i=mapsContainer.getKeys().iterator();i.hasNext();){
ContainerId key=(ContainerId) i.next();
DataManager dm = getDataManager(key.getDataContainerPrefix());
IndexItem root=mapsContainer.getRoot(key);
BaseContainerImpl container=new MapContainerImpl(key,root,indexManager,dm);
BaseContainerImpl container=new MapContainerImpl(key,root,rootIndex,dm);
container.expressDataInterest();
maps.put(key.getKey(),container);
}
@ -221,7 +258,7 @@ public class KahaStore implements Store{
ContainerId key=(ContainerId) i.next();
DataManager dm = getDataManager(key.getDataContainerPrefix());
IndexItem root=listsContainer.getRoot(key);
BaseContainerImpl container=new ListContainerImpl(key,root,indexManager,dm);
BaseContainerImpl container=new ListContainerImpl(key,root,rootIndex,dm);
container.expressDataInterest();
lists.put(key.getKey(),container);
}
@ -232,7 +269,7 @@ public class KahaStore implements Store{
}
}
protected DataManager getDataManager(String prefix){
protected DataManager getDataManager(String prefix) throws IOException {
DataManager dm = (DataManager) dataManagers.get(prefix);
if (dm == null){
dm = new DataManager(directory,prefix);
@ -240,4 +277,15 @@ public class KahaStore implements Store{
}
return dm;
}
protected IndexManager getIndexManager(String index_name) throws IOException {
IndexManager im = (IndexManager) indexManagers.get(index_name);
if( im == null ) {
File ifile=new File(directory,index_name+".idx");
im = new IndexManager(ifile,mode);
indexManagers.put(index_name,im);
}
return im;
}
}