diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java index 243fd974b5..b355b88e11 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java @@ -92,7 +92,7 @@ class HashBin { return size; } - HashPageInfo addHashPageInfo(long id, int size) { + HashPageInfo addHashPageInfo(long id, int size) throws IOException { HashPageInfo info = new HashPageInfo(hashIndex); info.setId(id); info.setSize(size); @@ -105,7 +105,7 @@ class HashBin { HashEntry result = null; try { int low = 0; - int high = size() - 1; + int high = size()-1; while (low <= high) { int mid = (low + high) >> 1; HashEntry te = getHashEntry(mid); @@ -129,7 +129,7 @@ class HashBin { boolean replace = false; try { int low = 0; - int high = size() - 1; + int high = size()-1; while (low <= high) { int mid = (low + high) >> 1; HashEntry midVal = getHashEntry(mid); @@ -223,7 +223,7 @@ class HashBin { HashPageInfo page = getRetrievePage(index); int offset = getRetrieveOffset(index); HashEntry result = page.removeHashEntry(offset); - + if (page.isEmpty()) { hashPages.remove(page); hashIndex.releasePage(page.getPage()); @@ -280,7 +280,7 @@ class HashBin { // overflowed info.begin(); HashEntry entry = info.removeHashEntry(info.size() - 1); - doOverFlow(hashPages.indexOf(info) + 1, entry); + doOverFlow(hashPages.indexOf(info)+1, entry); } } @@ -298,13 +298,22 @@ class HashBin { if (info.size() > maximumEntries) { // overflowed HashEntry overflowed = info.removeHashEntry(info.size() - 1); - doOverFlow(pageNo + 1, overflowed); + doOverFlow(pageNo+1, overflowed); } } private void doUnderFlow(int index) { } + String dump() throws IOException { + String str = "[" + hashPages.size()+"]"; + for (HashPageInfo page : hashPages) { + page.begin(); + str +=page.dump(); + page.end(); + } + return str; + } private void end() throws IOException { for (HashPageInfo info : hashPages) { info.end(); diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java index 9d38d2d95c..89f4f998ca 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashIndex.java @@ -17,8 +17,10 @@ package org.apache.activemq.kaha.impl.index.hash; import java.io.File; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; +import java.util.Arrays; import java.util.LinkedList; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.kaha.Marshaller; @@ -65,7 +67,7 @@ public class HashIndex implements Index, HashIndexMBean { private int pageCacheSize = 10; private int size; private int activeBins; - + /** * Constructor @@ -198,29 +200,17 @@ public class HashIndex implements Index, HashIndexMBean { readBuffer = new byte[pageSize]; try { openIndexFile(); - long offset = 0; - while ((offset + pageSize) <= indexFile.length()) { - indexFile.seek(offset); - indexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE); - dataIn.restart(readBuffer); - HashPage page = new HashPage(keysPerPage); - page.setId(offset); - page.readHeader(dataIn); - if (!page.isActive()) { - freeList.add(page); - } else { - addToBin(page); - size+=page.size(); - } - offset += pageSize; + if (indexFile.length() > 0) { + doCompress(); } - length = offset; } catch (IOException e) { LOG.error("Failed to load index ", e); throw new RuntimeException(e); } } } + + public synchronized void unload() throws IOException { if (loaded.compareAndSet(true, false)) { @@ -228,6 +218,7 @@ public class HashIndex implements Index, HashIndexMBean { indexFile.close(); indexFile = null; freeList.clear(); + pageCache.clear(); bins = new HashBin[bins.length]; } } @@ -330,6 +321,7 @@ public class HashIndex implements Index, HashIndexMBean { result = freeList.removeFirst(); result.setActive(true); result.reset(); + writePageHeader(result); } return result; } @@ -371,7 +363,7 @@ public class HashIndex implements Index, HashIndexMBean { return page; } - void addToBin(HashPage page) { + void addToBin(HashPage page) throws IOException { HashBin bin = getBin(page.getBinId()); bin.addHashPageInfo(page.getId(), page.getPersistedSize()); } @@ -393,7 +385,7 @@ public class HashIndex implements Index, HashIndexMBean { indexFile = new RandomAccessFile(file, "rw"); } } - + private HashBin getBin(Object key) { int hash = hash(key); int i = indexFor(hash, bins.length); @@ -419,6 +411,61 @@ public class HashIndex implements Index, HashIndexMBean { pageCache.remove(page.getId()); } } + + private void doLoad() throws IOException { + long offset = 0; + if (loaded.compareAndSet(false, true)) { + while ((offset + pageSize) <= indexFile.length()) { + indexFile.seek(offset); + indexFile.readFully(readBuffer, 0, HashPage.PAGE_HEADER_SIZE); + dataIn.restart(readBuffer); + HashPage page = new HashPage(keysPerPage); + page.setId(offset); + page.readHeader(dataIn); + if (!page.isActive()) { + page.reset(); + freeList.add(page); + } else { + addToBin(page); + size+=page.size(); + } + offset += pageSize; + } + length=offset; + } + } + + private void doCompress() throws IOException { + String backFileName = name + "-COMPRESS"; + HashIndex backIndex = new HashIndex(directory,backFileName,indexManager); + backIndex.setKeyMarshaller(keyMarshaller); + backIndex.setKeySize(getKeySize()); + backIndex.setNumberOfBins(getNumberOfBins()); + backIndex.setPageSize(getPageSize()); + backIndex.load(); + File backFile = backIndex.file; + long offset = 0; + while ((offset + pageSize) <= indexFile.length()) { + indexFile.seek(offset); + HashPage page = getFullPage(offset); + if (page.isActive()) { + for (HashEntry entry : page.getEntries()) { + backIndex.getBin(entry.getKey()).put(entry); + backIndex.size++; + } + } + offset += pageSize; + } + backIndex.unload(); + + unload(); + IOHelper.deleteFile(file); + IOHelper.copyFile(backFile, file); + IOHelper.deleteFile(backFile); + openIndexFile(); + doLoad(); + } + static int hash(Object x) { int h = x.hashCode(); diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java index f4567aa2e4..bbaf277b9f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPage.java @@ -38,8 +38,8 @@ class HashPage { private int maximumEntries; private long id; private int binId; - private int persistedSize; private List hashIndexEntries; + private int persistedSize; /* * for persistence only */ @@ -71,7 +71,7 @@ class HashPage { } public String toString() { - return "HashPage[" + getId() + ":" + binId + ":" + id+"] size = " + hashIndexEntries.size(); + return "HashPage[" + getId() + ":" + binId + ":" + id+"] size = " + persistedSize; } public boolean equals(Object o) { @@ -95,14 +95,7 @@ class HashPage { this.active = active; } - long getNextFreePageId() { - return this.nextFreePageId; - } - - void setNextFreePageId(long nextPageId) { - this.nextFreePageId = nextPageId; - } - + long getId() { return id; } @@ -116,8 +109,9 @@ class HashPage { } void write(Marshaller keyMarshaller, DataOutput dataOut) throws IOException { + persistedSize=hashIndexEntries.size(); writeHeader(dataOut); - dataOut.writeInt(hashIndexEntries.size()); + dataOut.writeInt(persistedSize); for (HashEntry entry : hashIndexEntries) { entry.write(keyMarshaller, dataOut); } @@ -125,7 +119,8 @@ class HashPage { void read(Marshaller keyMarshaller, DataInput dataIn) throws IOException { readHeader(dataIn); - int size = dataIn.readInt(); + dataIn.readInt(); + int size = persistedSize; hashIndexEntries.clear(); for (int i = 0; i < size; i++) { HashEntry entry = new HashEntry(); @@ -145,8 +140,10 @@ class HashPage { dataOut.writeBoolean(isActive()); dataOut.writeLong(nextFreePageId); dataOut.writeInt(binId); - dataOut.writeInt(size()); + persistedSize=hashIndexEntries.size(); + dataOut.writeInt(persistedSize); } + boolean isEmpty() { return hashIndexEntries.isEmpty(); @@ -186,12 +183,10 @@ class HashPage { void reset() throws IOException { hashIndexEntries.clear(); - setNextFreePageId(HashEntry.NOT_SET); + persistedSize=0; } void addHashEntry(int index, HashEntry entry) throws IOException { - // index = index >= 0 ? index : 0; - // index = (index == 0 || index< size()) ? index : size()-1; hashIndexEntries.add(index, entry); } @@ -227,7 +222,7 @@ class HashPage { this.binId = binId; } - void dump() { + String dump() { StringBuffer str = new StringBuffer(32); str.append(toString()); @@ -236,6 +231,6 @@ class HashPage { str.append(entry); str.append(","); } - LOG.info(str); + return str.toString(); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java index a6aa1f6391..7fc4c89e2e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java @@ -86,8 +86,8 @@ class HashPageInfo { return result; } - void dump() { - page.dump(); + String dump() { + return page.dump(); } void begin() throws IOException {