Kaha can now journal the changes it makes to the indexes so that they can be used to redo the changes on

failure.

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@409322 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hiram R. Chirino 2006-05-25 07:44:50 +00:00
parent cad2107dd7
commit aa049d2ce2
6 changed files with 221 additions and 70 deletions

View File

@ -25,20 +25,20 @@ import java.io.ObjectOutput;
public class ContainerId implements Externalizable{ public class ContainerId implements Externalizable{
private static final long serialVersionUID=-8883779541021821943L; private static final long serialVersionUID=-8883779541021821943L;
private Object key; private Object key;
private String dataContainerPrefix; private String dataContainerName;
/** /**
* @return Returns the dataContainerPrefix. * @return Returns the dataContainerPrefix.
*/ */
public String getDataContainerPrefix(){ public String getDataContainerName(){
return dataContainerPrefix; return dataContainerName;
} }
/** /**
* @param dataContainerPrefix The dataContainerPrefix to set. * @param dataContainerPrefix The dataContainerPrefix to set.
*/ */
public void setDataContainerPrefix(String dataContainerPrefix){ public void setDataContainerName(String dataContainerPrefix){
this.dataContainerPrefix=dataContainerPrefix; this.dataContainerName=dataContainerPrefix;
} }
/** /**
@ -69,12 +69,12 @@ public class ContainerId implements Externalizable{
} }
public void writeExternal(ObjectOutput out) throws IOException{ public void writeExternal(ObjectOutput out) throws IOException{
out.writeUTF(getDataContainerPrefix()); out.writeUTF(getDataContainerName());
out.writeObject(key); out.writeObject(key);
} }
public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException{ public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException{
dataContainerPrefix=in.readUTF(); dataContainerName=in.readUTF();
key=in.readObject(); key=in.readObject();
} }
} }

View File

@ -33,10 +33,11 @@ import org.apache.commons.logging.LogFactory;
* @version $Revision: 1.1.1.1 $ * @version $Revision: 1.1.1.1 $
*/ */
final class DataManager{ final class DataManager{
private static final Log log=LogFactory.getLog(DataManager.class); private static final Log log=LogFactory.getLog(DataManager.class);
protected static long MAX_FILE_LENGTH=1024*1024*32; protected static long MAX_FILE_LENGTH=1024*1024*32;
private final File dir; private final File dir;
private final String prefix; private final String name;
private StoreDataReader reader; private StoreDataReader reader;
private StoreDataWriter writer; private StoreDataWriter writer;
private DataFile currentWriteFile; private DataFile currentWriteFile;
@ -46,22 +47,27 @@ final class DataManager{
public static final byte DATA_ITEM_TYPE=1; public static final byte DATA_ITEM_TYPE=1;
public static final byte REDO_ITEM_TYPE=2; public static final byte REDO_ITEM_TYPE=2;
DataManager(File dir,String pf){ Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER;
private String dataFilePrefix;
DataManager(File dir, final String name){
this.dir=dir; this.dir=dir;
this.prefix=pf; this.name=name;
this.reader=new StoreDataReader(this); this.reader=new StoreDataReader(this);
this.writer=new StoreDataWriter(this); this.writer=new StoreDataWriter(this);
dataFilePrefix = "data-"+name+"-";
// build up list of current dataFiles // build up list of current dataFiles
File[] files=dir.listFiles(new FilenameFilter(){ File[] files=dir.listFiles(new FilenameFilter(){
public boolean accept(File dir,String name){ public boolean accept(File dir,String n){
return dir.equals(dir)&&name.startsWith(prefix); return dir.equals(dir)&&n.startsWith(dataFilePrefix);
} }
}); });
if(files!=null){ if(files!=null){
for(int i=0;i<files.length;i++){ for(int i=0;i<files.length;i++){
File file=files[i]; File file=files[i];
String name=file.getName(); String n=file.getName();
String numStr=name.substring(prefix.length(),name.length()); String numStr=n.substring(dataFilePrefix.length(),n.length());
int num=Integer.parseInt(numStr); int num=Integer.parseInt(numStr);
DataFile dataFile=new DataFile(file,num); DataFile dataFile=new DataFile(file,num);
fileMap.put(dataFile.getNumber(),dataFile); fileMap.put(dataFile.getNumber(),dataFile);
@ -72,8 +78,16 @@ final class DataManager{
} }
} }
public String getPrefix(){ private DataFile createAndAddDataFile(int num){
return prefix; String fileName=dataFilePrefix+num;
File file=new File(dir,fileName);
DataFile result=new DataFile(file,num);
fileMap.put(result.getNumber(),result);
return result;
}
public String getName(){
return name;
} }
DataFile findSpaceForData(DataItem item) throws IOException{ DataFile findSpaceForData(DataItem item) throws IOException{
@ -95,7 +109,7 @@ final class DataManager{
if(dataFile!=null){ if(dataFile!=null){
return dataFile.getRandomAccessFile(); return dataFile.getRandomAccessFile();
} }
throw new IOException("Could not locate data file "+prefix+item.getFile()); throw new IOException("Could not locate data file "+name+item.getFile());
} }
synchronized Object readItem(Marshaller marshaller, DataItem item) throws IOException{ synchronized Object readItem(Marshaller marshaller, DataItem item) throws IOException{
@ -106,11 +120,16 @@ final class DataManager{
return writer.storeItem(marshaller,payload, DATA_ITEM_TYPE); return writer.storeItem(marshaller,payload, DATA_ITEM_TYPE);
} }
synchronized DataItem storeRedoItem(Marshaller marshaller, Object payload) throws IOException{ synchronized DataItem storeRedoItem(Object payload) throws IOException{
return writer.storeItem(marshaller,payload, REDO_ITEM_TYPE); return writer.storeItem(redoMarshaller, payload, REDO_ITEM_TYPE);
} }
synchronized void recoverRedoItems(Marshaller marshaller, RedoListener listener) throws IOException{ synchronized void recoverRedoItems(RedoListener listener) throws IOException{
// Nothing to recover if there is no current file.
if( currentWriteFile == null )
return;
DataItem item = new DataItem(); DataItem item = new DataItem();
item.setFile(currentWriteFile.getNumber().intValue()); item.setFile(currentWriteFile.getNumber().intValue());
item.setOffset(0); item.setOffset(0);
@ -126,7 +145,7 @@ final class DataManager{
// Un-marshal the redo item // Un-marshal the redo item
Object object; Object object;
try { try {
object = readItem(marshaller, item); object = readItem(redoMarshaller, item);
} catch (IOException e1) { } catch (IOException e1) {
log.trace("End of data file reached at (payload was invalid): "+item); log.trace("End of data file reached at (payload was invalid): "+item);
return; return;
@ -224,17 +243,17 @@ final class DataManager{
} }
} }
private DataFile createAndAddDataFile(int num){
String fileName=prefix+num;
File file=new File(dir,fileName);
DataFile result=new DataFile(file,num);
fileMap.put(result.getNumber(),result);
return result;
}
private void removeDataFile(DataFile dataFile) throws IOException{ private void removeDataFile(DataFile dataFile) throws IOException{
fileMap.remove(dataFile.getNumber()); fileMap.remove(dataFile.getNumber());
boolean result=dataFile.delete(); boolean result=dataFile.delete();
log.debug("discarding data file "+dataFile+(result?"successful ":"failed")); log.debug("discarding data file "+dataFile+(result?"successful ":"failed"));
} }
public Marshaller getRedoMarshaller() {
return redoMarshaller;
}
public void setRedoMarshaller(Marshaller redoMarshaller) {
this.redoMarshaller = redoMarshaller;
}
} }

View File

@ -17,6 +17,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.util.LinkedList; import java.util.LinkedList;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
/** /**
@ -24,8 +25,9 @@ import org.apache.commons.logging.LogFactory;
* *
* @version $Revision: 1.1.1.1 $ * @version $Revision: 1.1.1.1 $
*/ */
final class IndexManager{ final class IndexManager {
private static final Log log=LogFactory.getLog(IndexManager.class); private static final Log log=LogFactory.getLog(IndexManager.class);
private final String name;
private File file; private File file;
private RandomAccessFile indexFile; private RandomAccessFile indexFile;
private StoreIndexReader reader; private StoreIndexReader reader;
@ -33,11 +35,12 @@ final class IndexManager{
private LinkedList freeList=new LinkedList(); private LinkedList freeList=new LinkedList();
private long length=0; private long length=0;
IndexManager(File ifile,String mode) throws IOException{ IndexManager(File directory, String name, String mode, DataManager redoLog ) throws IOException{
file=ifile; this.name = name;
indexFile=new RandomAccessFile(ifile,mode); file=new File(directory,"index-"+name);
; indexFile=new RandomAccessFile(file,mode);
reader=new StoreIndexReader(indexFile); reader=new StoreIndexReader(indexFile);
writer=new StoreIndexWriter(indexFile); writer=new StoreIndexWriter(indexFile, name, redoLog);
long offset=0; long offset=0;
while((offset+IndexItem.INDEX_SIZE)<=indexFile.length()){ while((offset+IndexItem.INDEX_SIZE)<=indexFile.length()){
IndexItem index=reader.readItem(offset); IndexItem index=reader.readItem(offset);
@ -69,6 +72,10 @@ final class IndexManager{
writer.storeItem(index); writer.storeItem(index);
} }
public void redo(RedoStoreIndexItem redo) throws IOException {
writer.redoStoreItem(redo);
}
synchronized IndexItem createNewIndex() throws IOException{ synchronized IndexItem createNewIndex() throws IOException{
IndexItem result=getNextFreeIndex(); IndexItem result=getNextFreeIndex();
if(result==null){ if(result==null){
@ -118,4 +125,9 @@ final class IndexManager{
void setLength(long value){ void setLength(long value){
this.length=value; this.length=value;
} }
public String getName() {
return name;
}
} }

View File

@ -37,8 +37,7 @@ import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
*/ */
public class KahaStore implements Store{ public class KahaStore implements Store{
private static final String DEFAULT_DATA_CONTAINER_NAME = "kaha-data."; private static final String DEFAULT_CONTAINER_NAME = "kaha";
private static final String DEFAULT_INDEX_CONTAINER_NAME = "kaha-index.";
private File directory; private File directory;
@ -54,6 +53,7 @@ public class KahaStore implements Store{
private String name; private String name;
private String mode; private String mode;
private boolean initialized; private boolean initialized;
private boolean logIndexChanges=false;
public KahaStore(String name,String mode) throws IOException{ public KahaStore(String name,String mode) throws IOException{
this.name=name; this.name=name;
@ -138,7 +138,7 @@ public class KahaStore implements Store{
} }
public MapContainer getMapContainer(Object id) throws IOException{ public MapContainer getMapContainer(Object id) throws IOException{
return getMapContainer(id, DEFAULT_DATA_CONTAINER_NAME); return getMapContainer(id, DEFAULT_CONTAINER_NAME);
} }
public synchronized MapContainer getMapContainer(Object id, String dataContainerName) throws IOException{ public synchronized MapContainer getMapContainer(Object id, String dataContainerName) throws IOException{
@ -148,11 +148,11 @@ public class KahaStore implements Store{
if(result==null){ if(result==null){
DataManager dm = getDataManager(dataContainerName); DataManager dm = getDataManager(dataContainerName);
IndexManager im = getIndexManager(DEFAULT_INDEX_CONTAINER_NAME); IndexManager im = getIndexManager(dm, dataContainerName);
ContainerId containerId = new ContainerId(); ContainerId containerId = new ContainerId();
containerId.setKey(id); containerId.setKey(id);
containerId.setDataContainerPrefix(dataContainerName); containerId.setDataContainerName(dataContainerName);
IndexItem root=mapsContainer.getRoot(containerId); IndexItem root=mapsContainer.getRoot(containerId);
if( root == null ) { if( root == null ) {
@ -186,19 +186,19 @@ public class KahaStore implements Store{
} }
public ListContainer getListContainer(Object id) throws IOException{ public ListContainer getListContainer(Object id) throws IOException{
return getListContainer(id,DEFAULT_DATA_CONTAINER_NAME); return getListContainer(id,DEFAULT_CONTAINER_NAME);
} }
public synchronized ListContainer getListContainer(Object id, String dataContainerName) throws IOException{ public synchronized ListContainer getListContainer(Object id, String containerName) throws IOException{
initialize(); initialize();
ListContainerImpl result=(ListContainerImpl) lists.get(id); ListContainerImpl result=(ListContainerImpl) lists.get(id);
if(result==null){ if(result==null){
DataManager dm = getDataManager(dataContainerName); DataManager dm = getDataManager(containerName);
IndexManager im = getIndexManager(DEFAULT_INDEX_CONTAINER_NAME); IndexManager im = getIndexManager(dm, containerName);
ContainerId containerId = new ContainerId(); ContainerId containerId = new ContainerId();
containerId.setKey(id); containerId.setKey(id);
containerId.setDataContainerPrefix(dataContainerName); containerId.setDataContainerName(containerName);
IndexItem root=listsContainer.getRoot(containerId); IndexItem root=listsContainer.getRoot(containerId);
if( root == null ) { if( root == null ) {
root=listsContainer.addRoot(containerId); root=listsContainer.addRoot(containerId);
@ -238,23 +238,23 @@ public class KahaStore implements Store{
directory=new File(name); directory=new File(name);
directory.mkdirs(); directory.mkdirs();
DataManager rootData = getDataManager(DEFAULT_DATA_CONTAINER_NAME); DataManager defaultDM = getDataManager(DEFAULT_CONTAINER_NAME);
IndexManager rootIndex = getIndexManager(DEFAULT_INDEX_CONTAINER_NAME); IndexManager defaultIM = getIndexManager(defaultDM, DEFAULT_CONTAINER_NAME);
IndexItem mapRoot=new IndexItem(); IndexItem mapRoot=new IndexItem();
IndexItem listRoot=new IndexItem(); IndexItem listRoot=new IndexItem();
if(rootIndex.isEmpty()){ if(defaultIM.isEmpty()){
mapRoot.setOffset(0); mapRoot.setOffset(0);
rootIndex.updateIndex(mapRoot); defaultIM.updateIndex(mapRoot);
listRoot.setOffset(IndexItem.INDEX_SIZE); listRoot.setOffset(IndexItem.INDEX_SIZE);
rootIndex.updateIndex(listRoot); defaultIM.updateIndex(listRoot);
rootIndex.setLength(IndexItem.INDEX_SIZE*2); defaultIM.setLength(IndexItem.INDEX_SIZE*2);
}else{ }else{
mapRoot=rootIndex.getIndex(0); mapRoot=defaultIM.getIndex(0);
listRoot=rootIndex.getIndex(IndexItem.INDEX_SIZE); listRoot=defaultIM.getIndex(IndexItem.INDEX_SIZE);
} }
mapsContainer=new IndexRootContainer(mapRoot,rootIndex,rootData); mapsContainer=new IndexRootContainer(mapRoot,defaultIM,defaultDM);
listsContainer=new IndexRootContainer(listRoot,rootIndex,rootData); listsContainer=new IndexRootContainer(listRoot,defaultIM,defaultDM);
for (Iterator i = dataManagers.values().iterator(); i.hasNext();){ for (Iterator i = dataManagers.values().iterator(); i.hasNext();){
DataManager dm = (DataManager) i.next(); DataManager dm = (DataManager) i.next();
@ -263,23 +263,42 @@ public class KahaStore implements Store{
} }
} }
protected DataManager getDataManager(String prefix) throws IOException { protected DataManager getDataManager(String name) throws IOException {
DataManager dm = (DataManager) dataManagers.get(prefix); DataManager dm = (DataManager) dataManagers.get(name);
if (dm == null){ if (dm == null){
dm = new DataManager(directory,prefix); dm = new DataManager(directory,name);
dataManagers.put(prefix,dm); recover(dm);
dataManagers.put(name,dm);
} }
return dm; return dm;
} }
protected IndexManager getIndexManager(String index_name) throws IOException { protected IndexManager getIndexManager(DataManager dm, String name) throws IOException {
IndexManager im = (IndexManager) indexManagers.get(index_name); IndexManager im = (IndexManager) indexManagers.get(name);
if( im == null ) { if( im == null ) {
File ifile=new File(directory,index_name+".idx"); im = new IndexManager(directory,name,mode, logIndexChanges?dm:null);
im = new IndexManager(ifile,mode); indexManagers.put(name,im);
indexManagers.put(index_name,im);
} }
return im; return im;
} }
private void recover(final DataManager dm) throws IOException {
dm.recoverRedoItems( new RedoListener() {
public void onRedoItem(DataItem item, Object o) throws Exception {
RedoStoreIndexItem redo = (RedoStoreIndexItem) o;
//IndexManager im = getIndexManager(dm, redo.getIndexName());
IndexManager im = getIndexManager(dm, dm.getName());
im.redo(redo);
}
});
}
public boolean isLogIndexChanges() {
return logIndexChanges;
}
public void setLogIndexChanges(boolean logIndexChanges) {
this.logIndexChanges = logIndexChanges;
}
} }

View File

@ -0,0 +1,79 @@
package org.apache.activemq.kaha.impl;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import org.apache.activemq.kaha.Marshaller;
public class RedoStoreIndexItem implements Externalizable {
public static final Marshaller MARSHALLER = new Marshaller() {
public Object readPayload(DataInput in) throws IOException {
RedoStoreIndexItem item = new RedoStoreIndexItem();
item.readExternal(in);
return item;
}
public void writePayload(Object object, DataOutput out) throws IOException {
RedoStoreIndexItem item = (RedoStoreIndexItem) object;
item.writeExternal(out);
}
};
private static final long serialVersionUID = -4865508871719676655L;
private String indexName;
private IndexItem indexItem;
private long offset;
public RedoStoreIndexItem() {
}
public RedoStoreIndexItem(String indexName, long offset, IndexItem item) {
this.indexName = indexName;
this.offset=offset;
this.indexItem = item;
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
readExternal((DataInput)in);
}
public void readExternal(DataInput in) throws IOException {
// indexName = in.readUTF();
offset = in.readLong();
indexItem = new IndexItem();
indexItem.read(in);
}
public void writeExternal(ObjectOutput out) throws IOException {
writeExternal((DataOutput)out);
}
public void writeExternal(DataOutput out) throws IOException {
// out.writeUTF(indexName);
out.writeLong(offset);
indexItem.write(out);
}
public String getIndexName() {
return indexName;
}
public void setIndexName(String indexName) {
this.indexName = indexName;
}
public IndexItem getIndexItem() {
return indexItem;
}
public void setIndexItem(IndexItem item) {
this.indexItem = item;
}
public long getOffset() {
return offset;
}
public void setOffset(long offset) {
this.offset = offset;
}
}

View File

@ -26,8 +26,11 @@ import java.io.RandomAccessFile;
* @version $Revision: 1.1.1.1 $ * @version $Revision: 1.1.1.1 $
*/ */
class StoreIndexWriter{ class StoreIndexWriter{
protected StoreByteArrayOutputStream dataOut;
protected RandomAccessFile file; protected final StoreByteArrayOutputStream dataOut = new StoreByteArrayOutputStream();
protected final RandomAccessFile file;
protected final String name;
protected final DataManager redoLog;
/** /**
* Construct a Store index writer * Construct a Store index writer
@ -35,14 +38,33 @@ class StoreIndexWriter{
* @param file * @param file
*/ */
StoreIndexWriter(RandomAccessFile file){ StoreIndexWriter(RandomAccessFile file){
this.file=file; this(file, null, null);
this.dataOut=new StoreByteArrayOutputStream(); }
public StoreIndexWriter(RandomAccessFile file, String indexName, DataManager redoLog) {
this.file=file;
this.name = indexName;
this.redoLog = redoLog;
}
void storeItem(IndexItem indexItem) throws IOException{
if( redoLog!=null ) {
RedoStoreIndexItem redo = new RedoStoreIndexItem(name, indexItem.getOffset(), indexItem);
redoLog.storeRedoItem(redo);
} }
void storeItem(IndexItem index) throws IOException{
dataOut.reset(); dataOut.reset();
index.write(dataOut); indexItem.write(dataOut);
file.seek(index.getOffset()); file.seek(indexItem.getOffset());
file.write(dataOut.getData(),0,IndexItem.INDEX_SIZE); file.write(dataOut.getData(),0,IndexItem.INDEX_SIZE);
} }
public void redoStoreItem(RedoStoreIndexItem redo) throws IOException {
dataOut.reset();
redo.getIndexItem().write(dataOut);
file.seek(redo.getOffset());
file.write(dataOut.getData(),0,IndexItem.INDEX_SIZE);
}
} }