mirror of https://github.com/apache/activemq.git
Updated to resolve over -eager clearing of old data files
http://www.nabble.com/ActiveMQ-4.1-hangs-when-using-kaha-persistence-tf2465546.html#a6874103 git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@465391 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
8cf0a390c5
commit
4e98feca92
|
@ -160,14 +160,25 @@ public interface MapContainer extends Map{
|
|||
/**
|
||||
* Get the Key object from it's location
|
||||
* @param keyLocation
|
||||
* @return
|
||||
* @return the key for the entry
|
||||
*/
|
||||
public Object getKey(StoreEntry keyLocation);
|
||||
|
||||
/**
|
||||
* Get the value from it's location
|
||||
* @param Valuelocation
|
||||
* @return
|
||||
* @return the Object
|
||||
*/
|
||||
public Object getValue(StoreEntry Valuelocation);
|
||||
|
||||
/**
|
||||
* Set the internal index map
|
||||
* @param map
|
||||
*/
|
||||
public void setIndexMap(Map map);
|
||||
|
||||
/**
|
||||
* @return the index map
|
||||
*/
|
||||
public Map getIndexMap();
|
||||
}
|
|
@ -60,7 +60,7 @@ class IndexRootContainer {
|
|||
while(nextItem!=Item.POSITION_NOT_SET){
|
||||
StoreEntry item=indexManager.getIndex(nextItem);
|
||||
StoreLocation data=item.getKeyDataItem();
|
||||
Object key=dataManager.readItem(rootMarshaller,data);
|
||||
Object key = dataManager.readItem(rootMarshaller,data);
|
||||
map.put(key,item);
|
||||
list.add(item);
|
||||
nextItem=item.getNextItem();
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.activemq.kaha.impl.container.ContainerId;
|
|||
import org.apache.activemq.kaha.impl.container.ListContainerImpl;
|
||||
import org.apache.activemq.kaha.impl.container.MapContainerImpl;
|
||||
import org.apache.activemq.kaha.impl.data.DataManager;
|
||||
import org.apache.activemq.kaha.impl.data.Item;
|
||||
import org.apache.activemq.kaha.impl.data.RedoListener;
|
||||
import org.apache.activemq.kaha.impl.index.IndexItem;
|
||||
import org.apache.activemq.kaha.impl.index.IndexManager;
|
||||
|
@ -407,6 +408,11 @@ public class KahaStore implements Store{
|
|||
lock();
|
||||
mapsContainer=new IndexRootContainer(mapRoot,rootIndexManager,defaultDM);
|
||||
listsContainer=new IndexRootContainer(listRoot,rootIndexManager,defaultDM);
|
||||
/**
|
||||
* Add interest in data files - then consolidate them
|
||||
*/
|
||||
generateInterestInMapDataFiles();
|
||||
generateInterestInListDataFiles();
|
||||
for(Iterator i=dataManagers.values().iterator();i.hasNext();){
|
||||
DataManager dm=(DataManager)i.next();
|
||||
dm.consolidateDataFiles();
|
||||
|
@ -472,6 +478,50 @@ public class KahaStore implements Store{
|
|||
}
|
||||
return lockSet;
|
||||
}
|
||||
|
||||
/**
|
||||
* scans the directory and builds up the IndexManager and DataManager
|
||||
* @throws IOException
|
||||
*/
|
||||
private void generateInterestInListDataFiles() throws IOException {
|
||||
for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
|
||||
ContainerId id = (ContainerId)i.next();
|
||||
DataManager dm = getDataManager(id.getDataContainerName());
|
||||
IndexManager im = getIndexManager(dm,id.getDataContainerName());
|
||||
IndexItem theRoot=listsContainer.getRoot(im,id);
|
||||
long nextItem=theRoot.getNextItem();
|
||||
while(nextItem!=Item.POSITION_NOT_SET){
|
||||
IndexItem item=im.getIndex(nextItem);
|
||||
item.setOffset(nextItem);
|
||||
dm.addInterestInFile(item.getKeyFile());
|
||||
dm.addInterestInFile(item.getValueFile());
|
||||
nextItem=item.getNextItem();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* scans the directory and builds up the IndexManager and DataManager
|
||||
* @throws IOException
|
||||
*/
|
||||
private void generateInterestInMapDataFiles() throws IOException {
|
||||
for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
|
||||
ContainerId id = (ContainerId)i.next();
|
||||
DataManager dm = getDataManager(id.getDataContainerName());
|
||||
IndexManager im = getIndexManager(dm,id.getDataContainerName());
|
||||
IndexItem theRoot=mapsContainer.getRoot(im,id);
|
||||
long nextItem=theRoot.getNextItem();
|
||||
while(nextItem!=Item.POSITION_NOT_SET){
|
||||
IndexItem item=im.getIndex(nextItem);
|
||||
item.setOffset(nextItem);
|
||||
dm.addInterestInFile(item.getKeyFile());
|
||||
dm.addInterestInFile(item.getValueFile());
|
||||
nextItem=item.getNextItem();
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -50,7 +50,7 @@ public abstract class BaseContainerImpl{
|
|||
protected boolean loaded=false;
|
||||
protected boolean closed=false;
|
||||
protected boolean initialized=false;
|
||||
private String indexType;
|
||||
protected String indexType;
|
||||
|
||||
protected BaseContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,
|
||||
DataManager dataManager,String indexType){
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.HashSet;
|
|||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.apache.activemq.kaha.IndexTypes;
|
||||
import org.apache.activemq.kaha.MapContainer;
|
||||
import org.apache.activemq.kaha.Marshaller;
|
||||
import org.apache.activemq.kaha.RuntimeStoreException;
|
||||
|
@ -47,14 +48,25 @@ import org.apache.commons.logging.LogFactory;
|
|||
public final class MapContainerImpl extends BaseContainerImpl implements MapContainer{
|
||||
|
||||
private static final Log log=LogFactory.getLog(MapContainerImpl.class);
|
||||
protected Map map=new HashMap();
|
||||
protected Map valueToKeyMap=new HashMap();
|
||||
protected Map indexMap;
|
||||
protected Marshaller keyMarshaller=Store.ObjectMarshaller;
|
||||
protected Marshaller valueMarshaller=Store.ObjectMarshaller;
|
||||
|
||||
public MapContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager,String indexType){
|
||||
super(id,root,indexManager,dataManager,indexType);
|
||||
}
|
||||
|
||||
public synchronized void init(){
|
||||
super.init();
|
||||
if(indexMap == null){
|
||||
if(indexType.equals(IndexTypes.DISK_INDEX)){
|
||||
this.indexMap = new HashMap();
|
||||
}else{
|
||||
this.indexMap = new HashMap();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/*
|
||||
* (non-Javadoc)
|
||||
|
@ -73,8 +85,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
IndexItem item=indexManager.getIndex(nextItem);
|
||||
StoreLocation data=item.getKeyDataItem();
|
||||
Object key=dataManager.readItem(keyMarshaller,data);
|
||||
map.put(key,item);
|
||||
valueToKeyMap.put(item,key);
|
||||
indexMap.put(key,item);
|
||||
indexList.add(item);
|
||||
nextItem=item.getNextItem();
|
||||
}
|
||||
|
@ -95,8 +106,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
checkClosed();
|
||||
if(loaded){
|
||||
loaded=false;
|
||||
map.clear();
|
||||
valueToKeyMap.clear();
|
||||
indexMap.clear();
|
||||
indexList.clear();
|
||||
}
|
||||
}
|
||||
|
@ -118,7 +128,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
*/
|
||||
public synchronized int size(){
|
||||
load();
|
||||
return map.size();
|
||||
return indexMap.size();
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -128,7 +138,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
*/
|
||||
public synchronized boolean isEmpty(){
|
||||
load();
|
||||
return map.isEmpty();
|
||||
return indexMap.isEmpty();
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -138,7 +148,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
*/
|
||||
public synchronized boolean containsKey(Object key){
|
||||
load();
|
||||
return map.containsKey(key);
|
||||
return indexMap.containsKey(key);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -150,7 +160,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
load();
|
||||
Object result=null;
|
||||
StoreEntry item=null;
|
||||
item=(StoreEntry)map.get(key);
|
||||
item=(StoreEntry)indexMap.get(key);
|
||||
if(item!=null){
|
||||
result=getValue(item);
|
||||
}
|
||||
|
@ -232,13 +242,9 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
*/
|
||||
public synchronized Object put(Object key,Object value){
|
||||
load();
|
||||
Object result=null;
|
||||
if(map.containsKey(key)){
|
||||
result=remove(key);
|
||||
}
|
||||
Object result=remove(key);;
|
||||
IndexItem item=write(key,value);
|
||||
map.put(key,item);
|
||||
valueToKeyMap.put(item,key);
|
||||
indexMap.put(key,item);
|
||||
indexList.add(item);
|
||||
return result;
|
||||
}
|
||||
|
@ -251,12 +257,11 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
public synchronized Object remove(Object key){
|
||||
load();
|
||||
Object result=null;
|
||||
IndexItem item=(IndexItem)map.get(key);
|
||||
IndexItem item=(IndexItem)indexMap.get(key);
|
||||
if(item!=null){
|
||||
//refresh the index
|
||||
item = (IndexItem)indexList.refreshEntry(item);
|
||||
map.remove(key);
|
||||
valueToKeyMap.remove(item);
|
||||
indexMap.remove(key);
|
||||
result=getValue(item);
|
||||
IndexItem prev=indexList.getPrevEntry(item);
|
||||
IndexItem next=indexList.getNextEntry(item);
|
||||
|
@ -276,7 +281,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
if(value!=null&&value.equals(o)){
|
||||
result=true;
|
||||
// find the key
|
||||
Object key=valueToKeyMap.get(item);
|
||||
Object key=getKey(item);
|
||||
if(key!=null){
|
||||
remove(key);
|
||||
}
|
||||
|
@ -289,7 +294,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
}
|
||||
|
||||
protected void remove(IndexItem item){
|
||||
Object key=valueToKeyMap.get(item);
|
||||
Object key=getKey(item);
|
||||
if(key!=null){
|
||||
remove(key);
|
||||
}
|
||||
|
@ -303,8 +308,9 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
public synchronized void clear(){
|
||||
checkClosed();
|
||||
loaded=true;
|
||||
map.clear();
|
||||
valueToKeyMap.clear();
|
||||
if(indexMap!=null){
|
||||
indexMap.clear();
|
||||
}
|
||||
super.clear();
|
||||
doClear();
|
||||
}
|
||||
|
@ -317,12 +323,11 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
*/
|
||||
public StoreEntry place(Object key, Object value) {
|
||||
load();
|
||||
if(map.containsKey(key)){
|
||||
if(indexMap.containsKey(key)){
|
||||
remove(key);
|
||||
}
|
||||
IndexItem item=write(key,value);
|
||||
map.put(key,item);
|
||||
valueToKeyMap.put(item,key);
|
||||
indexMap.put(key,item);
|
||||
indexList.add(item);
|
||||
return item;
|
||||
}
|
||||
|
@ -336,8 +341,8 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
IndexItem item=(IndexItem)entry;
|
||||
if(item!=null){
|
||||
|
||||
Object key = valueToKeyMap.remove(item);
|
||||
map.remove(key);
|
||||
Object key = getKey(item);
|
||||
indexMap.remove(key);
|
||||
IndexItem prev=indexList.getPrevEntry(item);
|
||||
IndexItem next=indexList.getNextEntry(item);
|
||||
indexList.remove(item);
|
||||
|
@ -347,8 +352,8 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
|
||||
/**
|
||||
* Get the value from it's location
|
||||
* @param Valuelocation
|
||||
* @return
|
||||
* @param item
|
||||
* @return the value associated with the store entry
|
||||
*/
|
||||
public synchronized Object getValue(StoreEntry item){
|
||||
load();
|
||||
|
@ -369,8 +374,8 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
|
||||
/**
|
||||
* Get the Key object from it's location
|
||||
* @param keyLocation
|
||||
* @return
|
||||
* @param item
|
||||
* @return the Key Object associated with the StoreEntry
|
||||
*/
|
||||
public synchronized Object getKey(StoreEntry item){
|
||||
load();
|
||||
|
@ -389,7 +394,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
|
||||
|
||||
protected Set getInternalKeySet(){
|
||||
return new HashSet(map.keySet());
|
||||
return new HashSet(indexMap.keySet());
|
||||
}
|
||||
|
||||
protected IndexLinkedList getItemList(){
|
||||
|
@ -426,4 +431,21 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
}
|
||||
return index;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return
|
||||
* @see org.apache.activemq.kaha.MapContainer#getIndexMap()
|
||||
*/
|
||||
public Map getIndexMap(){
|
||||
return indexMap;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param map
|
||||
* @see org.apache.activemq.kaha.MapContainer#setIndexMap(java.util.Map)
|
||||
*/
|
||||
public void setIndexMap(Map map){
|
||||
indexMap = map;
|
||||
|
||||
}
|
||||
}
|
|
@ -252,9 +252,7 @@ public final class DataManager{
|
|||
}
|
||||
for(int i=0;i<purgeList.size();i++){
|
||||
DataFile dataFile=(DataFile) purgeList.get(i);
|
||||
fileMap.remove(dataFile.getNumber());
|
||||
boolean result=dataFile.delete();
|
||||
log.debug("discarding data file "+dataFile+(result?"successful ":"failed"));
|
||||
removeDataFile(dataFile);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -311,8 +311,8 @@ import org.apache.activemq.kaha.impl.data.Item;
|
|||
}
|
||||
|
||||
public boolean equals(Object obj){
|
||||
boolean result = false;
|
||||
if (obj != null && obj instanceof IndexItem){
|
||||
boolean result = obj == this;
|
||||
if (!result && obj != null && obj instanceof IndexItem){
|
||||
IndexItem other = (IndexItem)obj;
|
||||
result = other.offset == this.offset;
|
||||
}
|
||||
|
|
|
@ -35,8 +35,8 @@ import org.apache.commons.logging.LogFactory;
|
|||
* @version $Revision: 1.1.1.1 $
|
||||
*/
|
||||
public final class IndexManager{
|
||||
public static final String NAME_PREFIX="index-";
|
||||
private static final Log log=LogFactory.getLog(IndexManager.class);
|
||||
private static final String NAME_PREFIX="index-";
|
||||
private final String name;
|
||||
private File directory;
|
||||
private File file;
|
||||
|
|
Loading…
Reference in New Issue