From 5b49189cfe89cf706ff1d1a54a5c7a05f212d5fd Mon Sep 17 00:00:00 2001 From: Robert Davies Date: Sat, 3 Feb 2007 07:24:10 +0000 Subject: [PATCH] Tidy up the choice of Index types for the Map and List container git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@503177 13f79535-47bb-0310-9956-ffa450edef68 --- .../cursors/FilePendingMessageCursor.java | 3 +- .../org/apache/activemq/kaha/IndexTypes.java | 35 ------------------- .../java/org/apache/activemq/kaha/Store.java | 26 +++++++++++--- .../apache/activemq/kaha/impl/KahaStore.java | 23 ++++++------ .../impl/container/BaseContainerImpl.java | 11 +++--- .../impl/container/ListContainerImpl.java | 2 +- .../kaha/impl/container/MapContainerImpl.java | 5 ++- .../kaha/impl/index/hash/HashBin.java | 7 ++-- .../kaha/impl/index/hash/HashPageInfo.java | 31 +++++++++------- .../kahadaptor/KahaPersistenceAdapter.java | 19 ++-------- .../java/org/apache/activemq/kaha/Loader.java | 2 +- .../activemq/kaha/MapContainerTest.java | 2 +- 12 files changed, 65 insertions(+), 101 deletions(-) delete mode 100644 activemq-core/src/main/java/org/apache/activemq/kaha/IndexTypes.java diff --git a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java index c2ccf514ef..8b5c93de9f 100755 --- a/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java +++ b/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursor.java @@ -22,7 +22,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.MessageReference; import org.apache.activemq.command.Message; -import org.apache.activemq.kaha.IndexTypes; import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.Store; import org.apache.activemq.memory.UsageListener; @@ -279,7 +278,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple protected ListContainer getDiskList(){ if(diskList==null){ try{ - diskList=store.getListContainer(name,"TopicSubscription",IndexTypes.DISK_INDEX); + diskList=store.getListContainer(name,"TopicSubscription",Store.IndexType.PERSISTENT); diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat())); }catch(IOException e){ e.printStackTrace(); diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/IndexTypes.java b/activemq-core/src/main/java/org/apache/activemq/kaha/IndexTypes.java deleted file mode 100644 index ce3d9e8aa0..0000000000 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/IndexTypes.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.kaha; - -/** - * Types of Indexes used by the Store - * - * @version $Revision: 1.2 $ - */ -public interface IndexTypes{ - - /** - * use in memory indexes - */ - public final static String IN_MEMORY_INDEX= "InMemoryIndex"; - /** - * use disk-based indexes - */ - public final static String DISK_INDEX = "DiskIndex"; -} diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java b/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java index acd8d8d7eb..5e25810e3f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java @@ -25,7 +25,25 @@ import java.util.Set; * @version $Revision: 1.2 $ */ public interface Store{ + /** + * Defauly container name + */ + public static final String DEFAULT_CONTAINER_NAME="kaha"; + /** + * Index Types + * + */ + public static enum IndexType{ + /** + * Map Index held in memory + */ + VM, + /** + * Map index persistent + */ + PERSISTENT + } /** * Byte Marshaller */ @@ -116,7 +134,7 @@ public interface Store{ * @return container for the associated id or null if it doesn't exist * @throws IOException */ - public MapContainer getMapContainer(Object id,String containerName,String indexType) throws IOException; + public MapContainer getMapContainer(Object id,String containerName,Store.IndexType indexType) throws IOException; /** * delete a container from the default container @@ -190,7 +208,7 @@ public interface Store{ * @return container for the associated id or null if it doesn't exist * @throws IOException */ - public ListContainer getListContainer(Object id,String containerName,String indexType) throws IOException; + public ListContainer getListContainer(Object id,String containerName,Store.IndexType indexType) throws IOException; /** * delete a ListContainer from the default container @@ -232,14 +250,14 @@ public interface Store{ * @see org.apache.activemq.kaha.IndexTypes * @return the default index type */ - public String getIndexType(); + public String getIndexTypeAsString(); /** * Set the default index type * @param type * @see org.apache.activemq.kaha.IndexTypes */ - public void setIndexType(String type); + public void setIndexTypeAsString(String type); /** * @return true if the store has been initialized diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java index 8765b9178d..087a87bf69 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java @@ -27,7 +27,6 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import org.apache.activemq.kaha.IndexTypes; import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.RuntimeStoreException; @@ -35,7 +34,6 @@ import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.StoreLocation; import org.apache.activemq.kaha.impl.async.AsyncDataManager; import org.apache.activemq.kaha.impl.async.DataManagerFacade; -import org.apache.activemq.kaha.impl.container.BaseContainerImpl; import org.apache.activemq.kaha.impl.container.ContainerId; import org.apache.activemq.kaha.impl.container.ListContainerImpl; import org.apache.activemq.kaha.impl.container.MapContainerImpl; @@ -56,7 +54,7 @@ import org.apache.commons.logging.LogFactory; public class KahaStore implements Store{ private static final String LOCK_FILE_NAME="store.lock"; - private static final String DEFAULT_CONTAINER_NAME="kaha"; + private final static String PROPERTY_PREFIX="org.apache.activemq.kaha.Store"; private final static boolean brokenFileLock="true".equals(System.getProperty(PROPERTY_PREFIX+".broken","false")); private final static boolean disableLocking="true".equals(System.getProperty(PROPERTY_PREFIX+"DisableLocking", @@ -78,7 +76,7 @@ public class KahaStore implements Store{ private boolean useAsyncDataManager=false; private long maxDataFileLength=1024*1024*32; private FileLock lock; - private String indexType=IndexTypes.DISK_INDEX; + private IndexType indexType=IndexType.PERSISTENT; public KahaStore(String name,String mode) throws IOException{ this.mode=mode; @@ -188,7 +186,7 @@ public class KahaStore implements Store{ return getMapContainer(id,containerName,indexType); } - public synchronized MapContainer getMapContainer(Object id,String containerName,String indexType) + public synchronized MapContainer getMapContainer(Object id,String containerName,IndexType indexType) throws IOException{ initialize(); ContainerId containerId=new ContainerId(); @@ -254,7 +252,7 @@ public class KahaStore implements Store{ return getListContainer(id,containerName,indexType); } - public synchronized ListContainer getListContainer(Object id,String containerName,String indexType) + public synchronized ListContainer getListContainer(Object id,String containerName,IndexType indexType) throws IOException{ initialize(); ContainerId containerId=new ContainerId(); @@ -388,8 +386,8 @@ public class KahaStore implements Store{ * @see org.apache.activemq.kaha.IndexTypes * @return the default index type */ - public synchronized String getIndexType(){ - return indexType; + public synchronized String getIndexTypeAsString(){ + return indexType==IndexType.PERSISTENT ? "PERSISTENT":"VM"; } /** @@ -398,11 +396,12 @@ public class KahaStore implements Store{ * @param type * @see org.apache.activemq.kaha.IndexTypes */ - public synchronized void setIndexType(String type){ - if(type==null||(!type.equals(IndexTypes.DISK_INDEX)&&!type.equals(IndexTypes.IN_MEMORY_INDEX))){ - throw new RuntimeException("Unknown IndexType: "+type); + public synchronized void setIndexTypeAsString(String type){ + if(type.equalsIgnoreCase("VM")){ + indexType=IndexType.VM; + }else{ + indexType=IndexType.PERSISTENT; } - this.indexType=type; } public synchronized void initialize() throws IOException{ diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java index da4a47e31e..7df3a871cf 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java @@ -21,8 +21,8 @@ package org.apache.activemq.kaha.impl.container; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.activemq.kaha.IndexTypes; import org.apache.activemq.kaha.RuntimeStoreException; +import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.StoreEntry; import org.apache.activemq.kaha.impl.DataManager; import org.apache.activemq.kaha.impl.data.Item; @@ -50,18 +50,15 @@ public abstract class BaseContainerImpl{ protected boolean loaded=false; protected boolean closed=false; protected boolean initialized=false; - protected String indexType; + protected Store.IndexType indexType; protected BaseContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager, - DataManager dataManager,String indexType){ + DataManager dataManager,Store.IndexType indexType){ this.containerId=id; this.root=root; this.indexManager=indexManager; this.dataManager=dataManager; this.indexType = indexType; - if (indexType == null || (!indexType.equals(IndexTypes.DISK_INDEX) && !indexType.equals(IndexTypes.IN_MEMORY_INDEX))) { - throw new RuntimeException("Unknown IndexType: " + indexType); - } } public ContainerId getContainerId(){ @@ -73,7 +70,7 @@ public abstract class BaseContainerImpl{ if(!initialized){ initialized=true; if(this.indexList==null){ - if(indexType.equals(IndexTypes.DISK_INDEX)){ + if(indexType.equals(Store.IndexType.PERSISTENT)){ this.indexList=new DiskIndexLinkedList(indexManager,root); }else{ this.indexList=new VMIndexLinkedList(root); diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java index 214f4443fa..f210f3a1e6 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java @@ -46,7 +46,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine public ListContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager, - String indexType) throws IOException{ + Store.IndexType indexType) throws IOException{ super(id,root,indexManager,dataManager,indexType); } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java index a3e9832ef1..c3e109cb26 100755 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java @@ -20,7 +20,6 @@ import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.Set; -import org.apache.activemq.kaha.IndexTypes; import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.Marshaller; import org.apache.activemq.kaha.RuntimeStoreException; @@ -52,7 +51,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont protected File directory; public MapContainerImpl(File directory,ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager, - String indexType){ + Store.IndexType indexType){ super(id,root,indexManager,dataManager,indexType); this.directory = directory; } @@ -60,7 +59,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont public synchronized void init() { super.init(); if(index==null){ - if(indexType.equals(IndexTypes.DISK_INDEX)){ + if(indexType.equals(Store.IndexType.PERSISTENT)){ String name = containerId.getDataContainerName() + "_" + containerId.getKey(); try{ this.index=new HashIndex(directory, name , indexManager); diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java index b7c90fdbb6..1bb3350881 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashBin.java @@ -174,7 +174,6 @@ class HashBin{ int offset=index%maximumEntries; page.addHashEntry(offset,entry); doOverFlow(index); - page.save(); } private HashEntry removeHashEntry(int index) throws IOException{ @@ -182,7 +181,6 @@ class HashBin{ int offset=getRetrieveOffset(index); HashEntry result=page.removeHashEntry(offset); doUnderFlow(index); - page.save(); return result; } @@ -233,7 +231,7 @@ class HashBin{ int count=0; for(HashPageInfo page:hashPages){ if((index+1)<=(count+page.size())){ - //count=count==0?count:count+1; + // count=count==0?count:count+1; result=index-count; break; } @@ -278,7 +276,6 @@ class HashBin{ HashEntry overflowed=info.removeHashEntry(info.size()-1); doOverFlow(pageNo+1,overflowed); } - info.save(); } private void doUnderFlow(int index){ @@ -289,7 +286,7 @@ class HashBin{ HashPageInfo info=hashPages.get(pageNo); } - private void end(){ + private void end() throws IOException{ for(HashPageInfo info:hashPages){ info.end(); } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java index 2938a2f95b..02b138b951 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/hash/HashPageInfo.java @@ -27,6 +27,7 @@ class HashPageInfo{ private long id; private int size; private HashPage page; + private boolean dirty=false; HashPageInfo(HashIndex index){ this.hashIndex=index; @@ -63,6 +64,7 @@ class HashPageInfo{ void addHashEntry(int index,HashEntry entry) throws IOException{ page.addHashEntry(index,entry); size++; + dirty=true; } HashEntry getHashEntry(int index) throws IOException{ @@ -73,11 +75,12 @@ class HashPageInfo{ HashEntry result=page.removeHashEntry(index); if(result!=null){ size--; + dirty=true; } return result; } - - void dump() { + + void dump(){ page.dump(); } @@ -87,19 +90,21 @@ class HashPageInfo{ } } - void end() { + void end() throws IOException{ + if(page!=null){ + if(dirty){ + hashIndex.writeFullPage(page); + } + } page=null; - } - - HashPage getPage() { - return page; - } - - void setPage(HashPage page) { - this.page=page; + dirty=false; } - void save() throws IOException{ - hashIndex.writeFullPage(page); + HashPage getPage(){ + return page; + } + + void setPage(HashPage page){ + this.page=page; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java index 608f4ddad0..269b079da0 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaPersistenceAdapter.java @@ -25,7 +25,6 @@ import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.kaha.IndexTypes; import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.Marshaller; @@ -58,7 +57,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{ protected OpenWireFormat wireFormat=new OpenWireFormat(); private long maxDataFileLength=32*1024*1024; protected int maximumDestinationCacheSize=10000; - private String indexType=IndexTypes.DISK_INDEX; + private File dir; private Store theStore; @@ -215,20 +214,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{ this.maxDataFileLength=maxDataFileLength; } - /** - * @return the indexType - */ - public String getIndexType(){ - return this.indexType; - } - - /** - * @param indexType the indexTypes to set - */ - public void setIndexType(String indexType){ - this.indexType=indexType; - } - + /** * @return the maximumDestinationCacheSize */ @@ -248,7 +234,6 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{ if(theStore==null){ theStore=StoreFactory.open(getStoreName(),"rw"); theStore.setMaxDataFileLength(maxDataFileLength); - theStore.setIndexType(indexType); } return theStore; } diff --git a/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java b/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java index ac3c1c474a..27b2ea8a49 100644 --- a/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java +++ b/activemq-core/src/test/java/org/apache/activemq/kaha/Loader.java @@ -53,7 +53,7 @@ class Loader extends Thread{ start.await(); Marshaller keyMarshaller=new StringMarshaller(); Marshaller valueMarshaller=new BytesMarshaller(); - MapContainer container=store.getMapContainer(name); + MapContainer container=store.getMapContainer(name,Store.DEFAULT_CONTAINER_NAME,Store.IndexType.PERSISTENT); container.setKeyMarshaller(keyMarshaller); container.setValueMarshaller(valueMarshaller); diff --git a/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java b/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java index d33ee2f920..a40b20ddcf 100644 --- a/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java @@ -177,7 +177,7 @@ public class MapContainerTest extends TestCase{ super.setUp(); name = System.getProperty("basedir", ".")+"/target/activemq-data/map-container.db"; store = getStore(); - container = store.getMapContainer("test","test",IndexTypes.DISK_INDEX); + container = store.getMapContainer("test","test",Store.IndexType.PERSISTENT); container.load(); testMap = new HashMap(); for (int i =0; i < COUNT; i++){