change Queue message store in Kaha store adaptor to use memory efficent list instead

of Map containers

git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@475416 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2006-11-15 20:56:21 +00:00
parent 1c89c92217
commit fc6ab9c687
4 changed files with 153 additions and 71 deletions

View File

@ -47,6 +47,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
protected int offset=0;
protected int maximumCacheSize=100;
protected IndexItem lastCached;
protected boolean cacheEnabled = true;
public ListContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager,
String indexType) throws IOException{
@ -858,46 +859,51 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
}
protected void itemAdded(IndexItem item,int pos,Object value){
int cachePosition=pos-offset;
// if pos is before the cache offset
// we need to clear the cache
if(pos<offset){
clearCache();
}
if(cacheList.isEmpty()){
offset=pos;
cacheList.add(value);
lastCached=item;
}else if(cachePosition==cacheList.size()&&cachePosition<maximumCacheSize){
cacheList.add(value);
lastCached=item;
}else if(cachePosition>=0&&cachePosition<=cacheList.size()){
cacheList.add(cachePosition,value);
if(cacheList.size()>maximumCacheSize){
itemRemoved(cacheList.size()-1);
if(cacheEnabled){
int cachePosition=pos-offset;
// if pos is before the cache offset
// we need to clear the cache
if(pos<offset){
clearCache();
}
if(cacheList.isEmpty()){
offset=pos;
cacheList.add(value);
lastCached=item;
}else if(cachePosition==cacheList.size()&&cachePosition<maximumCacheSize){
cacheList.add(value);
lastCached=item;
}else if(cachePosition>=0&&cachePosition<=cacheList.size()){
cacheList.add(cachePosition,value);
if(cacheList.size()>maximumCacheSize){
itemRemoved(cacheList.size()-1);
}
}
}
}
protected void itemRemoved(int pos){
int lastPosition=offset+cacheList.size()-1;
int cachePosition=pos-offset;
if(cachePosition>=0&&cachePosition<cacheList.size()){
if(cachePosition==lastPosition){
if(lastCached!=null){
lastCached=indexList.getPrevEntry(lastCached);
if(cacheEnabled){
int lastPosition=offset+cacheList.size()-1;
int cachePosition=pos-offset;
if(cachePosition>=0&&cachePosition<cacheList.size()){
if(cachePosition==lastPosition){
if(lastCached!=null){
lastCached=indexList.getPrevEntry(lastCached);
}
}
cacheList.remove(pos);
if(cacheList.isEmpty()){
clearCache();
}
}
cacheList.remove(pos);
if(cacheList.isEmpty()){
clearCache();
}
}
}
protected Object getCachedItem(int pos){
int cachePosition=pos-offset;
Object result=null;
if(cacheEnabled) {
int cachePosition=pos-offset;
if(cachePosition>=0&&cachePosition<cacheList.size()){
result=cacheList.get(cachePosition);
}
@ -928,6 +934,12 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
}
}
}
}else {
IndexItem item=indexList.get(pos);
if(item!=null){
result=getValue(item);
}
}
return result;
}
@ -980,6 +992,10 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
*/
public synchronized void setMaximumCacheSize(int maximumCacheSize){
this.maximumCacheSize=maximumCacheSize;
cacheEnabled = maximumCacheSize >= 0;
if (!cacheEnabled) {
clearCache();
}
}
/**

View File

@ -19,16 +19,17 @@ package org.apache.activemq.store.kahadaptor;
import java.io.IOException;
import java.util.Iterator;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.util.LRUCache;
/**
* An implementation of {@link org.apache.activemq.store.MessageStore} which uses a JPS Container
*
@ -36,51 +37,85 @@ import org.apache.activemq.store.MessageStore;
*/
public class KahaMessageStore implements MessageStore{
protected final ActiveMQDestination destination;
protected final MapContainer messageContainer;
protected final ListContainer messageContainer;
protected final LRUCache cache;
public KahaMessageStore(MapContainer container,ActiveMQDestination destination) throws IOException{
public KahaMessageStore(ListContainer container,ActiveMQDestination destination, int maximumCacheSize) throws IOException{
this.messageContainer=container;
this.destination=destination;
this.cache=new LRUCache(maximumCacheSize,maximumCacheSize,0.75f,false);
// populate the cache
StoreEntry entry=messageContainer.getFirst();
int count = 0;
if(entry!=null){
do{
Message msg = (Message)messageContainer.get(entry);
cache.put(msg.getMessageId(),entry);
entry = messageContainer.getNext(entry);
count++;
}while(entry!=null && count < maximumCacheSize);
}
}
public Object getId(){
return messageContainer.getId();
}
public void addMessage(ConnectionContext context,Message message) throws IOException{
messageContainer.put(message.getMessageId().toString(),message);
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
StoreEntry item = messageContainer.placeLast(message);
cache.put(message.getMessageId(),item);
}
public void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
public synchronized void addMessageReference(ConnectionContext context,MessageId messageId,long expirationTime,String messageRef)
throws IOException{
messageContainer.put(messageId.toString(),messageRef);
throw new RuntimeException("Not supported");
}
public Message getMessage(MessageId identity) throws IOException{
return (Message) messageContainer.get(identity.toString());
public synchronized Message getMessage(MessageId identity) throws IOException{
Message result=null;
StoreEntry entry=(StoreEntry)cache.remove(identity);
if(entry!=null){
result = (Message)messageContainer.get(entry);
}else{
for(Iterator i=messageContainer.iterator();i.hasNext();){
Message msg=(Message)i.next();
if(msg.getMessageId().equals(identity)){
result=msg;
break;
}
}
}
return result;
}
public String getMessageReference(MessageId identity) throws IOException{
return (String) messageContainer.get(identity.toString());
return null;
}
public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
messageContainer.remove(ack.getLastMessageId().toString());
removeMessage(ack.getLastMessageId());
}
public void removeMessage(MessageId msgId) throws IOException{
messageContainer.remove(msgId.toString());
}
public void recover(MessageRecoveryListener listener) throws Exception{
for(Iterator iter=messageContainer.values().iterator();iter.hasNext();){
Object msg=(Object) iter.next();
if(msg.getClass()==String.class){
listener.recoverMessageReference((String) msg);
}else{
listener.recoverMessage((Message) msg);
public synchronized void removeMessage(MessageId msgId) throws IOException{
StoreEntry entry=(StoreEntry)cache.remove(msgId);
if(entry!=null){
messageContainer.remove(entry);
}else{
for(Iterator i=messageContainer.iterator();i.hasNext();){
Message msg=(Message)i.next();
if(msg.getMessageId().equals(msgId)){
i.remove();
break;
}
}
}
}
public synchronized void recover(MessageRecoveryListener listener) throws Exception{
for(Iterator iter=messageContainer.iterator();iter.hasNext();){
listener.recoverMessage((Message)iter.next());
}
listener.finished();
}
@ -88,16 +123,18 @@ public class KahaMessageStore implements MessageStore{
public void stop() {}
public void removeAllMessages(ConnectionContext context) throws IOException{
public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
messageContainer.clear();
cache.clear();
}
public ActiveMQDestination getDestination(){
return destination;
}
public void delete(){
public synchronized void delete(){
messageContainer.clear();
cache.clear();
}
/**

View File

@ -1,19 +1,15 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
@ -59,6 +55,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
private boolean useExternalMessageReferences;
private OpenWireFormat wireFormat=new OpenWireFormat();
private long maxDataFileLength=32*1024*1024;
private int maximumDestinationCacheSize=2000;
private String indexType=IndexTypes.DISK_INDEX;
private File dir;
private Store theStore;
@ -68,6 +65,8 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
dir.mkdirs();
}
this.dir=dir;
wireFormat.setCacheEnabled(false);
wireFormat.setTightEncodingEnabled(true);
}
public Set getDestinations(){
@ -89,7 +88,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
MessageStore rc=(MessageStore)queues.get(destination);
if(rc==null){
rc=new KahaMessageStore(getMapContainer(destination,"queue-data"),destination);
rc=new KahaMessageStore(getListContainer(destination,"queue-data"),destination,maximumDestinationCacheSize);
messageStores.put(destination,rc);
if(transactionStore!=null){
rc=transactionStore.proxy(rc);
@ -185,10 +184,11 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
container.load();
return container;
}
protected ListContainer getListContainer(Object id,String containerName) throws IOException{
Store store=getStore();
ListContainer container=store.getListContainer(id,containerName);
container.setMaximumCacheSize(0);
if(useExternalMessageReferences){
container.setMarshaller(new StringMarshaller());
}else{
@ -199,9 +199,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
}
/**
* @param usageManager
* The UsageManager that is controlling the broker's memory
* usage.
* @param usageManager The UsageManager that is controlling the broker's memory usage.
*/
public void setUsageManager(UsageManager usageManager){
}
@ -214,8 +212,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
}
/**
* @param maxDataFileLength
* the maxDataFileLength to set
* @param maxDataFileLength the maxDataFileLength to set
*
* @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.MemoryPropertyEditor"
*/
@ -237,6 +234,20 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
this.indexType=indexType;
}
/**
* @return the maximumDestinationCacheSize
*/
public int getMaximumDestinationCacheSize(){
return this.maximumDestinationCacheSize;
}
/**
* @param maximumDestinationCacheSize the maximumDestinationCacheSize to set
*/
public void setMaximumDestinationCacheSize(int maximumDestinationCacheSize){
this.maximumDestinationCacheSize=maximumDestinationCacheSize;
}
protected synchronized Store getStore() throws IOException{
if(theStore==null){
String name=dir.getAbsolutePath()+File.separator+"kaha.db";

View File

@ -37,6 +37,24 @@ public class LRUCache extends LinkedHashMap{
public LRUCache(){
super(1000,0.75f,true);
}
/**
* Constructs an empty <tt>LRUCache</tt> instance with the
* specified initial capacity, maximumCacheSize,load factor and ordering mode.
*
* @param initialCapacity the initial capacity.
* @param maximumCacheSize
* @param loadFactor the load factor.
* @param accessOrder the ordering mode - <tt>true</tt> for
* access-order, <tt>false</tt> for insertion-order.
* @throws IllegalArgumentException if the initial capacity is negative
* or the load factor is nonpositive.
*/
public LRUCache(int initialCapacity,int maximumCacheSize,float loadFactor, boolean accessOrder) {
super(initialCapacity,loadFactor,accessOrder);
this.maxCacheSize = maximumCacheSize;
}