mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@669879 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
445dc449c2
commit
a918194d80
|
@ -301,6 +301,13 @@ public class IndexItem implements Item, StoreEntry {
|
|||
this.valueSize = valueSize;
|
||||
}
|
||||
|
||||
void copyIndex(IndexItem other) {
|
||||
this.offset=other.offset;
|
||||
this.active=other.active;
|
||||
this.previousItem=other.previousItem;
|
||||
this.nextItem=other.nextItem;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return print of 'this'
|
||||
*/
|
||||
|
|
|
@ -49,6 +49,7 @@ public final class IndexManager {
|
|||
private IndexItem lastFree;
|
||||
private boolean dirty;
|
||||
private final AtomicLong storeSize;
|
||||
private int freeSize = 0;
|
||||
|
||||
public IndexManager(File directory, String name, String mode, DataManager redoLog, AtomicLong storeSize) throws IOException {
|
||||
this.directory = directory;
|
||||
|
@ -64,7 +65,11 @@ public final class IndexManager {
|
|||
}
|
||||
|
||||
public synchronized IndexItem getIndex(long offset) throws IOException {
|
||||
return reader.readItem(offset);
|
||||
IndexItem result = null;
|
||||
if (offset >= 0) {
|
||||
result = reader.readItem(offset);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public synchronized IndexItem refreshIndex(IndexItem item) throws IOException {
|
||||
|
@ -80,8 +85,16 @@ public final class IndexManager {
|
|||
lastFree = item;
|
||||
} else {
|
||||
lastFree.setNextItem(item.getOffset());
|
||||
if (lastFree.equals(firstFree)) {
|
||||
firstFree=new IndexItem();
|
||||
firstFree.copyIndex(lastFree);
|
||||
writer.updateIndexes(firstFree);
|
||||
}
|
||||
writer.updateIndexes(lastFree);
|
||||
lastFree=item;
|
||||
}
|
||||
writer.updateIndexes(item);
|
||||
freeSize++;
|
||||
dirty = true;
|
||||
}
|
||||
|
||||
|
@ -155,6 +168,8 @@ public final class IndexManager {
|
|||
}
|
||||
}
|
||||
result.reset();
|
||||
writer.updateIndexes(result);
|
||||
freeSize--;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
@ -200,6 +215,7 @@ public final class IndexManager {
|
|||
lastFree = index;
|
||||
firstFree = index;
|
||||
}
|
||||
freeSize++;
|
||||
}
|
||||
offset += IndexItem.INDEX_SIZE;
|
||||
}
|
||||
|
|
|
@ -44,6 +44,7 @@ public class HashIndex implements Index, HashIndexMBean {
|
|||
public static final int DEFAULT_BIN_SIZE;
|
||||
public static final int MAXIMUM_CAPACITY;
|
||||
public static final int DEFAULT_LOAD_FACTOR;
|
||||
private static final int LOW_WATER_MARK=1024*16;
|
||||
private static final String NAME_PREFIX = "hash-index-";
|
||||
private static final Log LOG = LogFactory.getLog(HashIndex.class);
|
||||
private final String name;
|
||||
|
@ -67,6 +68,7 @@ public class HashIndex implements Index, HashIndexMBean {
|
|||
private boolean enablePageCaching=false;//this is off by default - see AMQ-1667
|
||||
private int pageCacheSize = 10;
|
||||
private int size;
|
||||
private int highestSize=0;
|
||||
private int activeBins;
|
||||
private int threshold;
|
||||
private int maximumCapacity=MAXIMUM_CAPACITY;
|
||||
|
@ -275,11 +277,14 @@ public class HashIndex implements Index, HashIndexMBean {
|
|||
entry.setKey((Comparable)key);
|
||||
entry.setIndexOffset(value.getOffset());
|
||||
if (!getBin(key).put(entry)) {
|
||||
size++;
|
||||
this.size++;
|
||||
}
|
||||
if (size >= threshold) {
|
||||
if (this.size >= this.threshold) {
|
||||
resize(2*bins.length);
|
||||
}
|
||||
if(this.size > this.highestSize) {
|
||||
this.highestSize=this.size;
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized StoreEntry get(Object key) throws IOException {
|
||||
|
@ -292,14 +297,22 @@ public class HashIndex implements Index, HashIndexMBean {
|
|||
|
||||
public synchronized StoreEntry remove(Object key) throws IOException {
|
||||
load();
|
||||
StoreEntry result = null;
|
||||
HashEntry entry = new HashEntry();
|
||||
entry.setKey((Comparable)key);
|
||||
HashEntry result = getBin(key).remove(entry);
|
||||
if (result != null) {
|
||||
size--;
|
||||
return indexManager.getIndex(result.getIndexOffset());
|
||||
HashEntry he = getBin(key).remove(entry);
|
||||
if (he != null) {
|
||||
this.size--;
|
||||
result = this.indexManager.getIndex(he.getIndexOffset());
|
||||
}
|
||||
return null;
|
||||
if (this.highestSize > LOW_WATER_MARK && this.highestSize > (this.size *2)) {
|
||||
int newSize = this.size/this.keysPerPage;
|
||||
newSize = Math.max(128, newSize);
|
||||
this.highestSize=0;
|
||||
resize(newSize);
|
||||
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public synchronized boolean containsKey(Object key) throws IOException {
|
||||
|
@ -523,42 +536,53 @@ public class HashIndex implements Index, HashIndexMBean {
|
|||
}
|
||||
|
||||
private void resize(int newCapacity) throws IOException {
|
||||
if (bins.length == getMaximumCapacity()) {
|
||||
if (bins.length < getMaximumCapacity()) {
|
||||
if (newCapacity != numberOfBins) {
|
||||
int capacity = 1;
|
||||
while (capacity < newCapacity) {
|
||||
capacity <<= 1;
|
||||
}
|
||||
if (newCapacity != numberOfBins) {
|
||||
LOG.info("Resize hash bins " + this.name + " from " + numberOfBins + " to " + newCapacity);
|
||||
|
||||
String backFileName = name + "-REISZE";
|
||||
HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
|
||||
backIndex.setKeyMarshaller(keyMarshaller);
|
||||
backIndex.setKeySize(getKeySize());
|
||||
backIndex.setNumberOfBins(newCapacity);
|
||||
backIndex.setPageSize(getPageSize());
|
||||
backIndex.load();
|
||||
File backFile = backIndex.file;
|
||||
long offset = 0;
|
||||
while ((offset + pageSize) <= indexFile.length()) {
|
||||
indexFile.seek(offset);
|
||||
HashPage page = getFullPage(offset);
|
||||
if (page.isActive()) {
|
||||
for (HashEntry entry : page.getEntries()) {
|
||||
backIndex.getBin(entry.getKey()).put(entry);
|
||||
backIndex.size++;
|
||||
}
|
||||
}
|
||||
page=null;
|
||||
offset += pageSize;
|
||||
}
|
||||
backIndex.unload();
|
||||
|
||||
unload();
|
||||
IOHelper.deleteFile(file);
|
||||
IOHelper.copyFile(backFile, file);
|
||||
IOHelper.deleteFile(backFile);
|
||||
setNumberOfBins(newCapacity);
|
||||
bins = new HashBin[newCapacity];
|
||||
threshold = calculateThreashold();
|
||||
openIndexFile();
|
||||
doLoad();
|
||||
}
|
||||
}
|
||||
}else {
|
||||
threshold = Integer.MAX_VALUE;
|
||||
return;
|
||||
}
|
||||
String backFileName = name + "-REISZE";
|
||||
HashIndex backIndex = new HashIndex(directory,backFileName,indexManager);
|
||||
backIndex.setKeyMarshaller(keyMarshaller);
|
||||
backIndex.setKeySize(getKeySize());
|
||||
backIndex.setNumberOfBins(newCapacity);
|
||||
backIndex.setPageSize(getPageSize());
|
||||
backIndex.load();
|
||||
File backFile = backIndex.file;
|
||||
long offset = 0;
|
||||
while ((offset + pageSize) <= indexFile.length()) {
|
||||
indexFile.seek(offset);
|
||||
HashPage page = getFullPage(offset);
|
||||
if (page.isActive()) {
|
||||
for (HashEntry entry : page.getEntries()) {
|
||||
backIndex.getBin(entry.getKey()).put(entry);
|
||||
backIndex.size++;
|
||||
}
|
||||
}
|
||||
page=null;
|
||||
offset += pageSize;
|
||||
}
|
||||
backIndex.unload();
|
||||
|
||||
unload();
|
||||
IOHelper.deleteFile(file);
|
||||
IOHelper.copyFile(backFile, file);
|
||||
IOHelper.deleteFile(backFile);
|
||||
setNumberOfBins(newCapacity);
|
||||
bins = new HashBin[newCapacity];
|
||||
threshold = calculateThreashold();
|
||||
openIndexFile();
|
||||
doLoad();
|
||||
}
|
||||
|
||||
private int calculateThreashold() {
|
||||
|
|
|
@ -121,7 +121,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
|||
private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
|
||||
private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
|
||||
private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
|
||||
private int maxReferenceFileLength=AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
|
||||
private int maxReferenceFileLength=AMQPersistenceAdapterFactory.DEFAULT_MAX_REFERNCE_FILE_LENGTH;
|
||||
private Map<AMQMessageStore,Set<Integer>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Set<Integer>> ();
|
||||
private String directoryPath = "";
|
||||
private RandomAccessFile lockFile;
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.activemq.store.amq;
|
|||
import java.io.File;
|
||||
|
||||
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
|
||||
import org.apache.activemq.kaha.impl.index.hash.HashIndex;
|
||||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
import org.apache.activemq.store.PersistenceAdapterFactory;
|
||||
import org.apache.activemq.store.ReferenceStoreAdapter;
|
||||
|
@ -33,7 +34,7 @@ import org.apache.activemq.util.IOHelper;
|
|||
* @version $Revision: 1.17 $
|
||||
*/
|
||||
public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory {
|
||||
|
||||
static final int DEFAULT_MAX_REFERNCE_FILE_LENGTH=2*1024*1024;
|
||||
private TaskRunnerFactory taskRunnerFactory;
|
||||
private File dataDirectory;
|
||||
private int journalThreadPriority = Thread.MAX_PRIORITY;
|
||||
|
@ -45,6 +46,12 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory {
|
|||
private boolean useNio = true;
|
||||
private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
|
||||
private long cleanupInterval = AsyncDataManager.DEFAULT_CLEANUP_INTERVAL;
|
||||
private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
|
||||
private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
|
||||
private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
|
||||
private int indexMaxBinSize = HashIndex.MAXIMUM_CAPACITY;
|
||||
private int indexLoadFactor = HashIndex.DEFAULT_LOAD_FACTOR;
|
||||
private int maxReferenceFileLength=DEFAULT_MAX_REFERNCE_FILE_LENGTH;
|
||||
|
||||
|
||||
/**
|
||||
|
@ -62,6 +69,12 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory {
|
|||
result.setUseNio(isUseNio());
|
||||
result.setMaxFileLength(getMaxFileLength());
|
||||
result.setCleanupInterval(getCleanupInterval());
|
||||
result.setIndexBinSize(getIndexBinSize());
|
||||
result.setIndexKeySize(getIndexKeySize());
|
||||
result.setIndexPageSize(getIndexPageSize());
|
||||
result.setIndexMaxBinSize(getIndexMaxBinSize());
|
||||
result.setIndexLoadFactor(getIndexLoadFactor());
|
||||
result.setMaxReferenceFileLength(getMaxReferenceFileLength());
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -189,4 +202,88 @@ public class AMQPersistenceAdapterFactory implements PersistenceAdapterFactory {
|
|||
public void setMaxFileLength(int maxFileLength) {
|
||||
this.maxFileLength = maxFileLength;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the indexBinSize
|
||||
*/
|
||||
public int getIndexBinSize() {
|
||||
return indexBinSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param indexBinSize the indexBinSize to set
|
||||
*/
|
||||
public void setIndexBinSize(int indexBinSize) {
|
||||
this.indexBinSize = indexBinSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the indexKeySize
|
||||
*/
|
||||
public int getIndexKeySize() {
|
||||
return indexKeySize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param indexKeySize the indexKeySize to set
|
||||
*/
|
||||
public void setIndexKeySize(int indexKeySize) {
|
||||
this.indexKeySize = indexKeySize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the indexPageSize
|
||||
*/
|
||||
public int getIndexPageSize() {
|
||||
return indexPageSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param indexPageSize the indexPageSize to set
|
||||
*/
|
||||
public void setIndexPageSize(int indexPageSize) {
|
||||
this.indexPageSize = indexPageSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the indexMaxBinSize
|
||||
*/
|
||||
public int getIndexMaxBinSize() {
|
||||
return indexMaxBinSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param indexMaxBinSize the indexMaxBinSize to set
|
||||
*/
|
||||
public void setIndexMaxBinSize(int indexMaxBinSize) {
|
||||
this.indexMaxBinSize = indexMaxBinSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the indexLoadFactor
|
||||
*/
|
||||
public int getIndexLoadFactor() {
|
||||
return indexLoadFactor;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param indexLoadFactor the indexLoadFactor to set
|
||||
*/
|
||||
public void setIndexLoadFactor(int indexLoadFactor) {
|
||||
this.indexLoadFactor = indexLoadFactor;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maxReferenceFileLength
|
||||
*/
|
||||
public int getMaxReferenceFileLength() {
|
||||
return maxReferenceFileLength;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param maxReferenceFileLength the maxReferenceFileLength to set
|
||||
*/
|
||||
public void setMaxReferenceFileLength(int maxReferenceFileLength) {
|
||||
this.maxReferenceFileLength = maxReferenceFileLength;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue