mirror of https://github.com/apache/activemq.git
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@591113 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
26387143e8
commit
28296e689c
|
@ -222,4 +222,40 @@ public interface MapContainer<K, V> extends Map<K, V> {
|
|||
* @return the StoreEntry
|
||||
*/
|
||||
StoreEntry getEntry(K key);
|
||||
|
||||
/**
|
||||
* Set the index bin size
|
||||
* @param size
|
||||
*/
|
||||
void setIndexBinSize(int size);
|
||||
|
||||
/**
|
||||
* @return index bin size
|
||||
*/
|
||||
int getIndexBinSize();
|
||||
|
||||
|
||||
/**
|
||||
* Add the index key size
|
||||
* @param size
|
||||
*/
|
||||
void setIndexKeySize(int size);
|
||||
|
||||
|
||||
/**
|
||||
* @return the index key size
|
||||
*/
|
||||
int getIndexKeySize();
|
||||
|
||||
|
||||
/**
|
||||
* Set the index page size
|
||||
* @param size
|
||||
*/
|
||||
void setIndexPageSize(int size);
|
||||
|
||||
/**
|
||||
* @return the index page size
|
||||
*/
|
||||
int getIndexPageSize();
|
||||
}
|
||||
|
|
|
@ -52,6 +52,9 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
protected Marshaller keyMarshaller = Store.OBJECT_MARSHALLER;
|
||||
protected Marshaller valueMarshaller = Store.OBJECT_MARSHALLER;
|
||||
protected File directory;
|
||||
private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
|
||||
private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
|
||||
private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
|
||||
|
||||
public MapContainerImpl(File directory, ContainerId id, IndexItem root, IndexManager indexManager,
|
||||
DataManager dataManager, boolean persistentIndex) {
|
||||
|
@ -66,7 +69,11 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
String name = containerId.getDataContainerName() + "_" + containerId.getKey();
|
||||
name = name.replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_");
|
||||
try {
|
||||
this.index = new HashIndex(directory, name, indexManager);
|
||||
HashIndex hashIndex = new HashIndex(directory, name, indexManager);
|
||||
hashIndex.setNumberOfBins(getIndexBinSize());
|
||||
hashIndex.setKeySize(getIndexKeySize());
|
||||
hashIndex.setPageSize(getIndexPageSize());
|
||||
this.index = hashIndex;
|
||||
} catch (IOException e) {
|
||||
LOG.error("Failed to create HashIndex", e);
|
||||
throw new RuntimeException(e);
|
||||
|
@ -527,4 +534,30 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
}
|
||||
return index;
|
||||
}
|
||||
|
||||
public int getIndexBinSize() {
|
||||
return indexBinSize;
|
||||
}
|
||||
|
||||
public void setIndexBinSize(int indexBinSize) {
|
||||
this.indexBinSize = indexBinSize;
|
||||
}
|
||||
|
||||
public int getIndexKeySize() {
|
||||
return indexKeySize;
|
||||
}
|
||||
|
||||
public void setIndexKeySize(int indexKeySize) {
|
||||
this.indexKeySize = indexKeySize;
|
||||
}
|
||||
|
||||
public int getIndexPageSize() {
|
||||
return indexPageSize;
|
||||
}
|
||||
|
||||
public void setIndexPageSize(int indexPageSize) {
|
||||
this.indexPageSize = indexPageSize;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -37,10 +37,10 @@ import org.apache.commons.logging.LogFactory;
|
|||
* @version $Revision: 1.1.1.1 $
|
||||
*/
|
||||
public class HashIndex implements Index {
|
||||
|
||||
public static final int DEFAULT_PAGE_SIZE;
|
||||
public static final int DEFAULT_KEY_SIZE;
|
||||
public static final int DEFAULT_BIN_SIZE;
|
||||
private static final String NAME_PREFIX = "hash-index-";
|
||||
private static final int DEFAULT_PAGE_SIZE;
|
||||
private static final int DEFAULT_KEY_SIZE;
|
||||
private static final Log LOG = LogFactory.getLog(HashIndex.class);
|
||||
private final String name;
|
||||
private File directory;
|
||||
|
@ -49,6 +49,7 @@ public class HashIndex implements Index {
|
|||
private IndexManager indexManager;
|
||||
private int pageSize = DEFAULT_PAGE_SIZE;
|
||||
private int keySize = DEFAULT_KEY_SIZE;
|
||||
private int numberOfBins = DEFAULT_BIN_SIZE;
|
||||
private int keysPerPage = pageSize / keySize;
|
||||
private DataByteArrayInputStream dataIn;
|
||||
private DataByteArrayOutputStream dataOut;
|
||||
|
@ -60,21 +61,10 @@ public class HashIndex implements Index {
|
|||
private HashPage lastFree;
|
||||
private AtomicBoolean loaded = new AtomicBoolean();
|
||||
private LRUCache<Long, HashPage> pageCache;
|
||||
private boolean enablePageCaching;
|
||||
private int pageCacheSize = 10;
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*
|
||||
* @param directory
|
||||
* @param name
|
||||
* @param indexManager
|
||||
* @throws IOException
|
||||
*/
|
||||
public HashIndex(File directory, String name, IndexManager indexManager) throws IOException {
|
||||
this(directory, name, indexManager, 1024);
|
||||
}
|
||||
private boolean enablePageCaching=true;
|
||||
private int pageCacheSize = 1;
|
||||
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*
|
||||
|
@ -84,15 +74,9 @@ public class HashIndex implements Index {
|
|||
* @param numberOfBins
|
||||
* @throws IOException
|
||||
*/
|
||||
public HashIndex(File directory, String name, IndexManager indexManager, int numberOfBins) throws IOException {
|
||||
public HashIndex(File directory, String name, IndexManager indexManager) throws IOException {
|
||||
this.directory = directory;
|
||||
this.name = name;
|
||||
this.indexManager = indexManager;
|
||||
int capacity = 1;
|
||||
while (capacity < numberOfBins) {
|
||||
capacity <<= 1;
|
||||
}
|
||||
this.bins = new HashBin[capacity];
|
||||
openIndexFile();
|
||||
pageCache = new LRUCache<Long, HashPage>(pageCacheSize, pageCacheSize, 0.75f, true);
|
||||
}
|
||||
|
@ -139,6 +123,23 @@ public class HashIndex implements Index {
|
|||
}
|
||||
this.pageSize = pageSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return number of bins
|
||||
*/
|
||||
public int getNumberOfBins() {
|
||||
return this.numberOfBins;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param numberOfBins
|
||||
*/
|
||||
public void setNumberOfBins(int numberOfBins) {
|
||||
if (loaded.get() && numberOfBins != this.numberOfBins) {
|
||||
throw new RuntimeException("Pages already loaded - can't reset bin size");
|
||||
}
|
||||
this.numberOfBins = numberOfBins;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the enablePageCaching
|
||||
|
@ -175,6 +176,12 @@ public class HashIndex implements Index {
|
|||
|
||||
public synchronized void load() {
|
||||
if (loaded.compareAndSet(false, true)) {
|
||||
this.indexManager = indexManager;
|
||||
int capacity = 1;
|
||||
while (capacity < numberOfBins) {
|
||||
capacity <<= 1;
|
||||
}
|
||||
this.bins = new HashBin[capacity];
|
||||
keysPerPage = pageSize / keySize;
|
||||
dataIn = new DataByteArrayInputStream();
|
||||
dataOut = new DataByteArrayOutputStream(pageSize);
|
||||
|
@ -439,5 +446,6 @@ public class HashIndex implements Index {
|
|||
static {
|
||||
DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", "16384"));
|
||||
DEFAULT_KEY_SIZE = Integer.parseInt(System.getProperty("defaultKeySize", "96"));
|
||||
DEFAULT_BIN_SIZE= Integer.parseInt(System.getProperty("defaultBinSize", "1024"));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -85,9 +85,6 @@ public class TreeIndex implements Index {
|
|||
* @param marshaller
|
||||
*/
|
||||
public void setKeyMarshaller(Marshaller marshaller) {
|
||||
if (loaded.get()) {
|
||||
throw new RuntimeException("Pages already loaded - can't set marshaller now");
|
||||
}
|
||||
this.keyMarshaller = marshaller;
|
||||
}
|
||||
|
||||
|
|
|
@ -44,6 +44,7 @@ import org.apache.activemq.command.JournalTransaction;
|
|||
import org.apache.activemq.command.Message;
|
||||
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
|
||||
import org.apache.activemq.kaha.impl.async.Location;
|
||||
import org.apache.activemq.kaha.impl.index.hash.HashIndex;
|
||||
import org.apache.activemq.openwire.OpenWireFormat;
|
||||
import org.apache.activemq.store.MessageStore;
|
||||
import org.apache.activemq.store.PersistenceAdapter;
|
||||
|
@ -105,6 +106,9 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
|||
private boolean persistentIndex=true;
|
||||
private boolean useNio = true;
|
||||
private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
|
||||
private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
|
||||
private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
|
||||
private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
|
||||
private Map<AMQMessageStore,Set<Integer>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Set<Integer>> ();
|
||||
|
||||
|
||||
|
@ -628,6 +632,9 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
|||
protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
|
||||
KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(storeSize);
|
||||
adaptor.setPersistentIndex(isPersistentIndex());
|
||||
adaptor.setIndexBinSize(getIndexBinSize());
|
||||
adaptor.setIndexKeySize(getIndexKeySize());
|
||||
adaptor.setIndexPageSize(getIndexPageSize());
|
||||
return adaptor;
|
||||
}
|
||||
|
||||
|
@ -756,6 +763,31 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
|||
public void setCheckpointInterval(long checkpointInterval) {
|
||||
this.checkpointInterval = checkpointInterval;
|
||||
}
|
||||
|
||||
public int getIndexBinSize() {
|
||||
return indexBinSize;
|
||||
}
|
||||
|
||||
public void setIndexBinSize(int indexBinSize) {
|
||||
this.indexBinSize = indexBinSize;
|
||||
}
|
||||
|
||||
public int getIndexKeySize() {
|
||||
return indexKeySize;
|
||||
}
|
||||
|
||||
public void setIndexKeySize(int indexKeySize) {
|
||||
this.indexKeySize = indexKeySize;
|
||||
}
|
||||
|
||||
public int getIndexPageSize() {
|
||||
return indexPageSize;
|
||||
}
|
||||
|
||||
public void setIndexPageSize(int indexPageSize) {
|
||||
this.indexPageSize = indexPageSize;
|
||||
}
|
||||
|
||||
|
||||
protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
|
||||
Set<Integer>set = dataFilesInProgress.get(store);
|
||||
|
@ -771,7 +803,5 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
|
|||
if (set != null) {
|
||||
set.remove(dataFileId);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.activemq.kaha.MapContainer;
|
|||
import org.apache.activemq.kaha.MessageIdMarshaller;
|
||||
import org.apache.activemq.kaha.Store;
|
||||
import org.apache.activemq.kaha.StoreFactory;
|
||||
import org.apache.activemq.kaha.impl.index.hash.HashIndex;
|
||||
import org.apache.activemq.store.MessageStore;
|
||||
import org.apache.activemq.store.ReferenceStore;
|
||||
import org.apache.activemq.store.ReferenceStoreAdapter;
|
||||
|
@ -63,6 +64,9 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
|
|||
private boolean storeValid;
|
||||
private Store stateStore;
|
||||
private boolean persistentIndex = true;
|
||||
private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
|
||||
private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
|
||||
private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
|
||||
|
||||
public KahaReferenceStoreAdapter(AtomicLong size){
|
||||
super(size);
|
||||
|
@ -176,7 +180,10 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
|
|||
String containerName)
|
||||
throws IOException {
|
||||
Store store = getStore();
|
||||
MapContainer<MessageId, ReferenceRecord> container = store.getMapContainer(id, containerName);
|
||||
MapContainer<MessageId, ReferenceRecord> container = store.getMapContainer(id, containerName,persistentIndex);
|
||||
container.setIndexBinSize(getIndexBinSize());
|
||||
container.setIndexKeySize(getIndexKeySize());
|
||||
container.setIndexPageSize(getIndexPageSize());
|
||||
container.setKeyMarshaller(new MessageIdMarshaller());
|
||||
container.setValueMarshaller(new ReferenceRecordMarshaller());
|
||||
container.load();
|
||||
|
@ -308,5 +315,29 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
|
|||
durableSubscribers.remove(info);
|
||||
}
|
||||
|
||||
public int getIndexBinSize() {
|
||||
return indexBinSize;
|
||||
}
|
||||
|
||||
public void setIndexBinSize(int indexBinSize) {
|
||||
this.indexBinSize = indexBinSize;
|
||||
}
|
||||
|
||||
public int getIndexKeySize() {
|
||||
return indexKeySize;
|
||||
}
|
||||
|
||||
public void setIndexKeySize(int indexKeySize) {
|
||||
this.indexKeySize = indexKeySize;
|
||||
}
|
||||
|
||||
public int getIndexPageSize() {
|
||||
return indexPageSize;
|
||||
}
|
||||
|
||||
public void setIndexPageSize(int indexPageSize) {
|
||||
this.indexPageSize = indexPageSize;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
|
@ -16,6 +16,9 @@
|
|||
*/
|
||||
package org.apache.activemq.util;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* @version $Revision$
|
||||
*/
|
||||
|
@ -52,24 +55,71 @@ public final class IOHelper {
|
|||
* @param name
|
||||
* @return
|
||||
*/
|
||||
public static String toFileSystemSafeName( String name ) {
|
||||
int size = name.length();
|
||||
StringBuffer rc = new StringBuffer(size*2);
|
||||
for (int i = 0; i < size; i++) {
|
||||
char c = name.charAt(i);
|
||||
boolean valid = c >= 'a' && c <= 'z';
|
||||
valid = valid || (c >= 'A' && c <= 'Z');
|
||||
valid = valid || (c >= '0' && c <= '9');
|
||||
valid = valid || (c == '_') || (c == '-') || (c == '.') || (c == '/') || (c == '\\');
|
||||
|
||||
if( valid ) {
|
||||
rc.append(c);
|
||||
} else {
|
||||
// Encode the character using hex notation
|
||||
rc.append('#');
|
||||
rc.append(HexSupport.toHexFromInt(c, true));
|
||||
}
|
||||
}
|
||||
return rc.toString();
|
||||
public static String toFileSystemSafeName(String name) {
|
||||
int size = name.length();
|
||||
StringBuffer rc = new StringBuffer(size * 2);
|
||||
for (int i = 0; i < size; i++) {
|
||||
char c = name.charAt(i);
|
||||
boolean valid = c >= 'a' && c <= 'z';
|
||||
valid = valid || (c >= 'A' && c <= 'Z');
|
||||
valid = valid || (c >= '0' && c <= '9');
|
||||
valid = valid || (c == '_') || (c == '-') || (c == '.')
|
||||
|| (c == '/') || (c == '\\');
|
||||
|
||||
if (valid) {
|
||||
rc.append(c);
|
||||
} else {
|
||||
// Encode the character using hex notation
|
||||
rc.append('#');
|
||||
rc.append(HexSupport.toHexFromInt(c, true));
|
||||
}
|
||||
}
|
||||
return rc.toString();
|
||||
}
|
||||
|
||||
public static boolean deleteFile(File fileToDelete) {
|
||||
if (fileToDelete == null || !fileToDelete.exists()) {
|
||||
return true;
|
||||
}
|
||||
boolean result = deleteChildren(fileToDelete);
|
||||
result &= fileToDelete.delete();
|
||||
return result;
|
||||
}
|
||||
|
||||
public static boolean deleteChildren(File parent) {
|
||||
if (parent == null || !parent.exists()) {
|
||||
return false;
|
||||
}
|
||||
boolean result = true;
|
||||
if (parent.isDirectory()) {
|
||||
File[] files = parent.listFiles();
|
||||
if (files == null) {
|
||||
result = false;
|
||||
} else {
|
||||
for (int i = 0; i < files.length; i++) {
|
||||
File file = files[i];
|
||||
if (file.getName().equals(".")
|
||||
|| file.getName().equals("..")) {
|
||||
continue;
|
||||
}
|
||||
if (file.isDirectory()) {
|
||||
result &= deleteFile(file);
|
||||
} else {
|
||||
result &= file.delete();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
public static void moveFile(File src, File targetDirectory) throws IOException {
|
||||
if (!src.renameTo(new File(targetDirectory, src.getName()))) {
|
||||
throw new IOException("Failed to move " + src + " to " + targetDirectory);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue