dumping first cut of kaha in SVN

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@387586 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-03-21 17:21:33 +00:00
parent 8697cc71b8
commit d56eda03db
47 changed files with 6211 additions and 0 deletions

View File

@ -0,0 +1,51 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
/**
* Implementation of a Marshaller for byte arrays
*
* @version $Revision: 1.2 $
*/
public class BytesMarshaller implements Marshaller{
/**
* Write the payload of this entry to the RawContainer
*
* @param object
* @param dataOut
* @throws IOException
*/
public void writePayload(Object object,DataOutputStream dataOut) throws IOException{
byte[] data=(byte[]) object;
dataOut.writeInt(data.length);
dataOut.write(data);
}
/**
* Read the entry from the RawContainer
*
* @param dataIn
* @return unmarshalled object
* @throws IOException
*/
public Object readPayload(DataInputStream dataIn) throws IOException{
int size=dataIn.readInt();
byte[] data=new byte[size];
dataIn.readFully(data);
return data;
}
}

View File

@ -0,0 +1,104 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha;
import java.util.List;
import java.util.NoSuchElementException;
/**
*Represents a container of persistent objects in the store
*Acts as a map, but values can be retrieved in insertion order
*
* @version $Revision: 1.2 $
*/
public interface ListContainer extends List{
/**
* The container is created or retrieved in
* an unloaded state.
* load populates the container will all the indexes used etc
* and should be called before any operations on the container
*/
public void load();
/**
* unload indexes from the container
*
*/
public void unload();
/**
* @return true if the indexes are loaded
*/
public boolean isLoaded();
/**
* For homogenous containers can set a custom marshaller for loading values
* The default uses Object serialization
* @param marshaller
*/
public void setMarshaller(Marshaller marshaller);
/**
* @return the id the MapContainer was create with
*/
public Object getId();
/**
* @return the number of values in the container
*/
public int size();
/**
* Inserts the given element at the beginning of this list.
*
* @param o the element to be inserted at the beginning of this list.
*/
public void addFirst(Object o);
/**
* Appends the given element to the end of this list. (Identical in
* function to the <tt>add</tt> method; included only for consistency.)
*
* @param o the element to be inserted at the end of this list.
*/
public void addLast(Object o);
/**
* Removes and returns the first element from this list.
*
* @return the first element from this list.
* @throws NoSuchElementException if this list is empty.
*/
public Object removeFirst();
/**
* Removes and returns the last element from this list.
*
* @return the last element from this list.
* @throws NoSuchElementException if this list is empty.
*/
public Object removeLast();
/**
* remove an objecr from the list without retrieving the old value from the store
* @param position
* @return true if successful
*/
public boolean doRemove(int position);
}

View File

@ -0,0 +1,144 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
/**
*Represents a container of persistent objects in the store
*Acts as a map, but values can be retrieved in insertion order
*
* @version $Revision: 1.2 $
*/
public interface MapContainer extends Map{
/**
* The container is created or retrieved in
* an unloaded state.
* load populates the container will all the indexes used etc
* and should be called before any operations on the container
*/
public void load();
/**
* unload indexes from the container
*
*/
public void unload();
/**
* @return true if the indexes are loaded
*/
public boolean isLoaded();
/**
* For homogenous containers can set a custom marshaller for loading keys
* The default uses Object serialization
* @param keyMarshaller
*/
public void setKeyMarshaller(Marshaller keyMarshaller);
/**
* For homogenous containers can set a custom marshaller for loading values
* The default uses Object serialization
* @param valueMarshaller
*/
public void setValueMarshaller(Marshaller valueMarshaller);
/**
* @return the id the MapContainer was create with
*/
public Object getId();
/**
* @return the number of values in the container
*/
public int size();
/**
* @return true if there are no values stored in the container
*/
public boolean isEmpty();
/**
* @param key
* @return true if the container contains the key
*/
public boolean containsKey(Object key);
/**
* Get the value associated with the key
* @param key
* @return the value associated with the key from the store
*/
public Object get(Object key);
/**
* @param o
* @return true if the MapContainer contains the value o
*/
public boolean containsValue(Object o);
/**
* Add add entries in the supplied Map
* @param map
*/
public void putAll(Map map);
/**
* @return a Set of all the keys
*/
public Set keySet();
/**
* @return a collection of all the values - the values will be lazily pulled out of the
* store if iterated etc.
*/
public Collection values();
/**
* @return a Set of all the Map.Entry instances - the values will be lazily pulled out of the
* store if iterated etc.
*/
public Set entrySet();
/**
* Add an entry
* @param key
* @param value
* @return the old value for the key
*/
public Object put(Object key,Object value);
/**
* remove an entry associated with the key
* @param key
* @return the old value assocaited with the key or null
*/
public Object remove(Object key);
/**
* empty the container
*/
public void clear();
}

View File

@ -0,0 +1,49 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
/**
*Marshaller for marshalling in/out objects to a RawContainer
*
* @version $Revision: 1.2 $
*/
public interface Marshaller {
/**
* Write the payload of this entry to the RawContainer
* @param object
* @param dataOut
* @throws IOException
*/
public void writePayload(Object object, DataOutputStream dataOut) throws IOException;
/**
* Read the entry from the RawContainer
* @param dataIn
* @return unmarshalled object
* @throws IOException
*/
public Object readPayload(DataInputStream dataIn) throws IOException;
}

View File

@ -0,0 +1,65 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
/**
* Implementation of a Marshaller for Objects
*
* @version $Revision: 1.2 $
*/
public class ObjectMarshaller implements Marshaller{
/**
* Write the payload of this entry to the RawContainer
*
* @param object
* @param dataOut
* @throws IOException
*/
public void writePayload(Object object,DataOutputStream dataOut) throws IOException{
ByteArrayOutputStream bytesOut = new ByteArrayOutputStream();
ObjectOutputStream objectOut=new ObjectOutputStream(bytesOut);
objectOut.writeObject(object);
objectOut.close();
byte[] data = bytesOut.toByteArray();
dataOut.writeInt(data.length);
dataOut.write(data);
}
/**
* Read the entry from the RawContainer
*
* @param dataIn
* @return unmarshalled object
* @throws IOException
*/
public Object readPayload(DataInputStream dataIn) throws IOException{
int size = dataIn.readInt();
byte[] data = new byte[size];
dataIn.readFully(data);
ByteArrayInputStream bytesIn = new ByteArrayInputStream(data);
ObjectInputStream objectIn=new ObjectInputStream(bytesIn);
try{
return objectIn.readObject();
}catch(ClassNotFoundException e){
throw new IOException(e.getMessage());
}
}
}

View File

@ -0,0 +1,59 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha;
/**
*Runtime exception for the Store
*
* @version $Revision: 1.2 $
*/
public class RuntimeStoreException extends RuntimeException{
private static final long serialVersionUID=8807084681372365173L;
/**
* Constructor
*/
public RuntimeStoreException(){
super();
}
/**
* Constructor
* @param message
*/
public RuntimeStoreException(String message){
super(message);
}
/**
* Constructor
* @param message
* @param cause
*/
public RuntimeStoreException(String message,Throwable cause){
super(message,cause);
}
/**
* Constructor
* @param cause
*/
public RuntimeStoreException(Throwable cause){
super(cause);
}
}

View File

@ -0,0 +1,97 @@
package org.apache.activemq.kaha;
import java.io.IOException;
import java.util.Set;
/**
* A Store is holds persistent containers
*
* @version $Revision: 1.2 $
*/
public interface Store{
/**
* close the store
* @throws IOException
*/
public void close() throws IOException;
/**
* Force all writes to disk
* @throws IOException
*/
public void force() throws IOException;
/**
* empty all the contents of the store
* @throws IOException
*/
public void clear() throws IOException;
/**
* delete the store
* @return true if the delete was successful
* @throws IOException
*/
public boolean delete() throws IOException;
/**
* Checks if a MapContainer exists
* @param id
* @return new MapContainer
*/
public boolean doesMapContainerExist(Object id);
/**
* Get a MapContainer with the given id - the MapContainer is created if needed
* @param id
* @return container for the associated id or null if it doesn't exist
* @throws IOException
*/
public MapContainer getMapContainer(Object id) throws IOException;
/**
* delete a container
* @param id
* @throws IOException
*/
public void deleteMapContainer(Object id) throws IOException;
/**
* Get a Set of call MapContainer Ids
* @return the set of ids
*/
public Set getMapContainerIds();
/**
* Checks if a ListContainer exists
* @param id
* @return new MapContainer
*/
public boolean doesListContainerExist(Object id);
/**
* Get a ListContainer with the given id and creates it if it doesn't exist
* @param id
* @return container for the associated id or null if it doesn't exist
* @throws IOException
*/
public ListContainer getListContainer(Object id) throws IOException;
/**
* delete a ListContainer
* @param id
* @throws IOException
*/
public void deleteListContainer(Object id) throws IOException;
/**
* Get a Set of call ListContainer Ids
* @return the set of ids
*/
public Set getListContainerIds();
}

View File

@ -0,0 +1,47 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha;
import java.io.File;
import java.io.IOException;
import org.apache.activemq.kaha.impl.StoreImpl;
/**
* Factory for creating stores
*
* @version $Revision: 1.2 $
*/
public class StoreFactory{
/**
* open or create a Store
* @param name
* @param mode
* @return the opened/created store
* @throws IOException
*/
public static Store open(String name,String mode) throws IOException{
return new StoreImpl(name,mode);
}
/**
* Delete a database
* @param name of the database
* @return true if successful
*/
public static boolean delete(String name){
File file = new File(name);
return file.delete();
}
}

View File

@ -0,0 +1,46 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
/**
* Implementation of a Marshaller for Strings
*
* @version $Revision: 1.2 $
*/
public class StringMarshaller implements Marshaller{
/**
* Write the payload of this entry to the RawContainer
*
* @param object
* @param dataOut
* @throws IOException
*/
public void writePayload(Object object,DataOutputStream dataOut) throws IOException{
dataOut.writeUTF(object.toString());
}
/**
* Read the entry from the RawContainer
*
* @param dataIn
* @return unmarshalled object
* @throws IOException
*/
public Object readPayload(DataInputStream dataIn) throws IOException{
return dataIn.readUTF();
}
}

View File

@ -0,0 +1,45 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.io.IOException;
/**
* Occurs when bad magic occurs in reading a file
*
* @version $Revision: 1.2 $
*/
public class BadMagicException extends IOException{
/**
*
*/
private static final long serialVersionUID=-570930196733067056L;
/**
* Default Constructor
*
*/
public BadMagicException(){
super();
}
/**
* Construct an Exception with a reason
* @param s
*/
public BadMagicException(String s){
super(s);
}
}

View File

@ -0,0 +1,43 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl;
/**
* Base class for container collections
*
* @version $Revision: 1.2 $
*/
class ContainerCollectionSupport{
protected MapContainerImpl container;
protected ContainerCollectionSupport(MapContainerImpl container){
this.container = container;
}
public int size(){
return container.size();
}
public boolean isEmpty(){
return container.isEmpty();
}
}

View File

@ -0,0 +1,111 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
* Set of Map.Entry objects for a container
*
* @version $Revision: 1.2 $
*/
public class ContainerEntrySet extends ContainerCollectionSupport implements Set{
ContainerEntrySet(MapContainerImpl container){
super(container);
}
public boolean contains(Object o){
return container.entrySet().contains(o);
}
public Iterator iterator(){
return new ContainerEntrySetIterator(container,buildEntrySet().iterator());
}
public Object[] toArray(){
return buildEntrySet().toArray();
}
public Object[] toArray(Object[] a){
return buildEntrySet().toArray(a);
}
public boolean add(Object o){
throw new UnsupportedOperationException("Cannot add here");
}
public boolean remove(Object o){
boolean result=false;
if(buildEntrySet().remove(o)){
ContainerMapEntry entry=(ContainerMapEntry) o;
container.remove(entry.getKey());
}
return result;
}
public boolean containsAll(Collection c){
return buildEntrySet().containsAll(c);
}
public boolean addAll(Collection c){
throw new UnsupportedOperationException("Cannot add here");
}
public boolean retainAll(Collection c){
List tmpList=new ArrayList();
for(Iterator i=c.iterator();i.hasNext();){
Object o=i.next();
if(!contains(o)){
tmpList.add(o);
}
}
boolean result=false;
for(Iterator i=tmpList.iterator();i.hasNext();){
result|=remove(i.next());
}
return result;
}
public boolean removeAll(Collection c){
boolean result=true;
for(Iterator i=c.iterator();i.hasNext();){
if(!remove(i.next())){
result=false;
}
}
return result;
}
public void clear(){
container.clear();
}
protected Set buildEntrySet(){
Set set=new HashSet();
for(Iterator i=container.keySet().iterator();i.hasNext();){
ContainerMapEntry entry=new ContainerMapEntry(container,i.next());
set.add(entry);
}
return set;
}
}

View File

@ -0,0 +1,52 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.util.Iterator;
/**
* An Iterator for a container entry Set
*
* @version $Revision: 1.2 $
*/
public class ContainerEntrySetIterator implements Iterator{
private MapContainerImpl container;
private Iterator iter;
private ContainerMapEntry currentEntry;
ContainerEntrySetIterator(MapContainerImpl container,Iterator iter){
this.container = container;
this.iter = iter;
}
public boolean hasNext(){
return iter.hasNext();
}
public Object next(){
currentEntry = (ContainerMapEntry) iter.next();
return currentEntry;
}
public void remove(){
if (currentEntry != null){
container.remove(currentEntry.getKey());
}
}
}

View File

@ -0,0 +1,99 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
* A Set of keys for the container
*
* @version $Revision: 1.2 $
*/
public class ContainerKeySet extends ContainerCollectionSupport implements Set{
ContainerKeySet(MapContainerImpl container){
super(container);
}
public boolean contains(Object o){
return container.getInternalKeySet().contains(o);
}
public Iterator iterator(){
return new ContainerKeySetIterator(container,container.getInternalKeySet().iterator());
}
public Object[] toArray(){
return container.getInternalKeySet().toArray();
}
public Object[] toArray(Object[] a){
return container.getInternalKeySet().toArray(a);
}
public boolean add(Object o){
throw new UnsupportedOperationException("Cannot add here");
}
public boolean remove(Object o){
return container.remove(o) != null;
}
public boolean containsAll(Collection c){
return container.getInternalKeySet().containsAll(c);
}
public boolean addAll(Collection c){
throw new UnsupportedOperationException("Cannot add here");
}
public boolean retainAll(Collection c){
List tmpList = new ArrayList();
for (Iterator i = c.iterator(); i.hasNext(); ){
Object o = i.next();
if (!contains(o)){
tmpList.add(o);
}
}
for(Iterator i = tmpList.iterator(); i.hasNext();){
remove(i.next());
}
return !tmpList.isEmpty();
}
public boolean removeAll(Collection c){
boolean result = true;
for (Iterator i = c.iterator(); i.hasNext(); ){
if (!remove(i.next())){
result = false;
}
}
return result;
}
public void clear(){
container.clear();
}
}

View File

@ -0,0 +1,52 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.util.Iterator;
/**
*Iterator for the set of keys for a container
*
* @version $Revision: 1.2 $
*/
public class ContainerKeySetIterator implements Iterator{
private MapContainerImpl container;
private Iterator iter;
private Object currentKey;
ContainerKeySetIterator(MapContainerImpl container,Iterator iter){
this.container = container;
this.iter = iter;
}
public boolean hasNext(){
return iter.hasNext();
}
public Object next(){
currentKey = iter.next();
return currentKey;
}
public void remove(){
if (currentKey != null){
container.remove(currentKey);
}
}
}

View File

@ -0,0 +1,117 @@
/**
*
*/
package org.apache.activemq.kaha.impl;
import java.util.ListIterator;
/**
* @author rajdavies
*
*/
public class ContainerListIterator implements ListIterator{
private ListContainerImpl container;
private ListIterator iterator;
private LocatableItem current;
protected ContainerListIterator(ListContainerImpl container,ListIterator iterator){
this.container=container;
this.iterator=iterator;
this.current = container.internalGet(0);
}
/*
* (non-Javadoc)
*
* @see java.util.ListIterator#hasNext()
*/
public boolean hasNext(){
return iterator.hasNext();
}
/*
* (non-Javadoc)
*
* @see java.util.ListIterator#next()
*/
public Object next(){
Object result=null;
current=(LocatableItem) iterator.next();
if(current!=null){
result=container.getValue(current);
}
return result;
}
/*
* (non-Javadoc)
*
* @see java.util.ListIterator#hasPrevious()
*/
public boolean hasPrevious(){
return iterator.hasPrevious();
}
/*
* (non-Javadoc)
*
* @see java.util.ListIterator#previous()
*/
public Object previous(){
Object result=null;
current=(LocatableItem) iterator.previous();
if(current!=null){
result=container.getValue(current);
}
return result;
}
/*
* (non-Javadoc)
*
* @see java.util.ListIterator#nextIndex()
*/
public int nextIndex(){
return iterator.nextIndex();
}
/*
* (non-Javadoc)
*
* @see java.util.ListIterator#previousIndex()
*/
public int previousIndex(){
return iterator.previousIndex();
}
/*
* (non-Javadoc)
*
* @see java.util.ListIterator#remove()
*/
public void remove(){
iterator.remove();
if(current!=null){
container.remove(current);
}
}
/*
* (non-Javadoc)
*
* @see java.util.ListIterator#set(E)
*/
public void set(Object o){
LocatableItem item=container.internalSet(previousIndex()+1,o);
iterator.set(item);
}
/*
* (non-Javadoc)
*
* @see java.util.ListIterator#add(E)
*/
public void add(Object o){
LocatableItem item=container.internalAdd(previousIndex()+1,o);
iterator.set(item);
}
}

View File

@ -0,0 +1,50 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.util.Map;
import org.apache.activemq.kaha.MapContainer;
/**
* Map.Entry implementation for a container
*
* @version $Revision: 1.2 $
*/
class ContainerMapEntry implements Map.Entry {
private MapContainer container;
private Object key;
ContainerMapEntry(MapContainer container,Object key){
this.container = container;
this.key = key;
}
public Object getKey(){
return key;
}
public Object getValue(){
return container.get(key);
}
public Object setValue(Object value){
return container.put(key, value);
}
}

View File

@ -0,0 +1,138 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
/**
* Values collection for the MapContainer
*
* @version $Revision: 1.2 $
*/
class ContainerValueCollection extends ContainerCollectionSupport implements Collection{
ContainerValueCollection(MapContainerImpl container){
super(container);
}
public boolean contains(Object o){
return container.containsValue(o);
}
public Iterator iterator(){
LinkedList list=container.getItemList();
list = (LinkedList) list.clone();
return new ContainerValueCollectionIterator(container,list.iterator());
}
public Object[] toArray(){
Object[] result = null;
List list = container.getItemList();
synchronized(list){
result = new Object[list.size()];
int count = 0;
for(Iterator i=list.iterator();i.hasNext();){
LocatableItem item=(LocatableItem) i.next();
Object value=container.getValue(item);
result[count++] = value;
}
}
return result;
}
public Object[] toArray(Object[] result){
List list=container.getItemList();
synchronized(list){
if(result.length<list.size()){
int count=0;
result=(Object[]) java.lang.reflect.Array.newInstance(result.getClass().getComponentType(),list.size());
for(Iterator i=list.iterator();i.hasNext();){
LocatableItem item=(LocatableItem) i.next();
Object value=container.getValue(item);
result[count++]=value;
}
}
}
return result;
}
public boolean add(Object o){
throw new UnsupportedOperationException("Can't add an object here");
}
public boolean remove(Object o){
return container.removeValue(o);
}
public boolean containsAll(Collection c){
boolean result = !c.isEmpty();
for (Iterator i = c.iterator(); i.hasNext(); ){
if (!contains(i.next())){
result = false;
break;
}
}
return result;
}
public boolean addAll(Collection c){
throw new UnsupportedOperationException("Can't add everything here!");
}
public boolean removeAll(Collection c){
boolean result = true;
for (Iterator i = c.iterator(); i.hasNext(); ){
Object obj = i.next();
result&=remove(obj);
}
return result;
}
public boolean retainAll(Collection c){
List tmpList = new ArrayList();
for (Iterator i = c.iterator(); i.hasNext(); ){
Object o = i.next();
if (!contains(o)){
tmpList.add(o);
}
}
for(Iterator i = tmpList.iterator(); i.hasNext();){
remove(i.next());
}
return !tmpList.isEmpty();
}
public void clear(){
container.clear();
}
}

View File

@ -0,0 +1,50 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.util.Iterator;
/**
* Values collection iterator for the MapContainer
*
* @version $Revision: 1.2 $
*/
public class ContainerValueCollectionIterator implements Iterator{
private MapContainerImpl container;
private Iterator iter;
private LocatableItem currentItem;
ContainerValueCollectionIterator(MapContainerImpl container,Iterator iter){
this.container = container;
this.iter = iter;
}
public boolean hasNext(){
return iter.hasNext();
}
public Object next(){
currentItem = (LocatableItem) iter.next();
return container.getValue(currentItem);
}
public void remove(){
if (currentItem != null){
container.remove(currentItem);
}
}
}

View File

@ -0,0 +1,162 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Free space list in the Store
*
* @version $Revision: 1.2 $
*/
final class FreeSpaceManager{
private static final Log log = LogFactory.getLog(FreeSpaceManager.class);
static final int ROOT_SIZE=64;
static final int RESIZE_INCREMENT=4096*1024;
private Map map=new HashMap();
private Map prevMap=new HashMap();
private FreeSpaceTree tree=new FreeSpaceTree();
private StoreWriter writer;
private StoreReader reader;
private long dataEnd=ROOT_SIZE;
private long fileLength=-1;
FreeSpaceManager(StoreWriter writer,StoreReader reader) throws IOException{
this.writer=writer;
this.reader=reader;
this.fileLength=reader.length();
}
final Item getFreeSpace(Item item) throws IOException{
Item result=tree.getNextFreeSpace(item);
if(result==null){
while(dataEnd>=fileLength){
writer.allocateSpace(fileLength+RESIZE_INCREMENT);
fileLength=reader.length();
}
result=new Item();
result.setOffset(dataEnd);
int newSize = ((item.getSize()/8)+1)*8;
result.setSize(newSize);
dataEnd=dataEnd+result.getSize()+Item.HEAD_SIZE;
}else{
removeFreeSpace(result);
}
// reset the item
item.setActive(true);
item.setOffset(result.getOffset());
item.setSize(result.getSize());
return item;
}
final void addFreeSpace(Item item) throws IOException{
long currentOffset=reader.position();
reader.readHeader(item);
item.setActive(false);
// see if we can condense some space together
// first look for free space adjacent up the disk
Long nextKey=new Long(item.getOffset()+item.getSize()+Item.HEAD_SIZE);
Item next=(Item) map.remove(nextKey);
if(next!=null){
tree.removeItem(next);
Long prevKey=new Long(next.getOffset()+next.getSize()+Item.HEAD_SIZE);
prevMap.remove(prevKey);
int newSize=item.getSize()+next.getSize()+Item.HEAD_SIZE;
item.setSize(newSize);
}
// now see if there was a previous item
// in the next map
Long key=new Long(item.getOffset());
Item prev=(Item) prevMap.remove(key);
Long prevKey=prev!=null?new Long(prev.getOffset()):null;
if(prev!=null&&prevKey!=null){
// we can condense the free space
// first we are about to change the item so remove it from the tree
tree.removeItem(prev);
int newSize=prev.getSize()+item.getSize()+Item.HEAD_SIZE;
prev.setSize(newSize);
// update the header
writer.updateHeader(prev);
// put back in the tree
tree.addItem(prev);
}else{
// update the item header
writer.updateHeader(item);
tree.addItem(item);
map.put(key,item);
prevKey=new Long(item.getOffset()+item.getSize()+Item.HEAD_SIZE);
prevMap.put(prevKey,item);
}
reader.position(currentOffset);
}
/**
* validates and builds free list
*
* @throws IOException
*/
final void scanStoredItems() throws IOException{
if(reader.length()>ROOT_SIZE){
long offset=ROOT_SIZE;
while((offset+Item.HEAD_SIZE)<reader.length()){
Item item=new Item();
try{
reader.position(offset);
item.setOffset(offset);
reader.readHeader(item);
}catch(BadMagicException e){
log.error("Got bad magic reading stored items",e);
break;
}
if(item.getSize()>=0){
if(!item.isActive()){
addFreeSpace(item);
}
offset+=item.getSize()+Item.HEAD_SIZE;
}else{
// we've hit free space or end of file
break;
}
}
dataEnd=offset;
}else {
dataEnd = ROOT_SIZE;
}
}
private void removeFreeSpace(Item item){
if(item!=null){
long next=item.getOffset()+item.getSize()+Item.HEAD_SIZE;
Long nextKey=new Long(next);
prevMap.remove(nextKey);
Long key=new Long(item.getOffset());
map.remove(key);
}
}
void dump(PrintWriter printer){
printer.println("FreeSpace: map size = "+map.size()+", tree size = "+tree.size()+", prevMap size = "
+prevMap.size());
for(Iterator i=map.entrySet().iterator();i.hasNext();){
printer.println("map = "+i.next());
}
}
}

View File

@ -0,0 +1,109 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
/**
* A a wrapper for a TreeMap of free Items - sorted by size This enables us to re-use free Items on disk
*
* @version $Revision: 1.2 $
*/
class FreeSpaceTree{
private Map sizeMap=new HashMap();
private TreeMap tree=new TreeMap();
void addItem(Item item){
Long sizeKey=new Long(item.getSize());
Item old=(Item) tree.put(sizeKey,item);
if(old!=null){
// We'll preserve old items to reuse
List list=(List) sizeMap.get(sizeKey);
if(list==null){
list=new ArrayList();
sizeMap.put(sizeKey,list);
}
list.add(old);
}
}
boolean removeItem(Item item){
boolean result=false;
Long sizeKey=new Long(item.getSize());
Item retrieved=(Item) tree.get(sizeKey);
if(retrieved==item){
Object foo=tree.remove(sizeKey);
if(foo!=retrieved){
Thread.dumpStack();
System.exit(0);
}
result=true;
reconfigureTree(sizeKey);
}else{
List list=(List) sizeMap.get(sizeKey);
if(list!=null){
boolean foo=list.remove(item);
if(list.isEmpty()){
sizeMap.remove(sizeKey);
}
}
}
return result;
}
Item getNextFreeSpace(Item item){
Item result=null;
if(!tree.isEmpty()){
Long sizeKey=new Long(item.getSize());
SortedMap map=tree.tailMap(sizeKey);
if(map!=null&&!map.isEmpty()){
Long resultKey=(Long) map.firstKey();
result=(Item) map.get(resultKey);
if(result!=null){
// remove from the tree
tree.remove(resultKey);
reconfigureTree(resultKey);
}
}
}
return result;
}
void reconfigureTree(Long sizeKey){
List list=(List) sizeMap.get(sizeKey);
if(list!=null){
if(!list.isEmpty()){
Object newItem=list.remove(list.size()-1);
tree.put(sizeKey,newItem);
}
if(list.isEmpty()){
sizeMap.remove(sizeKey);
}
}
}
int size(){
int result=0;
for(Iterator i=sizeMap.values().iterator();i.hasNext();){
List list=(List) i.next();
result+=list.size();
}
return result+tree.size();
}
}

View File

@ -0,0 +1,116 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.activemq.kaha.Marshaller;
/**
* A a wrapper for a data in the store
*
* @version $Revision: 1.2 $
*/
public class Item{
static final long POSITION_NOT_SET=-1;
static final short MAGIC=31317;
static final int ACTIVE=22;
static final int FREE=33;
static final int HEAD_SIZE=8; // magic + active + len
static final int LOCATION_SIZE=24;
private long offset=POSITION_NOT_SET;
private int size;
private boolean active;
Item(){}
void writeHeader(DataOutput dataOut) throws IOException{
dataOut.writeShort(MAGIC);
dataOut.writeByte(active?ACTIVE:FREE);
dataOut.writeInt(size);
dataOut.writeByte(0);//padding
}
void readHeader(DataInput dataIn) throws IOException{
int magic=dataIn.readShort();
if(magic==MAGIC){
active=(dataIn.readByte()==ACTIVE);
size=dataIn.readInt();
}else if (magic == 0){
size = -999; //end of data
}else{
throw new BadMagicException("Unexpected Magic value: "+magic);
}
}
void writePayload(Marshaller marshaller,Object object,DataOutputStream dataOut) throws IOException{
marshaller.writePayload(object,dataOut);
}
Object readPayload(Marshaller marshaller,DataInputStream dataIn) throws IOException{
return marshaller.readPayload(dataIn);
}
void readLocation(DataInput dataIn) throws IOException{}
void writeLocation(DataOutput dataOut) throws IOException{}
/**
* @return Returns the size.
*/
int getSize(){
return size;
}
/**
* @param size
* The size to set.
*/
void setSize(int size){
this.size=size;
}
void setOffset(long pos){
offset=pos;
}
long getOffset(){
return offset;
}
/**
* @return Returns the active.
*/
boolean isActive(){
return active;
}
/**
* @param active
* The active to set.
*/
void setActive(boolean active){
this.active=active;
}
/**
* @return a pretty print
*/
public String toString(){
String result="offset = "+offset+" ,active = "+active+" , size = "+size;
return result;
}
}

View File

@ -0,0 +1,822 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.ObjectMarshaller;
import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Implementation of a ListContainer
*
* @version $Revision: 1.2 $
*/
public class ListContainerImpl implements ListContainer{
private static final Log log=LogFactory.getLog(MapContainerImpl.class);
protected StoreImpl store;
protected LocatableItem root;
protected Object id;
protected LinkedList list=new LinkedList();
protected boolean loaded=false;
protected Marshaller marshaller=new ObjectMarshaller();
protected boolean closed = false;
protected ListContainerImpl(Object id,StoreImpl rfs,LocatableItem root) throws IOException{
this.id=id;
this.store=rfs;
this.root=root;
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.ListContainer#load()
*/
public void load(){
checkClosed();
if(!loaded){
loaded=true;
long start=root.getNextItem();
if(start!=Item.POSITION_NOT_SET){
try{
long nextItem=start;
while(nextItem!=Item.POSITION_NOT_SET){
LocatableItem item=new LocatableItem();
item.setOffset(nextItem);
store.readLocation(item);
list.add(item);
nextItem=item.getNextItem();
}
}catch(IOException e){
log.error("Failed to load container "+getId(),e);
throw new RuntimeStoreException(e);
}
}
}
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.ListContainer#unload()
*/
public void unload(){
checkClosed();
if(loaded){
loaded = false;
list.clear();
}
}
public void close(){
unload();
closed = true;
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.ListContainer#isLoaded()
*/
public boolean isLoaded(){
checkClosed();
return loaded;
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.ListContainer#setKeyMarshaller(org.apache.activemq.kaha.Marshaller)
*/
public void setMarshaller(Marshaller marshaller){
checkClosed();
this.marshaller=marshaller;
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.ListContainer#getId()
*/
public Object getId(){
checkClosed();
return id;
}
public boolean equals(Object obj){
checkLoaded();
checkClosed();
boolean result = false;
if (obj != null && obj instanceof List){
List other = (List) obj;
synchronized(list){
result = other.size() == size();
if (result){
for (int i =0; i < list.size(); i++){
Object o1 = other.get(i);
Object o2 = get(i);
result = o1 == o2 || (o1 != null && o2 != null && o1.equals(o2));
if (!result) break;
}
}
}
}
return result;
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.ListContainer#size()
*/
public int size(){
checkClosed();
checkLoaded();
return list.size();
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.ListContainer#addFirst(java.lang.Object)
*/
public void addFirst(Object o){
checkClosed();
checkLoaded();
LocatableItem item=writeFirst(o);
synchronized(list){
list.addFirst(item);
}
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.ListContainer#addLast(java.lang.Object)
*/
public void addLast(Object o){
checkClosed();
checkLoaded();
LocatableItem item=writeLast(o);
synchronized(list){
list.addLast(item);
}
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.ListContainer#removeFirst()
*/
public Object removeFirst(){
checkClosed();
checkLoaded();
Object result=null;
synchronized(list){
LocatableItem item=(LocatableItem) list.getFirst();
if(item!=null){
result=getValue(item);
int index=list.indexOf(item);
LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
LocatableItem next=index<(list.size()-1)?(LocatableItem) list.get(index+1):null;
list.removeFirst();
delete(item,prev,next);
item=null;
}
}
return result;
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.ListContainer#removeLast()
*/
public Object removeLast(){
checkClosed();
checkLoaded();
Object result=null;
synchronized(list){
LocatableItem item=(LocatableItem) list.getLast();
if(item!=null){
result=getValue(item);
int index=list.indexOf(item);
LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
LocatableItem next=null;
list.removeLast();
delete(item,prev,next);
item=null;
}
}
return result;
}
/*
* (non-Javadoc)
*
* @see java.util.List#isEmpty()
*/
public boolean isEmpty(){
checkClosed();
checkLoaded();
return list.isEmpty();
}
/*
* (non-Javadoc)
*
* @see java.util.List#contains(java.lang.Object)
*/
public boolean contains(Object o){
checkClosed();
checkLoaded();
boolean result=false;
if(o!=null){
synchronized(list){
for(Iterator i=list.iterator();i.hasNext();){
LocatableItem item=(LocatableItem) i.next();
Object value=getValue(item);
if(value!=null&&value.equals(o)){
result=true;
break;
}
}
}
}
return result;
}
/*
* (non-Javadoc)
*
* @see java.util.List#iterator()
*/
public Iterator iterator(){
checkClosed();
checkLoaded();
return listIterator();
}
/*
* (non-Javadoc)
*
* @see java.util.List#toArray()
*/
public Object[] toArray(){
checkClosed();
checkLoaded();
List tmp=new ArrayList(list.size());
synchronized(list){
for(Iterator i=list.iterator();i.hasNext();){
LocatableItem item=(LocatableItem) i.next();
Object value=getValue(item);
tmp.add(value);
}
}
return tmp.toArray();
}
/*
* (non-Javadoc)
*
* @see java.util.List#toArray(T[])
*/
public Object[] toArray(Object[] a){
checkClosed();
checkLoaded();
List tmp=new ArrayList(list.size());
synchronized(list){
for(Iterator i=list.iterator();i.hasNext();){
LocatableItem item=(LocatableItem) i.next();
Object value=getValue(item);
tmp.add(value);
}
}
return tmp.toArray(a);
}
/*
* (non-Javadoc)
*
* @see java.util.List#add(E)
*/
public boolean add(Object o){
checkClosed();
checkLoaded();
addLast(o);
return true;
}
/*
* (non-Javadoc)
*
* @see java.util.List#remove(java.lang.Object)
*/
public boolean remove(Object o){
checkClosed();
checkLoaded();
boolean result=false;
synchronized(list){
for(Iterator i=list.iterator();i.hasNext();){
LocatableItem item=(LocatableItem) i.next();
Object value = getValue(item);
if (value != null && value.equals(o)){
remove(item);
break;
}
}
}
return result;
}
protected void remove(LocatableItem item){
synchronized(list){
int index=list.indexOf(item);
LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
LocatableItem next=index<(list.size()-1)?(LocatableItem) list.get(index+1):null;
list.remove(index);
delete(item,prev,next);
}
}
/*
* (non-Javadoc)
*
* @see java.util.List#containsAll(java.util.Collection)
*/
public boolean containsAll(Collection c){
checkClosed();
checkLoaded();
boolean result=false;
synchronized(list){
for(Iterator i=c.iterator();i.hasNext();){
Object obj=i.next();
if(!(result=contains(obj))){
result=false;
break;
}
}
}
return result;
}
/*
* (non-Javadoc)
*
* @see java.util.List#addAll(java.util.Collection)
*/
public boolean addAll(Collection c){
checkClosed();
checkLoaded();
boolean result=false;
for(Iterator i=c.iterator();i.hasNext();){
add(i.next());
result=true;
}
return result;
}
/*
* (non-Javadoc)
*
* @see java.util.List#addAll(int, java.util.Collection)
*/
public boolean addAll(int index,Collection c){
checkClosed();
checkLoaded();
boolean result=false;
ListIterator e1=listIterator(index);
Iterator e2=c.iterator();
while(e2.hasNext()){
e1.add(e2.next());
result=true;
}
return result;
}
/*
* (non-Javadoc)
*
* @see java.util.List#removeAll(java.util.Collection)
*/
public boolean removeAll(Collection c){
checkClosed();
checkLoaded();
boolean result=true;
for(Iterator i=c.iterator();i.hasNext();){
Object obj=i.next();
result&=remove(obj);
}
return result;
}
/*
* (non-Javadoc)
*
* @see java.util.List#retainAll(java.util.Collection)
*/
public boolean retainAll(Collection c){
checkClosed();
checkLoaded();
List tmpList=new ArrayList();
synchronized(list){
for(Iterator i = list.iterator(); i.hasNext();){
LocatableItem item = (LocatableItem) i.next();
Object o = getValue(item);
if(!c.contains(o)){
tmpList.add(o);
}
}
}
for(Iterator i=tmpList.iterator();i.hasNext();){
remove(i.next());
}
return !tmpList.isEmpty();
}
/*
* (non-Javadoc)
*
* @see java.util.List#clear()
*/
public void clear(){
checkClosed();
synchronized(list){
list.clear();
try {
long start=root.getNextItem();
if(start!=Item.POSITION_NOT_SET){
long nextItem=start;
while(nextItem!=Item.POSITION_NOT_SET){
LocatableItem item=new LocatableItem();
item.setOffset(nextItem);
list.add(item);
nextItem=item.getNextItem();
}
}
root.setNextItem(Item.POSITION_NOT_SET);
store.updateItem(root);
for(int i=0;i<list.size();i++){
LocatableItem item=(LocatableItem) list.get(i);
store.removeItem(item);
}
list.clear();
}catch(IOException e){
log.error("Failed to clear ListContainer "+getId(),e);
throw new RuntimeStoreException(e);
}
}
}
/*
* (non-Javadoc)
*
* @see java.util.List#get(int)
*/
public Object get(int index){
checkClosed();
checkLoaded();
Object result=null;
LocatableItem item=(LocatableItem) list.get(index);
if(item!=null){
result=getValue(item);
}
return result;
}
/*
* (non-Javadoc)
*
* @see java.util.List#set(int, E)
*/
public Object set(int index,Object element){
checkClosed();
checkLoaded();
Object result=null;
synchronized(list){
LocatableItem replace=list.isEmpty()?null:(LocatableItem) list.get(index);
LocatableItem prev=(list.isEmpty() || (index-1) < 0)?null:(LocatableItem) list.get(index-1);
LocatableItem next=(list.isEmpty() || (index+1) >= size())?null:(LocatableItem) list.get(index+1);
result=getValue(replace);
list.remove(index);
delete(replace,prev,next);
add(index,element);
}
return result;
}
protected LocatableItem internalSet(int index,Object element){
synchronized(list){
LocatableItem replace=list.isEmpty()?null:(LocatableItem) list.get(index);
LocatableItem prev=(list.isEmpty() || (index-1) < 0)?null:(LocatableItem) list.get(index-1);
LocatableItem next=(list.isEmpty() || (index+1) >= size())?null:(LocatableItem) list.get(index+1);
list.remove(index);
delete(replace,prev,next);
return internalAdd(index,element);
}
}
/*
* (non-Javadoc)
*
* @see java.util.List#add(int, E)
*/
public void add(int index,Object element){
checkClosed();
checkLoaded();
synchronized(list){
LocatableItem item=insert(index,element);
list.add(index,item);
}
}
protected LocatableItem internalAdd(int index,Object element){
synchronized(list){
LocatableItem item=insert(index,element);
list.add(index,item);
return item;
}
}
protected LocatableItem internalGet(int index){
synchronized(list){
if (index >= 0 && index < list.size()){
return (LocatableItem) list.get(index);
}
}
return null;
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.ListContainer#doRemove(int)
*/
public boolean doRemove(int index){
checkClosed();
checkLoaded();
boolean result=false;
synchronized(list){
LocatableItem item=(LocatableItem) list.get(index);
if(item!=null){
LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
LocatableItem next=index<(list.size()-1)?(LocatableItem) list.get(index+1):null;
list.remove(index);
delete(item,prev,next);
result=true;
}
}
return result;
}
/*
* (non-Javadoc)
*
* @see java.util.List#remove(int)
*/
public Object remove(int index){
checkClosed();
checkLoaded();
Object result=null;
synchronized(list){
LocatableItem item=(LocatableItem) list.get(index);
if(item!=null){
result=getValue(item);
LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
LocatableItem next=index<(list.size()-1)?(LocatableItem) list.get(index+1):null;
list.remove(index);
delete(item,prev,next);
}
}
return result;
}
/*
* (non-Javadoc)
*
* @see java.util.List#indexOf(java.lang.Object)
*/
public int indexOf(Object o){
checkClosed();
checkLoaded();
int result=-1;
if(o!=null){
synchronized(list){
int count=0;
for(Iterator i=list.iterator();i.hasNext();count++){
LocatableItem item=(LocatableItem) i.next();
Object value=getValue(item);
if(value!=null&&value.equals(o)){
result=count;
break;
}
}
}
}
return result;
}
/*
* (non-Javadoc)
*
* @see java.util.List#lastIndexOf(java.lang.Object)
*/
public int lastIndexOf(Object o){
checkClosed();
checkLoaded();
int result=-1;
if(o!=null){
synchronized(list){
int count=list.size()-1;
for(ListIterator i=list.listIterator();i.hasPrevious();count--){
LocatableItem item=(LocatableItem) i.previous();
Object value=getValue(item);
if(value!=null&&value.equals(o)){
result=count;
break;
}
}
}
}
return result;
}
/*
* (non-Javadoc)
*
* @see java.util.List#listIterator()
*/
public ListIterator listIterator(){
checkClosed();
checkLoaded();
ListIterator iter = ((List) list.clone()).listIterator();
return new ContainerListIterator(this,iter);
}
/*
* (non-Javadoc)
*
* @see java.util.List#listIterator(int)
*/
public ListIterator listIterator(int index){
checkClosed();
checkLoaded();
List result = (List) list.clone();
ListIterator iter = result.listIterator(index);
return new ContainerListIterator(this,iter);
}
/*
* (non-Javadoc)
*
* @see java.util.List#subList(int, int)
*/
public List subList(int fromIndex,int toIndex){
checkClosed();
checkLoaded();
List tmp = list.subList(fromIndex, toIndex);
LinkedList result = new LinkedList();
for (Iterator i = tmp.iterator(); i.hasNext();){
LocatableItem item = (LocatableItem) i.next();
result.add(getValue(item));
}
return result;
}
protected LocatableItem writeLast(Object value){
long pos=Item.POSITION_NOT_SET;
LocatableItem item=null;
try{
LocatableItem last=list.isEmpty()?null:(LocatableItem) list.getLast();
last=last==null?root:last;
long prev=last.getOffset();
long next=Item.POSITION_NOT_SET;
item=new LocatableItem(prev,next,pos);
next=store.storeItem(marshaller,value,item);
if(last!=null){
last.setNextItem(next);
store.updateItem(last);
}
}catch(IOException e){
log.error("Failed to write "+value,e);
throw new RuntimeStoreException(e);
}
return item;
}
protected LocatableItem writeFirst(Object value){
long pos=Item.POSITION_NOT_SET;
LocatableItem item=null;
try{
LocatableItem next=list.isEmpty()?null:(LocatableItem) list.getFirst();
LocatableItem last=root;
long prevPos=last.getOffset();
long nextPos=next!=null?next.getOffset():Item.POSITION_NOT_SET;
item=new LocatableItem(prevPos,nextPos,pos);
nextPos=store.storeItem(marshaller,value,item);
if(last!=null){
last.setNextItem(nextPos);
store.updateItem(last);
}
if(next!=null){
next.setPreviousItem(nextPos);
store.updateItem(next);
}
}catch(IOException e){
log.error("Failed to write "+value,e);
throw new RuntimeStoreException(e);
}
return item;
}
protected LocatableItem insert(int insertPos,Object value){
long pos=Item.POSITION_NOT_SET;
LocatableItem item=null;
try{
int lastPos=insertPos-1;
LocatableItem prev=(list.isEmpty() || (insertPos-1) < 0)?null:(LocatableItem) list.get(lastPos);
LocatableItem next=(list.isEmpty() || (insertPos+1) >= size())?null:(LocatableItem) list.get(insertPos+1);
prev=prev==null?root:prev;
long prevPos=prev.getOffset();
long nextPos=next!=null?next.getOffset():Item.POSITION_NOT_SET;
item=new LocatableItem(prevPos,nextPos,pos);
nextPos=store.storeItem(marshaller,value,item);
if(prev!=null){
prev.setNextItem(nextPos);
store.updateItem(prev);
}
if(next!=null){
next.setPreviousItem(nextPos);
store.updateItem(next);
}
}catch(IOException e){
log.error("Failed to insert "+value,e);
throw new RuntimeStoreException(e);
}
return item;
}
protected Object getValue(LocatableItem item){
Object result=null;
if(item!=null){
try{
result=store.readItem(marshaller,item);
}catch(IOException e){
log.error("Failed to get value for "+item,e);
throw new RuntimeStoreException(e);
}
}
return result;
}
protected void delete(LocatableItem item,LocatableItem prev,LocatableItem next){
try{
prev=prev==null?root:prev;
if(next!=null){
prev.setNextItem(next.getOffset());
next.setPreviousItem(prev.getOffset());
store.updateItem(next);
}else{
prev.setNextItem(Item.POSITION_NOT_SET);
}
store.updateItem(prev);
store.removeItem(item);
}catch(IOException e){
log.error("Failed to delete "+item,e);
throw new RuntimeStoreException(e);
}
}
protected final void checkClosed(){
if (closed){
throw new RuntimeStoreException("The store is closed");
}
}
protected final void checkLoaded(){
if (!loaded){
throw new RuntimeStoreException("The container is not loaded");
}
}
}

View File

@ -0,0 +1,126 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.apache.activemq.kaha.Marshaller;
/**
* A an Item with a relative postion and location to other Items in the Store
*
* @version $Revision: 1.2 $
*/
public final class LocatableItem extends Item implements Externalizable{
private static final long serialVersionUID=-6888731361600185708L;
private long previousItem=POSITION_NOT_SET;
private long nextItem=POSITION_NOT_SET;
private long referenceItem=POSITION_NOT_SET;
public LocatableItem(){}
public LocatableItem(long prev,long next,long objOffset) throws IOException{
this.previousItem=prev;
this.nextItem=next;
this.referenceItem=objOffset;
}
public void writePayload(Marshaller marshaller,Object object,DataOutputStream dataOut) throws IOException{
dataOut.writeLong(previousItem);
dataOut.writeLong(nextItem);
dataOut.writeLong(referenceItem);
super.writePayload(marshaller,object,dataOut);
}
public Object readPayload(Marshaller marshaller,DataInputStream dataIn) throws IOException{
previousItem=dataIn.readLong();
nextItem=dataIn.readLong();
referenceItem=dataIn.readLong();
return super.readPayload(marshaller, dataIn);
}
void readLocation(DataInput dataIn) throws IOException{
previousItem=dataIn.readLong();
nextItem=dataIn.readLong();
referenceItem=dataIn.readLong();
}
public void writeLocation(DataOutput dataOut) throws IOException{
dataOut.writeLong(previousItem);
dataOut.writeLong(nextItem);
}
public void setPreviousItem(long newPrevEntry){
previousItem=newPrevEntry;
}
public long getPreviousItem(){
return previousItem;
}
public void setNextItem(long newNextEntry){
nextItem=newNextEntry;
}
public long getNextItem(){
return nextItem;
}
public void setReferenceItem(long newObjectOffset){
referenceItem=newObjectOffset;
}
public long getReferenceItem(){
return referenceItem;
}
/* (non-Javadoc)
* @see org.apache.activemq.kaha.impl.Item#toString()
*/
public String toString(){
String result=super.toString();
result+=" , referenceItem = "+referenceItem+", previousItem = "+previousItem+" , nextItem = "+nextItem;
return result;
}
/* (non-Javadoc)
* @see java.io.Externalizable#writeExternal(java.io.ObjectOutput)
*/
public void writeExternal(ObjectOutput out) throws IOException{
out.writeLong(previousItem);
out.writeLong(nextItem);
out.writeLong(referenceItem);
}
/* (non-Javadoc)
* @see java.io.Externalizable#readExternal(java.io.ObjectInput)
*/
public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException{
previousItem = in.readLong();
nextItem = in.readLong();
referenceItem = in.readLong();
}
}

View File

@ -0,0 +1,476 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.ObjectMarshaller;
import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Implementation of a MapContainer
*
* @version $Revision: 1.2 $
*/
public class MapContainerImpl implements MapContainer{
private static final Log log=LogFactory.getLog(MapContainerImpl.class);
protected StoreImpl store;
protected LocatableItem root;
protected Object id;
protected Map map=new HashMap();
protected Map valueToKeyMap=new HashMap();
protected LinkedList list=new LinkedList();
protected boolean loaded=false;
protected Marshaller keyMarshaller=new ObjectMarshaller();
protected Marshaller valueMarshaller=new ObjectMarshaller();
protected final Object mutex=new Object();
protected boolean closed=false;
protected MapContainerImpl(Object id,StoreImpl rfs,LocatableItem root) throws IOException{
this.id=id;
this.store=rfs;
this.root=root;
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.MapContainer#load()
*/
public void load(){
checkClosed();
if(!loaded){
loaded=true;
synchronized(mutex){
try{
long start=root.getNextItem();
if(start!=Item.POSITION_NOT_SET){
long nextItem=start;
while(nextItem!=Item.POSITION_NOT_SET){
LocatableItem item=new LocatableItem();
item.setOffset(nextItem);
Object key=store.readItem(keyMarshaller,item);
map.put(key,item);
valueToKeyMap.put(item,key);
list.add(item);
nextItem=item.getNextItem();
}
}
}catch(IOException e){
log.error("Failed to load container "+getId(),e);
throw new RuntimeStoreException(e);
}
}
}
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.MapContainer#unload()
*/
public void unload(){
checkClosed();
if(loaded){
loaded=false;
synchronized(mutex){
map.clear();
valueToKeyMap.clear();
list.clear();
}
}
}
public void close(){
unload();
closed=true;
}
public void setKeyMarshaller(Marshaller keyMarshaller){
checkClosed();
this.keyMarshaller=keyMarshaller;
}
public void setValueMarshaller(Marshaller valueMarshaller){
checkClosed();
this.valueMarshaller=valueMarshaller;
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.MapContainer#isLoaded()
*/
public boolean isLoaded(){
checkClosed();
return loaded;
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.MapContainer#getId()
*/
public Object getId(){
checkClosed();
return id;
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.MapContainer#size()
*/
public int size(){
checkClosed();
checkLoaded();
return map.size();
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.MapContainer#isEmpty()
*/
public boolean isEmpty(){
checkClosed();
checkLoaded();
return map.isEmpty();
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.MapContainer#containsKey(java.lang.Object)
*/
public boolean containsKey(Object key){
checkClosed();
checkLoaded();
synchronized(mutex){
return map.containsKey(key);
}
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.MapContainer#get(java.lang.Object)
*/
public Object get(Object key){
checkClosed();
checkLoaded();
Object result=null;
LocatableItem item=null;
synchronized(mutex){
item=(LocatableItem) map.get(key);
}
if(item!=null){
result=getValue(item);
}
return result;
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.MapContainer#containsValue(java.lang.Object)
*/
public boolean containsValue(Object o){
checkClosed();
checkLoaded();
boolean result=false;
if(o!=null){
synchronized(list){
for(Iterator i=list.iterator();i.hasNext();){
LocatableItem item=(LocatableItem) i.next();
Object value=getValue(item);
if(value!=null&&value.equals(o)){
result=true;
break;
}
}
}
}
return result;
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.MapContainer#putAll(java.util.Map)
*/
public void putAll(Map t){
checkClosed();
checkLoaded();
if(t!=null){
synchronized(mutex){
for(Iterator i=t.entrySet().iterator();i.hasNext();){
Map.Entry entry=(Map.Entry) i.next();
put(entry.getKey(),entry.getValue());
}
}
}
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.MapContainer#keySet()
*/
public Set keySet(){
checkClosed();
checkLoaded();
return new ContainerKeySet(this);
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.MapContainer#values()
*/
public Collection values(){
checkClosed();
checkLoaded();
return new ContainerValueCollection(this);
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.MapContainer#entrySet()
*/
public Set entrySet(){
checkClosed();
checkLoaded();
return new ContainerEntrySet(this);
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.MapContainer#put(java.lang.Object, java.lang.Object)
*/
public Object put(Object key,Object value){
checkClosed();
checkLoaded();
Object result=null;
synchronized(mutex){
if(map.containsKey(key)){
result=remove(key);
}
LocatableItem item=write(key,value);
map.put(key,item);
valueToKeyMap.put(item,key);
list.add(item);
}
return result;
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.MapContainer#remove(java.lang.Object)
*/
public Object remove(Object key){
checkClosed();
checkLoaded();
Object result=null;
synchronized(mutex){
LocatableItem item=(LocatableItem) map.get(key);
if(item!=null){
map.remove(key);
valueToKeyMap.remove(item);
result=getValue(item);
int index=list.indexOf(item);
LocatableItem prev=index>0?(LocatableItem) list.get(index-1):root;
LocatableItem next=index<(list.size()-1)?(LocatableItem) list.get(index+1):null;
list.remove(index);
{
delete(item,prev,next);
}
item=null;
}
}
return result;
}
public boolean removeValue(Object o){
checkClosed();
checkLoaded();
boolean result=false;
if(o!=null){
synchronized(list){
for(Iterator i=list.iterator();i.hasNext();){
LocatableItem item=(LocatableItem) i.next();
Object value=getValue(item);
if(value!=null&&value.equals(o)){
result=true;
// find the key
Object key=valueToKeyMap.get(item);
if(key!=null){
remove(key);
}
break;
}
}
}
}
return result;
}
protected void remove(LocatableItem item){
Object key=valueToKeyMap.get(item);
if(key!=null){
remove(key);
}
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.MapContainer#clear()
*/
public void clear(){
checkClosed();
synchronized(mutex){
loaded=true;
synchronized(mutex){
map.clear();
valueToKeyMap.clear();
list.clear();// going to re-use this
try{
long start=root.getNextItem();
if(start!=Item.POSITION_NOT_SET){
long nextItem=start;
while(nextItem!=Item.POSITION_NOT_SET){
LocatableItem item=new LocatableItem();
item.setOffset(nextItem);
list.add(item);
nextItem=item.getNextItem();
}
}
root.setNextItem(Item.POSITION_NOT_SET);
store.updateItem(root);
for(int i=0;i<list.size();i++){
LocatableItem item=(LocatableItem) list.get(i);
if(item.getReferenceItem()!=Item.POSITION_NOT_SET){
Item value=new Item();
value.setOffset(item.getReferenceItem());
store.removeItem(value);
}
store.removeItem(item);
}
list.clear();
}catch(IOException e){
log.error("Failed to clear MapContainer "+getId(),e);
throw new RuntimeStoreException(e);
}
}
}
}
protected Set getInternalKeySet(){
return new HashSet(map.keySet());
}
protected LinkedList getItemList(){
return list;
}
protected Object getValue(LocatableItem item){
Object result=null;
if(item!=null&&item.getReferenceItem()!=Item.POSITION_NOT_SET){
Item rec=new Item();
rec.setOffset(item.getReferenceItem());
try{
result=store.readItem(valueMarshaller,rec);
}catch(IOException e){
log.error("Failed to get value for "+item,e);
throw new RuntimeStoreException(e);
}
}
return result;
}
protected LocatableItem write(Object key,Object value){
long pos=Item.POSITION_NOT_SET;
LocatableItem item=null;
try{
if(value!=null){
Item valueItem=new Item();
pos=store.storeItem(valueMarshaller,value,valueItem);
}
LocatableItem last=list.isEmpty()?null:(LocatableItem) list.getLast();
last=last==null?root:last;
long prev=last.getOffset();
long next=Item.POSITION_NOT_SET;
item=new LocatableItem(prev,next,pos);
next=store.storeItem(keyMarshaller,key,item);
if(last!=null){
last.setNextItem(next);
store.updateItem(last);
}
}catch(IOException e){
e.printStackTrace();
log.error("Failed to write "+key+" , "+value,e);
throw new RuntimeStoreException(e);
}
return item;
}
protected void delete(LocatableItem key,LocatableItem prev,LocatableItem next){
try{
prev=prev==null?root:prev;
if(next!=null){
prev.setNextItem(next.getOffset());
next.setPreviousItem(prev.getOffset());
store.updateItem(next);
}else{
prev.setNextItem(Item.POSITION_NOT_SET);
}
store.updateItem(prev);
if(key.getReferenceItem()!=Item.POSITION_NOT_SET){
Item value=new Item();
value.setOffset(key.getReferenceItem());
store.removeItem(value);
}
store.removeItem(key);
}catch(IOException e){
log.error("Failed to delete "+key,e);
throw new RuntimeStoreException(e);
}
}
protected final void checkClosed(){
if(closed){
throw new RuntimeStoreException("The store is closed");
}
}
protected final void checkLoaded(){
if(!loaded){
throw new RuntimeStoreException("The container is not loaded");
}
}
}

View File

@ -0,0 +1,97 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.io.IOException;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.ObjectMarshaller;
import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A container of roots for other Containers
*
* @version $Revision: 1.2 $
*/
class RootContainer extends MapContainerImpl{
private static final Log log=LogFactory.getLog(RootContainer.class);
protected static final Marshaller rootMarshaller = new ObjectMarshaller();
protected RootContainer(Object id,StoreImpl rfs,LocatableItem root) throws IOException{
super(id,rfs,root);
}
protected void addRoot(Object key,LocatableItem er) throws IOException{
if(map.containsKey(key)){
remove(key);
}
LocatableItem entry=writeRoot(key,er);
map.put(key,entry);
synchronized(list){
list.add(entry);
}
}
protected LocatableItem writeRoot(Object key,LocatableItem value){
long pos=Item.POSITION_NOT_SET;
LocatableItem item=null;
try{
if(value!=null){
pos=store.storeItem(rootMarshaller,value,value);
}
LocatableItem last=list.isEmpty()?null:(LocatableItem) list.getLast();
last=last==null?root:last;
long prev=last.getOffset();
long next=Item.POSITION_NOT_SET;
item=new LocatableItem(prev,next,pos);
if(log.isDebugEnabled())
log.debug("writing root ...");
if(log.isDebugEnabled())
log.debug("root = "+value);
next=store.storeItem(rootMarshaller,key,item);
if(last!=null){
last.setNextItem(next);
store.updateItem(last);
}
}catch(IOException e){
e.printStackTrace();
log.error("Failed to write root",e);
throw new RuntimeStoreException(e);
}
return item;
}
protected Object getValue(LocatableItem item){
LocatableItem result=null;
if(item!=null&&item.getReferenceItem()!=Item.POSITION_NOT_SET){
LocatableItem value=new LocatableItem();
value.setOffset(item.getReferenceItem());
try{
result=(LocatableItem) store.readItem(rootMarshaller,value);
//now read the item
result.setOffset(item.getReferenceItem());
store.readItem(rootMarshaller, result);
}catch(IOException e){
log.error("Could not read item "+item,e);
throw new RuntimeStoreException(e);
}
}
return result;
}
}

View File

@ -0,0 +1,149 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.io.ByteArrayInputStream;
/**
* Optimized ByteArrayInputStream that can be used more than once
*
* @version $Revision: 1.1.1.1 $
*/
public class StoreByteArrayInputStream extends ByteArrayInputStream {
/**
* Creates a <code>WireByteArrayInputStream</code>.
*
* @param buf the input buffer.
*/
public StoreByteArrayInputStream(byte buf[]) {
super(buf);
}
/**
* Creates <code>WireByteArrayInputStream</code> that uses <code>buf</code> as its buffer array.
*
* @param buf the input buffer.
* @param offset the offset in the buffer of the first byte to read.
* @param length the maximum number of bytes to read from the buffer.
*/
public StoreByteArrayInputStream(byte buf[], int offset, int length) {
super(buf, offset, length);
}
/**
* Creates <code>WireByteArrayInputStream</code> with a minmalist byte array
*/
public StoreByteArrayInputStream() {
super(new byte[0]);
}
/**
* @return the current position in the stream
*/
public int position(){
return pos;
}
/**
* @return the underlying data array
*/
public byte[] getRawData(){
return buf;
}
/**
* reset the <code>WireByteArrayInputStream</code> to use an new byte array
*
* @param newBuff buffer to use
* @param offset the offset in the buffer of the first byte to read.
* @param length the maximum number of bytes to read from the buffer.
*/
public void restart(byte[] newBuff, int offset, int length) {
buf = newBuff;
pos = offset;
count = Math.min(offset + length, newBuff.length);
mark = offset;
}
/**
* reset the <code>WireByteArrayInputStream</code> to use an new byte array
*
* @param newBuff
*/
public void restart(byte[] newBuff) {
restart(newBuff, 0, newBuff.length);
}
/**
* re-start the input stream - reusing the current buffer
* @param size
*/
public void restart(int size){
if (buf == null || buf.length < size){
buf = new byte[size];
}
restart(buf);
}
/**
* Reads the next byte of data from this input stream. The value byte is returned as an <code>int</code> in the
* range <code>0</code> to <code>255</code>. If no byte is available because the end of the stream has been
* reached, the value <code>-1</code> is returned.
* <p>
* This <code>read</code> method cannot block.
*
* @return the next byte of data, or <code>-1</code> if the end of the stream has been reached.
*/
public int read() {
return (pos < count) ? (buf[pos++] & 0xff) : -1;
}
/**
* Reads up to <code>len</code> bytes of data into an array of bytes from this input stream.
*
* @param b the buffer into which the data is read.
* @param off the start offset of the data.
* @param len the maximum number of bytes read.
* @return the total number of bytes read into the buffer, or <code>-1</code> if there is no more data because the
* end of the stream has been reached.
*/
public int read(byte b[], int off, int len) {
if (b == null) {
throw new NullPointerException();
}
else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
}
if (pos >= count) {
return -1;
}
if (pos + len > count) {
len = count - pos;
}
if (len <= 0) {
return 0;
}
System.arraycopy(buf, pos, b, off, len);
pos += len;
return len;
}
/**
* @return the number of bytes that can be read from the input stream without blocking.
*/
public int available() {
return count - pos;
}
}

View File

@ -0,0 +1,118 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.io.ByteArrayOutputStream;
/**
* Optimized ByteArrayOutputStream
*
* @version $Revision: 1.1.1.1 $
*/
public class StoreByteArrayOutputStream extends ByteArrayOutputStream {
/**
* Creates a new byte array output stream.
*/
public StoreByteArrayOutputStream() {
super(16 * 1024);
}
/**
* Creates a new byte array output stream, with a buffer capacity of the specified size, in bytes.
*
* @param size the initial size.
* @exception IllegalArgumentException if size is negative.
*/
public StoreByteArrayOutputStream(int size) {
super(size);
}
/**
* start using a fresh byte array
*
* @param size
*/
public void restart(int size) {
buf = new byte[size];
count = 0;
}
/**
* Writes the specified byte to this byte array output stream.
*
* @param b the byte to be written.
*/
public void write(int b) {
int newcount = count + 1;
if (newcount > buf.length) {
byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
System.arraycopy(buf, 0, newbuf, 0, count);
buf = newbuf;
}
buf[count] = (byte) b;
count = newcount;
}
/**
* Writes <code>len</code> bytes from the specified byte array starting at offset <code>off</code> to this byte
* array output stream.
*
* @param b the data.
* @param off the start offset in the data.
* @param len the number of bytes to write.
*/
public void write(byte b[], int off, int len) {
if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) {
throw new IndexOutOfBoundsException();
}
else if (len == 0) {
return;
}
int newcount = count + len;
if (newcount > buf.length) {
byte newbuf[] = new byte[Math.max(buf.length << 1, newcount)];
System.arraycopy(buf, 0, newbuf, 0, count);
buf = newbuf;
}
System.arraycopy(b, off, buf, count, len);
count = newcount;
}
/**
* @return the underlying byte[] buffer
*/
public byte[] getData() {
return buf;
}
/**
* reset the output stream
*/
public void reset(){
count = 0;
}
/**
* Set the current position for writing
* @param offset
*/
public void position(int offset){
if (offset > buf.length) {
byte newbuf[] = new byte[Math.max(buf.length << 1, offset)];
System.arraycopy(buf, 0, newbuf, 0, count);
buf = newbuf;
}
count = offset;
}
}

View File

@ -0,0 +1,380 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.RandomAccessFile;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.activemq.kaha.Store;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
/**
* Implementation of a Store
*
* @version $Revision: 1.2 $
*/
public class StoreImpl implements Store{
private static final Log log = LogFactory.getLog(StoreImpl.class);
private final Object mutex=new Object();
private RandomAccessFile dataFile;
private Map mapContainers=new ConcurrentHashMap();
private Map listContainers=new ConcurrentHashMap();
private RootContainer rootMapContainer;
private RootContainer rootListContainer;
private String name;
private StoreReader reader;
private StoreWriter writer;
private FreeSpaceManager freeSpaceManager;
protected boolean closed=false;
protected Thread shutdownHook;
public StoreImpl(String name,String mode) throws IOException{
this.name=name;
this.dataFile=new RandomAccessFile(name,mode);
this.reader = new StoreReader(this.dataFile);
this.writer = new StoreWriter(this.dataFile);
File file = new File(name);
log.info("Kaha Store opened " + file.getAbsolutePath());
freeSpaceManager=new FreeSpaceManager(this.writer,this.reader);
initialization();
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.Store#close()
*/
public void close() throws IOException{
synchronized(mutex){
if(!closed){
for(Iterator i=mapContainers.values().iterator();i.hasNext();){
MapContainerImpl container=(MapContainerImpl) i.next();
container.close();
}
for(Iterator i=listContainers.values().iterator();i.hasNext();){
ListContainerImpl container=(ListContainerImpl) i.next();
container.close();
}
force();
dataFile.close();
closed=true;
}
}
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.Store#force()
*/
public void force() throws IOException{
checkClosed();
synchronized(mutex){
dataFile.getFD().sync();
}
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.Store#clear()
*/
public void clear(){
checkClosed();
for(Iterator i=mapContainers.values().iterator();i.hasNext();){
MapContainer container=(MapContainer) i.next();
container.clear();
}
for(Iterator i=listContainers.values().iterator();i.hasNext();){
ListContainer container=(ListContainer) i.next();
container.clear();
}
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.Store#delete()
*/
public boolean delete() throws IOException{
checkClosed();
dataFile.close();
File file=new File(name);
return file.delete();
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.Store#doesMapContainerExist(java.lang.Object)
*/
public boolean doesMapContainerExist(Object id){
return mapContainers.containsKey(id);
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.Store#getContainer(java.lang.Object)
*/
public MapContainer getMapContainer(Object id) throws IOException{
checkClosed();
synchronized(mutex){
MapContainer result=(MapContainerImpl) mapContainers.get(id);
if(result==null){
LocatableItem root=new LocatableItem();
rootMapContainer.addRoot(id,root);
result=new MapContainerImpl(id,this,root);
mapContainers.put(id,result);
}
return result;
}
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.Store#deleteContainer(java.lang.Object)
*/
public void deleteMapContainer(Object id) throws IOException{
checkClosed();
synchronized(mutex){
if(doesMapContainerExist(id)){
MapContainer container=getMapContainer(id);
if(container!=null){
container.load();
container.clear();
rootMapContainer.remove(id);
mapContainers.remove(id);
}
}
}
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.Store#getContainerKeys()
*/
public Set getMapContainerIds(){
checkClosed();
return java.util.Collections.unmodifiableSet(mapContainers.keySet());
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.Store#doesListContainerExist(java.lang.Object)
*/
public boolean doesListContainerExist(Object id){
return listContainers.containsKey(id);
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.Store#getListContainer(java.lang.Object)
*/
public ListContainer getListContainer(Object id) throws IOException{
checkClosed();
synchronized(mutex){
ListContainer result=(ListContainerImpl) listContainers.get(id);
if(result==null){
LocatableItem root=new LocatableItem();
rootListContainer.addRoot(id,root);
result=new ListContainerImpl(id,this,root);
listContainers.put(id,result);
}
return result;
}
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.Store#deleteListContainer(java.lang.Object)
*/
public void deleteListContainer(Object id) throws IOException{
checkClosed();
synchronized(mutex){
if(doesListContainerExist(id)){
ListContainer container=getListContainer(id);
if(container!=null){
container.load();
container.clear();
rootListContainer.remove(id);
listContainers.remove(id);
}
}
}
}
/*
* (non-Javadoc)
*
* @see org.apache.activemq.kaha.Store#getListContainerIds()
*/
public Set getListContainerIds(){
checkClosed();
return java.util.Collections.unmodifiableSet(listContainers.keySet());
}
public void dumpFreeSpace(PrintWriter printer){
checkClosed();
synchronized(mutex){
freeSpaceManager.dump(printer);
}
}
protected long storeItem(Marshaller marshaller,Object payload,Item item) throws IOException{
synchronized(mutex){
int payloadSize = writer.loadPayload(marshaller, payload, item);
item.setSize(payloadSize);
// free space manager will set offset and write any headers required
// so the position should now be correct for writing
item=freeSpaceManager.getFreeSpace(item);
writer.storeItem(item,payloadSize);
}
return item.getOffset();
}
protected Object readItem(Marshaller marshaller,Item item) throws IOException{
synchronized(mutex){
return reader.readItem(marshaller, item);
}
}
protected void readHeader(Item item) throws IOException{
synchronized(mutex){
reader.readHeader(item);
}
}
protected void readLocation(Item item) throws IOException{
synchronized(mutex){
reader.readLocation(item);
}
}
protected void updateItem(Item item) throws IOException{
synchronized(mutex){
writer.updatePayload(item);
}
}
protected void removeItem(Item item) throws IOException{
synchronized(mutex){
freeSpaceManager.addFreeSpace(item);
}
}
private void initialization() throws IOException{
//add shutdown hook
addShutdownHook();
// check for new file
LocatableItem mapRoot=new LocatableItem();
LocatableItem listRoot=new LocatableItem();
if(dataFile.length()==0){
writer.allocateSpace(FreeSpaceManager.RESIZE_INCREMENT);
storeItem(RootContainer.rootMarshaller,"mapRoot",mapRoot);
storeItem(RootContainer.rootMarshaller,"listRoot",listRoot);
}else{
freeSpaceManager.scanStoredItems();
dataFile.seek(FreeSpaceManager.ROOT_SIZE);
mapRoot.setOffset(FreeSpaceManager.ROOT_SIZE);
readItem(RootContainer.rootMarshaller,mapRoot);
listRoot.setOffset(dataFile.getFilePointer());
readItem(RootContainer.rootMarshaller,listRoot);
}
rootMapContainer=new RootContainer("root",this,mapRoot);
rootMapContainer.load();
Set keys=rootMapContainer.keySet();
for(Iterator i=keys.iterator();i.hasNext();){
Object id=i.next();
if(id!=null){
LocatableItem item=(LocatableItem) rootMapContainer.get(id);
if(item!=null){
MapContainer container=new MapContainerImpl(id,this,item);
mapContainers.put(id,container);
}
}
}
rootListContainer=new RootContainer("root",this,listRoot);
rootListContainer.load();
keys=rootListContainer.keySet();
for(Iterator i=keys.iterator();i.hasNext();){
Object id=i.next();
if(id!=null){
LocatableItem item=(LocatableItem) rootListContainer.get(id);
if(item!=null){
ListContainer container=new ListContainerImpl(id,this,item);
listContainers.put(id,container);
}
}
}
}
protected void checkClosed(){
if(closed){
throw new RuntimeStoreException("The store is closed");
}
}
protected void addShutdownHook() {
shutdownHook = new Thread("Kaha Store implementation is shutting down") {
public void run() {
if (!closed){
try{
//this needs to be really quick so ...
closed = true;
dataFile.close();
}catch(Throwable e){
log.error("Failed to close data file",e);
}
}
}
};
Runtime.getRuntime().addShutdownHook(shutdownHook);
}
protected void removeShutdownHook() {
if (shutdownHook != null) {
try {
Runtime.getRuntime().removeShutdownHook(shutdownHook);
}
catch (Exception e) {
log.warn("Failed to run shutdown hook",e);
}
}
}
}

View File

@ -0,0 +1,75 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha.impl;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.activemq.kaha.Marshaller;
/**
* Optimized Store reader
*
* @version $Revision: 1.1.1.1 $
*/
class StoreReader{
protected RandomAccessFile dataFile;
protected StoreByteArrayInputStream bytesIn;
protected DataInputStream dataIn;
/**
* Construct a Store reader
*
* @param file
*/
StoreReader(RandomAccessFile file){
this.dataFile=file;
this.bytesIn=new StoreByteArrayInputStream();
this.dataIn=new DataInputStream(bytesIn);
}
protected void readHeader(Item item) throws IOException{
dataFile.seek(item.getOffset());
bytesIn.restart(Item.HEAD_SIZE);
dataFile.readFully(bytesIn.getRawData(),0,Item.HEAD_SIZE);
item.readHeader(dataIn);
}
protected void readLocation(Item item) throws IOException{
readHeader(item);
bytesIn.restart(Item.LOCATION_SIZE);
dataFile.readFully(bytesIn.getRawData(),0,Item.LOCATION_SIZE);
item.readLocation(dataIn);
}
protected Object readItem(Marshaller marshaller,Item item) throws IOException{
readHeader(item);
byte[] data=new byte[item.getSize()];
dataFile.readFully(data);
bytesIn.restart(data);
return item.readPayload(marshaller,dataIn);
}
long length() throws IOException{
return dataFile.length();
}
long position() throws IOException{
return dataFile.getFilePointer();
}
void position(long newPosition) throws IOException{
dataFile.seek(newPosition);
}
}

View File

@ -0,0 +1,119 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
/**
* Optimized writes to a RandomAcessFile
*
* @version $Revision: 1.1.1.1 $
*/
package org.apache.activemq.kaha.impl;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.activemq.kaha.Marshaller;
/**
* Optimized Store writer
*
* @version $Revision: 1.1.1.1 $
*/
class StoreWriter{
protected RandomAccessFile dataFile;
protected StoreByteArrayOutputStream bytesOut;
protected DataOutputStream dataOut;
/**
* Construct a Store writer
* @param file
*/
StoreWriter(RandomAccessFile file){
this.dataFile = file;
this.bytesOut = new StoreByteArrayOutputStream();
this.dataOut = new DataOutputStream(bytesOut);
}
void updateHeader(Item item) throws IOException{
bytesOut.reset();
item.writeHeader(dataOut);
dataFile.seek(item.getOffset());
dataFile.write(bytesOut.getData(),0,bytesOut.size());
}
void updatePayload(Item item) throws IOException{
bytesOut.reset();
dataFile.seek(item.getOffset() + Item.HEAD_SIZE);
item.writeLocation(dataOut);
dataFile.write(bytesOut.getData(),0,bytesOut.size());
}
int loadPayload(Marshaller marshaller, Object payload,Item item) throws IOException{
bytesOut.reset();
bytesOut.position(Item.HEAD_SIZE);
item.writePayload(marshaller, payload, dataOut);
return bytesOut.size() - Item.HEAD_SIZE;
}
void storeItem(Item item,int payloadSize) throws IOException{
bytesOut.reset();
item.writeHeader(dataOut);
dataFile.seek(item.getOffset());
dataFile.write(bytesOut.getData(),0,payloadSize+Item.HEAD_SIZE);
}
void writeShort(long offset, int value) throws IOException{
bytesOut.reset();
dataFile.seek(offset);
dataOut.writeShort(value);
dataFile.write(bytesOut.getData(),0,bytesOut.size());
}
void writeInt(long offset,int value) throws IOException{
bytesOut.reset();
dataFile.seek(offset);
dataOut.writeInt(value);
dataFile.write(bytesOut.getData(),0,bytesOut.size());
}
void writeLong(long offset,long value) throws IOException{
bytesOut.reset();
dataFile.seek(offset);
dataOut.writeLong(value);
dataFile.write(bytesOut.getData(),0,bytesOut.size());
}
long length() throws IOException{
return dataFile.length();
}
long position() throws IOException{
return dataFile.getFilePointer();
}
void position(long newPosition) throws IOException{
dataFile.seek(newPosition);
}
void allocateSpace(long newLength) throws IOException{
dataFile.getFD().sync();
long currentOffset=dataFile.getFilePointer();
dataFile.seek(newLength);
dataFile.write(0);
dataFile.seek(currentOffset);
dataFile.getFD().sync();
}
}

View File

@ -0,0 +1,11 @@
<html>
<head>
</head>
<body>
<p>
fast message persistence implementation
</p>
</body>
</html>

View File

@ -0,0 +1,45 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.activemq.kaha.Marshaller;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
/**
* Marshall an AtomicInteger
* @version $Revision: 1.10 $
*/
public class AtomicIntegerMarshaller implements Marshaller{
public void writePayload(Object object,DataOutputStream dataOut) throws IOException{
AtomicInteger ai = (AtomicInteger) object;
dataOut.writeInt(ai.get());
}
public Object readPayload(DataInputStream dataIn) throws IOException{
int value = dataIn.readInt();
return new AtomicInteger(value);
}
}

View File

@ -0,0 +1,54 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.activeio.Packet;
import org.activeio.command.WireFormat;
import org.activeio.packet.ByteArrayPacket;
import org.apache.activemq.kaha.Marshaller;
/**
* Marshall a Message or a MessageReference
* @version $Revision: 1.10 $
*/
public class CommandMarshaller implements Marshaller{
private WireFormat wireFormat;
public CommandMarshaller(WireFormat wireFormat){
this.wireFormat = wireFormat;
}
public void writePayload(Object object,DataOutputStream dataOut) throws IOException{
Packet packet = wireFormat.marshal(object);
byte[] data = packet.sliceAsBytes();
dataOut.writeInt(data.length);
dataOut.write(data);
}
public Object readPayload(DataInputStream dataIn) throws IOException{
int size=dataIn.readInt();
byte[] data=new byte[size];
dataIn.readFully(data);
return wireFormat.unmarshal(new ByteArrayPacket(data));
}
}

View File

@ -0,0 +1,92 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import java.io.IOException;
import java.util.Iterator;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
/**
* An implementation of {@link org.apache.activemq.store.MessageStore} which uses a JPS Container
*
* @version $Revision: 1.7 $
*/
public class KahaMessageStore implements MessageStore{
protected final ActiveMQDestination destination;
protected final MapContainer messageContainer;
public KahaMessageStore(MapContainer container,ActiveMQDestination destination) throws IOException{
this.messageContainer=container;
this.destination=destination;
}
public void addMessage(ConnectionContext context,Message message) throws IOException{
messageContainer.put(message.getMessageId().toString(),message);
}
public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
throws IOException{
messageContainer.put(messageId.toString(),messageRef);
}
public Message getMessage(MessageId identity) throws IOException{
return (Message) messageContainer.get(identity.toString());
}
public String getMessageReference(MessageId identity) throws IOException{
return (String) messageContainer.get(identity.toString());
}
public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
messageContainer.remove(ack.getLastMessageId().toString());
}
public void removeMessage(MessageId msgId) throws IOException{
messageContainer.remove(msgId.toString());
}
public void recover(MessageRecoveryListener listener) throws Exception{
for(Iterator iter=messageContainer.values().iterator();iter.hasNext();){
Object msg=(Object) iter.next();
if(msg.getClass()==String.class){
listener.recoverMessageReference((String) msg);
}else{
listener.recoverMessage((Message) msg);
}
}
listener.finished();
}
public void start() throws IOException{}
public void stop(long timeout) throws IOException{}
public void removeAllMessages(ConnectionContext context) throws IOException{
messageContainer.clear();
}
public ActiveMQDestination getDestination(){
return destination;
}
public void delete(){
messageContainer.clear();
}
}

View File

@ -0,0 +1,151 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.activeio.command.WireFormat;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory;
import org.apache.activemq.kaha.StringMarshaller;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.activemq.store.memory.MemoryTransactionStore;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
/**
* @org.apache.xbean.XBean
*
* @version $Revision: 1.4 $
*/
public class KahaPersistentAdaptor implements PersistenceAdapter{
MemoryTransactionStore transactionStore;
ConcurrentHashMap topics=new ConcurrentHashMap();
ConcurrentHashMap queues=new ConcurrentHashMap();
private boolean useExternalMessageReferences;
private WireFormat wireFormat = new OpenWireFormat();
Store store;
public KahaPersistentAdaptor(File dir) throws IOException{
if (!dir.exists()){
dir.mkdirs();
}
String name = dir.getAbsolutePath() + File.separator + "kaha.db";
store=StoreFactory.open(name,"rw");
}
public Set getDestinations(){
Set rc=new HashSet();
for(Iterator i=store.getMapContainerIds().iterator();i.hasNext();){
Object obj=i.next();
if(obj instanceof ActiveMQDestination){
rc.add(obj);
}
}
return rc;
}
public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
MessageStore rc=(MessageStore) queues.get(destination);
if(rc==null){
rc=new KahaMessageStore(getMapContainer(destination),destination);
if(transactionStore!=null){
rc=transactionStore.proxy(rc);
}
queues.put(destination,rc);
}
return rc;
}
public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{
TopicMessageStore rc=(TopicMessageStore) topics.get(destination);
if(rc==null){
MapContainer messageContainer=getMapContainer(destination);
MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions");
MapContainer ackContainer=store.getMapContainer(destination.toString()+"-Acks");
ackContainer.setKeyMarshaller(new StringMarshaller());
ackContainer.setValueMarshaller(new AtomicIntegerMarshaller());
ackContainer.load();
rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination);
if(transactionStore!=null){
rc=transactionStore.proxy(rc);
}
topics.put(destination,rc);
}
return rc;
}
public TransactionStore createTransactionStore() throws IOException{
if(transactionStore==null){
transactionStore=new MemoryTransactionStore();
}
return transactionStore;
}
public void beginTransaction(ConnectionContext context){}
public void commitTransaction(ConnectionContext context) throws IOException{
store.force();
}
public void rollbackTransaction(ConnectionContext context){}
public void start() throws Exception{}
public void stop() throws Exception{}
public long getLastMessageBrokerSequenceId() throws IOException{
return 0;
}
public void deleteAllMessages() throws IOException{
if(store!=null){
store.clear();
}
if(transactionStore!=null){
transactionStore.delete();
}
}
public boolean isUseExternalMessageReferences(){
return useExternalMessageReferences;
}
public void setUseExternalMessageReferences(boolean useExternalMessageReferences){
this.useExternalMessageReferences=useExternalMessageReferences;
}
protected MapContainer getMapContainer(Object id) throws IOException{
MapContainer container=store.getMapContainer(id);
container.setKeyMarshaller(new StringMarshaller());
if(useExternalMessageReferences){
container.setValueMarshaller(new StringMarshaller());
}else{
container.setValueMarshaller(new CommandMarshaller(wireFormat));
}
container.load();
return container;
}
}

View File

@ -0,0 +1,162 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StringMarshaller;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
/**
* @version $Revision: 1.5 $
*/
public class KahaTopicMessageStore extends KahaMessageStore implements TopicMessageStore{
private Map ackContainer;
private Map subscriberContainer;
private Store store;
private Map subscriberAcks=new ConcurrentHashMap();
public KahaTopicMessageStore(Store store,MapContainer messageContainer,MapContainer ackContainer,
MapContainer subsContainer,ActiveMQDestination destination) throws IOException{
super(messageContainer,destination);
this.store=store;
this.ackContainer=ackContainer;
subscriberContainer=subsContainer;
// load all the Ack containers
for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
Object key=i.next();
addSubscriberAckContainer(key);
}
}
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
int subscriberCount=subscriberAcks.size();
if(subscriberCount>0){
super.addMessage(context,message);
String id=message.getMessageId().toString();
ackContainer.put(id,new AtomicInteger(subscriberCount));
for(Iterator i=subscriberAcks.keySet().iterator();i.hasNext();){
Object key=i.next();
ListContainer container=store.getListContainer(key);
container.add(id);
}
}
}
public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
MessageId messageId) throws IOException{
String subcriberId=getSubscriptionKey(clientId,subscriptionName);
String id=messageId.toString();
ListContainer container=(ListContainer) subscriberAcks.get(subcriberId);
if(container!=null){
container.remove(id);
AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
if(count!=null){
if(count.decrementAndGet()>0){
ackContainer.put(id,count);
}else{
// no more references to message messageContainer so remove it
container.remove(id);
}
}
}
}
public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
return (SubscriptionInfo) subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
}
public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
throws IOException{
SubscriptionInfo info=new SubscriptionInfo();
info.setDestination(destination);
info.setClientId(clientId);
info.setSelector(selector);
info.setSubcriptionName(subscriptionName);
String key=getSubscriptionKey(clientId,subscriptionName);
subscriberContainer.put(key,info);
addSubscriberAckContainer(key);
}
public synchronized void deleteSubscription(String clientId,String subscriptionName){
String key=getSubscriptionKey(clientId,subscriptionName);
subscriberContainer.remove(key);
ListContainer list=(ListContainer) subscriberAcks.get(key);
for(Iterator i=list.iterator();i.hasNext();){
String id=i.next().toString();
AtomicInteger count=(AtomicInteger) ackContainer.remove(id);
if(count!=null){
if(count.decrementAndGet()>0){
ackContainer.put(id,count);
}else{
// no more references to message messageContainer so remove it
messageContainer.remove(id);
}
}
}
}
public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
throws Exception{
String key=getSubscriptionKey(clientId,subscriptionName);
ListContainer list=(ListContainer) subscriberAcks.get(key);
for(Iterator i=list.iterator();i.hasNext();){
Object msg=messageContainer.get(i.next());
if(msg!=null){
if(msg.getClass()==String.class){
listener.recoverMessageReference((String) msg);
}else{
listener.recoverMessage((Message) msg);
}
}
listener.finished();
}
}
public void delete(){
super.delete();
ackContainer.clear();
subscriberContainer.clear();
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException{
return (SubscriptionInfo[]) subscriberContainer.values().toArray(
new SubscriptionInfo[subscriberContainer.size()]);
}
protected String getSubscriptionKey(String clientId,String subscriberName){
String result=clientId+":";
result+=subscriberName!=null?subscriberName:"NOT_SET";
return result;
}
protected void addSubscriberAckContainer(Object key) throws IOException{
ListContainer container=store.getListContainer(key);
container.setMarshaller(new StringMarshaller());
container.load();
subscriberAcks.put(key,container);
}
}

View File

@ -0,0 +1,107 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.memory;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
/**
* An implementation of {@link org.apache.activemq.store.MessageStore} which uses a
*
* @version $Revision: 1.7 $
*/
public class MemoryMessageStore implements MessageStore {
protected final ActiveMQDestination destination;
protected final Map messageTable;
public MemoryMessageStore(ActiveMQDestination destination) {
this(destination, new LinkedHashMap());
}
public MemoryMessageStore(ActiveMQDestination destination, Map messageTable) {
this.destination = destination;
this.messageTable = Collections.synchronizedMap(messageTable);
}
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
messageTable.put(message.getMessageId(), message);
}
public void addMessageReference(ConnectionContext context, MessageId messageId, long expirationTime, String messageRef) throws IOException {
messageTable.put(messageId, messageRef);
}
public Message getMessage(MessageId identity) throws IOException {
return (Message) messageTable.get(identity);
}
public String getMessageReference(MessageId identity) throws IOException {
return (String) messageTable.get(identity);
}
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
messageTable.remove(ack.getLastMessageId());
}
public void removeMessage(MessageId msgId) throws IOException {
messageTable.remove(msgId);
}
public void recover(MessageRecoveryListener listener) throws Exception {
// the message table is a synchronizedMap - so just have to synchronize here
synchronized(messageTable){
for(Iterator iter=messageTable.values().iterator();iter.hasNext();){
Object msg=(Object) iter.next();
if(msg.getClass()==String.class){
listener.recoverMessageReference((String) msg);
}else{
listener.recoverMessage((Message) msg);
}
}
listener.finished();
}
}
public void start() throws IOException {
}
public void stop(long timeout) throws IOException {
}
public void removeAllMessages(ConnectionContext context) throws IOException {
messageTable.clear();
}
public ActiveMQDestination getDestination() {
return destination;
}
public void delete() {
messageTable.clear();
}
}

View File

@ -0,0 +1,150 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.memory;
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionStore;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
/**
* @org.apache.xbean.XBean
*
* @version $Revision: 1.4 $
*/
public class MemoryPersistenceAdapter implements PersistenceAdapter {
private static final Log log = LogFactory.getLog(MemoryPersistenceAdapter.class);
MemoryTransactionStore transactionStore;
ConcurrentHashMap topics = new ConcurrentHashMap();
ConcurrentHashMap queues = new ConcurrentHashMap();
private boolean useExternalMessageReferences;
public Set getDestinations() {
Set rc = new HashSet(queues.size()+topics.size());
for (Iterator iter = queues.keySet().iterator(); iter.hasNext();) {
rc.add( iter.next() );
}
for (Iterator iter = topics.keySet().iterator(); iter.hasNext();) {
rc.add( iter.next() );
}
return rc;
}
public static MemoryPersistenceAdapter newInstance(File file) {
return new MemoryPersistenceAdapter();
}
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
MessageStore rc = (MessageStore)queues.get(destination);
if(rc==null) {
rc = new MemoryMessageStore(destination);
if( transactionStore !=null ) {
rc = transactionStore.proxy(rc);
}
queues.put(destination, rc);
}
return rc;
}
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
TopicMessageStore rc = (TopicMessageStore)topics.get(destination);
if(rc==null) {
rc = new MemoryTopicMessageStore(destination);
if( transactionStore !=null ) {
rc = transactionStore.proxy(rc);
}
topics.put(destination, rc);
}
return rc;
}
public TransactionStore createTransactionStore() throws IOException {
if( transactionStore==null ) {
transactionStore = new MemoryTransactionStore();
}
return transactionStore;
}
public void beginTransaction(ConnectionContext context) {
}
public void commitTransaction(ConnectionContext context) {
}
public void rollbackTransaction(ConnectionContext context) {
}
public void start() throws Exception {
}
public void stop() throws Exception {
}
public long getLastMessageBrokerSequenceId() throws IOException {
return 0;
}
public void deleteAllMessages() throws IOException {
for (Iterator iter = topics.values().iterator(); iter.hasNext();) {
MemoryMessageStore store = asMemoryMessageStore(iter.next());
if (store != null) {
store.delete();
}
}
for (Iterator iter = queues.values().iterator(); iter.hasNext();) {
MemoryMessageStore store = asMemoryMessageStore(iter.next());
if (store != null) {
store.delete();
}
}
if (transactionStore != null) {
transactionStore.delete();
}
}
public boolean isUseExternalMessageReferences() {
return useExternalMessageReferences;
}
public void setUseExternalMessageReferences(boolean useExternalMessageReferences) {
this.useExternalMessageReferences = useExternalMessageReferences;
}
protected MemoryMessageStore asMemoryMessageStore(Object value) {
if (value instanceof MemoryMessageStore) {
return (MemoryMessageStore) value;
}
log.warn("Expected an instance of MemoryMessageStore but was: " + value);
return null;
}
}

View File

@ -0,0 +1,124 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.memory;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.util.SubscriptionKey;
/**
* @version $Revision: 1.5 $
*/
public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore {
private Map ackDatabase;
private Map subscriberDatabase;
MessageId lastMessageId;
public MemoryTopicMessageStore(ActiveMQDestination destination) {
this(destination, new LinkedHashMap(), makeMap(), makeMap());
}
protected static Map makeMap() {
return Collections.synchronizedMap(new HashMap());
}
public MemoryTopicMessageStore(ActiveMQDestination destination, Map messageTable, Map subscriberDatabase, Map ackDatabase) {
super(destination, messageTable);
this.subscriberDatabase = subscriberDatabase;
this.ackDatabase = ackDatabase;
}
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
super.addMessage(context, message);
lastMessageId = message.getMessageId();
}
public void acknowledge(ConnectionContext context, String clientId, String subscriptionName, MessageId messageId) throws IOException {
ackDatabase.put(new SubscriptionKey(clientId, subscriptionName), messageId);
}
public SubscriptionInfo lookupSubscription(String clientId, String subscriptionName) throws IOException {
return (SubscriptionInfo) subscriberDatabase.get(new SubscriptionKey(clientId, subscriptionName));
}
public void addSubsciption(String clientId, String subscriptionName, String selector, boolean retroactive) throws IOException {
SubscriptionInfo info = new SubscriptionInfo();
info.setDestination(destination);
info.setClientId(clientId);
info.setSelector(selector);
info.setSubcriptionName(subscriptionName);
SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
subscriberDatabase.put(key, info);
MessageId l=retroactive ? null : lastMessageId;
if( l!=null ) {
ackDatabase.put(key, l);
}
}
public void deleteSubscription(String clientId, String subscriptionName) {
org.apache.activemq.util.SubscriptionKey key = new SubscriptionKey(clientId, subscriptionName);
ackDatabase.remove(key);
subscriberDatabase.remove(key);
}
public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
throws Exception{
MessageId lastAck=(MessageId) ackDatabase.get(new SubscriptionKey(clientId,subscriptionName));
boolean pastLastAck=lastAck==null;
// the message table is a synchronizedMap - so just have to synchronize here
synchronized(messageTable){
for(Iterator iter=messageTable.entrySet().iterator();iter.hasNext();){
Map.Entry entry=(Entry) iter.next();
if(pastLastAck){
Object msg=entry.getValue();
if(msg.getClass()==String.class){
listener.recoverMessageReference((String) msg);
}else{
listener.recoverMessage((Message) msg);
}
}else{
pastLastAck=entry.getKey().equals(lastAck);
}
}
listener.finished();
}
}
public void delete() {
super.delete();
ackDatabase.clear();
subscriberDatabase.clear();
lastMessageId=null;
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return (SubscriptionInfo[]) subscriberDatabase.values().toArray(new SubscriptionInfo[subscriberDatabase.size()]);
}
}

View File

@ -0,0 +1,256 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.memory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import javax.transaction.xa.XAException;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.XATransactionId;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ProxyMessageStore;
import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TransactionRecoveryListener;
import org.apache.activemq.store.TransactionStore;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
/**
* Provides a TransactionStore implementation that can create transaction aware
* MessageStore objects from non transaction aware MessageStore objects.
*
* @version $Revision: 1.4 $
*/
public class MemoryTransactionStore implements TransactionStore {
ConcurrentHashMap inflightTransactions = new ConcurrentHashMap();
ConcurrentHashMap preparedTransactions = new ConcurrentHashMap();
private boolean doingRecover;
public static class Tx {
private ArrayList messages = new ArrayList();
private ArrayList acks = new ArrayList();
public void add(AddMessageCommand msg) {
messages.add(msg);
}
public void add(RemoveMessageCommand ack) {
acks.add(ack);
}
public Message[] getMessages() {
Message rc[] = new Message[messages.size()];
int count=0;
for (Iterator iter = messages.iterator(); iter.hasNext();) {
AddMessageCommand cmd = (AddMessageCommand) iter.next();
rc[count++] = cmd.getMessage();
}
return rc;
}
public MessageAck[] getAcks() {
MessageAck rc[] = new MessageAck[acks.size()];
int count=0;
for (Iterator iter = acks.iterator(); iter.hasNext();) {
RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next();
rc[count++] = cmd.getMessageAck();
}
return rc;
}
/**
* @throws IOException
*/
public void commit() throws IOException {
// Do all the message adds.
for (Iterator iter = messages.iterator(); iter.hasNext();) {
AddMessageCommand cmd = (AddMessageCommand) iter.next();
cmd.run();
}
// And removes..
for (Iterator iter = acks.iterator(); iter.hasNext();) {
RemoveMessageCommand cmd = (RemoveMessageCommand) iter.next();
cmd.run();
}
}
}
public interface AddMessageCommand {
Message getMessage();
void run() throws IOException;
}
public interface RemoveMessageCommand {
MessageAck getMessageAck();
void run() throws IOException;
}
public MessageStore proxy(MessageStore messageStore) {
return new ProxyMessageStore(messageStore) {
public void addMessage(ConnectionContext context, final Message send) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), send);
}
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
}
};
}
public TopicMessageStore proxy(TopicMessageStore messageStore) {
return new ProxyTopicMessageStore(messageStore) {
public void addMessage(ConnectionContext context, final Message send) throws IOException {
MemoryTransactionStore.this.addMessage(getDelegate(), send);
}
public void removeMessage(ConnectionContext context, final MessageAck ack) throws IOException {
MemoryTransactionStore.this.removeMessage(getDelegate(), ack);
}
};
}
/**
* @see org.apache.activemq.store.TransactionStore#prepare(TransactionId)
*/
public void prepare(TransactionId txid) {
Tx tx = (Tx) inflightTransactions.remove(txid);
if (tx == null)
return;
preparedTransactions.put(txid, tx);
}
public Tx getTx(Object txid) {
Tx tx = (Tx) inflightTransactions.get(txid);
if (tx == null) {
tx = new Tx();
inflightTransactions.put(txid, tx);
}
return tx;
}
/**
* @throws XAException
* @see org.apache.activemq.store.TransactionStore#commit(org.apache.activemq.service.Transaction)
*/
public void commit(TransactionId txid, boolean wasPrepared) throws IOException {
Tx tx;
if( wasPrepared ) {
tx = (Tx) preparedTransactions.remove(txid);
} else {
tx = (Tx) inflightTransactions.remove(txid);
}
if( tx == null )
return;
tx.commit();
}
/**
* @see org.apache.activemq.store.TransactionStore#rollback(TransactionId)
*/
public void rollback(TransactionId txid) {
preparedTransactions.remove(txid);
inflightTransactions.remove(txid);
}
public void start() throws Exception {
}
public void stop() throws Exception {
}
synchronized public void recover(TransactionRecoveryListener listener) throws IOException {
// All the inflight transactions get rolled back..
inflightTransactions.clear();
this.doingRecover = true;
try {
for (Iterator iter = preparedTransactions.keySet().iterator(); iter.hasNext();) {
Object txid = (Object) iter.next();
Tx tx = (Tx) preparedTransactions.get(txid);
listener.recover((XATransactionId) txid, tx.getMessages(), tx.getAcks());
}
} finally {
this.doingRecover = false;
}
}
/**
* @param message
* @throws IOException
*/
void addMessage(final MessageStore destination, final Message message) throws IOException {
if( doingRecover )
return;
if (message.getTransactionId()!=null) {
Tx tx = getTx(message.getTransactionId());
tx.add(new AddMessageCommand() {
public Message getMessage() {
return message;
}
public void run() throws IOException {
destination.addMessage(null, message);
}
});
} else {
destination.addMessage(null, message);
}
}
/**
* @param ack
* @throws IOException
*/
private void removeMessage(final MessageStore destination,final MessageAck ack) throws IOException {
if( doingRecover )
return;
if (ack.isInTransaction()) {
Tx tx = getTx(ack.getTransactionId());
tx.add(new RemoveMessageCommand() {
public MessageAck getMessageAck() {
return ack;
}
public void run() throws IOException {
destination.removeMessage(null, ack);
}
});
} else {
destination.removeMessage(null, ack);
}
}
public void delete() {
inflightTransactions.clear();
preparedTransactions.clear();
doingRecover=false;
}
}

View File

@ -0,0 +1,11 @@
<html>
<head>
</head>
<body>
<p>
kaha implementation of message persistence for the broker
</p>
</body>
</html>

View File

@ -0,0 +1,315 @@
package org.apache.activemq.kaha;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory;
import junit.framework.TestCase;
public class ListContainerTest extends TestCase{
protected String name = "test";
protected Store store;
protected ListContainer container;
protected LinkedList testList;
protected static final int COUNT = 10;
/*
* Test method for 'org.apache.activemq.kaha.ListContainer.size()'
*/
public void testSize()throws Exception {
container.addAll(testList);
assertEquals(container.size(),testList.size());
}
/*
* Test method for 'org.apache.activemq.kaha.ListContainer.addFirst(Object)'
*/
public void testAddFirst()throws Exception {
container.addAll(testList);
assertEquals(container.size(),testList.size());
String first = "first";
container.addFirst(first);
assertEquals(first,container.get(0));
assertEquals(container.size(),testList.size()+1);
}
/*
* Test method for 'org.apache.activemq.kaha.ListContainer.addLast(Object)'
*/
public void testAddLast()throws Exception {
container.addAll(testList);
assertEquals(container.size(),testList.size());
String last = "last";
container.addLast(last);
assertEquals(last,container.get(testList.size()));
assertEquals(container.size(),testList.size()+1);
}
/*
* Test method for 'org.apache.activemq.kaha.ListContainer.removeFirst()'
*/
public void testRemoveFirst()throws Exception {
container.addAll(testList);
assertEquals(container.size(),testList.size());
assertEquals(testList.get(0),container.removeFirst());
assertEquals(container.size(),testList.size()-1);
for (int i =1; i < testList.size(); i++){
assertEquals(testList.get(i),container.get(i-1));
}
}
/*
* Test method for 'org.apache.activemq.kaha.ListContainer.removeLast()'
*/
public void testRemoveLast()throws Exception {
container.addAll(testList);
assertEquals(container.size(),testList.size());
assertEquals(testList.get(testList.size()-1),container.removeLast());
assertEquals(container.size(),testList.size()-1);
for (int i =0; i < testList.size()-1; i++){
assertEquals(testList.get(i),container.get(i));
}
}
/*
* Test method for 'java.util.List.iterator()'
*/
public void testIterator()throws Exception {
container.addAll(testList);
for (Iterator i = testList.iterator(), j = container.iterator(); i.hasNext();){
assertEquals(i.next(),j.next());
}
for (Iterator i = container.iterator(); i.hasNext();){
i.next();
i.remove();
}
assert(container.isEmpty());
}
/*
* Test method for 'java.util.List.isEmpty()'
*/
public void testIsEmpty()throws Exception {
assertTrue(container.isEmpty());
}
/*
* Test method for 'java.util.List.contains(Object)'
*/
public void testContains()throws Exception {
container.addAll(testList);
for (Iterator i = testList.iterator(), j = container.iterator(); i.hasNext();){
assertTrue(container.contains(i.next()));
}
}
/*
* Test method for 'java.util.List.toArray()'
*/
public void testToArray()throws Exception {
container.addAll(testList);
Object[] a = testList.toArray();
Object[] b = container.toArray();
assertEquals(a.length,b.length);
for (int i = 0 ; i < a.length; i++){
assertEquals(a[i],b[i]);
}
}
/*
* Test method for 'java.util.List.remove(Object)'
*/
public void testRemoveObject()throws Exception {
container.addAll(testList);
assertEquals(container.size(),testList.size());
for (int i =0; i < testList.size(); i++){
container.remove(testList.get(i));
}
assertTrue(container.isEmpty());
}
/*
* Test method for 'java.util.List.containsAll(Collection<?>)'
*/
public void testContainsAll()throws Exception {
container.addAll(testList);
assertTrue(container.containsAll(testList));
}
/*
* Test method for 'java.util.List.removeAll(Collection<?>)'
*/
public void testRemoveAll()throws Exception {
container.addAll(testList);
assertEquals(testList.size(),container.size());
container.removeAll(testList);
assertTrue(container.isEmpty());
}
/*
* Test method for 'java.util.List.retainAll(Collection<?>)'
*/
public void testRetainAll()throws Exception {
container.addAll(testList);
assertEquals(testList.size(),container.size());
testList.remove(0);
container.retainAll(testList);
assertEquals(testList.size(),container.size());
}
/*
* Test method for 'java.util.List.clear()'
*/
public void testClear()throws Exception {
container.addAll(testList);
assertEquals(testList.size(),container.size());
container.clear();
assertTrue(container.isEmpty());
}
/*
* Test method for 'java.util.List.get(int)'
*/
public void testGet()throws Exception {
container.addAll(testList);
for (int i =0; i < testList.size();i++){
assertEquals(container.get(i),testList.get(i));
}
}
/*
* Test method for 'java.util.List.set(int, E)'
*/
public void testSet()throws Exception {
container.addAll(testList);
}
/*
* Test method for 'java.util.List.add(int, E)'
*/
public void testAddIntE()throws Exception {
container.addAll(testList);
assertTrue(container.equals(testList));
Object testObj = "testObj";
int index = 0;
testList.set(index, testObj);
container.set(index, testObj);
assertTrue(container.equals(testList));
index = testList.size()-1;
testList.set(index, testObj);
container.set(index, testObj);
assertTrue(container.equals(testList));
}
/*
* Test method for 'java.util.List.remove(int)'
*/
public void testRemoveInt()throws Exception {
container.addAll(testList);
assertTrue(container.equals(testList));
testList.remove(0);
container.remove(0);
assertTrue(container.equals(testList));
int pos = testList.size()-1;
testList.remove(pos);
container.remove(pos);
assertTrue(container.equals(testList));
}
/*
* Test method for 'java.util.List.indexOf(Object)'
*/
public void testIndexOf()throws Exception {
container.addAll(testList);
assertTrue(container.equals(testList));
for (int i =0; i < testList.size(); i++){
Object o = testList.get(i);
assertEquals(i,container.indexOf(o));
}
}
/*
* Test method for 'java.util.List.listIterator()'
*/
public void testListIterator()throws Exception {
container.addAll(testList);
ListIterator containerIter = container.listIterator();
ListIterator testIter = testList.listIterator();
assertTrue(testIter.hasNext());
assertTrue(containerIter.hasNext());
while (testIter.hasNext()){
Object o1 = testIter.next();
Object o2 = containerIter.next();
assertEquals(o1,o2);
testIter.remove();
containerIter.remove();
}
assertTrue(testList.isEmpty());
assertTrue(container.isEmpty());
}
/*
* Test method for 'java.util.List.listIterator(int)'
*/
public void testListIteratorInt()throws Exception {
container.addAll(testList);
int start = testList.size()/2;
ListIterator containerIter = container.listIterator(start);
ListIterator testIter = testList.listIterator(start);
assertTrue(testIter.hasNext());
assertTrue(containerIter.hasNext());
while (testIter.hasNext()){
Object o1 = testIter.next();
Object o2 = containerIter.next();
assertEquals(o1,o2);
}
}
/*
* Test method for 'java.util.List.subList(int, int)'
*/
public void testSubList()throws Exception {
container.addAll(testList);
int start = testList.size()/2;
List l1 = testList.subList(start, testList.size());
List l2 = container.subList(start, testList.size());
assertEquals(l1.size(),l2.size());
assertEquals(l1,l2);
}
protected Store getStore() throws IOException{
return StoreFactory.open(name, "rw");
}
protected void setUp() throws Exception{
super.setUp();
StoreFactory.delete(name);
store = getStore();
store.deleteListContainer(name);
container = store.getListContainer(name);
container.load();
testList = new LinkedList();
for (int i =0; i < COUNT; i++){
String value = "value:"+i;
testList.add(value);
}
}
protected void tearDown() throws Exception{
super.tearDown();
assertTrue(StoreFactory.delete(name));
}
}

View File

@ -0,0 +1,62 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha;
import java.io.IOException;
import java.io.PrintWriter;
import junit.framework.TestCase;
import org.apache.activemq.kaha.impl.StoreImpl;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
/**
* Store test
*
* @version $Revision: 1.2 $
*/
public class LoadTest extends TestCase{
static final int COUNT=10000;
static final int NUM_LOADERS=2;
protected String name="load.db";
protected StoreImpl store;
/*
* Test method for 'org.apache.activemq.kaha.Store.close()'
*/
public void testLoad() throws Exception{
CountDownLatch start=new CountDownLatch(NUM_LOADERS);
CountDownLatch stop=new CountDownLatch(NUM_LOADERS);
for(int i=0;i<NUM_LOADERS;i++){
Loader loader=new Loader("loader:"+i,store,COUNT,start,stop);
loader.start();
}
stop.await();
store.dumpFreeSpace(new PrintWriter(System.out));
}
protected StoreImpl getStore() throws IOException{
return (StoreImpl) StoreFactory.open(name,"rw");
}
protected void setUp() throws Exception{
super.setUp();
StoreFactory.delete(name);
store=getStore();
}
protected void tearDown() throws Exception{
super.tearDown();
store.clear();
store.close();
assertTrue(StoreFactory.delete(name));
}
}

View File

@ -0,0 +1,113 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.kaha;
import java.io.IOException;
import java.util.Iterator;
import java.util.Set;
import org.apache.activemq.kaha.BytesMarshaller;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StringMarshaller;
import org.apache.activemq.kaha.impl.StoreImpl;
import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import junit.framework.TestCase;
/**
* Store test
*
* @version $Revision: 1.2 $
*/
class Loader extends Thread{
private String name;
private Store store;
private int count;
private CountDownLatch start;
private CountDownLatch stop;
public Loader(String name,Store store,int count,CountDownLatch start,CountDownLatch stop){
this.name=name;
this.store=store;
this.count=count;
this.start = start;
this.stop = stop;
}
public void run(){
try{
start.countDown();
start.await();
Marshaller keyMarshaller=new StringMarshaller();
Marshaller valueMarshaller=new BytesMarshaller();
MapContainer container=store.getMapContainer(name);
container.setKeyMarshaller(keyMarshaller);
container.setValueMarshaller(valueMarshaller);
container.load();
// set data
Object value=getData(1024);
long startTime=System.currentTimeMillis();
long startLoad=System.currentTimeMillis();
for(int i=0;i<count;i++){
String key="key:"+i;
container.put(key,value);
}
long finishLoad=System.currentTimeMillis();
long totalLoadTime=finishLoad-startLoad;
System.out.println("name "+name+" load time = "+totalLoadTime+"(ms)");
Set keys=container.keySet();
long startExtract=System.currentTimeMillis();
for(Iterator i=keys.iterator();i.hasNext();){
byte[] data=(byte[]) container.get(i.next());
}
long finishExtract=System.currentTimeMillis();
long totalExtractTime=finishExtract-startExtract;
System.out.println("name "+name+" extract time = "+totalExtractTime+"(ms)");
long startRemove=System.currentTimeMillis();
for(Iterator i=keys.iterator();i.hasNext();){
container.remove(i.next());
}
long finishRemove = System.currentTimeMillis();
long totalRemoveTime = finishRemove-startRemove;
System.out.println("name "+name+" remove time = "+totalRemoveTime+"(ms)");
//re-insert data of longer length
startLoad=System.currentTimeMillis();
value = getData(2048);
for(int i=0;i<count;i++){
String key="key:"+i;
container.put(key,value);
}
finishLoad=System.currentTimeMillis();
totalLoadTime=finishLoad-startLoad;
System.out.println("name "+name+" 2nd load time = "+totalLoadTime+"(ms)");
}catch(Exception e){
e.printStackTrace();
}finally{
stop.countDown();
}
}
byte[] getData(int size){
byte[] result=new byte[size];
for(int i=0;i<size;i++){
result[i]='a';
}
return result;
}
}

View File

@ -0,0 +1,182 @@
package org.apache.activemq.kaha;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory;
import junit.framework.TestCase;
public class MapContainerTest extends TestCase{
protected String name = "test";
protected Store store;
protected MapContainer container;
protected Map testMap;
protected static final int COUNT = 10;
/*
* Test method for 'org.apache.activemq.kaha.MapContainer.size()'
*/
public void testSize() throws Exception {
container.putAll(testMap);
assertTrue(container.size()==testMap.size());
}
/*
* Test method for 'org.apache.activemq.kaha.MapContainer.isEmpty()'
*/
public void testIsEmpty() throws Exception {
assertTrue(container.isEmpty());
}
/*
* Test method for 'org.apache.activemq.kaha.MapContainer.clear()'
*/
public void testClear() throws Exception {
container.putAll(testMap);
assertTrue(container.size()==testMap.size());
container.clear();
assertTrue(container.isEmpty());
}
/*
* Test method for 'org.apache.activemq.kaha.MapContainer.containsKey(Object)'
*/
public void testContainsKeyObject() throws Exception {
container.putAll(testMap);
for (Iterator i = testMap.entrySet().iterator();i.hasNext();){
Map.Entry entry = (Entry) i.next();
assertTrue(container.containsKey(entry.getKey()));
}
}
/*
* Test method for 'org.apache.activemq.kaha.MapContainer.get(Object)'
*/
public void testGetObject() throws Exception {
container.putAll(testMap);
for (Iterator i = testMap.entrySet().iterator();i.hasNext();){
Map.Entry entry = (Entry) i.next();
Object value = container.get(entry.getKey());
assertNotNull(value);
assertTrue(value.equals(entry.getValue()));
}
}
/*
* Test method for 'org.apache.activemq.kaha.MapContainer.containsValue(Object)'
*/
public void testContainsValueObject() throws Exception {
container.putAll(testMap);
for (Iterator i = testMap.entrySet().iterator();i.hasNext();){
Map.Entry entry = (Entry) i.next();
assertTrue(container.containsValue(entry.getValue()));
}
}
/*
* Test method for 'org.apache.activemq.kaha.MapContainer.putAll(Map)'
*/
public void testPutAllMap() throws Exception {
container.putAll(testMap);
for (Iterator i = testMap.entrySet().iterator();i.hasNext();){
Map.Entry entry = (Entry) i.next();
assertTrue(container.containsValue(entry.getValue()));
assertTrue(container.containsKey(entry.getKey()));
}
}
/*
* Test method for 'org.apache.activemq.kaha.MapContainer.keySet()'
*/
public void testKeySet() throws Exception {
container.putAll(testMap);
Set keys = container.keySet();
assertTrue(keys.size()==testMap.size());
for (Iterator i = testMap.keySet().iterator();i.hasNext();){
Object key = i.next();
assertTrue(keys.contains(key));
keys.remove(key);
}
assertTrue(container.isEmpty());
}
/*
* Test method for 'org.apache.activemq.kaha.MapContainer.values()'
*/
public void testValues() throws Exception {
container.putAll(testMap);
Collection values = container.values();
assertTrue(values.size()==testMap.size());
for (Iterator i = testMap.values().iterator();i.hasNext();){
Object value = i.next();
assertTrue(values.contains(value));
assertTrue(values.remove(value));
}
assertTrue(container.isEmpty());
}
/*
* Test method for 'org.apache.activemq.kaha.MapContainer.entrySet()'
*/
public void testEntrySet() throws Exception {
container.putAll(testMap);
Set entries = container.entrySet();
assertTrue(entries.size()==testMap.size());
for (Iterator i = entries.iterator();i.hasNext();){
Map.Entry entry = (Entry) i.next();
assertTrue(testMap.containsKey(entry.getKey()));
assertTrue(testMap.containsValue(entry.getValue()));
}
}
/*
* Test method for 'org.apache.activemq.kaha.MapContainer.remove(Object)'
*/
public void testRemoveObject() throws Exception {
container.putAll(testMap);
for (Iterator i = testMap.keySet().iterator();i.hasNext();){
container.remove(i.next());
}
assertTrue(container.isEmpty());
}
protected Store getStore() throws IOException{
return StoreFactory.open(name, "rw");
}
protected void setUp() throws Exception{
super.setUp();
store = getStore();
store.deleteListContainer(name);
container = store.getMapContainer(name);
container.load();
testMap = new HashMap();
for (int i =0; i < COUNT; i++){
String key = "key:" + i;
String value = "value:"+i;
testMap.put(key, value);
}
}
protected void tearDown() throws Exception{
super.tearDown();
assertTrue(StoreFactory.delete(name));
}
}

View File

@ -0,0 +1,208 @@
/**
*
* Copyright 2005-2006 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory;
import junit.framework.TestCase;
/**
*Store test
*
* @version $Revision: 1.2 $
*/
public class StoreTest extends TestCase{
protected String name = "sdbStoreTest.db";
protected Store store;
/*
* Test method for 'org.apache.activemq.kaha.Store.close()'
*/
public void testClose() throws Exception{
store.close();
try {
//access should throw an exception
store.getListContainer("fred");
assertTrue("Should have got a enception",false);
}catch(Exception e){
}
}
/*
* Test method for 'org.apache.activemq.kaha.Store.clear()'
*/
public void testClear() throws Exception{
int count = 100;
ListContainer list = store.getListContainer("testClear");
list.load();
for (int i =0; i < count; i++){
list.add("test " + i);
}
assertEquals(count,list.size());
store.clear();
assertTrue(list.isEmpty());
}
/*
* Test method for 'org.apache.activemq.kaha.Store.getMapContainer(Object)'
*/
public void testGetMapContainer() throws Exception{
String containerId = "test";
MapContainer container = store.getMapContainer(containerId);
container.load();
assertNotNull(container);
store.close();
store = getStore();
container = store.getMapContainer(containerId);
assertNotNull(container);
}
/*
* Test method for 'org.apache.activemq.kaha.Store.deleteMapContainer(Object)'
*/
public void testDeleteMapContainer() throws Exception{
String containerId = "test";
MapContainer container = store.getMapContainer(containerId);
assertNotNull(container);
store.deleteMapContainer(containerId);
assertFalse(store.doesMapContainerExist(containerId));
store.close();
store = getStore();
assertFalse(store.doesMapContainerExist(containerId));
}
/*
* Test method for 'org.apache.activemq.kaha.Store.getMapContainerIds()'
*/
public void testGetMapContainerIds()throws Exception {
String containerId = "test";
MapContainer container = store.getMapContainer(containerId);
Set set = store.getMapContainerIds();
assertTrue(set.contains(containerId));
}
/*
* Test method for 'org.apache.activemq.kaha.Store.getListContainer(Object)'
*/
public void testGetListContainer() throws Exception{
String containerId = "test";
ListContainer container = store.getListContainer(containerId);
assertNotNull(container);
store.close();
store = getStore();
container = store.getListContainer(containerId);
assertNotNull(container);
}
/*
* Test method for 'org.apache.activemq.kaha.Store.deleteListContainer(Object)'
*/
public void testDeleteListContainer()throws Exception{
String containerId = "test";
ListContainer container = store.getListContainer(containerId);
assertNotNull(container);
store.deleteListContainer(containerId);
assertFalse(store.doesListContainerExist(containerId));
store.close();
store = getStore();
assertFalse(store.doesListContainerExist(containerId));
}
/*
* Test method for 'org.apache.activemq.kaha.Store.getListContainerIds()'
*/
public void testGetListContainerIds()throws Exception {
String containerId = "test";
ListContainer container = store.getListContainer(containerId);
Set set = store.getListContainerIds();
assertTrue(set.contains(containerId));
}
public void testBasicAllocations() throws Exception{
Map testMap = new HashMap();
for (int i =0; i<10; i++){
String key = "key:"+i;
String value = "value:"+i;
testMap.put(key, value);
}
List testList = new ArrayList();
for (int i = 0; i < 10; i++){
testList.add("value:"+i);
}
String listId = "testList";
String mapId = "testMap";
MapContainer mapContainer = store.getMapContainer(mapId);
mapContainer.load();
ListContainer listContainer = store.getListContainer(listId);
listContainer.load();
mapContainer.putAll(testMap);
listContainer.addAll(testList);
store.close();
store = getStore();
mapContainer = store.getMapContainer(mapId);
mapContainer.load();
listContainer = store.getListContainer(listId);
listContainer.load();
for (Iterator i = testMap.keySet().iterator(); i.hasNext();){
Object key = i.next();
Object value = testMap.get(key);
assertTrue(mapContainer.containsKey(key));
assertEquals(value,mapContainer.get(key));
}
assertEquals(testList.size(),listContainer.size());
for (Iterator i = testList.iterator(), j = listContainer.iterator(); i.hasNext();){
assertEquals(i.next(),j.next());
}
}
protected Store getStore() throws IOException{
return StoreFactory.open(name, "rw");
}
protected void setUp() throws Exception{
super.setUp();
store = getStore();
}
protected void tearDown() throws Exception{
super.tearDown();
assertTrue(StoreFactory.delete(name));
}
}