git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@453123 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-10-05 07:19:35 +00:00
parent 44aece5020
commit 17946e5325
28 changed files with 1611 additions and 869 deletions

View File

@ -17,6 +17,7 @@
*/
package org.apache.activemq.kaha;
import java.io.IOException;
import java.util.List;
import java.util.NoSuchElementException;
/**
@ -116,4 +117,40 @@ public interface ListContainer extends List{
*/
public void clearCache();
/**
* add an Object to the list but get a StoreEntry of its position
* @param object
* @return the entry in the Store
*/
public StoreEntry placeLast(Object object);
/**
* insert an Object in first position int the list but get a StoreEntry of its position
* @param object
* @return the location in the Store
*/
public StoreEntry placeFirst(Object object);
/**
* Advanced feature = must ensure the object written doesn't overwrite other objects in the container
* @param entry
* @param object
*/
public void update(StoreEntry entry, Object object);
/**
* Retrieve an Object from the Store by its location
* @param entry
* @return the Object at that entry
*/
public Object get(StoreEntry entry);
/**
* remove the Object at the StoreEntry
* @param entry
* @return true if successful
*/
public boolean remove(StoreEntry entry);
}

View File

@ -142,4 +142,32 @@ public interface MapContainer extends Map{
* empty the container
*/
public void clear();
/**
* Add an entry to the Store Map
* @param key
* @param Value
* @return the StoreEntry associated with the entry
*/
public StoreEntry place(Object key, Object Value);
/**
* Remove an Entry from ther Map
* @param entry
*/
public void remove(StoreEntry entry);
/**
* Get the Key object from it's location
* @param keyLocation
* @return
*/
public Object getKey(StoreEntry keyLocation);
/**
* Get the value from it's location
* @param Valuelocation
* @return
*/
public Object getValue(StoreEntry Valuelocation);
}

View File

@ -26,10 +26,19 @@ import java.util.Set;
*/
public interface Store{
/**
* Byte Marshaller
*/
public final static Marshaller BytesMarshaller = new BytesMarshaller();
/**
* Object Marshaller
*/
public final static Marshaller ObjectMarshaller = new ObjectMarshaller();
/**
* String Marshaller
*/
public final static Marshaller StringMarshaller = new StringMarshaller();
/**
* close the store
@ -61,7 +70,7 @@ public interface Store{
public boolean delete() throws IOException;
/**
* Checks if a MapContainer exists
* Checks if a MapContainer exists in the default container
*
* @param id
* @return new MapContainer
@ -69,6 +78,16 @@ public interface Store{
*/
public boolean doesMapContainerExist(Object id) throws IOException;
/**
* Checks if a MapContainer exists in the named container
*
* @param id
* @param containerName
* @return new MapContainer
* @throws IOException
*/
public boolean doesMapContainerExist(Object id,String containerName) throws IOException;
/**
* Get a MapContainer with the given id - the MapContainer is created if needed
*
@ -89,13 +108,33 @@ public interface Store{
public MapContainer getMapContainer(Object id,String containerName) throws IOException;
/**
* delete a container
* Get a MapContainer with the given id - the MapContainer is created if needed
*
* @param id
* @param containerName
* @param indexType
* @return container for the associated id or null if it doesn't exist
* @throws IOException
*/
public MapContainer getMapContainer(Object id,String containerName,String indexType) throws IOException;
/**
* delete a container from the default container
*
* @param id
* @throws IOException
*/
public void deleteMapContainer(Object id) throws IOException;
/**
* delete a MapContainer from the name container
*
* @param id
* @param containerName
* @throws IOException
*/
public void deleteMapContainer(Object id,String containerName) throws IOException;
/**
* Get a Set of call MapContainer Ids
*
@ -105,7 +144,7 @@ public interface Store{
public Set getMapContainerIds() throws IOException;
/**
* Checks if a ListContainer exists
* Checks if a ListContainer exists in the default container
*
* @param id
* @return new MapContainer
@ -113,6 +152,16 @@ public interface Store{
*/
public boolean doesListContainerExist(Object id) throws IOException;
/**
* Checks if a ListContainer exists in the named container
*
* @param id
* @param containerName
* @return new MapContainer
* @throws IOException
*/
public boolean doesListContainerExist(Object id,String containerName) throws IOException;
/**
* Get a ListContainer with the given id and creates it if it doesn't exist
*
@ -133,13 +182,34 @@ public interface Store{
public ListContainer getListContainer(Object id,String containerName) throws IOException;
/**
* delete a ListContainer
* Get a ListContainer with the given id and creates it if it doesn't exist
*
* @param id
* @param containerName
* @param indexType
* @return container for the associated id or null if it doesn't exist
* @throws IOException
*/
public ListContainer getListContainer(Object id,String containerName,String indexType) throws IOException;
/**
* delete a ListContainer from the default container
*
* @param id
* @throws IOException
*/
public void deleteListContainer(Object id) throws IOException;
/**
* delete a ListContainer from the named container
*
* @param id
* @param containerName
* @throws IOException
*/
public void deleteListContainer(Object id,String containerName) throws IOException;
/**
* Get a Set of call ListContainer Ids
*
@ -157,4 +227,17 @@ public interface Store{
* @param maxDataFileLength the maxDataFileLength to set
*/
public void setMaxDataFileLength(long maxDataFileLength);
/**
* @see org.apache.activemq.kaha.IndexTypes
* @return the default index type
*/
public String getIndexType();
/**
* Set the default index type
* @param type
* @see org.apache.activemq.kaha.IndexTypes
*/
public void setIndexType(String type);
}

View File

@ -24,7 +24,9 @@ import java.util.Set;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.impl.data.DataItem;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.container.ContainerId;
import org.apache.activemq.kaha.impl.data.DataManager;
import org.apache.activemq.kaha.impl.data.Item;
import org.apache.activemq.kaha.impl.index.IndexItem;
@ -56,8 +58,8 @@ class IndexRootContainer {
this.dataManager=dfm;
long nextItem=root.getNextItem();
while(nextItem!=Item.POSITION_NOT_SET){
IndexItem item=indexManager.getIndex(nextItem);
DataItem data=item.getKeyDataItem();
StoreEntry item=indexManager.getIndex(nextItem);
StoreLocation data=item.getKeyDataItem();
Object key=dataManager.readItem(rootMarshaller,data);
map.put(key,item);
list.add(item);
@ -72,60 +74,68 @@ class IndexRootContainer {
IndexItem addRoot(Object key) throws IOException{
IndexItem addRoot(IndexManager containerIndexManager,ContainerId key) throws IOException{
if (map.containsKey(key)){
removeRoot(key);
removeRoot(containerIndexManager,key);
}
DataItem data = dataManager.storeDataItem(rootMarshaller, key);
IndexItem index = indexManager.createNewIndex();
index.setKeyData(data);
StoreLocation data = dataManager.storeDataItem(rootMarshaller, key);
IndexItem newRoot = indexManager.createNewIndex();
indexManager.updateIndex(newRoot);
index.setValueOffset(newRoot.getOffset());
newRoot.setKeyData(data);
IndexItem containerRoot = containerIndexManager.createNewIndex();
containerIndexManager.storeIndex(containerRoot);
newRoot.setValueOffset(containerRoot.getOffset());
IndexItem last=list.isEmpty()?null:(IndexItem) list.getLast();
last=last==null?root:last;
long prev=last.getOffset();
index.setPreviousItem(prev);
indexManager.updateIndex(index);
last.setNextItem(index.getOffset());
indexManager.updateIndex(last);
map.put(key, index);
list.add(index);
return newRoot;
newRoot.setPreviousItem(prev);
indexManager.storeIndex(newRoot);
last.setNextItem(newRoot.getOffset());
indexManager.storeIndex(last);
map.put(key, newRoot);
list.add(newRoot);
return containerRoot;
}
void removeRoot(Object key) throws IOException{
IndexItem item = (IndexItem) map.remove(key);
if (item != null){
dataManager.removeInterestInFile(item.getKeyFile());
IndexItem rootIndex = indexManager.getIndex(item.getValueOffset());
indexManager.freeIndex(rootIndex);
int index=list.indexOf(item);
IndexItem prev=index>0?(IndexItem) list.get(index-1):root;
void removeRoot(IndexManager containerIndexManager,ContainerId key) throws IOException{
StoreEntry oldRoot=(StoreEntry)map.remove(key);
if(oldRoot!=null){
dataManager.removeInterestInFile(oldRoot.getKeyFile());
// get the container root
IndexItem containerRoot=containerIndexManager.getIndex(oldRoot.getValueOffset());
if(containerRoot!=null){
containerIndexManager.freeIndex(containerRoot);
}
int index=list.indexOf(oldRoot);
IndexItem prev=index>0?(IndexItem)list.get(index-1):root;
prev=prev==null?root:prev;
IndexItem next=index<(list.size()-1)?(IndexItem) list.get(index+1):null;
IndexItem next=index<(list.size()-1)?(IndexItem)list.get(index+1):null;
if(next!=null){
prev.setNextItem(next.getOffset());
next.setPreviousItem(prev.getOffset());
indexManager.updateIndex(next);
indexManager.updateIndexes(next);
}else{
prev.setNextItem(Item.POSITION_NOT_SET);
}
indexManager.updateIndex(prev);
list.remove(item);
indexManager.updateIndexes(prev);
list.remove(oldRoot);
indexManager.freeIndex((IndexItem)oldRoot);
}
}
IndexItem getRoot(Object key) throws IOException{
IndexItem index = (IndexItem) map.get(key);
IndexItem getRoot(IndexManager containerIndexManager,ContainerId key) throws IOException{
StoreEntry index = (StoreEntry) map.get(key);
if (index != null){
return indexManager.getIndex(index.getValueOffset());
return containerIndexManager.getIndex(index.getValueOffset());
}
return null;
}
boolean doesRootExist(Object key){
return map.containsKey(key);
}
}

View File

@ -1,36 +1,42 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.activemq.kaha.impl;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.activemq.kaha.IndexTypes;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.container.BaseContainerImpl;
import org.apache.activemq.kaha.impl.container.ContainerId;
import org.apache.activemq.kaha.impl.container.ListContainerImpl;
import org.apache.activemq.kaha.impl.container.MapContainerImpl;
import org.apache.activemq.kaha.impl.data.DataItem;
import org.apache.activemq.kaha.impl.data.DataManager;
import org.apache.activemq.kaha.impl.data.RedoListener;
import org.apache.activemq.kaha.impl.index.IndexItem;
@ -39,35 +45,39 @@ import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
/**
* Optimized Store writer
* Store Implementation
*
* @version $Revision: 1.1.1.1 $
*/
public class KahaStore implements Store{
private static final String DEFAULT_CONTAINER_NAME = "kaha";
private static final String LOCK_FILE_NAME="store.lock";
private static final String DEFAULT_CONTAINER_NAME="kaha";
private final static String PROPERTY_PREFIX="org.apache.activemq.kaha.Store";
private final static boolean brokenFileLock="true".equals(System.getProperty(PROPERTY_PREFIX+".broken","false"));
private final static boolean disableLocking="true".equals(System.getProperty(PROPERTY_PREFIX+"DisableLocking",
"false"));
private static Set lockSet;
private static final Log log=LogFactory.getLog(KahaStore.class);
private File directory;
protected IndexRootContainer mapsContainer;
public IndexRootContainer listsContainer;
private IndexRootContainer mapsContainer;
private IndexRootContainer listsContainer;
private Map lists=new ConcurrentHashMap();
private Map maps=new ConcurrentHashMap();
private Map dataManagers = new ConcurrentHashMap();
private Map indexManagers = new ConcurrentHashMap();
protected IndexManager rootIndexManager; //contains all the root indexes
private Map dataManagers=new ConcurrentHashMap();
private Map indexManagers=new ConcurrentHashMap();
private IndexManager rootIndexManager; // contains all the root indexes
private boolean closed=false;
private String name;
private String mode;
private boolean initialized;
private boolean logIndexChanges=false;
private long maxDataFileLength = DataManager.MAX_FILE_LENGTH;
private long maxDataFileLength=DataManager.MAX_FILE_LENGTH;
private FileLock lock;
private String indexType=IndexTypes.DISK_INDEX;
public KahaStore(String name,String mode) throws IOException{
this.name=name;
this.mode=mode;
directory=new File(name);
directory.mkdirs();
@ -77,33 +87,29 @@ public class KahaStore implements Store{
if(!closed){
closed=true;
if(initialized){
for (Iterator iter = indexManagers.values().iterator(); iter.hasNext();) {
IndexManager im = (IndexManager) iter.next();
unlock();
for(Iterator iter=indexManagers.values().iterator();iter.hasNext();){
IndexManager im=(IndexManager)iter.next();
im.close();
iter.remove();
}
for (Iterator iter = dataManagers.values().iterator(); iter.hasNext();) {
DataManager dm = (DataManager) iter.next();
for(Iterator iter=dataManagers.values().iterator();iter.hasNext();){
DataManager dm=(DataManager)iter.next();
dm.close();
iter.remove();
}
}
}
}
public synchronized void force() throws IOException{
if(initialized){
for (Iterator iter = indexManagers.values().iterator(); iter.hasNext();) {
IndexManager im = (IndexManager) iter.next();
for(Iterator iter=indexManagers.values().iterator();iter.hasNext();){
IndexManager im=(IndexManager)iter.next();
im.force();
}
for (Iterator iter = dataManagers.values().iterator(); iter.hasNext();) {
DataManager dm = (DataManager) iter.next();
for(Iterator iter=dataManagers.values().iterator();iter.hasNext();){
DataManager dm=(DataManager)iter.next();
dm.force();
}
}
@ -112,34 +118,30 @@ public class KahaStore implements Store{
public synchronized void clear() throws IOException{
initialize();
for(Iterator i=maps.values().iterator();i.hasNext();){
BaseContainerImpl container=(BaseContainerImpl) i.next();
BaseContainerImpl container=(BaseContainerImpl)i.next();
container.clear();
}
for(Iterator i=lists.values().iterator();i.hasNext();){
BaseContainerImpl container=(BaseContainerImpl) i.next();
BaseContainerImpl container=(BaseContainerImpl)i.next();
container.clear();
}
lists.clear();
maps.clear();
}
public synchronized boolean delete() throws IOException{
boolean result=true;
if (initialized){
if(initialized){
clear();
for(Iterator iter=indexManagers.values().iterator();iter.hasNext();){
IndexManager im=(IndexManager) iter.next();
IndexManager im=(IndexManager)iter.next();
result&=im.delete();
iter.remove();
}
for(Iterator iter=dataManagers.values().iterator();iter.hasNext();){
DataManager dm=(DataManager) iter.next();
DataManager dm=(DataManager)iter.next();
result&=dm.delete();
iter.remove();
}
}
if(directory!=null&&directory.isDirectory()){
File[] files=directory.listFiles();
if(files!=null){
@ -150,146 +152,166 @@ public class KahaStore implements Store{
}
}
}
log.info("Kaha Store deleted data directory "+directory);
}
initialized=false;
return result;
}
public boolean doesMapContainerExist(Object id) throws IOException{
return doesMapContainerExist(id,DEFAULT_CONTAINER_NAME);
}
public boolean doesMapContainerExist(Object id,String containerName) throws IOException{
initialize();
return maps.containsKey(id);
ContainerId containerId=new ContainerId();
containerId.setKey(id);
containerId.setDataContainerName(containerName);
return maps.containsKey(containerId)||mapsContainer.doesRootExist(containerId);
}
public MapContainer getMapContainer(Object id) throws IOException{
return getMapContainer(id, DEFAULT_CONTAINER_NAME);
return getMapContainer(id,DEFAULT_CONTAINER_NAME);
}
public synchronized MapContainer getMapContainer(Object id, String dataContainerName) throws IOException{
public MapContainer getMapContainer(Object id,String containerName) throws IOException{
return getMapContainer(id,containerName,indexType);
}
public synchronized MapContainer getMapContainer(Object id,String containerName,String indexType)
throws IOException{
initialize();
MapContainerImpl result=(MapContainerImpl) maps.get(id);
ContainerId containerId=new ContainerId();
containerId.setKey(id);
containerId.setDataContainerName(containerName);
MapContainerImpl result=(MapContainerImpl)maps.get(containerId);
if(result==null){
DataManager dm = getDataManager(dataContainerName);
IndexManager im = getIndexManager(dm, dataContainerName);
ContainerId containerId = new ContainerId();
containerId.setKey(id);
containerId.setDataContainerName(dataContainerName);
IndexItem root=mapsContainer.getRoot(containerId);
if( root == null ) {
root=mapsContainer.addRoot(containerId);
DataManager dm=getDataManager(containerName);
IndexManager im=getIndexManager(dm,containerName);
IndexItem root=mapsContainer.getRoot(im,containerId);
if(root==null){
root=mapsContainer.addRoot(im,containerId);
}
result=new MapContainerImpl(containerId,root,rootIndexManager,im,dm);
result.expressDataInterest();
maps.put(containerId.getKey(),result);
result=new MapContainerImpl(containerId,root,im,dm,indexType);
maps.put(containerId,result);
}
return result;
}
public void deleteMapContainer(Object id) throws IOException{
deleteMapContainer(id,DEFAULT_CONTAINER_NAME);
}
public void deleteMapContainer(Object id,String containerName) throws IOException{
initialize();
MapContainerImpl container=(MapContainerImpl) maps.remove(id);
ContainerId containerId=new ContainerId();
containerId.setKey(id);
containerId.setDataContainerName(containerName);
MapContainerImpl container=(MapContainerImpl)maps.remove(containerId);
if(container!=null){
container.clear();
mapsContainer.removeRoot(container.getContainerId());
mapsContainer.removeRoot(container.getIndexManager(),containerId);
}
}
public Set getMapContainerIds() throws IOException{
initialize();
return maps.keySet();
Set set = new HashSet();
for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
ContainerId id = (ContainerId)i.next();
set.add(id.getKey());
}
return set;
}
public boolean doesListContainerExist(Object id) throws IOException{
return doesListContainerExist(id,DEFAULT_CONTAINER_NAME);
}
public boolean doesListContainerExist(Object id,String containerName) throws IOException{
initialize();
return lists.containsKey(id);
ContainerId containerId=new ContainerId();
containerId.setKey(id);
containerId.setDataContainerName(containerName);
return lists.containsKey(containerId)||listsContainer.doesRootExist(containerId);
}
public ListContainer getListContainer(Object id) throws IOException{
return getListContainer(id,DEFAULT_CONTAINER_NAME);
}
public synchronized ListContainer getListContainer(Object id, String containerName) throws IOException{
initialize();
public ListContainer getListContainer(Object id,String containerName) throws IOException{
return getListContainer(id,containerName,indexType);
}
ListContainerImpl result=(ListContainerImpl) lists.get(id);
public synchronized ListContainer getListContainer(Object id,String containerName,String indexType)
throws IOException{
initialize();
ContainerId containerId=new ContainerId();
containerId.setKey(id);
containerId.setDataContainerName(containerName);
ListContainerImpl result=(ListContainerImpl)lists.get(containerId);
if(result==null){
DataManager dm = getDataManager(containerName);
IndexManager im = getIndexManager(dm, containerName);
ContainerId containerId = new ContainerId();
containerId.setKey(id);
containerId.setDataContainerName(containerName);
IndexItem root=listsContainer.getRoot(containerId);
if( root == null ) {
root=listsContainer.addRoot(containerId);
DataManager dm=getDataManager(containerName);
IndexManager im=getIndexManager(dm,containerName);
IndexItem root=listsContainer.getRoot(im,containerId);
if(root==null){
root=listsContainer.addRoot(im,containerId);
}
result=new ListContainerImpl(containerId,root,rootIndexManager,im,dm);
result.expressDataInterest();
lists.put(containerId.getKey(),result);
result=new ListContainerImpl(containerId,root,im,dm,indexType);
lists.put(containerId,result);
}
return result;
}
public void deleteListContainer(Object id) throws IOException{
deleteListContainer(id,DEFAULT_CONTAINER_NAME);
}
public void deleteListContainer(Object id,String containerName) throws IOException{
initialize();
ListContainerImpl container=(ListContainerImpl) lists.remove(id);
ContainerId containerId=new ContainerId();
containerId.setKey(id);
containerId.setDataContainerName(containerName);
ListContainerImpl container=(ListContainerImpl)lists.remove(containerId);
if(container!=null){
listsContainer.removeRoot(container.getIndexManager(),containerId);
container.clear();
listsContainer.removeRoot(container.getContainerId());
}
}
public Set getListContainerIds() throws IOException{
initialize();
return lists.keySet();
}
protected void checkClosed(){
if(closed){
throw new RuntimeStoreException("The store is closed");
Set set = new HashSet();
for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
ContainerId id = (ContainerId)i.next();
set.add(id.getKey());
}
return set;
}
public synchronized void initialize() throws IOException{
if( closed )
throw new IOException("Store has been closed.");
if(!initialized){
initialized=true;
log.info("Kaha Store using data directory " + directory);
DataManager defaultDM = getDataManager(DEFAULT_CONTAINER_NAME);
rootIndexManager = getIndexManager(defaultDM, DEFAULT_CONTAINER_NAME);
IndexItem mapRoot=new IndexItem();
IndexItem listRoot=new IndexItem();
if(rootIndexManager.isEmpty()){
mapRoot.setOffset(0);
rootIndexManager.updateIndex(mapRoot);
listRoot.setOffset(IndexItem.INDEX_SIZE);
rootIndexManager.updateIndex(listRoot);
rootIndexManager.setLength(IndexItem.INDEX_SIZE*2);
}else{
mapRoot=rootIndexManager.getIndex(0);
listRoot=rootIndexManager.getIndex(IndexItem.INDEX_SIZE);
}
mapsContainer=new IndexRootContainer(mapRoot,rootIndexManager,defaultDM);
listsContainer=new IndexRootContainer(listRoot,rootIndexManager,defaultDM);
for (Iterator i = dataManagers.values().iterator(); i.hasNext();){
DataManager dm = (DataManager) i.next();
dm.consolidateDataFiles();
}
}
/**
* @return the listsContainer
*/
public IndexRootContainer getListsContainer(){
return this.listsContainer;
}
public DataManager getDataManager(String name) throws IOException {
DataManager dm = (DataManager) dataManagers.get(name);
if (dm == null){
dm = new DataManager(directory,name);
/**
* @return the mapsContainer
*/
public IndexRootContainer getMapsContainer(){
return this.mapsContainer;
}
public DataManager getDataManager(String name) throws IOException{
DataManager dm=(DataManager)dataManagers.get(name);
if(dm==null){
dm=new DataManager(directory,name);
dm.setMaxFileLength(maxDataFileLength);
recover(dm);
dataManagers.put(name,dm);
@ -297,32 +319,33 @@ public class KahaStore implements Store{
return dm;
}
public IndexManager getIndexManager(DataManager dm, String name) throws IOException {
IndexManager im = (IndexManager) indexManagers.get(name);
if( im == null ) {
im = new IndexManager(directory,name,mode, logIndexChanges?dm:null);
public IndexManager getIndexManager(DataManager dm,String name) throws IOException{
IndexManager im=(IndexManager)indexManagers.get(name);
if(im==null){
im=new IndexManager(directory,name,mode,logIndexChanges?dm:null);
indexManagers.put(name,im);
}
return im;
}
private void recover(final DataManager dm) throws IOException {
dm.recoverRedoItems( new RedoListener() {
public void onRedoItem(DataItem item, Object o) throws Exception {
RedoStoreIndexItem redo = (RedoStoreIndexItem) o;
//IndexManager im = getIndexManager(dm, redo.getIndexName());
IndexManager im = getIndexManager(dm, dm.getName());
private void recover(final DataManager dm) throws IOException{
dm.recoverRedoItems(new RedoListener(){
public void onRedoItem(StoreLocation item,Object o) throws Exception{
RedoStoreIndexItem redo=(RedoStoreIndexItem)o;
// IndexManager im = getIndexManager(dm, redo.getIndexName());
IndexManager im=getIndexManager(dm,dm.getName());
im.redo(redo);
}
});
}
public boolean isLogIndexChanges() {
public boolean isLogIndexChanges(){
return logIndexChanges;
}
public void setLogIndexChanges(boolean logIndexChanges) {
this.logIndexChanges = logIndexChanges;
public void setLogIndexChanges(boolean logIndexChanges){
this.logIndexChanges=logIndexChanges;
}
/**
@ -333,10 +356,123 @@ public class KahaStore implements Store{
}
/**
* @param maxDataFileLength the maxDataFileLength to set
* @param maxDataFileLength
* the maxDataFileLength to set
*/
public void setMaxDataFileLength(long maxDataFileLength){
this.maxDataFileLength=maxDataFileLength;
}
/**
* @see org.apache.activemq.kaha.IndexTypes
* @return the default index type
*/
public String getIndexType(){
return indexType;
}
/**
* Set the default index type
*
* @param type
* @see org.apache.activemq.kaha.IndexTypes
*/
public void setIndexType(String type){
if(type==null||(!type.equals(IndexTypes.DISK_INDEX)&&!type.equals(IndexTypes.IN_MEMORY_INDEX))){
throw new RuntimeException("Unknown IndexType: "+type);
}
this.indexType=indexType;
}
public synchronized void initialize() throws IOException{
if(closed)
throw new IOException("Store has been closed.");
if(!initialized){
initialized=true;
log.info("Kaha Store using data directory "+directory);
DataManager defaultDM=getDataManager(DEFAULT_CONTAINER_NAME);
rootIndexManager=getIndexManager(defaultDM,DEFAULT_CONTAINER_NAME);
IndexItem mapRoot=new IndexItem();
IndexItem listRoot=new IndexItem();
if(rootIndexManager.isEmpty()){
mapRoot.setOffset(0);
rootIndexManager.storeIndex(mapRoot);
listRoot.setOffset(IndexItem.INDEX_SIZE);
rootIndexManager.storeIndex(listRoot);
rootIndexManager.setLength(IndexItem.INDEX_SIZE*2);
}else{
mapRoot=rootIndexManager.getIndex(0);
listRoot=rootIndexManager.getIndex(IndexItem.INDEX_SIZE);
}
lock();
mapsContainer=new IndexRootContainer(mapRoot,rootIndexManager,defaultDM);
listsContainer=new IndexRootContainer(listRoot,rootIndexManager,defaultDM);
for(Iterator i=dataManagers.values().iterator();i.hasNext();){
DataManager dm=(DataManager)i.next();
dm.consolidateDataFiles();
}
}
}
private synchronized void lock() throws IOException{
if(!disableLocking&&directory!=null&&lock==null){
Set set=getVmLockSet();
synchronized(set){
if(lock==null){
if(!set.add(directory.getCanonicalPath())){
throw new StoreLockedExcpetion("Kaha Store "+directory.getName()
+" is already opened by this application.");
}
if(!brokenFileLock){
lock=rootIndexManager.getLock();
if(lock==null){
set.remove(directory.getCanonicalPath());
throw new StoreLockedExcpetion("Kaha Store "+directory.getName()
+" is already opened by another application");
}
}
}
}
}
}
private void unlock() throws IOException{
if(!disableLocking&&directory!=null){
Set set=getVmLockSet();
synchronized(set){
if(lock!=null){
set.remove(directory.getCanonicalPath());
if(lock.isValid()){
lock.release();
}
lock=null;
}
}
}
}
private void checkClosed(){
if(closed){
throw new RuntimeStoreException("The store is closed");
}
}
static private Set getVmLockSet(){
if(lockSet==null){
Properties properties=System.getProperties();
synchronized(properties){
lockSet=(Set)properties.get("org.apache.activemq.kaha.impl.KahaStore");
if(lockSet==null){
lockSet=new HashSet();
}
properties.put(PROPERTY_PREFIX,lockSet);
}
}
return lockSet;
}
}

View File

@ -1,85 +1,94 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.activemq.kaha.impl.container;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.kaha.IndexTypes;
import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.impl.data.DataManager;
import org.apache.activemq.kaha.impl.data.Item;
import org.apache.activemq.kaha.impl.index.DiskIndexLinkedList;
import org.apache.activemq.kaha.impl.index.IndexItem;
import org.apache.activemq.kaha.impl.index.IndexLinkedList;
import org.apache.activemq.kaha.impl.index.IndexManager;
import org.apache.activemq.kaha.impl.index.VMIndexLinkedList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Implementation of a ListContainer
*
* @version $Revision: 1.2 $
*/
public abstract class BaseContainerImpl{
private static final Log log=LogFactory.getLog(BaseContainerImpl.class);
protected IndexItem root;
protected IndexLinkedList indexList;
protected IndexManager rootIndexManager; // IndexManager that contains the root
protected IndexManager indexManager;
protected DataManager dataManager;
protected ContainerId containerId;
protected boolean loaded=false;
protected boolean closed=false;
protected boolean initialized = false;
protected final Object mutex=new Object();
protected boolean initialized=false;
private String indexType;
protected BaseContainerImpl(ContainerId id,IndexItem root,IndexManager rootIndexManager,IndexManager indexManager,DataManager dataManager){
protected BaseContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,
DataManager dataManager,String indexType){
this.containerId=id;
this.root=root;
this.rootIndexManager = rootIndexManager;
this.indexManager=indexManager;
this.dataManager=dataManager;
this.indexType = indexType;
if (indexType == null || (!indexType.equals(IndexTypes.DISK_INDEX) && !indexType.equals(IndexTypes.IN_MEMORY_INDEX))) {
throw new RuntimeException("Unknown IndexType: " + indexType);
}
}
public ContainerId getContainerId(){
return containerId;
}
public void init(){
if (!initialized){
synchronized(mutex){
if (!initialized){
initialized= true;
if (this.indexList == null){
public synchronized void init(){
if(!initialized){
if(!initialized){
initialized=true;
if(this.indexList==null){
if(indexType.equals(IndexTypes.DISK_INDEX)){
this.indexList=new DiskIndexLinkedList(indexManager,root);
}else{
this.indexList=new VMIndexLinkedList(root);
}
}
}
}
}
public void clear(){
if (indexList != null){
public synchronized void clear(){
if(indexList!=null){
indexList.clear();
}
}
/**
* @return the indexList
*/
@ -100,16 +109,15 @@ public abstract class BaseContainerImpl{
public abstract int size();
protected abstract Object getValue(IndexItem currentItem);
protected abstract Object getValue(StoreEntry currentItem);
protected abstract void remove(IndexItem currentItem);
protected final IndexLinkedList getInternalList(){
protected synchronized final IndexLinkedList getInternalList(){
return indexList;
}
public final void close(){
public synchronized final void close(){
unload();
closed=true;
}
@ -119,7 +127,7 @@ public abstract class BaseContainerImpl{
*
* @see org.apache.activemq.kaha.ListContainer#isLoaded()
*/
public final boolean isLoaded(){
public synchronized final boolean isLoaded(){
checkClosed();
return loaded;
}
@ -134,7 +142,16 @@ public abstract class BaseContainerImpl{
return containerId.getKey();
}
public final void expressDataInterest() throws IOException{
public DataManager getDataManager(){
return dataManager;
}
public IndexManager getIndexManager(){
return indexManager;
}
public synchronized final void expressDataInterest() throws IOException{
long nextItem=root.getNextItem();
while(nextItem!=Item.POSITION_NOT_SET){
IndexItem item=indexManager.getIndex(nextItem);
@ -147,32 +164,29 @@ public abstract class BaseContainerImpl{
protected final void doClear(){
checkClosed();
synchronized(mutex){
loaded=true;
synchronized(mutex){
List indexList=new ArrayList();
try{
long nextItem=root.getNextItem();
while(nextItem!=Item.POSITION_NOT_SET){
IndexItem item=new IndexItem();
item.setOffset(nextItem);
indexList.add(item);
nextItem=item.getNextItem();
}
root.setNextItem(Item.POSITION_NOT_SET);
updateIndex(root);
for(int i=0;i<indexList.size();i++){
IndexItem item=(IndexItem) indexList.get(i);
dataManager.removeInterestInFile(item.getKeyFile());
dataManager.removeInterestInFile(item.getValueFile());
indexManager.freeIndex(item);
}
indexList.clear();
}catch(IOException e){
log.error("Failed to clear Container "+getId(),e);
throw new RuntimeStoreException(e);
}
loaded=true;
List indexList=new ArrayList();
try{
init();
long nextItem=root.getNextItem();
while(nextItem!=Item.POSITION_NOT_SET){
IndexItem item=new IndexItem();
item.setOffset(nextItem);
indexList.add(item);
nextItem=item.getNextItem();
}
root.setNextItem(Item.POSITION_NOT_SET);
storeIndex(root);
for(int i=0;i<indexList.size();i++){
IndexItem item=(IndexItem)indexList.get(i);
dataManager.removeInterestInFile(item.getKeyFile());
dataManager.removeInterestInFile(item.getValueFile());
indexManager.freeIndex(item);
}
indexList.clear();
}catch(IOException e){
log.error("Failed to clear Container "+getId(),e);
throw new RuntimeStoreException(e);
}
}
@ -185,11 +199,11 @@ public abstract class BaseContainerImpl{
if(next!=null){
prev.setNextItem(next.getOffset());
next.setPreviousItem(prev.getOffset());
updateIndex(next);
updateIndexes(next);
}else{
prev.setNextItem(Item.POSITION_NOT_SET);
}
updateIndex(prev);
updateIndexes(prev);
indexManager.freeIndex(key);
}catch(IOException e){
log.error("Failed to delete "+key,e);
@ -203,15 +217,17 @@ public abstract class BaseContainerImpl{
}
}
protected void updateIndex(IndexItem item) throws IOException{
IndexManager im = isRoot(item) ? rootIndexManager : indexManager;
im.updateIndex(item);
protected void storeIndex(IndexItem item) throws IOException{
indexManager.storeIndex(item);
}
protected final boolean isRoot(IndexItem item){
// return item != null && root != null && (root == item || root.getOffset() == item.getOffset());
return item != null && root != null && root == item;
protected void updateIndexes(IndexItem item) throws IOException{
indexManager.updateIndexes(item);
}
protected final boolean isRoot(StoreEntry item){
return item!=null&&root!=null&&(root==item||root.getOffset()==item.getOffset());
// return item != null && indexRoot != null && indexRoot == item;
}

View File

@ -19,7 +19,7 @@ package org.apache.activemq.kaha.impl.container;
import java.util.ListIterator;
import org.apache.activemq.kaha.impl.index.IndexItem;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.impl.index.IndexLinkedList;
/**
@ -31,7 +31,7 @@ public class CachedContainerListIterator implements ListIterator{
protected IndexLinkedList list;
protected int pos = 0;
protected int nextPos =0;
protected IndexItem currentItem;
protected StoreEntry currentItem;
protected CachedContainerListIterator(ListContainerImpl container,int start){
this.container=container;

View File

@ -19,6 +19,7 @@ package org.apache.activemq.kaha.impl.container;
import java.util.ListIterator;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.impl.index.IndexItem;
import org.apache.activemq.kaha.impl.index.IndexLinkedList;
@ -61,7 +62,7 @@ public class ContainerListIterator extends ContainerValueCollectionIterator impl
public int nextIndex(){
int result = -1;
if (nextItem != null){
IndexItem next = list.getNextEntry(nextItem);
StoreEntry next = list.getNextEntry(nextItem);
if (next != null){
result = container.getInternalList().indexOf(next);
}
@ -79,7 +80,7 @@ public class ContainerListIterator extends ContainerValueCollectionIterator impl
public int previousIndex(){
int result = -1;
if (nextItem != null){
IndexItem prev = list.getPrevEntry(nextItem);
StoreEntry prev = list.getPrevEntry(nextItem);
if (prev != null){
result = container.getInternalList().indexOf(prev);
}

View File

@ -1,16 +1,21 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.activemq.kaha.impl.container;
import java.io.IOException;
@ -20,24 +25,26 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.impl.data.DataItem;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.data.DataManager;
import org.apache.activemq.kaha.impl.data.Item;
import org.apache.activemq.kaha.impl.index.IndexItem;
import org.apache.activemq.kaha.impl.index.IndexManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Implementation of a ListContainer
*
* @version $Revision: 1.2 $
*/
public class ListContainerImpl extends BaseContainerImpl implements ListContainer{
private static final Log log=LogFactory.getLog(ListContainerImpl.class);
protected Marshaller marshaller=Store.ObjectMarshaller;
protected LinkedList cacheList=new LinkedList();
@ -45,9 +52,9 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
protected int maximumCacheSize=100;
protected IndexItem lastCached;
public ListContainerImpl(ContainerId id,IndexItem root,IndexManager rootIndexManager,IndexManager indexManager,
DataManager dataManager) throws IOException{
super(id,root,rootIndexManager,indexManager,dataManager);
public ListContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager,String indexType)
throws IOException{
super(id,root,indexManager,dataManager,indexType);
}
/*
@ -55,25 +62,23 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see org.apache.activemq.kaha.ListContainer#load()
*/
public void load(){
public synchronized void load(){
checkClosed();
if(!loaded){
synchronized(mutex){
if(!loaded){
loaded=true;
if(!loaded){
loaded=true;
try{
init();
try{
long nextItem=root.getNextItem();
while(nextItem!=Item.POSITION_NOT_SET){
IndexItem item=indexManager.getIndex(nextItem);
indexList.add(item);
itemAdded(item,indexList.size()-1,getValue(item));
nextItem=item.getNextItem();
}
}catch(IOException e){
log.error("Failed to load container "+getId(),e);
throw new RuntimeStoreException(e);
long nextItem=root.getNextItem();
while(nextItem!=Item.POSITION_NOT_SET){
IndexItem item=indexManager.getIndex(nextItem);
indexList.add(item);
itemAdded(item,indexList.size()-1,getValue(item));
nextItem=item.getNextItem();
}
}catch(IOException e){
log.error("Failed to load container "+getId(),e);
throw new RuntimeStoreException(e);
}
}
}
@ -84,7 +89,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see org.apache.activemq.kaha.ListContainer#unload()
*/
public void unload(){
public synchronized void unload(){
checkClosed();
if(loaded){
loaded=false;
@ -98,26 +103,24 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see org.apache.activemq.kaha.ListContainer#setKeyMarshaller(org.apache.activemq.kaha.Marshaller)
*/
public void setMarshaller(Marshaller marshaller){
public synchronized void setMarshaller(Marshaller marshaller){
checkClosed();
this.marshaller=marshaller;
}
public boolean equals(Object obj){
public synchronized boolean equals(Object obj){
load();
boolean result=false;
if(obj!=null&&obj instanceof List){
List other=(List) obj;
synchronized(mutex){
result=other.size()==size();
if(result){
for(int i=0;i<indexList.size();i++){
Object o1=other.get(i);
Object o2=get(i);
result=o1==o2||(o1!=null&&o2!=null&&o1.equals(o2));
if(!result){
break;
}
List other=(List)obj;
result=other.size()==size();
if(result){
for(int i=0;i<indexList.size();i++){
Object o1=other.get(i);
Object o2=get(i);
result=o1==o2||(o1!=null&&o2!=null&&o1.equals(o2));
if(!result){
break;
}
}
}
@ -130,7 +133,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see org.apache.activemq.kaha.ListContainer#size()
*/
public int size(){
public synchronized int size(){
load();
return indexList.size();
}
@ -140,13 +143,8 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see org.apache.activemq.kaha.ListContainer#addFirst(java.lang.Object)
*/
public void addFirst(Object o){
load();
IndexItem item=writeFirst(o);
synchronized(mutex){
indexList.addFirst(item);
itemAdded(item,0,o);
}
public synchronized void addFirst(Object o){
internalAddFirst(o);
}
/*
@ -154,13 +152,8 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see org.apache.activemq.kaha.ListContainer#addLast(java.lang.Object)
*/
public void addLast(Object o){
load();
IndexItem item=writeLast(o);
synchronized(mutex){
indexList.addLast(item);
itemAdded(item,indexList.size()-1,o);
}
public synchronized void addLast(Object o){
internalAddLast(o);
}
/*
@ -168,21 +161,18 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see org.apache.activemq.kaha.ListContainer#removeFirst()
*/
public Object removeFirst(){
public synchronized Object removeFirst(){
load();
Object result=null;
synchronized(mutex){
IndexItem item=(IndexItem) indexList.getFirst();
if(item!=null){
itemRemoved(0);
result=getValue(item);
int index=indexList.indexOf(item);
IndexItem prev=index>0?(IndexItem) indexList.get(index-1):root;
IndexItem next=index<(indexList.size()-1)?(IndexItem) indexList.get(index+1):null;
indexList.removeFirst();
delete(item,prev,next);
item=null;
}
IndexItem item=(IndexItem)indexList.getFirst();
if(item!=null){
itemRemoved(0);
result=getValue(item);
IndexItem prev=root;
IndexItem next=indexList.size()>1?(IndexItem)indexList.get(1):null;
indexList.removeFirst();
delete(item,prev,next);
item=null;
}
return result;
}
@ -192,19 +182,17 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see org.apache.activemq.kaha.ListContainer#removeLast()
*/
public Object removeLast(){
public synchronized Object removeLast(){
load();
Object result=null;
synchronized(mutex){
IndexItem last=indexList.getLast();
if(last!=null){
itemRemoved(indexList.size()-1);
result=getValue(last);
IndexItem prev=indexList.getPrevEntry(last);
IndexItem next=null;
indexList.removeLast();
delete(last,prev,next);
}
IndexItem last=indexList.getLast();
if(last!=null){
itemRemoved(indexList.size()-1);
result=getValue(last);
IndexItem prev=indexList.getPrevEntry(last);
IndexItem next=null;
indexList.removeLast();
delete(last,prev,next);
}
return result;
}
@ -214,7 +202,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#isEmpty()
*/
public boolean isEmpty(){
public synchronized boolean isEmpty(){
load();
return indexList.isEmpty();
}
@ -224,20 +212,18 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#contains(java.lang.Object)
*/
public boolean contains(Object o){
public synchronized boolean contains(Object o){
load();
boolean result=false;
if(o!=null){
synchronized(mutex){
IndexItem next=indexList.getFirst();
while(next!=null){
Object value=getValue(next);
if(value!=null&&value.equals(o)){
result=true;
break;
}
next=indexList.getNextEntry(next);
IndexItem next=indexList.getFirst();
while(next!=null){
Object value=getValue(next);
if(value!=null&&value.equals(o)){
result=true;
break;
}
next=indexList.getNextEntry(next);
}
}
return result;
@ -248,7 +234,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#iterator()
*/
public Iterator iterator(){
public synchronized Iterator iterator(){
load();
return listIterator();
}
@ -258,16 +244,14 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#toArray()
*/
public Object[] toArray(){
public synchronized Object[] toArray(){
load();
List tmp=new ArrayList(indexList.size());
synchronized(mutex){
IndexItem next=indexList.getFirst();
while(next!=null){
Object value=getValue(next);
tmp.add(value);
next=indexList.getNextEntry(next);
}
IndexItem next=indexList.getFirst();
while(next!=null){
Object value=getValue(next);
tmp.add(value);
next=indexList.getNextEntry(next);
}
return tmp.toArray();
}
@ -277,16 +261,14 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#toArray(T[])
*/
public Object[] toArray(Object[] a){
public synchronized Object[] toArray(Object[] a){
load();
List tmp=new ArrayList(indexList.size());
synchronized(mutex){
IndexItem next=indexList.getFirst();
while(next!=null){
Object value=getValue(next);
tmp.add(value);
next=indexList.getNextEntry(next);
}
IndexItem next=indexList.getFirst();
while(next!=null){
Object value=getValue(next);
tmp.add(value);
next=indexList.getNextEntry(next);
}
return tmp.toArray(a);
}
@ -296,7 +278,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#add(E)
*/
public boolean add(Object o){
public synchronized boolean add(Object o){
load();
addLast(o);
return true;
@ -307,34 +289,30 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#remove(java.lang.Object)
*/
public boolean remove(Object o){
public synchronized boolean remove(Object o){
load();
boolean result=false;
synchronized(mutex){
int pos=0;
IndexItem next=indexList.getFirst();
while(next!=null){
Object value=getValue(next);
if(value!=null&&value.equals(o)){
remove(next);
itemRemoved(pos);
result=true;
break;
}
next=indexList.getNextEntry(next);
pos++;
int pos=0;
IndexItem next=indexList.getFirst();
while(next!=null){
Object value=getValue(next);
if(value!=null&&value.equals(o)){
remove(next);
itemRemoved(pos);
result=true;
break;
}
next=indexList.getNextEntry(next);
pos++;
}
return result;
}
protected void remove(IndexItem item){
synchronized(mutex){
IndexItem prev=indexList.getPrevEntry(item);
IndexItem next=indexList.getNextEntry(item);
indexList.remove(item);
delete(item,prev,next);
}
IndexItem prev=indexList.getPrevEntry(item);
IndexItem next=indexList.getNextEntry(item);
indexList.remove(item);
delete(item,prev,next);
}
/*
@ -342,16 +320,14 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#containsAll(java.util.Collection)
*/
public boolean containsAll(Collection c){
public synchronized boolean containsAll(Collection c){
load();
boolean result=false;
synchronized(mutex){
for(Iterator i=c.iterator();i.hasNext();){
Object obj=i.next();
if(!(result=contains(obj))){
result=false;
break;
}
for(Iterator i=c.iterator();i.hasNext();){
Object obj=i.next();
if(!(result=contains(obj))){
result=false;
break;
}
}
return result;
@ -362,9 +338,8 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#addAll(java.util.Collection)
*/
public boolean addAll(Collection c){
public synchronized boolean addAll(Collection c){
load();
boolean result=false;
for(Iterator i=c.iterator();i.hasNext();){
add(i.next());
}
@ -376,7 +351,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#addAll(int, java.util.Collection)
*/
public boolean addAll(int index,Collection c){
public synchronized boolean addAll(int index,Collection c){
load();
boolean result=false;
ListIterator e1=listIterator(index);
@ -393,7 +368,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#removeAll(java.util.Collection)
*/
public boolean removeAll(Collection c){
public synchronized boolean removeAll(Collection c){
load();
boolean result=true;
for(Iterator i=c.iterator();i.hasNext();){
@ -408,18 +383,16 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#retainAll(java.util.Collection)
*/
public boolean retainAll(Collection c){
public synchronized boolean retainAll(Collection c){
load();
List tmpList=new ArrayList();
synchronized(mutex){
IndexItem next=indexList.getFirst();
while(next!=null){
Object o=getValue(next);
if(!c.contains(o)){
tmpList.add(o);
}
next=indexList.getNextEntry(next);
IndexItem next=indexList.getFirst();
while(next!=null){
Object o=getValue(next);
if(!c.contains(o)){
tmpList.add(o);
}
next=indexList.getNextEntry(next);
}
for(Iterator i=tmpList.iterator();i.hasNext();){
remove(i.next());
@ -432,13 +405,11 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#clear()
*/
public void clear(){
public synchronized void clear(){
checkClosed();
synchronized(mutex){
super.clear();
doClear();
clearCache();
}
super.clear();
doClear();
clearCache();
}
/*
@ -446,7 +417,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#get(int)
*/
public Object get(int index){
public synchronized Object get(int index){
load();
return getCachedItem(index);
}
@ -459,29 +430,25 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
public Object set(int index,Object element){
load();
Object result=null;
synchronized(mutex){
IndexItem replace=indexList.isEmpty()?null:(IndexItem) indexList.get(index);
IndexItem prev=(indexList.isEmpty()||(index-1)<0)?null:(IndexItem) indexList.get(index-1);
IndexItem next=(indexList.isEmpty()||(index+1)>=size())?null:(IndexItem) indexList.get(index+1);
result=getValue(replace);
indexList.remove(index);
delete(replace,prev,next);
itemRemoved(index);
add(index,element);
}
IndexItem replace=indexList.isEmpty()?null:(IndexItem)indexList.get(index);
IndexItem prev=(indexList.isEmpty()||(index-1)<0)?null:(IndexItem)indexList.get(index-1);
IndexItem next=(indexList.isEmpty()||(index+1)>=size())?null:(IndexItem)indexList.get(index+1);
result=getValue(replace);
indexList.remove(index);
delete(replace,prev,next);
itemRemoved(index);
add(index,element);
return result;
}
protected IndexItem internalSet(int index,Object element){
synchronized(mutex){
IndexItem replace=indexList.isEmpty()?null:(IndexItem) indexList.get(index);
IndexItem prev=(indexList.isEmpty()||(index-1)<0)?null:(IndexItem) indexList.get(index-1);
IndexItem next=(indexList.isEmpty()||(index+1)>=size())?null:(IndexItem) indexList.get(index+1);
indexList.remove(index);
delete(replace,prev,next);
itemRemoved(index);
return internalAdd(index,element);
}
IndexItem replace=indexList.isEmpty()?null:(IndexItem)indexList.get(index);
IndexItem prev=(indexList.isEmpty()||(index-1)<0)?null:(IndexItem)indexList.get(index-1);
IndexItem next=(indexList.isEmpty()||(index+1)>=size())?null:(IndexItem)indexList.get(index+1);
indexList.remove(index);
delete(replace,prev,next);
itemRemoved(index);
return internalAdd(index,element);
}
/*
@ -489,29 +456,43 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#add(int, E)
*/
public void add(int index,Object element){
public synchronized void add(int index,Object element){
load();
synchronized(mutex){
IndexItem item=insert(index,element);
indexList.add(index,item);
itemAdded(item,index,element);
}
IndexItem item=insert(index,element);
indexList.add(index,item);
itemAdded(item,index,element);
}
protected StoreEntry internalAddLast(Object o) {
load();
IndexItem item=writeLast(o);
indexList.addLast(item);
itemAdded(item,indexList.size()-1,o);
return item;
}
protected StoreEntry internalAddFirst(Object o){
load();
IndexItem item=writeFirst(o);
indexList.addFirst(item);
itemAdded(item,0,o);
return item;
}
protected IndexItem internalAdd(int index,Object element){
synchronized(mutex){
IndexItem item=insert(index,element);
indexList.add(index,item);
itemAdded(item,index,element);
return item;
}
load();
IndexItem item=insert(index,element);
indexList.add(index,item);
itemAdded(item,index,element);
return item;
}
protected IndexItem internalGet(int index){
synchronized(mutex){
if(index>=0&&index<indexList.size()){
return indexList.get(index);
}
protected StoreEntry internalGet(int index){
load();
if(index>=0&&index<indexList.size()){
return indexList.get(index);
}
return null;
}
@ -521,20 +502,18 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see org.apache.activemq.kaha.ListContainer#doRemove(int)
*/
public boolean doRemove(int index){
public synchronized boolean doRemove(int index){
load();
boolean result=false;
synchronized(mutex){
IndexItem item=indexList.get(index);
if(item!=null){
result=true;
IndexItem prev=indexList.getPrevEntry(item);
prev=prev!=null?prev:root;
IndexItem next=indexList.getNextEntry(prev);
indexList.remove(index);
itemRemoved(index);
delete(item,prev,next);
}
IndexItem item=indexList.get(index);
if(item!=null){
result=true;
IndexItem prev=indexList.getPrevEntry(item);
prev=prev!=null?prev:root;
IndexItem next=indexList.getNextEntry(prev);
indexList.remove(index);
itemRemoved(index);
delete(item,prev,next);
}
return result;
}
@ -544,20 +523,18 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#remove(int)
*/
public Object remove(int index){
public synchronized Object remove(int index){
load();
Object result=null;
synchronized(mutex){
IndexItem item=indexList.get(index);
if(item!=null){
itemRemoved(index);
result=getValue(item);
IndexItem prev=indexList.getPrevEntry(item);
prev=prev!=null?prev:root;
IndexItem next=indexList.getNextEntry(item);
indexList.remove(index);
delete(item,prev,next);
}
IndexItem item=indexList.get(index);
if(item!=null){
itemRemoved(index);
result=getValue(item);
IndexItem prev=indexList.getPrevEntry(item);
prev=prev!=null?prev:root;
IndexItem next=indexList.getNextEntry(item);
indexList.remove(index);
delete(item,prev,next);
}
return result;
}
@ -567,22 +544,20 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#indexOf(java.lang.Object)
*/
public int indexOf(Object o){
public synchronized int indexOf(Object o){
load();
int result=-1;
if(o!=null){
synchronized(mutex){
int count=0;
IndexItem next=indexList.getFirst();
while(next!=null){
Object value=getValue(next);
if(value!=null&&value.equals(o)){
result=count;
break;
}
count++;
next=indexList.getNextEntry(next);
int count=0;
IndexItem next=indexList.getFirst();
while(next!=null){
Object value=getValue(next);
if(value!=null&&value.equals(o)){
result=count;
break;
}
count++;
next=indexList.getNextEntry(next);
}
}
return result;
@ -593,22 +568,20 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#lastIndexOf(java.lang.Object)
*/
public int lastIndexOf(Object o){
public synchronized int lastIndexOf(Object o){
load();
int result=-1;
if(o!=null){
synchronized(mutex){
int count=indexList.size()-1;
IndexItem next=indexList.getLast();
while(next!=null){
Object value=getValue(next);
if(value!=null&&value.equals(o)){
result=count;
break;
}
count--;
next=indexList.getPrevEntry(next);
int count=indexList.size()-1;
IndexItem next=indexList.getLast();
while(next!=null){
Object value=getValue(next);
if(value!=null&&value.equals(o)){
result=count;
break;
}
count--;
next=indexList.getPrevEntry(next);
}
}
return result;
@ -619,7 +592,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#listIterator()
*/
public ListIterator listIterator(){
public synchronized ListIterator listIterator(){
load();
return new CachedContainerListIterator(this,0);
}
@ -629,7 +602,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#listIterator(int)
*/
public ListIterator listIterator(int index){
public synchronized ListIterator listIterator(int index){
load();
return new CachedContainerListIterator(this,index);
}
@ -639,7 +612,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*
* @see java.util.List#subList(int, int)
*/
public List subList(int fromIndex,int toIndex){
public synchronized List subList(int fromIndex,int toIndex){
load();
List result=new ArrayList();
int count=fromIndex;
@ -651,11 +624,74 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
return result;
}
/**
* add an Object to the list but get a StoreEntry of its position
* @param object
* @return the entry in the Store
*/
public synchronized StoreEntry placeLast(Object object) {
StoreEntry item = internalAddLast(object);
return item;
}
/**
* insert an Object in first position int the list but get a StoreEntry of its position
* @param object
* @return the location in the Store
*/
public synchronized StoreEntry placeFirst(Object object) {
StoreEntry item = internalAddFirst(object);
return item;
}
/**
* @param entry
* @param object
* @see org.apache.activemq.kaha.ListContainer#update(org.apache.activemq.kaha.StoreEntry, java.lang.Object)
*/
public void update(StoreEntry entry,Object object){
try{
dataManager.updateItem(entry.getValueDataItem(),marshaller, object);
}catch(IOException e){
throw new RuntimeException(e);
}
}
/**
* Retrieve an Object from the Store by its location
* @param entry
* @return the Object at that entry
*/
public synchronized Object get(StoreEntry entry) {
load();
return getValue(entry);
}
/**
* remove the Object at the StoreEntry
* @param entry
* @return true if successful
*/
public synchronized boolean remove(StoreEntry entry) {
IndexItem item = (IndexItem)entry;
load();
boolean result = false;
if(item!=null){
clearCache();
IndexItem prev=indexList.getPrevEntry(item);
prev=prev!=null?prev:root;
IndexItem next=indexList.getNextEntry(item);
delete(item,prev,next);
}
return result;
}
protected IndexItem writeLast(Object value){
IndexItem index=null;
try{
if(value!=null){
DataItem data=dataManager.storeDataItem(marshaller,value);
StoreLocation data=dataManager.storeDataItem(marshaller,value);
index=indexManager.createNewIndex();
index.setValueData(data);
IndexItem prev=indexList.getLast();
@ -663,13 +699,13 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
IndexItem next=indexList.getNextEntry(prev);
prev.setNextItem(index.getOffset());
index.setPreviousItem(prev.getOffset());
updateIndex(prev);
updateIndexes(prev);
if(next!=null){
next.setPreviousItem(index.getOffset());
index.setNextItem(next.getOffset());
updateIndex(next);
updateIndexes(next);
}
updateIndex(index);
storeIndex(index);
}
}catch(IOException e){
log.error("Failed to write "+value,e);
@ -682,20 +718,20 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
IndexItem index=null;
try{
if(value!=null){
DataItem data=dataManager.storeDataItem(marshaller,value);
StoreLocation data=dataManager.storeDataItem(marshaller,value);
index=indexManager.createNewIndex();
index.setValueData(data);
IndexItem prev=root;
IndexItem next=indexList.getNextEntry(prev);
prev.setNextItem(index.getOffset());
index.setPreviousItem(prev.getOffset());
updateIndex(prev);
updateIndexes(prev);
if(next!=null){
next.setPreviousItem(index.getOffset());
index.setNextItem(next.getOffset());
updateIndex(next);
updateIndexes(next);
}
updateIndex(index);
storeIndex(index);
}
}catch(IOException e){
log.error("Failed to write "+value,e);
@ -705,11 +741,10 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
}
protected IndexItem insert(int insertPos,Object value){
long pos=Item.POSITION_NOT_SET;
IndexItem index=null;
try{
if(value!=null){
DataItem data=dataManager.storeDataItem(marshaller,value);
StoreLocation data=dataManager.storeDataItem(marshaller,value);
index=indexManager.createNewIndex();
index.setValueData(data);
IndexItem prev=null;
@ -727,13 +762,13 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
}
prev.setNextItem(index.getOffset());
index.setPreviousItem(prev.getOffset());
updateIndex(prev);
updateIndexes(prev);
if(next!=null){
next.setPreviousItem(index.getOffset());
index.setNextItem(next.getOffset());
updateIndex(next);
updateIndexes(next);
}
updateIndex(index);
storeIndex(index);
}
}catch(IOException e){
log.error("Failed to insert "+value,e);
@ -742,11 +777,13 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
return index;
}
protected Object getValue(IndexItem item){
protected Object getValue(StoreEntry item){
Object result=null;
if(item!=null){
try{
DataItem data=item.getValueDataItem();
// ensure it's up to date
//item=indexList.getEntry(item);
StoreLocation data=item.getValueDataItem();
result=dataManager.readItem(marshaller,data);
}catch(IOException e){
log.error("Failed to get value for "+item,e);
@ -759,7 +796,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
/**
* @return a string representation of this collection.
*/
public String toString(){
public synchronized String toString(){
StringBuffer result=new StringBuffer();
result.append("[");
Iterator i=iterator();
@ -852,7 +889,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
/**
* clear any cached values
*/
public void clearCache(){
public synchronized void clearCache(){
cacheList.clear();
offset=0;
lastCached=null;
@ -861,56 +898,60 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
/**
* @return the cacheList
*/
public LinkedList getCacheList(){
public synchronized LinkedList getCacheList(){
return cacheList;
}
/**
* @param cacheList the cacheList to set
* @param cacheList
* the cacheList to set
*/
public void setCacheList(LinkedList cacheList){
public synchronized void setCacheList(LinkedList cacheList){
this.cacheList=cacheList;
}
/**
* @return the lastCached
*/
public IndexItem getLastCached(){
public synchronized StoreEntry getLastCached(){
return lastCached;
}
/**
* @param lastCached the lastCached to set
* @param lastCached
* the lastCached to set
*/
public void setLastCached(IndexItem lastCached){
public synchronized void setLastCached(IndexItem lastCached){
this.lastCached=lastCached;
}
/**
* @return the maximumCacheSize
*/
public int getMaximumCacheSize(){
public synchronized int getMaximumCacheSize(){
return maximumCacheSize;
}
/**
* @param maximumCacheSize the maximumCacheSize to set
* @param maximumCacheSize
* the maximumCacheSize to set
*/
public void setMaximumCacheSize(int maximumCacheSize){
public synchronized void setMaximumCacheSize(int maximumCacheSize){
this.maximumCacheSize=maximumCacheSize;
}
/**
* @return the offset
*/
public int getOffset(){
public synchronized int getOffset(){
return offset;
}
/**
* @param offset the offset to set
* @param offset
* the offset to set
*/
public void setOffset(int offset){
public synchronized void setOffset(int offset){
this.offset=offset;
}

View File

@ -1,20 +1,21 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.activemq.kaha.impl.container;
import java.io.IOException;
@ -26,10 +27,10 @@ import java.util.Map;
import java.util.Set;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.ObjectMarshaller;
import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.impl.data.DataItem;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.data.DataManager;
import org.apache.activemq.kaha.impl.data.Item;
import org.apache.activemq.kaha.impl.index.IndexItem;
@ -37,20 +38,22 @@ import org.apache.activemq.kaha.impl.index.IndexLinkedList;
import org.apache.activemq.kaha.impl.index.IndexManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Implementation of a MapContainer
*
* @version $Revision: 1.2 $
*/
public final class MapContainerImpl extends BaseContainerImpl implements MapContainer{
private static final Log log=LogFactory.getLog(MapContainerImpl.class);
protected Map map=new HashMap();
protected Map valueToKeyMap=new HashMap();
protected Marshaller keyMarshaller= Store.ObjectMarshaller;
protected Marshaller keyMarshaller=Store.ObjectMarshaller;
protected Marshaller valueMarshaller=Store.ObjectMarshaller;
public MapContainerImpl(ContainerId id,IndexItem root,IndexManager rootIndexManager,IndexManager indexManager,DataManager dataManager){
super(id,root,rootIndexManager,indexManager,dataManager);
public MapContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager,String indexType){
super(id,root,indexManager,dataManager,indexType);
}
/*
@ -58,32 +61,29 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
*
* @see org.apache.activemq.kaha.MapContainer#load()
*/
public void load(){
public synchronized void load(){
checkClosed();
if(!loaded){
synchronized(mutex){
if(!loaded){
if(!loaded){
loaded=true;
try{
init();
loaded=true;
try{
long nextItem=root.getNextItem();
while(nextItem!=Item.POSITION_NOT_SET){
IndexItem item=indexManager.getIndex(nextItem);
DataItem data=item.getKeyDataItem();
Object key=dataManager.readItem(keyMarshaller,data);
map.put(key,item);
valueToKeyMap.put(item,key);
indexList.add(item);
nextItem=item.getNextItem();
}
}catch(IOException e){
log.error("Failed to load container "+getId(),e);
throw new RuntimeStoreException(e);
long nextItem=root.getNextItem();
while(nextItem!=Item.POSITION_NOT_SET){
IndexItem item=indexManager.getIndex(nextItem);
StoreLocation data=item.getKeyDataItem();
Object key=dataManager.readItem(keyMarshaller,data);
map.put(key,item);
valueToKeyMap.put(item,key);
indexList.add(item);
nextItem=item.getNextItem();
}
}catch(IOException e){
log.error("Failed to load container "+getId(),e);
throw new RuntimeStoreException(e);
}
}
}
}
/*
@ -91,24 +91,22 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
*
* @see org.apache.activemq.kaha.MapContainer#unload()
*/
public void unload(){
public synchronized void unload(){
checkClosed();
if(loaded){
loaded=false;
synchronized(mutex){
map.clear();
valueToKeyMap.clear();
indexList.clear();
}
map.clear();
valueToKeyMap.clear();
indexList.clear();
}
}
public void setKeyMarshaller(Marshaller keyMarshaller){
public synchronized void setKeyMarshaller(Marshaller keyMarshaller){
checkClosed();
this.keyMarshaller=keyMarshaller;
}
public void setValueMarshaller(Marshaller valueMarshaller){
public synchronized void setValueMarshaller(Marshaller valueMarshaller){
checkClosed();
this.valueMarshaller=valueMarshaller;
}
@ -118,7 +116,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
*
* @see org.apache.activemq.kaha.MapContainer#size()
*/
public int size(){
public synchronized int size(){
load();
return map.size();
}
@ -128,7 +126,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
*
* @see org.apache.activemq.kaha.MapContainer#isEmpty()
*/
public boolean isEmpty(){
public synchronized boolean isEmpty(){
load();
return map.isEmpty();
}
@ -138,11 +136,9 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
*
* @see org.apache.activemq.kaha.MapContainer#containsKey(java.lang.Object)
*/
public boolean containsKey(Object key){
public synchronized boolean containsKey(Object key){
load();
synchronized(mutex){
return map.containsKey(key);
}
return map.containsKey(key);
}
/*
@ -150,13 +146,11 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
*
* @see org.apache.activemq.kaha.MapContainer#get(java.lang.Object)
*/
public Object get(Object key){
public synchronized Object get(Object key){
load();
Object result=null;
IndexItem item=null;
synchronized(mutex){
item=(IndexItem) map.get(key);
}
StoreEntry item=null;
item=(StoreEntry)map.get(key);
if(item!=null){
result=getValue(item);
}
@ -168,20 +162,18 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
*
* @see org.apache.activemq.kaha.MapContainer#containsValue(java.lang.Object)
*/
public boolean containsValue(Object o){
public synchronized boolean containsValue(Object o){
load();
boolean result=false;
if(o!=null){
synchronized(indexList){
IndexItem item=indexList.getFirst();
while(item!=null){
Object value=getValue(item);
if(value!=null&&value.equals(o)){
result=true;
break;
}
item=indexList.getNextEntry(item);
IndexItem item=indexList.getFirst();
while(item!=null){
Object value=getValue(item);
if(value!=null&&value.equals(o)){
result=true;
break;
}
item=indexList.getNextEntry(item);
}
}
return result;
@ -192,14 +184,12 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
*
* @see org.apache.activemq.kaha.MapContainer#putAll(java.util.Map)
*/
public void putAll(Map t){
public synchronized void putAll(Map t){
load();
if(t!=null){
synchronized(mutex){
for(Iterator i=t.entrySet().iterator();i.hasNext();){
Map.Entry entry=(Map.Entry) i.next();
put(entry.getKey(),entry.getValue());
}
for(Iterator i=t.entrySet().iterator();i.hasNext();){
Map.Entry entry=(Map.Entry)i.next();
put(entry.getKey(),entry.getValue());
}
}
}
@ -209,7 +199,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
*
* @see org.apache.activemq.kaha.MapContainer#keySet()
*/
public Set keySet(){
public synchronized Set keySet(){
load();
return new ContainerKeySet(this);
}
@ -219,7 +209,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
*
* @see org.apache.activemq.kaha.MapContainer#values()
*/
public Collection values(){
public synchronized Collection values(){
load();
return new ContainerValueCollection(this);
}
@ -229,7 +219,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
*
* @see org.apache.activemq.kaha.MapContainer#entrySet()
*/
public Set entrySet(){
public synchronized Set entrySet(){
load();
return new ContainerEntrySet(this);
}
@ -237,20 +227,19 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.MapContainer#put(java.lang.Object, java.lang.Object)
* @see org.apache.activemq.kaha.MapContainer#put(java.lang.Object,
* java.lang.Object)
*/
public Object put(Object key,Object value){
public synchronized Object put(Object key,Object value){
load();
Object result=null;
synchronized(mutex){
if(map.containsKey(key)){
result=remove(key);
}
IndexItem item=write(key,value);
map.put(key,item);
valueToKeyMap.put(item,key);
indexList.add(item);
if(map.containsKey(key)){
result=remove(key);
}
IndexItem item=write(key,value);
map.put(key,item);
valueToKeyMap.put(item,key);
indexList.add(item);
return result;
}
@ -259,45 +248,41 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
*
* @see org.apache.activemq.kaha.MapContainer#remove(java.lang.Object)
*/
public Object remove(Object key){
public synchronized Object remove(Object key){
load();
Object result=null;
synchronized(mutex){
IndexItem item=(IndexItem) map.get(key);
if(item!=null){
map.remove(key);
valueToKeyMap.remove(item);
// ensure we have the upto date item
item=indexList.getEntry(item);
result=getValue(item);
IndexItem prev=indexList.getPrevEntry(item);
IndexItem next=indexList.getNextEntry(item);
indexList.remove(item);
delete(item,prev,next);
}
IndexItem item=(IndexItem)map.get(key);
if(item!=null){
//refresh the index
item = (IndexItem)indexList.refreshEntry(item);
map.remove(key);
valueToKeyMap.remove(item);
result=getValue(item);
IndexItem prev=indexList.getPrevEntry(item);
IndexItem next=indexList.getNextEntry(item);
indexList.remove(item);
delete(item,prev,next);
}
return result;
}
public boolean removeValue(Object o){
public synchronized boolean removeValue(Object o){
load();
boolean result=false;
if(o!=null){
synchronized(mutex){
IndexItem item=indexList.getFirst();
while(item!=null){
Object value=getValue(item);
if(value!=null&&value.equals(o)){
result=true;
// find the key
Object key=valueToKeyMap.get(item);
if(key!=null){
remove(key);
}
break;
IndexItem item=indexList.getFirst();
while(item!=null){
Object value=getValue(item);
if(value!=null&&value.equals(o)){
result=true;
// find the key
Object key=valueToKeyMap.get(item);
if(key!=null){
remove(key);
}
item=indexList.getNextEntry(item);
break;
}
item=indexList.getNextEntry(item);
}
}
return result;
@ -315,32 +300,64 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
*
* @see org.apache.activemq.kaha.MapContainer#clear()
*/
public void clear(){
public synchronized void clear(){
checkClosed();
synchronized(mutex){
loaded=true;
synchronized(mutex){
map.clear();
valueToKeyMap.clear();
super.clear();
doClear();
}
loaded=true;
map.clear();
valueToKeyMap.clear();
super.clear();
doClear();
}
/**
* Add an entry to the Store Map
* @param key
* @param value
* @return the StoreEntry associated with the entry
*/
public StoreEntry place(Object key, Object value) {
load();
if(map.containsKey(key)){
remove(key);
}
IndexItem item=write(key,value);
map.put(key,item);
valueToKeyMap.put(item,key);
indexList.add(item);
return item;
}
/**
* Remove an Entry from ther Map
* @param entry
*/
public void remove(StoreEntry entry) {
load();
IndexItem item=(IndexItem)entry;
if(item!=null){
Object key = valueToKeyMap.remove(item);
map.remove(key);
IndexItem prev=indexList.getPrevEntry(item);
IndexItem next=indexList.getNextEntry(item);
indexList.remove(item);
delete(item,prev,next);
}
}
protected Set getInternalKeySet(){
return new HashSet(map.keySet());
}
protected IndexLinkedList getItemList(){
return indexList;
}
protected Object getValue(IndexItem item){
/**
* Get the value from it's location
* @param Valuelocation
* @return
*/
public synchronized Object getValue(StoreEntry item){
load();
Object result=null;
if(item!=null){
try{
DataItem data=item.getValueDataItem();
// ensure this value is up to date
//item=indexList.getEntry(item);
StoreLocation data=item.getValueDataItem();
result=dataManager.readItem(valueMarshaller,data);
}catch(IOException e){
log.error("Failed to get value for "+item,e);
@ -350,13 +367,17 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
return result;
}
protected Object getKey(IndexItem item){
/**
* Get the Key object from it's location
* @param keyLocation
* @return
*/
public synchronized Object getKey(StoreEntry item){
load();
Object result=null;
if(item!=null){
try{
DataItem data=item.getKeyDataItem();
StoreLocation data=item.getKeyDataItem();
result=dataManager.readItem(keyMarshaller,data);
}catch(IOException e){
log.error("Failed to get key for "+item,e);
@ -366,16 +387,25 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
return result;
}
protected Set getInternalKeySet(){
return new HashSet(map.keySet());
}
protected IndexLinkedList getItemList(){
return indexList;
}
protected IndexItem write(Object key,Object value){
IndexItem index=null;
try{
if(key!=null){
index=indexManager.createNewIndex();
DataItem data=dataManager.storeDataItem(keyMarshaller,key);
StoreLocation data=dataManager.storeDataItem(keyMarshaller,key);
index.setKeyData(data);
}
if(value!=null){
DataItem data=dataManager.storeDataItem(valueMarshaller,value);
StoreLocation data=dataManager.storeDataItem(valueMarshaller,value);
index.setValueData(data);
}
IndexItem prev=indexList.getLast();
@ -383,13 +413,13 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
IndexItem next=indexList.getNextEntry(prev);
prev.setNextItem(index.getOffset());
index.setPreviousItem(prev.getOffset());
updateIndex(prev);
updateIndexes(prev);
if(next!=null){
next.setPreviousItem(index.getOffset());
index.setNextItem(next.getOffset());
updateIndex(next);
updateIndexes(next);
}
updateIndex(index);
storeIndex(index);
}catch(IOException e){
log.error("Failed to write "+key+" , "+value,e);
throw new RuntimeStoreException(e);

View File

@ -17,13 +17,15 @@
*/
package org.apache.activemq.kaha.impl.data;
import org.apache.activemq.kaha.StoreLocation;
/**
* A a wrapper for a data in the store
*
* @version $Revision: 1.2 $
*/
public final class DataItem implements Item{
public final class DataItem implements Item, StoreLocation{
private int file=(int) POSITION_NOT_SET;
private long offset=POSITION_NOT_SET;
@ -42,7 +44,8 @@ public final class DataItem implements Item{
}
/**
* @return Returns the size.
* @return
* @see org.apache.activemq.kaha.StoreLocation#getSize()
*/
public int getSize(){
return size;
@ -56,7 +59,8 @@ public final class DataItem implements Item{
}
/**
* @return Returns the offset.
* @return
* @see org.apache.activemq.kaha.StoreLocation#getOffset()
*/
public long getOffset(){
return offset;
@ -70,7 +74,8 @@ public final class DataItem implements Item{
}
/**
* @return Returns the file.
* @return
* @see org.apache.activemq.kaha.StoreLocation#getFile()
*/
public int getFile(){
return file;

View File

@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.index.RedoStoreIndexItem;
import org.apache.activemq.util.IOExceptionSupport;
import org.apache.commons.logging.Log;
@ -41,6 +42,7 @@ public final class DataManager{
private static final Log log=LogFactory.getLog(DataManager.class);
public static long MAX_FILE_LENGTH=1024*1024*32;
private static final String NAME_PREFIX="data-";
private final File dir;
private final String name;
private StoreDataReader reader;
@ -62,7 +64,7 @@ public final class DataManager{
this.reader=new StoreDataReader(this);
this.writer=new StoreDataWriter(this);
dataFilePrefix = "data-"+name+"-";
dataFilePrefix = NAME_PREFIX+name+"-";
// build up list of current dataFiles
File[] files=dir.listFiles(new FilenameFilter(){
public boolean accept(File dir,String n){
@ -109,27 +111,32 @@ public final class DataManager{
return currentWriteFile;
}
RandomAccessFile getDataFile(DataItem item) throws IOException{
RandomAccessFile getDataFile(StoreLocation item) throws IOException{
Integer key=new Integer(item.getFile());
DataFile dataFile=(DataFile) fileMap.get(key);
if(dataFile!=null){
return dataFile.getRandomAccessFile();
}
throw new IOException("Could not locate data file "+name+item.getFile());
log.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
throw new IOException("Could not locate data file "+NAME_PREFIX+name+"-"+item.getFile());
}
public synchronized Object readItem(Marshaller marshaller, DataItem item) throws IOException{
public synchronized Object readItem(Marshaller marshaller, StoreLocation item) throws IOException{
return reader.readItem(marshaller,item);
}
public synchronized DataItem storeDataItem(Marshaller marshaller, Object payload) throws IOException{
public synchronized StoreLocation storeDataItem(Marshaller marshaller, Object payload) throws IOException{
return writer.storeItem(marshaller,payload, DATA_ITEM_TYPE);
}
public synchronized DataItem storeRedoItem(Object payload) throws IOException{
public synchronized StoreLocation storeRedoItem(Object payload) throws IOException{
return writer.storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
}
public synchronized void updateItem(StoreLocation location,Marshaller marshaller, Object payload) throws IOException {
writer.updateItem(location,marshaller,payload,DATA_ITEM_TYPE);
}
public synchronized void recoverRedoItems(RedoListener listener) throws IOException{
// Nothing to recover if there is no current file.
@ -188,6 +195,7 @@ public final class DataManager{
}
}
public synchronized boolean delete() throws IOException{
boolean result=true;
for(Iterator i=fileMap.values().iterator();i.hasNext();){
@ -198,6 +206,7 @@ public final class DataManager{
return result;
}
public synchronized void addInterestInFile(int file) throws IOException{
if(file>=0){
Integer key=new Integer(file);
@ -276,4 +285,8 @@ public final class DataManager{
public void setMaxFileLength(long maxFileLength){
this.maxFileLength=maxFileLength;
}
public String toString(){
return "DataManager:("+NAME_PREFIX+name+")";
}
}

View File

@ -17,9 +17,11 @@
*/
package org.apache.activemq.kaha.impl.data;
import org.apache.activemq.kaha.StoreLocation;
public interface RedoListener {
void onRedoItem(DataItem item, Object object) throws Exception;
void onRedoItem(StoreLocation item, Object object) throws Exception;
}

View File

@ -20,6 +20,7 @@ package org.apache.activemq.kaha.impl.data;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreLocation;
/**
* Optimized Store reader
*
@ -58,7 +59,7 @@ final class StoreDataReader{
return rc;
}
protected Object readItem(Marshaller marshaller,DataItem item) throws IOException{
protected Object readItem(Marshaller marshaller,StoreLocation item) throws IOException{
RandomAccessFile file=dataManager.getDataFile(item);
// TODO: we could reuse the buffer in dataIn if it's big enough to avoid

View File

@ -19,8 +19,10 @@ package org.apache.activemq.kaha.impl.data;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreLocation;
/**
* Optimized Store writer
*
@ -50,7 +52,7 @@ final class StoreDataWriter{
* @throws IOException
* @throws FileNotFoundException
*/
DataItem storeItem(Marshaller marshaller, Object payload, byte type) throws IOException {
StoreLocation storeItem(Marshaller marshaller, Object payload, byte type) throws IOException {
// Write the packet our internal buffer.
buffer.reset();
@ -75,4 +77,19 @@ final class StoreDataWriter{
dataManager.addInterestInFile(dataFile);
return item;
}
void updateItem(StoreLocation location,Marshaller marshaller, Object payload, byte type) throws IOException {
//Write the packet our internal buffer.
buffer.reset();
buffer.position(DataManager.ITEM_HEAD_SIZE);
marshaller.writePayload(payload,buffer);
int size=buffer.size();
int payloadSize=size-DataManager.ITEM_HEAD_SIZE;
buffer.reset();
buffer.writeByte(type);
buffer.writeInt(payloadSize);
RandomAccessFile dataFile = dataManager.getDataFile(location);
dataFile.seek(location.getOffset());
dataFile.write(buffer.getData(),0,size);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.activemq.kaha.impl.index;
import java.io.IOException;
import org.apache.activemq.kaha.StoreEntry;
/**
* A linked list used by IndexItems
*
@ -72,7 +73,7 @@ public class DiskIndexLinkedList implements IndexLinkedList{
*
* @return the first element from this list.
*/
public IndexItem removeFirst(){
public StoreEntry removeFirst(){
if(size==0){
return null;
}
@ -89,7 +90,7 @@ public class DiskIndexLinkedList implements IndexLinkedList{
public Object removeLast(){
if(size==0)
return null;
IndexItem result=last;
StoreEntry result=last;
remove(last);
return result;
}
@ -142,7 +143,7 @@ public class DiskIndexLinkedList implements IndexLinkedList{
* @return <tt>true</tt> (as per the general contract of <tt>Collection.add</tt>).
*/
public boolean add(IndexItem item){
size++;
addLast(item);
return true;
}
@ -224,7 +225,7 @@ public class DiskIndexLinkedList implements IndexLinkedList{
* @return the index in this list of the first occurrence of the specified element, or -1 if the list does not
* contain this element.
*/
public int indexOf(IndexItem o){
public int indexOf(StoreEntry o){
int index=0;
if(size>0){
for(IndexItem e=getNextEntry(root);e!=null;e=getNextEntry(e)){
@ -271,19 +272,19 @@ public class DiskIndexLinkedList implements IndexLinkedList{
try{
result=indexManager.getIndex(current.getPreviousItem());
}catch(IOException e){
throw new RuntimeException("Failed to index",e);
throw new RuntimeException("Failed to get current index for "+current,e);
}
}
//essential root get's updated consistently
if(result != null &&root!=null && root.equals(result)){
return root;
// essential root get's updated consistently
if(result!=null&&root!=null&&root.equals(result)){
return root;
}
return result;
}
public IndexItem getEntry(IndexItem current){
IndexItem result=null;
if(current!=null&&current.getOffset()>=0){
public StoreEntry getEntry(StoreEntry current){
StoreEntry result=null;
if(current != null && current.getOffset() >= 0){
try{
result=indexManager.getIndex(current.getOffset());
}catch(IOException e){
@ -297,6 +298,26 @@ public class DiskIndexLinkedList implements IndexLinkedList{
return result;
}
/**
* Update the indexes of a StoreEntry
* @param current
*/
public StoreEntry refreshEntry(StoreEntry current){
StoreEntry result=null;
if(current != null && current.getOffset() >= 0){
try{
result=indexManager.refreshIndex((IndexItem)current);
}catch(IOException e){
throw new RuntimeException("Failed to index",e);
}
}
//essential root get's updated consistently
if(result != null &&root!=null && root.equals(result)){
return root;
}
return result;
}
public void remove(IndexItem e){
if(e==root||e.equals(root))
return;

View File

@ -21,6 +21,8 @@ import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.data.DataItem;
import org.apache.activemq.kaha.impl.data.Item;
/**
@ -28,9 +30,10 @@ import org.apache.activemq.kaha.impl.data.Item;
*
* @version $Revision: 1.2 $
*/
public class IndexItem implements Item{
public class IndexItem implements Item, StoreEntry{
public static final int INDEX_SIZE=51;
public static final int INDEXES_ONLY_SIZE=19;
//used by linked list
IndexItem next;
IndexItem prev;
@ -66,7 +69,11 @@ import org.apache.activemq.kaha.impl.data.Item;
active=true;
}
public DataItem getKeyDataItem(){
/**
* @return
* @see org.apache.activemq.kaha.StoreEntry#getKeyDataItem()
*/
public StoreLocation getKeyDataItem(){
DataItem result=new DataItem();
result.setOffset(keyOffset);
result.setFile(keyFile);
@ -74,7 +81,11 @@ import org.apache.activemq.kaha.impl.data.Item;
return result;
}
public DataItem getValueDataItem(){
/**
* @return
* @see org.apache.activemq.kaha.StoreEntry#getValueDataItem()
*/
public StoreLocation getValueDataItem(){
DataItem result=new DataItem();
result.setOffset(valueOffset);
result.setFile(valueFile);
@ -82,13 +93,13 @@ import org.apache.activemq.kaha.impl.data.Item;
return result;
}
public void setValueData(DataItem item){
public void setValueData(StoreLocation item){
valueOffset=item.getOffset();
valueFile=item.getFile();
valueSize=item.getSize();
}
public void setKeyData(DataItem item){
public void setKeyData(StoreLocation item){
keyOffset=item.getOffset();
keyFile=item.getFile();
keySize=item.getSize();
@ -98,7 +109,7 @@ import org.apache.activemq.kaha.impl.data.Item;
* @param dataOut
* @throws IOException
*/
void write(DataOutput dataOut) throws IOException{
public void write(DataOutput dataOut) throws IOException{
dataOut.writeShort(MAGIC);
dataOut.writeBoolean(active);
dataOut.writeLong(previousItem);
@ -111,11 +122,18 @@ import org.apache.activemq.kaha.impl.data.Item;
dataOut.writeInt(valueSize);
}
void updateIndexes(DataOutput dataOut) throws IOException{
dataOut.writeShort(MAGIC);
dataOut.writeBoolean(active);
dataOut.writeLong(previousItem);
dataOut.writeLong(nextItem);
}
/**
* @param dataIn
* @throws IOException
*/
void read(DataInput dataIn) throws IOException{
public void read(DataInput dataIn) throws IOException{
if(dataIn.readShort()!=MAGIC){
throw new BadMagicException();
}
@ -130,6 +148,15 @@ import org.apache.activemq.kaha.impl.data.Item;
valueSize=dataIn.readInt();
}
void readIndexes(DataInput dataIn) throws IOException{
if(dataIn.readShort()!=MAGIC){
throw new BadMagicException();
}
active=dataIn.readBoolean();
previousItem=dataIn.readLong();
nextItem=dataIn.readLong();
}
/**
* @param newPrevEntry
*/
@ -152,7 +179,8 @@ import org.apache.activemq.kaha.impl.data.Item;
}
/**
* @return next item
* @return
* @see org.apache.activemq.kaha.StoreEntry#getNextItem()
*/
public long getNextItem(){
return nextItem;
@ -173,7 +201,8 @@ import org.apache.activemq.kaha.impl.data.Item;
}
/**
* @return Returns the keyFile.
* @return
* @see org.apache.activemq.kaha.StoreEntry#getKeyFile()
*/
public int getKeyFile(){
return keyFile;
@ -187,7 +216,8 @@ import org.apache.activemq.kaha.impl.data.Item;
}
/**
* @return Returns the valueFile.
* @return
* @see org.apache.activemq.kaha.StoreEntry#getValueFile()
*/
public int getValueFile(){
return valueFile;
@ -201,7 +231,8 @@ import org.apache.activemq.kaha.impl.data.Item;
}
/**
* @return Returns the valueOffset.
* @return
* @see org.apache.activemq.kaha.StoreEntry#getValueOffset()
*/
public long getValueOffset(){
return valueOffset;
@ -229,7 +260,8 @@ import org.apache.activemq.kaha.impl.data.Item;
}
/**
* @return Returns the offset.
* @return
* @see org.apache.activemq.kaha.StoreEntry#getOffset()
*/
public long getOffset(){
return offset;
@ -242,6 +274,10 @@ import org.apache.activemq.kaha.impl.data.Item;
this.offset=offset;
}
/**
* @return
* @see org.apache.activemq.kaha.StoreEntry#getKeySize()
*/
public int getKeySize() {
return keySize;
}
@ -250,6 +286,10 @@ import org.apache.activemq.kaha.impl.data.Item;
this.keySize = keySize;
}
/**
* @return
* @see org.apache.activemq.kaha.StoreEntry#getValueSize()
*/
public int getValueSize() {
return valueSize;
}

View File

@ -17,6 +17,13 @@
*/
package org.apache.activemq.kaha.impl.index;
import org.apache.activemq.kaha.StoreEntry;
/**
* Inteface to LinkedList of Indexes
*
* @version $Revision$
*/
public interface IndexLinkedList{
/**
@ -43,7 +50,7 @@ public interface IndexLinkedList{
*
* @return the first element from this list.
*/
public IndexItem removeFirst();
public StoreEntry removeFirst();
/**
* Removes and returns the last element from this list.
@ -54,16 +61,14 @@ public interface IndexLinkedList{
/**
* Inserts the given element at the beginning of this list.
*
* @param o the element to be inserted at the beginning of this list.
* @param item
*/
public void addFirst(IndexItem item);
/**
* Appends the given element to the end of this list. (Identical in function to the <tt>add</tt> method; included
* only for consistency.)
*
* @param o the element to be inserted at the end of this list.
* @param item
*/
public void addLast(IndexItem item);
@ -83,8 +88,8 @@ public interface IndexLinkedList{
/**
* Appends the specified element to the end of this list.
* @param item
*
* @param o element to be appended to this list.
* @return <tt>true</tt> (as per the general contract of <tt>Collection.add</tt>).
*/
public boolean add(IndexItem item);
@ -137,7 +142,7 @@ public interface IndexLinkedList{
* @return the index in this list of the first occurrence of the specified element, or -1 if the list does not
* contain this element.
*/
public int indexOf(IndexItem o);
public int indexOf(StoreEntry o);
/**
* Retrieve the next entry after this entry
@ -164,8 +169,15 @@ public interface IndexLinkedList{
/**
* Ensure we have the up to date entry
* @param current
* @param entry
* @return the entry
*/
public IndexItem getEntry(IndexItem current);
public StoreEntry getEntry(StoreEntry entry);
/**
* Update the indexes of a StoreEntry
* @param current
* @return update StoreEntry
*/
public StoreEntry refreshEntry(StoreEntry current);
}

View File

@ -18,10 +18,14 @@
package org.apache.activemq.kaha.impl.index;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileLock;
import java.util.LinkedList;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.impl.data.DataManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -34,29 +38,22 @@ public final class IndexManager{
private static final Log log=LogFactory.getLog(IndexManager.class);
private static final String NAME_PREFIX="index-";
private final String name;
private File directory;
private File file;
private RandomAccessFile indexFile;
private StoreIndexReader reader;
private StoreIndexWriter writer;
private LinkedList freeList=new LinkedList();
private DataManager redoLog;
private String mode;
private long length=0;
public IndexManager(File directory,String name,String mode,DataManager redoLog) throws IOException{
this.directory = directory;
this.name=name;
file=new File(directory,NAME_PREFIX+name);
indexFile=new RandomAccessFile(file,mode);
reader=new StoreIndexReader(indexFile);
writer=new StoreIndexWriter(indexFile,name,redoLog);
long offset=0;
while((offset+IndexItem.INDEX_SIZE)<=indexFile.length()){
IndexItem index=reader.readItem(offset);
if(!index.isActive()){
index.reset();
freeList.add(index);
}
offset+=IndexItem.INDEX_SIZE;
}
length=offset;
this.mode = mode;
this.redoLog = redoLog;
initialize();
}
public synchronized boolean isEmpty(){
@ -67,18 +64,27 @@ public final class IndexManager{
return reader.readItem(offset);
}
public synchronized IndexItem refreshIndex(IndexItem item) throws IOException{
reader.updateIndexes(item);
return item;
}
public synchronized void freeIndex(IndexItem item) throws IOException{
item.reset();
//item.reset();
item.setActive(false);
writer.storeItem(item);
writer.updateIndexes(item);
freeList.add(item);
}
public synchronized void updateIndex(IndexItem index) throws IOException{
public synchronized void storeIndex(IndexItem index) throws IOException{
writer.storeItem(index);
}
public void redo(final RedoStoreIndexItem redo) throws IOException{
public synchronized void updateIndexes(IndexItem index) throws IOException{
writer.updateIndexes(index);
}
public synchronized void redo(final RedoStoreIndexItem redo) throws IOException{
writer.redoStoreItem(redo);
}
@ -106,6 +112,7 @@ public final class IndexManager{
}
}
public synchronized boolean delete() throws IOException{
freeList.clear();
if(indexFile!=null){
@ -115,7 +122,7 @@ public final class IndexManager{
return file.delete();
}
private IndexItem getNextFreeIndex(){
private synchronized IndexItem getNextFreeIndex(){
IndexItem result=null;
if(!freeList.isEmpty()){
result=(IndexItem) freeList.removeLast();
@ -128,12 +135,33 @@ public final class IndexManager{
return length;
}
public void setLength(long value){
public synchronized void setLength(long value){
this.length=value;
}
public synchronized FileLock getLock() throws IOException {
return indexFile.getChannel().tryLock();
}
public String toString(){
return "IndexManager:("+NAME_PREFIX+name+")";
}
protected void initialize() throws IOException {
file=new File(directory,NAME_PREFIX+name);
indexFile=new RandomAccessFile(file,mode);
reader=new StoreIndexReader(indexFile);
writer=new StoreIndexWriter(indexFile,name,redoLog);
long offset=0;
while((offset+IndexItem.INDEX_SIZE)<=indexFile.length()){
IndexItem index=reader.readItem(offset);
if(!index.isActive()){
index.reset();
freeList.add(index);
}
offset+=IndexItem.INDEX_SIZE;
}
length=offset;
}
}

View File

@ -50,4 +50,13 @@ class StoreIndexReader{
result.read(dataIn);
return result;
}
void updateIndexes(IndexItem indexItem) throws IOException{
if (indexItem != null){
file.seek(indexItem.getOffset());
file.readFully(buffer,0,IndexItem.INDEXES_ONLY_SIZE);
dataIn.restart(buffer);
indexItem.readIndexes(dataIn);
}
}
}

View File

@ -62,6 +62,18 @@ class StoreIndexWriter{
file.write(dataOut.getData(),0,IndexItem.INDEX_SIZE);
}
void updateIndexes(IndexItem indexItem) throws IOException{
if( redoLog!=null ) {
RedoStoreIndexItem redo = new RedoStoreIndexItem(name, indexItem.getOffset(), indexItem);
redoLog.storeRedoItem(redo);
}
dataOut.reset();
indexItem.updateIndexes(dataOut);
file.seek(indexItem.getOffset());
file.write(dataOut.getData(),0,IndexItem.INDEXES_ONLY_SIZE);
}
public void redoStoreItem(RedoStoreIndexItem redo) throws IOException {
dataOut.reset();
redo.getIndexItem().write(dataOut);

View File

@ -17,12 +17,14 @@
*/
package org.apache.activemq.kaha.impl.index;
import org.apache.activemq.kaha.StoreEntry;
/**
* A linked list used by IndexItems
*
* @version $Revision: 1.2 $
*/
final class VMIndexLinkedList implements Cloneable, IndexLinkedList{
public final class VMIndexLinkedList implements Cloneable, IndexLinkedList{
private transient IndexItem root;
private transient int size=0;
@ -30,7 +32,7 @@ final class VMIndexLinkedList implements Cloneable, IndexLinkedList{
/**
* Constructs an empty list.
*/
VMIndexLinkedList(IndexItem header){
public VMIndexLinkedList(IndexItem header){
this.root = header;
this.root.next=root.prev=root;
}
@ -62,11 +64,11 @@ final class VMIndexLinkedList implements Cloneable, IndexLinkedList{
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.IndexLinkedList#removeFirst()
*/
public IndexItem removeFirst(){
public StoreEntry removeFirst(){
if(size==0){
return null;
}
IndexItem result=root.next;
StoreEntry result=root.next;
remove(root.next);
return result;
}
@ -77,7 +79,7 @@ final class VMIndexLinkedList implements Cloneable, IndexLinkedList{
public Object removeLast(){
if(size==0)
return null;
IndexItem result=root.prev;
StoreEntry result=root.prev;
remove(root.prev);
return result;
}
@ -171,7 +173,7 @@ final class VMIndexLinkedList implements Cloneable, IndexLinkedList{
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.IndexLinkedList#indexOf(org.apache.activemq.kaha.impl.IndexItem)
*/
public int indexOf(IndexItem o){
public int indexOf(StoreEntry o){
int index=0;
for(IndexItem e=root.next;e!=root;e=e.next){
if(o==e){
@ -228,7 +230,15 @@ final class VMIndexLinkedList implements Cloneable, IndexLinkedList{
return clone;
}
public IndexItem getEntry(IndexItem current){
public StoreEntry getEntry(StoreEntry current){
return current;
}
/**
* Update the indexes of a StoreEntry
* @param current
*/
public StoreEntry refreshEntry(StoreEntry current){
return current;
}
}

View File

@ -211,9 +211,9 @@ public class Statements {
public String getFindDurableSubMessagesStatement(){
if(findDurableSubMessagesStatement==null){
findDurableSubMessagesStatement="SELECT M.ID, M.MSG FROM "+getFullMessageTableName()+" M, "
+getFullAckTableName()+" D "+" WHERE ? >= ( select count(*) from "
+getFullMessageTableName()+" M, where D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+" AND M.CONTAINER=D.CONTAINER AND M.ID > ?"+" ORDER BY M.ID)";
+getFullAckTableName()+" D "+" WHERE ?>= ( SELECT COUNT(*) FROM "
+getFullMessageTableName()+" M, " + getFullAckTableName() + " D WHERE (D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+" AND M.CONTAINER=D.CONTAINER AND M.ID > ?)"+" ORDER BY M.ID)";
}
return findDurableSubMessagesStatement;
}
@ -230,9 +230,9 @@ public class Statements {
public String getNextDurableSubscriberMessageStatement(){
if (nextDurableSubscriberMessageStatement == null){
nextDurableSubscriberMessageStatement = "SELECT M.ID, M.MSG FROM "+getFullMessageTableName()+" M, "
+getFullAckTableName()+" D "+" WHERE 1 >= ( select count(*) from "
+getFullMessageTableName()+" M, where D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"+" ORDER BY M.ID)";
+getFullAckTableName()+" D "+" WHERE 1 >= ( SELECT COUNT(*) FROM "
+getFullMessageTableName()+" M, WHERE (D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"+") ORDER BY M.ID)";
}
return nextDurableSubscriberMessageStatement;
}
@ -242,7 +242,7 @@ public class Statements {
*/
public String getDurableSubscriberMessageCountStatement(){
if (durableSubscriberMessageCountStatement==null){
durableSubscriberMessageCountStatement = "select count(*) from "
durableSubscriberMessageCountStatement = "SELECT COUNT(*) FROM "
+getFullMessageTableName()+" M, where D.CONTAINER=? AND D.CLIENT_ID=? AND D.SUB_NAME=?"
+" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID";
}

View File

@ -414,13 +414,14 @@ public class DefaultJDBCAdapter implements JDBCAdapter {
PreparedStatement s = null;
ResultSet rs = null;
try {
System.err.println("VANILLA STATEMENT = " + statements.getFindDurableSubMessagesStatement());
s = c.getConnection().prepareStatement(statements.getFindDurableSubMessagesStatement());
s.setString(1, destination.getQualifiedName());
s.setString(2, clientId);
s.setString(3, subscriptionName);
s.setLong(4,seq);
s.setInt(5,maxReturned);
System.err.println("STATEMENT = " + s);
rs = s.executeQuery();
if( statements.isUseExternalMessageReferences() ) {

View File

@ -1,20 +1,21 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.activemq.store.kahadaptor;
import java.io.File;
@ -26,6 +27,8 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.kaha.IndexTypes;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory;
@ -39,12 +42,14 @@ import org.apache.activemq.store.TransactionStore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
/**
* @org.apache.xbean.XBean
*
* @version $Revision: 1.4 $
*/
public class KahaPersistenceAdapter implements PersistenceAdapter{
private static final Log log=LogFactory.getLog(KahaPersistenceAdapter.class);
static final String PREPARED_TRANSACTIONS_NAME="PreparedTransactions";
KahaTransactionStore transactionStore;
@ -53,39 +58,39 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
ConcurrentHashMap messageStores=new ConcurrentHashMap();
private boolean useExternalMessageReferences;
private OpenWireFormat wireFormat=new OpenWireFormat();
private long maxDataFileLength = 32 * 1024 * 1024;
Store store;
private long maxDataFileLength=32*1024*1024;
private String indexType=IndexTypes.DISK_INDEX;
private File dir;
private Store theStore;
public KahaPersistenceAdapter(File dir) throws IOException{
if(!dir.exists()){
dir.mkdirs();
}
String name=dir.getAbsolutePath()+File.separator+"kaha.db";
store=StoreFactory.open(name,"rw");
store.setMaxDataFileLength(maxDataFileLength);
this.dir=dir;
}
public Set getDestinations(){
Set rc=new HashSet();
try {
for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
Object obj=i.next();
if(obj instanceof ActiveMQDestination){
rc.add(obj);
try{
Store store=getStore();
for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
Object obj=i.next();
if(obj instanceof ActiveMQDestination){
rc.add(obj);
}
}
}
}catch(IOException e){
log.error("Failed to get destinations " ,e);
log.error("Failed to get destinations ",e);
}
return rc;
}
public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
MessageStore rc=(MessageStore) queues.get(destination);
MessageStore rc=(MessageStore)queues.get(destination);
if(rc==null){
rc=new KahaMessageStore(getMapContainer(destination,"queue-data"),destination);
messageStores.put(destination, rc);
messageStores.put(destination,rc);
if(transactionStore!=null){
rc=transactionStore.proxy(rc);
}
@ -95,31 +100,31 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
}
public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{
TopicMessageStore rc=(TopicMessageStore) topics.get(destination);
TopicMessageStore rc=(TopicMessageStore)topics.get(destination);
if(rc==null){
MapContainer messageContainer=getMapContainer(destination,"topic-data");
Store store=getStore();
ListContainer messageContainer=getListContainer(destination,"topic-data");
MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs");
MapContainer ackContainer=store.getMapContainer(destination.toString(),"topic-acks");
ackContainer.setKeyMarshaller(new StringMarshaller());
ackContainer.setValueMarshaller(new AtomicIntegerMarshaller());
ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks");
ackContainer.setMarshaller(new TopicSubAckMarshaller());
rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination);
messageStores.put(destination, rc);
messageStores.put(destination,rc);
if(transactionStore!=null){
rc=transactionStore.proxy(rc);
}
topics.put(destination,rc);
}
return rc;
}
protected MessageStore retrieveMessageStore(Object id){
MessageStore result = (MessageStore) messageStores.get(id);
MessageStore result=(MessageStore)messageStores.get(id);
return result;
}
public TransactionStore createTransactionStore() throws IOException{
if(transactionStore==null){
Store store=getStore();
MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME,"transactions");
container.setKeyMarshaller(new CommandMarshaller(wireFormat));
container.setValueMarshaller(new TransactionMarshaller(wireFormat));
@ -129,18 +134,25 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
return transactionStore;
}
public void beginTransaction(ConnectionContext context){}
public void commitTransaction(ConnectionContext context) throws IOException{
store.force();
public void beginTransaction(ConnectionContext context){
}
public void rollbackTransaction(ConnectionContext context){}
public void commitTransaction(ConnectionContext context) throws IOException{
if(theStore!=null){
theStore.force();
}
}
public void start() throws Exception{}
public void rollbackTransaction(ConnectionContext context){
}
public void start() throws Exception{
}
public void stop() throws Exception{
store.close();
if(theStore!=null){
theStore.close();
}
}
public long getLastMessageBrokerSequenceId() throws IOException{
@ -148,8 +160,8 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
}
public void deleteAllMessages() throws IOException{
if(store!=null){
store.delete();
if(theStore!=null){
theStore.clear();
}
}
@ -162,6 +174,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
}
protected MapContainer getMapContainer(Object id,String containerName) throws IOException{
Store store=getStore();
MapContainer container=store.getMapContainer(id,containerName);
container.setKeyMarshaller(new StringMarshaller());
if(useExternalMessageReferences){
@ -173,11 +186,25 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
return container;
}
protected ListContainer getListContainer(Object id,String containerName) throws IOException{
Store store=getStore();
ListContainer container=store.getListContainer(id,containerName);
if(useExternalMessageReferences){
container.setMarshaller(new StringMarshaller());
}else{
container.setMarshaller(new CommandMarshaller(wireFormat));
}
container.load();
return container;
}
/**
* @param usageManager
* The UsageManager that is controlling the broker's memory usage.
* The UsageManager that is controlling the broker's memory
* usage.
*/
public void setUsageManager(UsageManager usageManager){}
public void setUsageManager(UsageManager usageManager){
}
/**
* @return the maxDataFileLength
@ -187,11 +214,36 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
}
/**
* @param maxDataFileLength the maxDataFileLength to set
* @param maxDataFileLength
* the maxDataFileLength to set
*
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
*/
public void setMaxDataFileLength(long maxDataFileLength){
this.maxDataFileLength=maxDataFileLength;
}
/**
* @return the indexType
*/
public String getIndexType(){
return this.indexType;
}
/**
* @param indexType the indexTypes to set
*/
public void setIndexType(String indexType){
this.indexType=indexType;
}
protected synchronized Store getStore() throws IOException{
if(theStore==null){
String name=dir.getAbsolutePath()+File.separator+"kaha.db";
theStore=StoreFactory.open(name,"rw");
theStore.setMaxDataFileLength(maxDataFileLength);
theStore.setIndexType(indexType);
}
return theStore;
}
}

View File

@ -21,15 +21,19 @@ import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.StringMarshaller;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
@ -37,15 +41,18 @@ import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
/**
* @version $Revision: 1.5 $
*/
public class KahaTopicMessageStore extends KahaMessageStore implements TopicMessageStore{
private Map ackContainer;
public class KahaTopicMessageStore implements TopicMessageStore{
private ActiveMQDestination destination;
private ListContainer ackContainer;
private ListContainer messageContainer;
private Map subscriberContainer;
private Store store;
private Map subscriberAcks=new ConcurrentHashMap();
public KahaTopicMessageStore(Store store,MapContainer messageContainer,MapContainer ackContainer,
public KahaTopicMessageStore(Store store,ListContainer messageContainer,ListContainer ackContainer,
MapContainer subsContainer,ActiveMQDestination destination) throws IOException{
super(messageContainer,destination);
this.messageContainer = messageContainer;
this.destination = destination;
this.store=store;
this.ackContainer=ackContainer;
subscriberContainer=subsContainer;
@ -59,32 +66,35 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
int subscriberCount=subscriberAcks.size();
if(subscriberCount>0){
String id=message.getMessageId().toString();
ackContainer.put(id,new AtomicInteger(subscriberCount));
StoreEntry entry = messageContainer.placeLast(message);
TopicSubAck tsa = new TopicSubAck();
tsa.setCount(subscriberCount);
tsa.setStoreEntry(entry);
StoreEntry ackEntry = ackContainer.placeLast(tsa);
for(Iterator i=subscriberAcks.keySet().iterator();i.hasNext();){
Object key=i.next();
ListContainer container=store.getListContainer(key,"durable-subs");
container.add(id);
container.add(ackEntry);
}
super.addMessage(context,message);
}
}
public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
MessageId messageId) throws IOException{
MessageId messageId) throws IOException{
String subcriberId=getSubscriptionKey(clientId,subscriptionName);
String id=messageId.toString();
ListContainer container=(ListContainer) subscriberAcks.get(subcriberId);
ListContainer container=(ListContainer)subscriberAcks.get(subcriberId);
if(container!=null){
//container.remove(id);
container.removeFirst();
AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
if(count!=null){
if(count.decrementAndGet()>0){
ackContainer.put(id,count);
}else{
// no more references to message messageContainer so remove it
super.removeMessage(messageId);
StoreEntry ackEntry=(StoreEntry)container.removeFirst();
if(ackEntry!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ackEntry);
if(tsa!=null){
if(tsa.decrementCount()<=0){
ackContainer.remove(ackEntry);
messageContainer.remove(tsa.getStoreEntry());
}else {
ackContainer.update(ackEntry,tsa);
}
}
}
}
@ -115,14 +125,16 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
subscriberContainer.remove(key);
ListContainer list=(ListContainer) subscriberAcks.get(key);
for(Iterator i=list.iterator();i.hasNext();){
String id=i.next().toString();
AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
if(count!=null){
if(count.decrementAndGet()>0){
ackContainer.put(id,count);
}else{
// no more references to message messageContainer so remove it
messageContainer.remove(id);
StoreEntry ackEntry=(StoreEntry)i.next();
if(ackEntry!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ackEntry);
if(tsa!=null){
if(tsa.decrementCount()<=0){
ackContainer.remove(ackEntry);
messageContainer.remove(tsa.getStoreEntry());
}else {
ackContainer.update(ackEntry,tsa);
}
}
}
}
@ -134,7 +146,8 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
ListContainer list=(ListContainer) subscriberAcks.get(key);
if(list!=null){
for(Iterator i=list.iterator();i.hasNext();){
Object msg=messageContainer.get(i.next());
TopicSubAck tsa = (TopicSubAck)i.next();
Object msg=messageContainer.get(tsa.getStoreEntry());
if(msg!=null){
if(msg.getClass()==String.class){
listener.recoverMessageReference((String) msg);
@ -157,7 +170,8 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
boolean startFound=false;
int count = 0;
for(Iterator i=list.iterator();i.hasNext() && count < maxReturned;){
Object msg=messageContainer.get(i.next());
TopicSubAck tsa = (TopicSubAck)i.next();
Object msg=messageContainer.get(tsa.getStoreEntry());
if(msg!=null){
if(msg.getClass()==String.class){
String ref=msg.toString();
@ -186,7 +200,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
}
public void delete(){
super.delete();
messageContainer.clear();
ackContainer.clear();
subscriberContainer.clear();
}
@ -204,7 +218,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
protected void addSubscriberAckContainer(Object key) throws IOException{
ListContainer container=store.getListContainer(key,"topic-subs");
Marshaller marshaller=new StringMarshaller();
Marshaller marshaller=new StoreEntryMarshaller();
container.setMarshaller(marshaller);
subscriberAcks.put(key,container);
}
@ -213,8 +227,9 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
String key=getSubscriptionKey(clientId,subscriptionName);
ListContainer list=(ListContainer) subscriberAcks.get(key);
Iterator iter = list.iterator();
return (Message) (iter.hasNext() ? iter.next() : null);
TopicSubAck tsa = (TopicSubAck)list.get(0);
Message msg=(Message)messageContainer.get(tsa.getStoreEntry());
return msg;
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
@ -223,5 +238,125 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
return list.size();
}
/**
* @param context
* @param messageId
* @param expirationTime
* @param messageRef
* @throws IOException
* @see org.apache.activemq.store.MessageStore#addMessageReference(org.apache.activemq.broker.ConnectionContext, org.apache.activemq.command.MessageId, long, java.lang.String)
*/
public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef) throws IOException{
messageContainer.add(messageRef);
}
/**
* @return the destination
* @see org.apache.activemq.store.MessageStore#getDestination()
*/
public ActiveMQDestination getDestination(){
return destination;
}
/**
* @param identity
* @return the Message
* @throws IOException
* @see org.apache.activemq.store.MessageStore#getMessage(org.apache.activemq.command.MessageId)
*/
public Message getMessage(MessageId identity) throws IOException{
Message result = null;
for (Iterator i = messageContainer.iterator(); i.hasNext();){
Message msg = (Message)i.next();
if (msg.getMessageId().equals(identity)) {
result = msg;
break;
}
}
return result;
}
/**
* @param identity
* @return String
* @throws IOException
* @see org.apache.activemq.store.MessageStore#getMessageReference(org.apache.activemq.command.MessageId)
*/
public String getMessageReference(MessageId identity) throws IOException{
return null;
}
/**
* @throws Exception
* @see org.apache.activemq.store.MessageStore#recover(org.apache.activemq.store.MessageRecoveryListener)
*/
public void recover(MessageRecoveryListener listener) throws Exception{
for(Iterator iter=messageContainer.iterator();iter.hasNext();){
Object msg=iter.next();
if(msg.getClass()==String.class){
listener.recoverMessageReference((String) msg);
}else{
listener.recoverMessage((Message) msg);
}
}
listener.finished();
}
/**
* @param context
* @throws IOException
* @see org.apache.activemq.store.MessageStore#removeAllMessages(org.apache.activemq.broker.ConnectionContext)
*/
public void removeAllMessages(ConnectionContext context) throws IOException{
messageContainer.clear();
}
/**
* @param context
* @param ack
* @throws IOException
* @see org.apache.activemq.store.MessageStore#removeMessage(org.apache.activemq.broker.ConnectionContext, org.apache.activemq.command.MessageAck)
*/
public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
for (Iterator i = messageContainer.iterator(); i.hasNext();){
Message msg = (Message)i.next();
if (msg.getMessageId().equals(ack.getLastMessageId())) {
i.remove();
break;
}
}
}
/**
* @param usageManager
* @see org.apache.activemq.store.MessageStore#setUsageManager(org.apache.activemq.memory.UsageManager)
*/
public void setUsageManager(UsageManager usageManager){
// TODO Auto-generated method stub
}
/**
* @throws Exception
* @see org.apache.activemq.Service#start()
*/
public void start() throws Exception{
// TODO Auto-generated method stub
}
/**
* @throws Exception
* @see org.apache.activemq.Service#stop()
*/
public void stop() throws Exception{
// TODO Auto-generated method stub
}
}

View File

@ -19,6 +19,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
import org.apache.activemq.kaha.IndexTypes;
import org.apache.activemq.kaha.StoreFactory;
import org.apache.activemq.kaha.impl.KahaStore;
import org.apache.activemq.kaha.impl.container.ContainerId;
@ -131,8 +132,8 @@ public class CachedListContainerImplTest extends TestCase{
ContainerId containerId=new ContainerId();
containerId.setKey(id);
containerId.setDataContainerName(containerName);
IndexItem root=store.listsContainer.addRoot(containerId);
ListContainerImpl result=new ListContainerImpl(containerId,root,store.rootIndexManager,im,dm);
IndexItem root=store.getListsContainer().addRoot(im,containerId);
ListContainerImpl result=new ListContainerImpl(containerId,root,im,dm,IndexTypes.DISK_INDEX);
result.expressDataInterest();
result.setMaximumCacheSize(MAX_CACHE_SIZE);
return result;

View File

@ -22,6 +22,7 @@ import java.util.List;
import junit.framework.TestCase;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.impl.index.IndexItem;
import org.apache.activemq.kaha.impl.index.IndexLinkedList;
@ -200,7 +201,7 @@ public class VMIndexLinkedListTest extends TestCase{
list.add(i,(IndexItem) testData.get(i));
}
for (int i =0; i < testData.size(); i++){
assertTrue(list.indexOf((IndexItem) testData.get(i))==i);
assertTrue(list.indexOf((StoreEntry) testData.get(i))==i);
}
}