mirror of https://github.com/apache/activemq.git
Further enhancement to https://issues.apache.org/activemq/browse/AMQ-1246
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@591442 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
229ca6afd7
commit
770642a8a1
|
@ -181,9 +181,36 @@ class HashBin {
|
|||
}
|
||||
|
||||
private void addHashEntry(int index, HashEntry entry) throws IOException {
|
||||
HashPageInfo page = getInsertPage(index);
|
||||
int offset = index % maximumEntries;
|
||||
page.addHashEntry(offset, entry);
|
||||
HashPageInfo pageToUse = null;
|
||||
int offset = 0;
|
||||
if (index >= maximumBinSize()) {
|
||||
HashPage hp = hashIndex.createPage(id);
|
||||
pageToUse = addHashPageInfo(hp.getId(), 0);
|
||||
pageToUse.setPage(hp);
|
||||
offset = 0;
|
||||
} else {
|
||||
|
||||
int count = 0;
|
||||
int countSoFar=0;
|
||||
int pageNo = 0;
|
||||
for (HashPageInfo page : hashPages) {
|
||||
count += page.size();
|
||||
if (index < count ) {
|
||||
offset = index - countSoFar;
|
||||
break;
|
||||
}
|
||||
if (index == count && page.size()+1 <= maximumEntries) {
|
||||
offset = page.size();
|
||||
break;
|
||||
}
|
||||
countSoFar += page.size();
|
||||
pageNo++;
|
||||
}
|
||||
pageToUse = hashPages.get(pageNo);
|
||||
}
|
||||
pageToUse.begin();
|
||||
|
||||
pageToUse.addHashEntry(offset, entry);
|
||||
doOverFlow(index);
|
||||
}
|
||||
|
||||
|
@ -202,25 +229,12 @@ class HashBin {
|
|||
HashEntry result = page.getHashEntry(offset);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
private int maximumBinSize() {
|
||||
return maximumEntries * hashPages.size();
|
||||
}
|
||||
|
||||
private HashPageInfo getInsertPage(int index) throws IOException {
|
||||
HashPageInfo result = null;
|
||||
if (index >= maximumBinSize()) {
|
||||
HashPage page = hashIndex.createPage(id);
|
||||
result = addHashPageInfo(page.getId(), 0);
|
||||
result.setPage(page);
|
||||
} else {
|
||||
int offset = index / maximumEntries;
|
||||
result = hashPages.get(offset);
|
||||
}
|
||||
result.begin();
|
||||
return result;
|
||||
}
|
||||
|
||||
private HashPageInfo getRetrievePage(int index) throws IOException {
|
||||
HashPageInfo result = null;
|
||||
int count = 0;
|
||||
|
@ -251,16 +265,6 @@ class HashBin {
|
|||
return result;
|
||||
}
|
||||
|
||||
// private int getInsertPageNo(int index) {
|
||||
// int result = index / maximumEntries;
|
||||
// return result;
|
||||
// }
|
||||
//
|
||||
// private int getOffset(int index) {
|
||||
// int result = index % maximumEntries;
|
||||
// return result;
|
||||
// }
|
||||
|
||||
private void doOverFlow(int index) throws IOException {
|
||||
int pageNo = index / maximumEntries;
|
||||
HashPageInfo info = hashPages.get(pageNo);
|
||||
|
|
|
@ -40,7 +40,6 @@ class HashPage {
|
|||
private int binId;
|
||||
private int persistedSize;
|
||||
private List<HashEntry> hashIndexEntries;
|
||||
private static final HashEntry nullEntry = new HashEntry();
|
||||
/*
|
||||
* for persistence only
|
||||
*/
|
||||
|
@ -193,11 +192,6 @@ class HashPage {
|
|||
void addHashEntry(int index, HashEntry entry) throws IOException {
|
||||
// index = index >= 0 ? index : 0;
|
||||
// index = (index == 0 || index< size()) ? index : size()-1;
|
||||
if (index > hashIndexEntries.size()) {
|
||||
for (int i = hashIndexEntries.size(); i < (index+1);i++) {
|
||||
hashIndexEntries.add(nullEntry);
|
||||
}
|
||||
}
|
||||
hashIndexEntries.add(index, entry);
|
||||
}
|
||||
|
||||
|
|
|
@ -55,6 +55,8 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
|
|||
|
||||
private static final Log LOG = LogFactory.getLog(KahaPersistenceAdapter.class);
|
||||
private static final String STORE_STATE = "store-state";
|
||||
private static final String INDEX_VERSION_NAME = "INDEX_VERSION";
|
||||
private static final Integer INDEX_VERSION = new Integer(2);
|
||||
private static final String RECORD_REFERENCES = "record-references";
|
||||
private static final String TRANSACTIONS = "transactions-state";
|
||||
private MapContainer stateMap;
|
||||
|
@ -67,6 +69,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
|
|||
private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
|
||||
private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
|
||||
private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
|
||||
|
||||
|
||||
public KahaReferenceStoreAdapter(AtomicLong size){
|
||||
super(size);
|
||||
|
@ -93,6 +96,14 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
|
|||
if (status != null) {
|
||||
storeValid = status.get();
|
||||
}
|
||||
if (storeValid) {
|
||||
//check what version the indexes are at
|
||||
Integer indexVersion = (Integer) stateMap.get(INDEX_VERSION_NAME);
|
||||
if (indexVersion==null || indexVersion.intValue() < INDEX_VERSION.intValue()) {
|
||||
storeValid = false;
|
||||
LOG.warn("Indexes at an older version - need to regenerate");
|
||||
}
|
||||
}
|
||||
if (storeValid) {
|
||||
if (stateMap.containsKey(RECORD_REFERENCES)) {
|
||||
recordReferences = (Map<Integer, AtomicInteger>)stateMap.get(RECORD_REFERENCES);
|
||||
|
@ -100,6 +111,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
|
|||
}
|
||||
}
|
||||
stateMap.put(STORE_STATE, new AtomicBoolean());
|
||||
stateMap.put(INDEX_VERSION_NAME, INDEX_VERSION);
|
||||
durableSubscribers = store.getListContainer("durableSubscribers");
|
||||
durableSubscribers.setMarshaller(new CommandMarshaller());
|
||||
preparedTransactions = store.getMapContainer("transactions", TRANSACTIONS, false);
|
||||
|
@ -112,6 +124,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
|
|||
public synchronized void stop() throws Exception {
|
||||
stateMap.put(RECORD_REFERENCES, recordReferences);
|
||||
stateMap.put(STORE_STATE, new AtomicBoolean(true));
|
||||
stateMap.put(INDEX_VERSION_NAME, INDEX_VERSION);
|
||||
if (this.stateStore != null) {
|
||||
this.stateStore.close();
|
||||
this.stateStore = null;
|
||||
|
|
|
@ -25,15 +25,17 @@ import org.apache.activemq.kaha.impl.index.IndexItem;
|
|||
import org.apache.activemq.kaha.impl.index.IndexManager;
|
||||
import org.apache.activemq.util.IOHelper;
|
||||
|
||||
|
||||
/**
|
||||
* Test a HashIndex
|
||||
*/
|
||||
public class HashTest extends TestCase {
|
||||
|
||||
private static final int COUNT = 1000;
|
||||
private static final int COUNT = 10000;
|
||||
|
||||
private HashIndex hashIndex;
|
||||
|
||||
private File directory;
|
||||
|
||||
private IndexManager indexManager;
|
||||
|
||||
/**
|
||||
|
@ -44,8 +46,12 @@ public class HashTest extends TestCase {
|
|||
super.setUp();
|
||||
directory = new File(IOHelper.getDefaultDataDirectory());
|
||||
directory.mkdirs();
|
||||
indexManager = new IndexManager(directory, "im-hash-test", "rw", null, new AtomicLong());
|
||||
IOHelper.deleteChildren(directory);
|
||||
indexManager = new IndexManager(directory, "im-hash-test", "rw", null,
|
||||
new AtomicLong());
|
||||
this.hashIndex = new HashIndex(directory, "testHash", indexManager);
|
||||
this.hashIndex.setNumberOfBins(12);
|
||||
this.hashIndex.setPageSize(32 * 1024);
|
||||
this.hashIndex.setKeyMarshaller(Store.STRING_MARSHALLER);
|
||||
}
|
||||
|
||||
|
@ -56,7 +62,7 @@ public class HashTest extends TestCase {
|
|||
doTest(600);
|
||||
hashIndex.clear();
|
||||
hashIndex.unload();
|
||||
doTest(1024 * 4);
|
||||
doTest(128);
|
||||
}
|
||||
|
||||
public void doTest(int pageSize) throws Exception {
|
||||
|
@ -66,8 +72,11 @@ public class HashTest extends TestCase {
|
|||
doInsert(keyRoot);
|
||||
checkRetrieve(keyRoot);
|
||||
doRemove(keyRoot);
|
||||
|
||||
doInsert(keyRoot);
|
||||
doRemoveBackwards(keyRoot);
|
||||
doRemoveHalf(keyRoot);
|
||||
doInsertHalf(keyRoot);
|
||||
checkRetrieve(keyRoot);
|
||||
}
|
||||
|
||||
void doInsert(String keyRoot) throws Exception {
|
||||
|
@ -75,23 +84,41 @@ public class HashTest extends TestCase {
|
|||
IndexItem value = indexManager.createNewIndex();
|
||||
indexManager.storeIndex(value);
|
||||
hashIndex.store(keyRoot + i, value);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
void checkRetrieve(String keyRoot) throws IOException {
|
||||
for (int i = 0; i < COUNT; i++) {
|
||||
IndexItem item = (IndexItem)hashIndex.get(keyRoot + i);
|
||||
IndexItem item = (IndexItem) hashIndex.get(keyRoot + i);
|
||||
assertNotNull(item);
|
||||
}
|
||||
}
|
||||
|
||||
void doRemoveHalf(String keyRoot) throws Exception {
|
||||
for (int i = 0; i < COUNT; i++) {
|
||||
if (i % 2 == 0) {
|
||||
hashIndex.remove(keyRoot + i);
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
void doInsertHalf(String keyRoot) throws Exception {
|
||||
for (int i = 0; i < COUNT; i++) {
|
||||
if (i % 2 == 0) {
|
||||
IndexItem value = indexManager.createNewIndex();
|
||||
indexManager.storeIndex(value);
|
||||
hashIndex.store(keyRoot + i, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void doRemove(String keyRoot) throws Exception {
|
||||
for (int i = 0; i < COUNT; i++) {
|
||||
hashIndex.remove(keyRoot + i);
|
||||
}
|
||||
for (int i = 0; i < COUNT; i++) {
|
||||
IndexItem item = (IndexItem)hashIndex.get(keyRoot + i);
|
||||
IndexItem item = (IndexItem) hashIndex.get(keyRoot + i);
|
||||
assertNull(item);
|
||||
}
|
||||
}
|
||||
|
@ -101,7 +128,7 @@ public class HashTest extends TestCase {
|
|||
hashIndex.remove(keyRoot + i);
|
||||
}
|
||||
for (int i = 0; i < COUNT; i++) {
|
||||
IndexItem item = (IndexItem)hashIndex.get(keyRoot + i);
|
||||
IndexItem item = (IndexItem) hashIndex.get(keyRoot + i);
|
||||
assertNull(item);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue