Compress HashIndex on startup - only way to ensure the index

pages are loaded in correct order without changing the wire format

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@632964 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-03-03 07:18:00 +00:00
parent b67f61df98
commit e376456111
4 changed files with 96 additions and 45 deletions

View File

@ -92,7 +92,7 @@ class HashBin {
return size; return size;
} }
HashPageInfo addHashPageInfo(long id, int size) { HashPageInfo addHashPageInfo(long id, int size) throws IOException {
HashPageInfo info = new HashPageInfo(hashIndex); HashPageInfo info = new HashPageInfo(hashIndex);
info.setId(id); info.setId(id);
info.setSize(size); info.setSize(size);
@ -105,7 +105,7 @@ class HashBin {
HashEntry result = null; HashEntry result = null;
try { try {
int low = 0; int low = 0;
int high = size() - 1; int high = size()-1;
while (low <= high) { while (low <= high) {
int mid = (low + high) >> 1; int mid = (low + high) >> 1;
HashEntry te = getHashEntry(mid); HashEntry te = getHashEntry(mid);
@ -129,7 +129,7 @@ class HashBin {
boolean replace = false; boolean replace = false;
try { try {
int low = 0; int low = 0;
int high = size() - 1; int high = size()-1;
while (low <= high) { while (low <= high) {
int mid = (low + high) >> 1; int mid = (low + high) >> 1;
HashEntry midVal = getHashEntry(mid); HashEntry midVal = getHashEntry(mid);
@ -223,7 +223,7 @@ class HashBin {
HashPageInfo page = getRetrievePage(index); HashPageInfo page = getRetrievePage(index);
int offset = getRetrieveOffset(index); int offset = getRetrieveOffset(index);
HashEntry result = page.removeHashEntry(offset); HashEntry result = page.removeHashEntry(offset);
if (page.isEmpty()) { if (page.isEmpty()) {
hashPages.remove(page); hashPages.remove(page);
hashIndex.releasePage(page.getPage()); hashIndex.releasePage(page.getPage());
@ -280,7 +280,7 @@ class HashBin {
// overflowed // overflowed
info.begin(); info.begin();
HashEntry entry = info.removeHashEntry(info.size() - 1); HashEntry entry = info.removeHashEntry(info.size() - 1);
doOverFlow(hashPages.indexOf(info) + 1, entry); doOverFlow(hashPages.indexOf(info)+1, entry);
} }
} }
@ -298,13 +298,22 @@ class HashBin {
if (info.size() > maximumEntries) { if (info.size() > maximumEntries) {
// overflowed // overflowed
HashEntry overflowed = info.removeHashEntry(info.size() - 1); HashEntry overflowed = info.removeHashEntry(info.size() - 1);
doOverFlow(pageNo + 1, overflowed); doOverFlow(pageNo+1, overflowed);
} }
} }
private void doUnderFlow(int index) { private void doUnderFlow(int index) {
} }
String dump() throws IOException {
String str = "[" + hashPages.size()+"]";
for (HashPageInfo page : hashPages) {
page.begin();
str +=page.dump();
page.end();
}
return str;
}
private void end() throws IOException { private void end() throws IOException {
for (HashPageInfo info : hashPages) { for (HashPageInfo info : hashPages) {
info.end(); info.end();

View File

@ -17,8 +17,10 @@
package org.apache.activemq.kaha.impl.index.hash; package org.apache.activemq.kaha.impl.index.hash;
import java.io.File; import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.util.Arrays;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.kaha.Marshaller; import org.apache.activemq.kaha.Marshaller;
@ -65,7 +67,7 @@ public class HashIndex implements Index, HashIndexMBean {
private int pageCacheSize = 10; private int pageCacheSize = 10;
private int size; private int size;
private int activeBins; private int activeBins;
/** /**
* Constructor * Constructor
@ -198,29 +200,17 @@ public class HashIndex implements Index, HashIndexMBean {
readBuffer = new byte[pageSize]; readBuffer = new byte[pageSize];
try { try {
openIndexFile(); openIndexFile();
long offset = 0; if (indexFile.length() > 0) {
while ((offset + pageSize) <= indexFile.length()) { doCompress();
indexFile.seek(offset);
indexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE);
dataIn.restart(readBuffer);
HashPage page = new HashPage(keysPerPage);
page.setId(offset);
page.readHeader(dataIn);
if (!page.isActive()) {
freeList.add(page);
} else {
addToBin(page);
size+=page.size();
}
offset += pageSize;
} }
length = offset;
} catch (IOException e) { } catch (IOException e) {
LOG.error("Failed to load index ", e); LOG.error("Failed to load index ", e);
throw new RuntimeException(e); throw new RuntimeException(e);
} }
} }
} }
public synchronized void unload() throws IOException { public synchronized void unload() throws IOException {
if (loaded.compareAndSet(true, false)) { if (loaded.compareAndSet(true, false)) {
@ -228,6 +218,7 @@ public class HashIndex implements Index, HashIndexMBean {
indexFile.close(); indexFile.close();
indexFile = null; indexFile = null;
freeList.clear(); freeList.clear();
pageCache.clear();
bins = new HashBin[bins.length]; bins = new HashBin[bins.length];
} }
} }
@ -330,6 +321,7 @@ public class HashIndex implements Index, HashIndexMBean {
result = freeList.removeFirst(); result = freeList.removeFirst();
result.setActive(true); result.setActive(true);
result.reset(); result.reset();
writePageHeader(result);
} }
return result; return result;
} }
@ -371,7 +363,7 @@ public class HashIndex implements Index, HashIndexMBean {
return page; return page;
} }
void addToBin(HashPage page) { void addToBin(HashPage page) throws IOException {
HashBin bin = getBin(page.getBinId()); HashBin bin = getBin(page.getBinId());
bin.addHashPageInfo(page.getId(), page.getPersistedSize()); bin.addHashPageInfo(page.getId(), page.getPersistedSize());
} }
@ -393,7 +385,7 @@ public class HashIndex implements Index, HashIndexMBean {
indexFile = new RandomAccessFile(file, "rw"); indexFile = new RandomAccessFile(file, "rw");
} }
} }
private HashBin getBin(Object key) { private HashBin getBin(Object key) {
int hash = hash(key); int hash = hash(key);
int i = indexFor(hash, bins.length); int i = indexFor(hash, bins.length);
@ -419,6 +411,61 @@ public class HashIndex implements Index, HashIndexMBean {
pageCache.remove(page.getId()); pageCache.remove(page.getId());
} }
} }
private void doLoad() throws IOException {
long offset = 0;
if (loaded.compareAndSet(false, true)) {
while ((offset + pageSize) <= indexFile.length()) {
indexFile.seek(offset);
indexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE);
dataIn.restart(readBuffer);
HashPage page = new HashPage(keysPerPage);
page.setId(offset);
page.readHeader(dataIn);
if (!page.isActive()) {
page.reset();
freeList.add(page);
} else {
addToBin(page);
size+=page.size();
}
offset += pageSize;
}
length=offset;
}
}
private void doCompress() throws IOException {
String backFileName = name + "-COMPRESS";
HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
backIndex.setKeyMarshaller(keyMarshaller);
backIndex.setKeySize(getKeySize());
backIndex.setNumberOfBins(getNumberOfBins());
backIndex.setPageSize(getPageSize());
backIndex.load();
File backFile = backIndex.file;
long offset = 0;
while ((offset + pageSize) <= indexFile.length()) {
indexFile.seek(offset);
HashPage page = getFullPage(offset);
if (page.isActive()) {
for (HashEntry entry : page.getEntries()) {
backIndex.getBin(entry.getKey()).put(entry);
backIndex.size++;
}
}
offset += pageSize;
}
backIndex.unload();
unload();
IOHelper.deleteFile(file);
IOHelper.copyFile(backFile, file);
IOHelper.deleteFile(backFile);
openIndexFile();
doLoad();
}
static int hash(Object x) { static int hash(Object x) {
int h = x.hashCode(); int h = x.hashCode();

View File

@ -38,8 +38,8 @@ class HashPage {
private int maximumEntries; private int maximumEntries;
private long id; private long id;
private int binId; private int binId;
private int persistedSize;
private List<HashEntry> hashIndexEntries; private List<HashEntry> hashIndexEntries;
private int persistedSize;
/* /*
* for persistence only * for persistence only
*/ */
@ -71,7 +71,7 @@ class HashPage {
} }
public String toString() { public String toString() {
return "HashPage[" + getId() + ":" + binId + ":" + id+"] size = " + hashIndexEntries.size(); return "HashPage[" + getId() + ":" + binId + ":" + id+"] size = " + persistedSize;
} }
public boolean equals(Object o) { public boolean equals(Object o) {
@ -95,14 +95,7 @@ class HashPage {
this.active = active; this.active = active;
} }
long getNextFreePageId() {
return this.nextFreePageId;
}
void setNextFreePageId(long nextPageId) {
this.nextFreePageId = nextPageId;
}
long getId() { long getId() {
return id; return id;
} }
@ -116,8 +109,9 @@ class HashPage {
} }
void write(Marshaller keyMarshaller, DataOutput dataOut) throws IOException { void write(Marshaller keyMarshaller, DataOutput dataOut) throws IOException {
persistedSize=hashIndexEntries.size();
writeHeader(dataOut); writeHeader(dataOut);
dataOut.writeInt(hashIndexEntries.size()); dataOut.writeInt(persistedSize);
for (HashEntry entry : hashIndexEntries) { for (HashEntry entry : hashIndexEntries) {
entry.write(keyMarshaller, dataOut); entry.write(keyMarshaller, dataOut);
} }
@ -125,7 +119,8 @@ class HashPage {
void read(Marshaller keyMarshaller, DataInput dataIn) throws IOException { void read(Marshaller keyMarshaller, DataInput dataIn) throws IOException {
readHeader(dataIn); readHeader(dataIn);
int size = dataIn.readInt(); dataIn.readInt();
int size = persistedSize;
hashIndexEntries.clear(); hashIndexEntries.clear();
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
HashEntry entry = new HashEntry(); HashEntry entry = new HashEntry();
@ -145,8 +140,10 @@ class HashPage {
dataOut.writeBoolean(isActive()); dataOut.writeBoolean(isActive());
dataOut.writeLong(nextFreePageId); dataOut.writeLong(nextFreePageId);
dataOut.writeInt(binId); dataOut.writeInt(binId);
dataOut.writeInt(size()); persistedSize=hashIndexEntries.size();
dataOut.writeInt(persistedSize);
} }
boolean isEmpty() { boolean isEmpty() {
return hashIndexEntries.isEmpty(); return hashIndexEntries.isEmpty();
@ -186,12 +183,10 @@ class HashPage {
void reset() throws IOException { void reset() throws IOException {
hashIndexEntries.clear(); hashIndexEntries.clear();
setNextFreePageId(HashEntry.NOT_SET); persistedSize=0;
} }
void addHashEntry(int index, HashEntry entry) throws IOException { void addHashEntry(int index, HashEntry entry) throws IOException {
// index = index >= 0 ? index : 0;
// index = (index == 0 || index< size()) ? index : size()-1;
hashIndexEntries.add(index, entry); hashIndexEntries.add(index, entry);
} }
@ -227,7 +222,7 @@ class HashPage {
this.binId = binId; this.binId = binId;
} }
void dump() { String dump() {
StringBuffer str = new StringBuffer(32); StringBuffer str = new StringBuffer(32);
str.append(toString()); str.append(toString());
@ -236,6 +231,6 @@ class HashPage {
str.append(entry); str.append(entry);
str.append(","); str.append(",");
} }
LOG.info(str); return str.toString();
} }
} }

View File

@ -86,8 +86,8 @@ class HashPageInfo {
return result; return result;
} }
void dump() { String dump() {
page.dump(); return page.dump();
} }
void begin() throws IOException { void begin() throws IOException {