git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@651242 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2008-04-24 12:23:17 +00:00
parent 347f8eca1b
commit d3ebc9cd21
7 changed files with 81 additions and 47 deletions

View File

@ -23,13 +23,11 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.broker.region.QueueMessageReference;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.Message;

View File

@ -17,10 +17,6 @@
package org.apache.activemq.kaha.impl.index.hash;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Bin in a HashIndex
@ -28,12 +24,13 @@ import org.apache.commons.logging.LogFactory;
* @version $Revision: 1.1.1.1 $
*/
class HashBin {
private static final transient Log LOG = LogFactory.getLog(HashBin.class);
private HashIndex hashIndex;
private int id;
private int maximumEntries;
private int size;
private List<HashPageInfo> hashPages = new ArrayList<HashPageInfo>();
private int numberOfPages =0;
private HashPageInfo root = null;
private HashPageInfo tail = null;
/**
* Constructor
@ -62,7 +59,7 @@ class HashBin {
}
public int hashCode() {
return (int)id;
return (int)getId();
}
int getId() {
@ -93,7 +90,13 @@ class HashBin {
HashPageInfo info = new HashPageInfo(hashIndex);
info.setId(id);
info.setSize(size);
hashPages.add(info);
if (root == null) {
root=info;
}else {
tail.linkAfter(info);
}
tail=info;
this.numberOfPages++;
this.size += size;
return info;
}
@ -191,8 +194,10 @@ class HashBin {
int count = 0;
int countSoFar=0;
int pageNo = 0;
for (HashPageInfo page : hashPages) {
HashPageInfo page = root;
while (page != null) {
count += page.size();
pageToUse=page;
if (index < count ) {
offset = index - countSoFar;
break;
@ -203,13 +208,12 @@ class HashBin {
}
countSoFar += page.size();
pageNo++;
page = (HashPageInfo) page.getNext();
}
while(pageNo >= hashPages.size()) {
while(pageNo >= this.numberOfPages) {
HashPage hp = hashIndex.createPage(id);
addHashPageInfo(hp.getId(), 0);
pageToUse = addHashPageInfo(hp.getId(), 0);
}
pageToUse = hashPages.get(pageNo);
}
pageToUse.begin();
pageToUse.addHashEntry(offset, entry);
@ -222,7 +226,14 @@ class HashBin {
HashEntry result = page.removeHashEntry(offset);
if (page.isEmpty()) {
hashPages.remove(page);
if (root.equals(page)) {
root=(HashPageInfo) root.getNext();
}
if (tail.equals(page)) {
tail=(HashPageInfo) page.getPrevious();
}
page.unlink();
this.numberOfPages--;
hashIndex.releasePage(page.getPage());
}
doUnderFlow(index);
@ -239,21 +250,22 @@ class HashBin {
private int getMaximumBinSize() {
return maximumEntries * hashPages.size();
return maximumEntries * this.numberOfPages;
}
private HashPageInfo getRetrievePage(int index) throws IOException {
HashPageInfo result = null;
int count = 0;
int pageNo = 0;
for (HashPageInfo page : hashPages) {
HashPageInfo page = root;
while (page != null) {
count += page.size();
result = page;
if (index < count) {
break;
}
pageNo++;
page = (HashPageInfo) page.getNext();
}
result = hashPages.get(pageNo);
result.begin();
return result;
}
@ -261,12 +273,14 @@ class HashBin {
private int getRetrieveOffset(int index) throws IOException {
int result = 0;
int count = 0;
for (HashPageInfo page : hashPages) {
HashPageInfo page = root;
while (page != null) {
if ((index + 1) <= (count + page.size())) {
result = index - count;
break;
}
count += page.size();
page = (HashPageInfo) page.getNext();
}
return result;
}
@ -277,43 +291,51 @@ class HashBin {
// overflowed
info.begin();
HashEntry entry = info.removeHashEntry(info.size() - 1);
doOverFlow(hashPages.indexOf(info)+1, entry);
doOverFlow(getNextPage(info), entry);
}
}
private void doOverFlow(int pageNo, HashEntry entry) throws IOException {
private void doOverFlow(HashPageInfo next, HashEntry entry) throws IOException {
HashPageInfo info = null;
if (pageNo >= hashPages.size()) {
if (next == null) {
HashPage page = hashIndex.createPage(id);
info = addHashPageInfo(page.getId(), 0);
info.setPage(page);
} else {
info = hashPages.get(pageNo);
info = next;
}
info.begin();
info.addHashEntry(0, entry);
if (info.size() > maximumEntries) {
// overflowed
HashEntry overflowed = info.removeHashEntry(info.size() - 1);
doOverFlow(pageNo+1, overflowed);
doOverFlow(getNextPage(info), overflowed);
}
}
private HashPageInfo getNextPage(HashPageInfo start) {
return (HashPageInfo) start.getNext();
}
private void doUnderFlow(int index) {
}
String dump() throws IOException {
String str = "[" + hashPages.size()+"]";
for (HashPageInfo page : hashPages) {
String str = "[" + this.numberOfPages+"]";
HashPageInfo page = root;
while (page != null) {
page.begin();
str +=page.dump();
page.end();
page = (HashPageInfo) page.getNext();
}
return str;
}
private void end() throws IOException {
for (HashPageInfo info : hashPages) {
info.end();
HashPageInfo page = root;
while (page != null) {
page.end();
page = (HashPageInfo) page.getNext();
}
}
}

View File

@ -465,6 +465,12 @@ public class HashIndex implements Index, HashIndexMBean {
}
public String toString() {
String str = "HashIndex"+System.identityHashCode(this)+": "+file.getName();
return str;
}
static int hash(Object x) {
int h = x.hashCode();
h += ~(h << 9);

View File

@ -17,13 +17,14 @@
package org.apache.activemq.kaha.impl.index.hash;
import java.io.IOException;
import org.apache.activemq.util.LinkedNode;
/**
* A Page within a HashPageInfo
*
* @version $Revision: 1.1.1.1 $
*/
class HashPageInfo {
class HashPageInfo extends LinkedNode{
private HashIndex hashIndex;
private long id;

View File

@ -59,7 +59,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
private static final Log LOG = LogFactory.getLog(KahaReferenceStoreAdapter.class);
private static final String STORE_STATE = "store-state";
private static final String INDEX_VERSION_NAME = "INDEX_VERSION";
private static final Integer INDEX_VERSION = new Integer(5);
private static final Integer INDEX_VERSION = new Integer(6);
private static final String RECORD_REFERENCES = "record-references";
private static final String TRANSACTIONS = "transactions-state";
private MapContainer stateMap;

View File

@ -352,7 +352,9 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
}
private String getSubscriptionContainerName(String subscriptionKey) {
StringBuffer buffer = new StringBuffer(subscriptionKey);
return buffer.append(":").append(destination.getQualifiedName()).append(TOPIC_SUB_NAME).toString();
StringBuffer result = new StringBuffer(TOPIC_SUB_NAME);
result.append(destination.getQualifiedName());
result.append(subscriptionKey);
return result.toString();
}
}

View File

@ -133,6 +133,7 @@ public class LinkedNode {
public void unlink() {
// If we are allready unlinked...
if (prev == this) {
reset();
return;
}
@ -145,6 +146,10 @@ public class LinkedNode {
prev.next = next;
// Update our links..
reset();
}
public void reset() {
next = this;
prev = this;
tail = true;