mirror of https://github.com/apache/activemq.git
Grow the HashIndex bins as required
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@656378 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
43b0a34cdc
commit
c2a5b54c6d
|
@ -259,6 +259,17 @@ public interface MapContainer<K, V> extends Map<K, V> {
|
||||||
*/
|
*/
|
||||||
int getIndexPageSize();
|
int getIndexPageSize();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* set the meximum bin size
|
||||||
|
*/
|
||||||
|
void setMaxBinSize(int size);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the maximum bin size
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
int getMaxBinSize();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the Index MBean
|
* @return the Index MBean
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -58,6 +58,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
||||||
private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
|
private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
|
||||||
private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
|
private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
|
||||||
private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
|
private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
|
||||||
|
private int maxBinSize = HashIndex.MAXIMUM_CAPACITY;
|
||||||
|
|
||||||
public MapContainerImpl(File directory, ContainerId id, IndexItem root, IndexManager indexManager,
|
public MapContainerImpl(File directory, ContainerId id, IndexItem root, IndexManager indexManager,
|
||||||
DataManager dataManager, boolean persistentIndex) {
|
DataManager dataManager, boolean persistentIndex) {
|
||||||
|
@ -76,6 +77,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
||||||
hashIndex.setNumberOfBins(getIndexBinSize());
|
hashIndex.setNumberOfBins(getIndexBinSize());
|
||||||
hashIndex.setKeySize(getIndexKeySize());
|
hashIndex.setKeySize(getIndexKeySize());
|
||||||
hashIndex.setPageSize(getIndexPageSize());
|
hashIndex.setPageSize(getIndexPageSize());
|
||||||
|
hashIndex.setMaximumCapacity(getMaxBinSize());
|
||||||
this.index = hashIndex;
|
this.index = hashIndex;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Failed to create HashIndex", e);
|
LOG.error("Failed to create HashIndex", e);
|
||||||
|
@ -567,6 +569,15 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
||||||
return (IndexMBean) index;
|
return (IndexMBean) index;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getMaxBinSize() {
|
||||||
|
return maxBinSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxBinSize(int maxBinSize) {
|
||||||
|
this.maxBinSize = maxBinSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
load();
|
load();
|
||||||
|
@ -589,6 +600,4 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
||||||
buf.append("}");
|
buf.append("}");
|
||||||
return buf.toString();
|
return buf.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -42,6 +42,11 @@ public class HashIndex implements Index, HashIndexMBean {
|
||||||
public static final int DEFAULT_PAGE_SIZE;
|
public static final int DEFAULT_PAGE_SIZE;
|
||||||
public static final int DEFAULT_KEY_SIZE;
|
public static final int DEFAULT_KEY_SIZE;
|
||||||
public static final int DEFAULT_BIN_SIZE;
|
public static final int DEFAULT_BIN_SIZE;
|
||||||
|
public static final int MAXIMUM_CAPACITY = 16384;
|
||||||
|
/**
|
||||||
|
* The load factor used when none specified in constructor.
|
||||||
|
**/
|
||||||
|
static final float DEFAULT_LOAD_FACTOR;
|
||||||
private static final String NAME_PREFIX = "hash-index-";
|
private static final String NAME_PREFIX = "hash-index-";
|
||||||
private static final Log LOG = LogFactory.getLog(HashIndex.class);
|
private static final Log LOG = LogFactory.getLog(HashIndex.class);
|
||||||
private final String name;
|
private final String name;
|
||||||
|
@ -66,6 +71,9 @@ public class HashIndex implements Index, HashIndexMBean {
|
||||||
private int pageCacheSize = 10;
|
private int pageCacheSize = 10;
|
||||||
private int size;
|
private int size;
|
||||||
private int activeBins;
|
private int activeBins;
|
||||||
|
private int threshold;
|
||||||
|
private int maximumCapacity=MAXIMUM_CAPACITY;
|
||||||
|
private float loadFactor=0.75f;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -178,6 +186,48 @@ public class HashIndex implements Index, HashIndexMBean {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the threshold
|
||||||
|
*/
|
||||||
|
public int getThreshold() {
|
||||||
|
return threshold;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param threshold the threshold to set
|
||||||
|
*/
|
||||||
|
public void setThreshold(int threshold) {
|
||||||
|
this.threshold = threshold;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the loadFactor
|
||||||
|
*/
|
||||||
|
public float getLoadFactor() {
|
||||||
|
return loadFactor;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param loadFactor the loadFactor to set
|
||||||
|
*/
|
||||||
|
public void setLoadFactor(float loadFactor) {
|
||||||
|
this.loadFactor = loadFactor;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the maximumCapacity
|
||||||
|
*/
|
||||||
|
public int getMaximumCapacity() {
|
||||||
|
return maximumCapacity;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param maximumCapacity the maximumCapacity to set
|
||||||
|
*/
|
||||||
|
public void setMaximumCapacity(int maximumCapacity) {
|
||||||
|
this.maximumCapacity = maximumCapacity;
|
||||||
|
}
|
||||||
|
|
||||||
public synchronized int getSize() {
|
public synchronized int getSize() {
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
@ -193,6 +243,7 @@ public class HashIndex implements Index, HashIndexMBean {
|
||||||
capacity <<= 1;
|
capacity <<= 1;
|
||||||
}
|
}
|
||||||
this.bins = new HashBin[capacity];
|
this.bins = new HashBin[capacity];
|
||||||
|
threshold = calculateThreashold();
|
||||||
keysPerPage = pageSize / keySize;
|
keysPerPage = pageSize / keySize;
|
||||||
dataIn = new DataByteArrayInputStream();
|
dataIn = new DataByteArrayInputStream();
|
||||||
dataOut = new DataByteArrayOutputStream(pageSize);
|
dataOut = new DataByteArrayOutputStream(pageSize);
|
||||||
|
@ -229,6 +280,9 @@ public class HashIndex implements Index, HashIndexMBean {
|
||||||
if (!getBin(key).put(entry)) {
|
if (!getBin(key).put(entry)) {
|
||||||
size++;
|
size++;
|
||||||
}
|
}
|
||||||
|
if (size >= threshold) {
|
||||||
|
resize(2*bins.length);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized StoreEntry get(Object key) throws IOException {
|
public synchronized StoreEntry get(Object key) throws IOException {
|
||||||
|
@ -361,11 +415,18 @@ public class HashIndex implements Index, HashIndexMBean {
|
||||||
}
|
}
|
||||||
|
|
||||||
void addToBin(HashPage page) throws IOException {
|
void addToBin(HashPage page) throws IOException {
|
||||||
HashBin bin = getBin(page.getBinId());
|
int index = page.getBinId();
|
||||||
|
if (index >= numberOfBins) {
|
||||||
|
HashBin[] newBins = new HashBin[index+1];
|
||||||
|
System.arraycopy(this.bins, 0, newBins, 0, this.bins.length);
|
||||||
|
this.bins=newBins;
|
||||||
|
}
|
||||||
|
HashBin bin = getBin(index);
|
||||||
bin.addHashPageInfo(page.getId(), page.getPersistedSize());
|
bin.addHashPageInfo(page.getId(), page.getPersistedSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
private HashBin getBin(int index) {
|
private HashBin getBin(int index) {
|
||||||
|
|
||||||
HashBin result = bins[index];
|
HashBin result = bins[index];
|
||||||
if (result == null) {
|
if (result == null) {
|
||||||
result = new HashBin(this, index, pageSize / keySize);
|
result = new HashBin(this, index, pageSize / keySize);
|
||||||
|
@ -464,6 +525,49 @@ public class HashIndex implements Index, HashIndexMBean {
|
||||||
doLoad();
|
doLoad();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void resize(int newCapacity) throws IOException {
|
||||||
|
if (bins.length == getMaximumCapacity()) {
|
||||||
|
threshold = Integer.MAX_VALUE;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
String backFileName = name + "-REISZE";
|
||||||
|
HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
|
||||||
|
backIndex.setKeyMarshaller(keyMarshaller);
|
||||||
|
backIndex.setKeySize(getKeySize());
|
||||||
|
backIndex.setNumberOfBins(newCapacity);
|
||||||
|
backIndex.setPageSize(getPageSize());
|
||||||
|
backIndex.load();
|
||||||
|
File backFile = backIndex.file;
|
||||||
|
long offset = 0;
|
||||||
|
while ((offset + pageSize) <= indexFile.length()) {
|
||||||
|
indexFile.seek(offset);
|
||||||
|
HashPage page = getFullPage(offset);
|
||||||
|
if (page.isActive()) {
|
||||||
|
for (HashEntry entry : page.getEntries()) {
|
||||||
|
backIndex.getBin(entry.getKey()).put(entry);
|
||||||
|
backIndex.size++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
page=null;
|
||||||
|
offset += pageSize;
|
||||||
|
}
|
||||||
|
backIndex.unload();
|
||||||
|
|
||||||
|
unload();
|
||||||
|
IOHelper.deleteFile(file);
|
||||||
|
IOHelper.copyFile(backFile, file);
|
||||||
|
IOHelper.deleteFile(backFile);
|
||||||
|
setNumberOfBins(newCapacity);
|
||||||
|
bins = new HashBin[newCapacity];
|
||||||
|
threshold = calculateThreashold();
|
||||||
|
openIndexFile();
|
||||||
|
doLoad();
|
||||||
|
}
|
||||||
|
|
||||||
|
private int calculateThreashold() {
|
||||||
|
return (int)(bins.length * 100 * loadFactor);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public String toString() {
|
public String toString() {
|
||||||
String str = "HashIndex"+System.identityHashCode(this)+": "+file.getName();
|
String str = "HashIndex"+System.identityHashCode(this)+": "+file.getName();
|
||||||
|
@ -488,5 +592,6 @@ public class HashIndex implements Index, HashIndexMBean {
|
||||||
DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "16384"));
|
DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "16384"));
|
||||||
DEFAULT_KEY_SIZE = Integer.parseInt(System.getProperty("defaultKeySize", "96"));
|
DEFAULT_KEY_SIZE = Integer.parseInt(System.getProperty("defaultKeySize", "96"));
|
||||||
DEFAULT_BIN_SIZE= Integer.parseInt(System.getProperty("defaultBinSize", "1024"));
|
DEFAULT_BIN_SIZE= Integer.parseInt(System.getProperty("defaultBinSize", "1024"));
|
||||||
|
DEFAULT_LOAD_FACTOR=Float.parseFloat(System.getProperty("defaultLoadFactor","1.5f"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -118,6 +118,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
||||||
private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
|
private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
|
||||||
private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
|
private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
|
||||||
private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
|
private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
|
||||||
|
private int maxBinSize = HashIndex.MAXIMUM_CAPACITY;
|
||||||
private int maxReferenceFileLength=AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
|
private int maxReferenceFileLength=AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
|
||||||
private Map<AMQMessageStore,Set<Integer>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Set<Integer>> ();
|
private Map<AMQMessageStore,Set<Integer>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Set<Integer>> ();
|
||||||
private String directoryPath = "";
|
private String directoryPath = "";
|
||||||
|
@ -685,6 +686,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
||||||
adaptor.setIndexBinSize(getIndexBinSize());
|
adaptor.setIndexBinSize(getIndexBinSize());
|
||||||
adaptor.setIndexKeySize(getIndexKeySize());
|
adaptor.setIndexKeySize(getIndexKeySize());
|
||||||
adaptor.setIndexPageSize(getIndexPageSize());
|
adaptor.setIndexPageSize(getIndexPageSize());
|
||||||
|
adaptor.setMaxBinSize(getMaxBinSize());
|
||||||
return adaptor;
|
return adaptor;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -834,6 +836,14 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
||||||
return indexPageSize;
|
return indexPageSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getMaxBinSize() {
|
||||||
|
return maxBinSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxBinSize(int maxBinSize) {
|
||||||
|
this.maxBinSize = maxBinSize;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* When set using XBean, you can use values such as: "20
|
* When set using XBean, you can use values such as: "20
|
||||||
* mb", "1024 kb", or "1 gb"
|
* mb", "1024 kb", or "1 gb"
|
||||||
|
|
|
@ -72,6 +72,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
|
||||||
private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
|
private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
|
||||||
private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
|
private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
|
||||||
private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
|
private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
|
||||||
|
private int maxBinSize = HashIndex.MAXIMUM_CAPACITY;
|
||||||
|
|
||||||
|
|
||||||
public KahaReferenceStoreAdapter(AtomicLong size){
|
public KahaReferenceStoreAdapter(AtomicLong size){
|
||||||
|
@ -203,6 +204,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
|
||||||
container.setIndexBinSize(getIndexBinSize());
|
container.setIndexBinSize(getIndexBinSize());
|
||||||
container.setIndexKeySize(getIndexKeySize());
|
container.setIndexKeySize(getIndexKeySize());
|
||||||
container.setIndexPageSize(getIndexPageSize());
|
container.setIndexPageSize(getIndexPageSize());
|
||||||
|
container.setMaxBinSize(getIndexBinSize());
|
||||||
container.setKeyMarshaller(new MessageIdMarshaller());
|
container.setKeyMarshaller(new MessageIdMarshaller());
|
||||||
container.setValueMarshaller(new ReferenceRecordMarshaller());
|
container.setValueMarshaller(new ReferenceRecordMarshaller());
|
||||||
container.load();
|
container.load();
|
||||||
|
@ -361,4 +363,12 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
|
||||||
public void setIndexPageSize(int indexPageSize) {
|
public void setIndexPageSize(int indexPageSize) {
|
||||||
this.indexPageSize = indexPageSize;
|
this.indexPageSize = indexPageSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getMaxBinSize() {
|
||||||
|
return maxBinSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxBinSize(int maxBinSize) {
|
||||||
|
this.maxBinSize = maxBinSize;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue