mirror of https://github.com/apache/activemq.git
Tidy up the choice of Index types for the Map and List container
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@503177 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
93c67fe701
commit
5b49189cfe
|
@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.MessageReference;
|
||||
import org.apache.activemq.command.Message;
|
||||
import org.apache.activemq.kaha.IndexTypes;
|
||||
import org.apache.activemq.kaha.ListContainer;
|
||||
import org.apache.activemq.kaha.Store;
|
||||
import org.apache.activemq.memory.UsageListener;
|
||||
|
@ -279,7 +278,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
|
|||
protected ListContainer getDiskList(){
|
||||
if(diskList==null){
|
||||
try{
|
||||
diskList=store.getListContainer(name,"TopicSubscription",IndexTypes.DISK_INDEX);
|
||||
diskList=store.getListContainer(name,"TopicSubscription",Store.IndexType.PERSISTENT);
|
||||
diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
|
||||
}catch(IOException e){
|
||||
e.printStackTrace();
|
||||
|
|
|
@ -1,35 +0,0 @@
|
|||
/**
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.kaha;
|
||||
|
||||
/**
|
||||
* Types of Indexes used by the Store
|
||||
*
|
||||
* @version $Revision: 1.2 $
|
||||
*/
|
||||
public interface IndexTypes{
|
||||
|
||||
/**
|
||||
* use in memory indexes
|
||||
*/
|
||||
public final static String IN_MEMORY_INDEX= "InMemoryIndex";
|
||||
/**
|
||||
* use disk-based indexes
|
||||
*/
|
||||
public final static String DISK_INDEX = "DiskIndex";
|
||||
}
|
|
@ -25,7 +25,25 @@ import java.util.Set;
|
|||
* @version $Revision: 1.2 $
|
||||
*/
|
||||
public interface Store{
|
||||
/**
|
||||
* Defauly container name
|
||||
*/
|
||||
public static final String DEFAULT_CONTAINER_NAME="kaha";
|
||||
|
||||
/**
|
||||
* Index Types
|
||||
*
|
||||
*/
|
||||
public static enum IndexType{
|
||||
/**
|
||||
* Map Index held in memory
|
||||
*/
|
||||
VM,
|
||||
/**
|
||||
* Map index persistent
|
||||
*/
|
||||
PERSISTENT
|
||||
}
|
||||
/**
|
||||
* Byte Marshaller
|
||||
*/
|
||||
|
@ -116,7 +134,7 @@ public interface Store{
|
|||
* @return container for the associated id or null if it doesn't exist
|
||||
* @throws IOException
|
||||
*/
|
||||
public MapContainer getMapContainer(Object id,String containerName,String indexType) throws IOException;
|
||||
public MapContainer getMapContainer(Object id,String containerName,Store.IndexType indexType) throws IOException;
|
||||
|
||||
/**
|
||||
* delete a container from the default container
|
||||
|
@ -190,7 +208,7 @@ public interface Store{
|
|||
* @return container for the associated id or null if it doesn't exist
|
||||
* @throws IOException
|
||||
*/
|
||||
public ListContainer getListContainer(Object id,String containerName,String indexType) throws IOException;
|
||||
public ListContainer getListContainer(Object id,String containerName,Store.IndexType indexType) throws IOException;
|
||||
|
||||
/**
|
||||
* delete a ListContainer from the default container
|
||||
|
@ -232,14 +250,14 @@ public interface Store{
|
|||
* @see org.apache.activemq.kaha.IndexTypes
|
||||
* @return the default index type
|
||||
*/
|
||||
public String getIndexType();
|
||||
public String getIndexTypeAsString();
|
||||
|
||||
/**
|
||||
* Set the default index type
|
||||
* @param type
|
||||
* @see org.apache.activemq.kaha.IndexTypes
|
||||
*/
|
||||
public void setIndexType(String type);
|
||||
public void setIndexTypeAsString(String type);
|
||||
|
||||
/**
|
||||
* @return true if the store has been initialized
|
||||
|
|
|
@ -27,7 +27,6 @@ import java.util.Map;
|
|||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import org.apache.activemq.kaha.IndexTypes;
|
||||
import org.apache.activemq.kaha.ListContainer;
|
||||
import org.apache.activemq.kaha.MapContainer;
|
||||
import org.apache.activemq.kaha.RuntimeStoreException;
|
||||
|
@ -35,7 +34,6 @@ import org.apache.activemq.kaha.Store;
|
|||
import org.apache.activemq.kaha.StoreLocation;
|
||||
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
|
||||
import org.apache.activemq.kaha.impl.async.DataManagerFacade;
|
||||
import org.apache.activemq.kaha.impl.container.BaseContainerImpl;
|
||||
import org.apache.activemq.kaha.impl.container.ContainerId;
|
||||
import org.apache.activemq.kaha.impl.container.ListContainerImpl;
|
||||
import org.apache.activemq.kaha.impl.container.MapContainerImpl;
|
||||
|
@ -56,7 +54,7 @@ import org.apache.commons.logging.LogFactory;
|
|||
public class KahaStore implements Store{
|
||||
|
||||
private static final String LOCK_FILE_NAME="store.lock";
|
||||
private static final String DEFAULT_CONTAINER_NAME="kaha";
|
||||
|
||||
private final static String PROPERTY_PREFIX="org.apache.activemq.kaha.Store";
|
||||
private final static boolean brokenFileLock="true".equals(System.getProperty(PROPERTY_PREFIX+".broken","false"));
|
||||
private final static boolean disableLocking="true".equals(System.getProperty(PROPERTY_PREFIX+"DisableLocking",
|
||||
|
@ -78,7 +76,7 @@ public class KahaStore implements Store{
|
|||
private boolean useAsyncDataManager=false;
|
||||
private long maxDataFileLength=1024*1024*32;
|
||||
private FileLock lock;
|
||||
private String indexType=IndexTypes.DISK_INDEX;
|
||||
private IndexType indexType=IndexType.PERSISTENT;
|
||||
|
||||
public KahaStore(String name,String mode) throws IOException{
|
||||
this.mode=mode;
|
||||
|
@ -188,7 +186,7 @@ public class KahaStore implements Store{
|
|||
return getMapContainer(id,containerName,indexType);
|
||||
}
|
||||
|
||||
public synchronized MapContainer getMapContainer(Object id,String containerName,String indexType)
|
||||
public synchronized MapContainer getMapContainer(Object id,String containerName,IndexType indexType)
|
||||
throws IOException{
|
||||
initialize();
|
||||
ContainerId containerId=new ContainerId();
|
||||
|
@ -254,7 +252,7 @@ public class KahaStore implements Store{
|
|||
return getListContainer(id,containerName,indexType);
|
||||
}
|
||||
|
||||
public synchronized ListContainer getListContainer(Object id,String containerName,String indexType)
|
||||
public synchronized ListContainer getListContainer(Object id,String containerName,IndexType indexType)
|
||||
throws IOException{
|
||||
initialize();
|
||||
ContainerId containerId=new ContainerId();
|
||||
|
@ -388,8 +386,8 @@ public class KahaStore implements Store{
|
|||
* @see org.apache.activemq.kaha.IndexTypes
|
||||
* @return the default index type
|
||||
*/
|
||||
public synchronized String getIndexType(){
|
||||
return indexType;
|
||||
public synchronized String getIndexTypeAsString(){
|
||||
return indexType==IndexType.PERSISTENT ? "PERSISTENT":"VM";
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -398,11 +396,12 @@ public class KahaStore implements Store{
|
|||
* @param type
|
||||
* @see org.apache.activemq.kaha.IndexTypes
|
||||
*/
|
||||
public synchronized void setIndexType(String type){
|
||||
if(type==null||(!type.equals(IndexTypes.DISK_INDEX)&&!type.equals(IndexTypes.IN_MEMORY_INDEX))){
|
||||
throw new RuntimeException("Unknown IndexType: "+type);
|
||||
public synchronized void setIndexTypeAsString(String type){
|
||||
if(type.equalsIgnoreCase("VM")){
|
||||
indexType=IndexType.VM;
|
||||
}else{
|
||||
indexType=IndexType.PERSISTENT;
|
||||
}
|
||||
this.indexType=type;
|
||||
}
|
||||
|
||||
public synchronized void initialize() throws IOException{
|
||||
|
|
|
@ -21,8 +21,8 @@ package org.apache.activemq.kaha.impl.container;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import org.apache.activemq.kaha.IndexTypes;
|
||||
import org.apache.activemq.kaha.RuntimeStoreException;
|
||||
import org.apache.activemq.kaha.Store;
|
||||
import org.apache.activemq.kaha.StoreEntry;
|
||||
import org.apache.activemq.kaha.impl.DataManager;
|
||||
import org.apache.activemq.kaha.impl.data.Item;
|
||||
|
@ -50,18 +50,15 @@ public abstract class BaseContainerImpl{
|
|||
protected boolean loaded=false;
|
||||
protected boolean closed=false;
|
||||
protected boolean initialized=false;
|
||||
protected String indexType;
|
||||
protected Store.IndexType indexType;
|
||||
|
||||
protected BaseContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,
|
||||
DataManager dataManager,String indexType){
|
||||
DataManager dataManager,Store.IndexType indexType){
|
||||
this.containerId=id;
|
||||
this.root=root;
|
||||
this.indexManager=indexManager;
|
||||
this.dataManager=dataManager;
|
||||
this.indexType = indexType;
|
||||
if (indexType == null || (!indexType.equals(IndexTypes.DISK_INDEX) && !indexType.equals(IndexTypes.IN_MEMORY_INDEX))) {
|
||||
throw new RuntimeException("Unknown IndexType: " + indexType);
|
||||
}
|
||||
}
|
||||
|
||||
public ContainerId getContainerId(){
|
||||
|
@ -73,7 +70,7 @@ public abstract class BaseContainerImpl{
|
|||
if(!initialized){
|
||||
initialized=true;
|
||||
if(this.indexList==null){
|
||||
if(indexType.equals(IndexTypes.DISK_INDEX)){
|
||||
if(indexType.equals(Store.IndexType.PERSISTENT)){
|
||||
this.indexList=new DiskIndexLinkedList(indexManager,root);
|
||||
}else{
|
||||
this.indexList=new VMIndexLinkedList(root);
|
||||
|
|
|
@ -46,7 +46,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
|
|||
|
||||
|
||||
public ListContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager,
|
||||
String indexType) throws IOException{
|
||||
Store.IndexType indexType) throws IOException{
|
||||
super(id,root,indexManager,dataManager,indexType);
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,6 @@ import java.util.Collection;
|
|||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import org.apache.activemq.kaha.IndexTypes;
|
||||
import org.apache.activemq.kaha.MapContainer;
|
||||
import org.apache.activemq.kaha.Marshaller;
|
||||
import org.apache.activemq.kaha.RuntimeStoreException;
|
||||
|
@ -52,7 +51,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
protected File directory;
|
||||
|
||||
public MapContainerImpl(File directory,ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager,
|
||||
String indexType){
|
||||
Store.IndexType indexType){
|
||||
super(id,root,indexManager,dataManager,indexType);
|
||||
this.directory = directory;
|
||||
}
|
||||
|
@ -60,7 +59,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
|
|||
public synchronized void init() {
|
||||
super.init();
|
||||
if(index==null){
|
||||
if(indexType.equals(IndexTypes.DISK_INDEX)){
|
||||
if(indexType.equals(Store.IndexType.PERSISTENT)){
|
||||
String name = containerId.getDataContainerName() + "_" + containerId.getKey();
|
||||
try{
|
||||
this.index=new HashIndex(directory, name , indexManager);
|
||||
|
|
|
@ -174,7 +174,6 @@ class HashBin{
|
|||
int offset=index%maximumEntries;
|
||||
page.addHashEntry(offset,entry);
|
||||
doOverFlow(index);
|
||||
page.save();
|
||||
}
|
||||
|
||||
private HashEntry removeHashEntry(int index) throws IOException{
|
||||
|
@ -182,7 +181,6 @@ class HashBin{
|
|||
int offset=getRetrieveOffset(index);
|
||||
HashEntry result=page.removeHashEntry(offset);
|
||||
doUnderFlow(index);
|
||||
page.save();
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -233,7 +231,7 @@ class HashBin{
|
|||
int count=0;
|
||||
for(HashPageInfo page:hashPages){
|
||||
if((index+1)<=(count+page.size())){
|
||||
//count=count==0?count:count+1;
|
||||
// count=count==0?count:count+1;
|
||||
result=index-count;
|
||||
break;
|
||||
}
|
||||
|
@ -278,7 +276,6 @@ class HashBin{
|
|||
HashEntry overflowed=info.removeHashEntry(info.size()-1);
|
||||
doOverFlow(pageNo+1,overflowed);
|
||||
}
|
||||
info.save();
|
||||
}
|
||||
|
||||
private void doUnderFlow(int index){
|
||||
|
@ -289,7 +286,7 @@ class HashBin{
|
|||
HashPageInfo info=hashPages.get(pageNo);
|
||||
}
|
||||
|
||||
private void end(){
|
||||
private void end() throws IOException{
|
||||
for(HashPageInfo info:hashPages){
|
||||
info.end();
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@ class HashPageInfo{
|
|||
private long id;
|
||||
private int size;
|
||||
private HashPage page;
|
||||
private boolean dirty=false;
|
||||
|
||||
HashPageInfo(HashIndex index){
|
||||
this.hashIndex=index;
|
||||
|
@ -63,6 +64,7 @@ class HashPageInfo{
|
|||
void addHashEntry(int index,HashEntry entry) throws IOException{
|
||||
page.addHashEntry(index,entry);
|
||||
size++;
|
||||
dirty=true;
|
||||
}
|
||||
|
||||
HashEntry getHashEntry(int index) throws IOException{
|
||||
|
@ -73,11 +75,12 @@ class HashPageInfo{
|
|||
HashEntry result=page.removeHashEntry(index);
|
||||
if(result!=null){
|
||||
size--;
|
||||
dirty=true;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
void dump() {
|
||||
|
||||
void dump(){
|
||||
page.dump();
|
||||
}
|
||||
|
||||
|
@ -87,19 +90,21 @@ class HashPageInfo{
|
|||
}
|
||||
}
|
||||
|
||||
void end() {
|
||||
void end() throws IOException{
|
||||
if(page!=null){
|
||||
if(dirty){
|
||||
hashIndex.writeFullPage(page);
|
||||
}
|
||||
}
|
||||
page=null;
|
||||
}
|
||||
|
||||
HashPage getPage() {
|
||||
return page;
|
||||
}
|
||||
|
||||
void setPage(HashPage page) {
|
||||
this.page=page;
|
||||
dirty=false;
|
||||
}
|
||||
|
||||
void save() throws IOException{
|
||||
hashIndex.writeFullPage(page);
|
||||
HashPage getPage(){
|
||||
return page;
|
||||
}
|
||||
|
||||
void setPage(HashPage page){
|
||||
this.page=page;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import org.apache.activemq.broker.ConnectionContext;
|
|||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ActiveMQTopic;
|
||||
import org.apache.activemq.kaha.IndexTypes;
|
||||
import org.apache.activemq.kaha.ListContainer;
|
||||
import org.apache.activemq.kaha.MapContainer;
|
||||
import org.apache.activemq.kaha.Marshaller;
|
||||
|
@ -58,7 +57,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
|
|||
protected OpenWireFormat wireFormat=new OpenWireFormat();
|
||||
private long maxDataFileLength=32*1024*1024;
|
||||
protected int maximumDestinationCacheSize=10000;
|
||||
private String indexType=IndexTypes.DISK_INDEX;
|
||||
|
||||
private File dir;
|
||||
private Store theStore;
|
||||
|
||||
|
@ -215,20 +214,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
|
|||
this.maxDataFileLength=maxDataFileLength;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the indexType
|
||||
*/
|
||||
public String getIndexType(){
|
||||
return this.indexType;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param indexType the indexTypes to set
|
||||
*/
|
||||
public void setIndexType(String indexType){
|
||||
this.indexType=indexType;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @return the maximumDestinationCacheSize
|
||||
*/
|
||||
|
@ -248,7 +234,6 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
|
|||
if(theStore==null){
|
||||
theStore=StoreFactory.open(getStoreName(),"rw");
|
||||
theStore.setMaxDataFileLength(maxDataFileLength);
|
||||
theStore.setIndexType(indexType);
|
||||
}
|
||||
return theStore;
|
||||
}
|
||||
|
|
|
@ -53,7 +53,7 @@ class Loader extends Thread{
|
|||
start.await();
|
||||
Marshaller keyMarshaller=new StringMarshaller();
|
||||
Marshaller valueMarshaller=new BytesMarshaller();
|
||||
MapContainer container=store.getMapContainer(name);
|
||||
MapContainer container=store.getMapContainer(name,Store.DEFAULT_CONTAINER_NAME,Store.IndexType.PERSISTENT);
|
||||
|
||||
container.setKeyMarshaller(keyMarshaller);
|
||||
container.setValueMarshaller(valueMarshaller);
|
||||
|
|
|
@ -177,7 +177,7 @@ public class MapContainerTest extends TestCase{
|
|||
super.setUp();
|
||||
name = System.getProperty("basedir", ".")+"/target/activemq-data/map-container.db";
|
||||
store = getStore();
|
||||
container = store.getMapContainer("test","test",IndexTypes.DISK_INDEX);
|
||||
container = store.getMapContainer("test","test",Store.IndexType.PERSISTENT);
|
||||
container.load();
|
||||
testMap = new HashMap();
|
||||
for (int i =0; i < COUNT; i++){
|
||||
|
|
Loading…
Reference in New Issue