use Kaha MapContainer for message storage instead of the Kaha ListContainer

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@503680 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-02-05 13:57:43 +00:00
parent 5b49189cfe
commit 0859f30a91
26 changed files with 837 additions and 362 deletions

View File

@ -278,7 +278,7 @@ public class FilePendingMessageCursor extends AbstractPendingMessageCursor imple
protected ListContainer getDiskList(){
if(diskList==null){
try{
diskList=store.getListContainer(name,"TopicSubscription",Store.IndexType.PERSISTENT);
diskList=store.getListContainer(name,"TopicSubscription",true);
diskList.setMarshaller(new CommandMarshaller(new OpenWireFormat()));
}catch(IOException e){
e.printStackTrace();

View File

@ -23,7 +23,7 @@ package org.apache.activemq.command;
* @openwire:marshaller code="120"
* @version $Revision$
*/
public class ConnectionId implements DataStructure {
public class ConnectionId implements DataStructure, Comparable<ConnectionId> {
public static final byte DATA_STRUCTURE_TYPE=CommandTypes.CONNECTION_ID;
@ -86,4 +86,10 @@ public class ConnectionId implements DataStructure {
public boolean isMarshallAware() {
return false;
}
public int compareTo(ConnectionId o){
return value.compareTo(o.value);
}
}

View File

@ -21,7 +21,7 @@ package org.apache.activemq.command;
* @openwire:marshaller code="111"
* @version $Revision: 1.11 $
*/
public class LocalTransactionId extends TransactionId {
public class LocalTransactionId extends TransactionId implements Comparable<LocalTransactionId>{
public static final byte DATA_STRUCTURE_TYPE=CommandTypes.ACTIVEMQ_LOCAL_TRANSACTION_ID;
@ -79,6 +79,19 @@ public class LocalTransactionId extends TransactionId {
&& connectionId.equals(tx.connectionId);
}
/**
* @param o
* @return
* @see java.lang.Comparable#compareTo(java.lang.Object)
*/
public int compareTo(LocalTransactionId o){
int result = connectionId.compareTo(o.connectionId);
if (result==0) {
result = (int) (value - o.value);
}
return result;
}
/**
* @openwire:property version=1
*/
@ -99,5 +112,7 @@ public class LocalTransactionId extends TransactionId {
this.connectionId = connectionId;
}
}

View File

@ -22,7 +22,7 @@ package org.apache.activemq.command;
* @openwire:marshaller code="110"
* @version $Revision: 1.12 $
*/
public class MessageId implements DataStructure {
public class MessageId implements DataStructure, Comparable<MessageId> {
public static final byte DATA_STRUCTURE_TYPE=CommandTypes.MESSAGE_ID;
@ -145,5 +145,18 @@ public class MessageId implements DataStructure {
copy.key = key;
copy.brokerSequenceId = brokerSequenceId ;
return copy;
}
/**
* @param o
* @return
* @see java.lang.Comparable#compareTo(java.lang.Object)
*/
public int compareTo(MessageId other){
int result = -1;
if (other != null) {
result = this.toString().compareTo(other.toString());
}
return result;
}
}

View File

@ -138,7 +138,7 @@ public interface ListContainer<V> extends List<V>{
public StoreEntry getFirst();
/**
* Get yjr StoreEntry for the last item of the list
* Get the StoreEntry for the last item of the list
*
* @return the last StoreEntry or null if the list is empty
*/

View File

@ -171,5 +171,41 @@ public interface MapContainer<K, V> extends Map<K, V>{
*/
public V getValue(StoreEntry Valuelocation);
/** Get the StoreEntry for the first value in the Map
*
* @return the first StoreEntry or null if the map is empty
*/
public StoreEntry getFirst();
/**
* Get the StoreEntry for the last value item of the Map
*
* @return the last StoreEntry or null if the list is empty
*/
public StoreEntry getLast();
/**
* Get the next StoreEntry value from the map
*
* @param entry
* @return the next StoreEntry or null
*/
public StoreEntry getNext(StoreEntry entry);
/**
* Get the previous StoreEntry from the map
*
* @param entry
* @return the previous store entry or null
*/
public StoreEntry getPrevious(StoreEntry entry);
/**
* It's possible that a StoreEntry could be come stale
* this will return an upto date entry for the StoreEntry position
* @param entry old entry
* @return a refreshed StoreEntry
*/
public StoreEntry refresh(StoreEntry entry);
}

View File

@ -0,0 +1,51 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.activemq.command.MessageId;
/**
* Implementation of a Marshaller for MessageIds
*
* @version $Revision: 1.2 $
*/
public class MessageIdMarshaller implements Marshaller<MessageId> {
/**
* Write the payload of this entry to the RawContainer
*
* @param object
* @param dataOut
* @throws IOException
*/
public void writePayload(MessageId object,DataOutput dataOut) throws IOException{
dataOut.writeUTF(object.toString());
}
/**
* Read the entry from the RawContainer
*
* @param dataIn
* @return unmarshalled object
* @throws IOException
*/
public MessageId readPayload(DataInput dataIn) throws IOException{
return new MessageId(dataIn.readUTF());
}
}

View File

@ -0,0 +1,67 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.activemq.command.Message;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
/**
* Implementation of a Marshaller for MessageIds
*
* @version $Revision: 1.2 $
*/
public class MessageMarshaller implements Marshaller<Message> {
private WireFormat wireFormat;
/**
* Constructor
* @param wireFormat
*/
public MessageMarshaller(WireFormat wireFormat) {
this.wireFormat=wireFormat;
}
/**
* Write the payload of this entry to the RawContainer
*
* @param message
* @param dataOut
* @throws IOException
*/
public void writePayload(Message message,DataOutput dataOut) throws IOException{
ByteSequence packet = wireFormat.marshal(message);
dataOut.writeInt(packet.length);
dataOut.write(packet.data, packet.offset, packet.length);
}
/**
* Read the entry from the RawContainer
*
* @param dataIn
* @return unmarshalled object
* @throws IOException
*/
public Message readPayload(DataInput dataIn) throws IOException{
int size=dataIn.readInt();
byte[] data=new byte[size];
dataIn.readFully(data);
return (Message)wireFormat.unmarshal(new ByteSequence(data));
}
}

View File

@ -29,21 +29,7 @@ public interface Store{
* Defauly container name
*/
public static final String DEFAULT_CONTAINER_NAME="kaha";
/**
* Index Types
*
*/
public static enum IndexType{
/**
* Map Index held in memory
*/
VM,
/**
* Map index persistent
*/
PERSISTENT
}
/**
* Byte Marshaller
*/
@ -130,11 +116,11 @@ public interface Store{
*
* @param id
* @param containerName
* @param indexType
* @param persistentIndex
* @return container for the associated id or null if it doesn't exist
* @throws IOException
*/
public MapContainer getMapContainer(Object id,String containerName,Store.IndexType indexType) throws IOException;
public MapContainer getMapContainer(Object id,String containerName,boolean persistentIndex) throws IOException;
/**
* delete a container from the default container
@ -204,11 +190,11 @@ public interface Store{
*
* @param id
* @param containerName
* @param indexType
* @param persistentIndex
* @return container for the associated id or null if it doesn't exist
* @throws IOException
*/
public ListContainer getListContainer(Object id,String containerName,Store.IndexType indexType) throws IOException;
public ListContainer getListContainer(Object id,String containerName,boolean persistentIndex) throws IOException;
/**
* delete a ListContainer from the default container

View File

@ -76,7 +76,7 @@ public class KahaStore implements Store{
private boolean useAsyncDataManager=false;
private long maxDataFileLength=1024*1024*32;
private FileLock lock;
private IndexType indexType=IndexType.PERSISTENT;
private boolean persistentIndex=true;
public KahaStore(String name,String mode) throws IOException{
this.mode=mode;
@ -183,10 +183,10 @@ public class KahaStore implements Store{
}
public MapContainer getMapContainer(Object id,String containerName) throws IOException{
return getMapContainer(id,containerName,indexType);
return getMapContainer(id,containerName,persistentIndex);
}
public synchronized MapContainer getMapContainer(Object id,String containerName,IndexType indexType)
public synchronized MapContainer getMapContainer(Object id,String containerName,boolean persistentIndex)
throws IOException{
initialize();
ContainerId containerId=new ContainerId();
@ -200,7 +200,7 @@ public class KahaStore implements Store{
if(root==null){
root=mapsContainer.addRoot(im,containerId);
}
result=new MapContainerImpl(directory,containerId,root,im,dm,indexType);
result=new MapContainerImpl(directory,containerId,root,im,dm,persistentIndex);
maps.put(containerId,result);
}
return result;
@ -249,10 +249,10 @@ public class KahaStore implements Store{
}
public ListContainer getListContainer(Object id,String containerName) throws IOException{
return getListContainer(id,containerName,indexType);
return getListContainer(id,containerName,persistentIndex);
}
public synchronized ListContainer getListContainer(Object id,String containerName,IndexType indexType)
public synchronized ListContainer getListContainer(Object id,String containerName,boolean persistentIndex)
throws IOException{
initialize();
ContainerId containerId=new ContainerId();
@ -267,7 +267,7 @@ public class KahaStore implements Store{
if(root==null){
root=listsContainer.addRoot(im,containerId);
}
result=new ListContainerImpl(containerId,root,im,dm,indexType);
result=new ListContainerImpl(containerId,root,im,dm,persistentIndex);
lists.put(containerId,result);
}
return result;
@ -387,7 +387,7 @@ public class KahaStore implements Store{
* @return the default index type
*/
public synchronized String getIndexTypeAsString(){
return indexType==IndexType.PERSISTENT ? "PERSISTENT":"VM";
return persistentIndex ? "PERSISTENT":"VM";
}
/**
@ -398,9 +398,9 @@ public class KahaStore implements Store{
*/
public synchronized void setIndexTypeAsString(String type){
if(type.equalsIgnoreCase("VM")){
indexType=IndexType.VM;
persistentIndex=false;
}else{
indexType=IndexType.PERSISTENT;
persistentIndex=true;
}
}

View File

@ -50,15 +50,15 @@ public abstract class BaseContainerImpl{
protected boolean loaded=false;
protected boolean closed=false;
protected boolean initialized=false;
protected Store.IndexType indexType;
protected boolean persistentIndex;
protected BaseContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,
DataManager dataManager,Store.IndexType indexType){
DataManager dataManager,boolean persistentIndex){
this.containerId=id;
this.root=root;
this.indexManager=indexManager;
this.dataManager=dataManager;
this.indexType = indexType;
this.persistentIndex = persistentIndex;
}
public ContainerId getContainerId(){
@ -70,7 +70,7 @@ public abstract class BaseContainerImpl{
if(!initialized){
initialized=true;
if(this.indexList==null){
if(indexType.equals(Store.IndexType.PERSISTENT)){
if(persistentIndex){
this.indexList=new DiskIndexLinkedList(indexManager,root);
}else{
this.indexList=new VMIndexLinkedList(root);

View File

@ -46,8 +46,8 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
public ListContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager,
Store.IndexType indexType) throws IOException{
super(id,root,indexManager,dataManager,indexType);
boolean persistentIndex) throws IOException{
super(id,root,indexManager,dataManager,persistentIndex);
}
/*
@ -697,7 +697,7 @@ public class ListContainerImpl extends BaseContainerImpl implements ListContaine
}
/**
* Get yjr StoreEntry for the last item of the list
* Get the StoreEntry for the last item of the list
*
* @return the last StoreEntry or null if the list is empty
*/

View File

@ -50,19 +50,20 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
protected Marshaller valueMarshaller=Store.ObjectMarshaller;
protected File directory;
public MapContainerImpl(File directory,ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager,
Store.IndexType indexType){
super(id,root,indexManager,dataManager,indexType);
this.directory = directory;
public MapContainerImpl(File directory,ContainerId id,IndexItem root,IndexManager indexManager,
DataManager dataManager,boolean persistentIndex){
super(id,root,indexManager,dataManager,persistentIndex);
this.directory=directory;
}
public synchronized void init() {
public synchronized void init(){
super.init();
if(index==null){
if(indexType.equals(Store.IndexType.PERSISTENT)){
String name = containerId.getDataContainerName() + "_" + containerId.getKey();
if(persistentIndex){
String name=containerId.getDataContainerName()+"_"+containerId.getKey();
name=name.replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_");
try{
this.index=new HashIndex(directory, name , indexManager);
this.index=new HashIndex(directory,name,indexManager);
}catch(IOException e){
log.error("Failed to create HashIndex",e);
throw new RuntimeException(e);
@ -182,7 +183,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
Object result=null;
StoreEntry item=null;
try{
item=(StoreEntry)index.get(key);
item=index.get(key);
}catch(IOException e){
log.error("Failed trying to get key: "+key,e);
throw new RuntimeException(e);
@ -288,11 +289,10 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
load();
try{
Object result=null;
IndexItem item=(IndexItem)index.get(key);
IndexItem item=(IndexItem)index.remove(key);
if(item!=null){
// refresh the index
item=(IndexItem)indexList.refreshEntry(item);
index.remove(key);
result=getValue(item);
IndexItem prev=indexList.getPrevEntry(item);
IndexItem next=indexList.getNextEntry(item);
@ -365,15 +365,13 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
public synchronized StoreEntry place(Object key,Object value){
load();
try{
if(index.containsKey(key)){
remove(key);
}
remove(key);
IndexItem item=write(key,value);
index.store(key,item);
indexList.add(item);
return item;
}catch(IOException e){
log.error("Failed trying to palce key: "+key,e);
log.error("Failed trying to place key: "+key,e);
throw new RuntimeException(e);
}
}
@ -402,6 +400,33 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
}
}
public synchronized StoreEntry getFirst(){
load();
return indexList.getFirst();
}
public synchronized StoreEntry getLast(){
load();
return indexList.getLast();
}
public synchronized StoreEntry getNext(StoreEntry entry){
load();
IndexItem item=(IndexItem)entry;
return indexList.getNextEntry(item);
}
public synchronized StoreEntry getPrevious(StoreEntry entry){
load();
IndexItem item=(IndexItem)entry;
return indexList.getPrevEntry(item);
}
public synchronized StoreEntry refresh(StoreEntry entry){
load();
return indexList.getEntry(entry);
}
/**
* Get the value from it's location
*

View File

@ -146,7 +146,8 @@ class HashBin{
}
}
void remove(HashEntry entry) throws IOException{
HashEntry remove(HashEntry entry) throws IOException{
HashEntry result = null;
try{
int low=0;
int high=size()-1;
@ -155,6 +156,7 @@ class HashBin{
HashEntry te=getHashEntry(mid);
int cmp=te.compareTo(entry);
if(cmp==0){
result =te;
removeHashEntry(mid);
size--;
break;
@ -167,6 +169,7 @@ class HashBin{
}finally{
end();
}
return result;
}
private void addHashEntry(int index,HashEntry entry) throws IOException{

View File

@ -24,6 +24,7 @@ import org.apache.activemq.kaha.impl.index.Index;
import org.apache.activemq.kaha.impl.index.IndexManager;
import org.apache.activemq.util.DataByteArrayInputStream;
import org.apache.activemq.util.DataByteArrayOutputStream;
import org.apache.activemq.util.LRUCache;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -34,7 +35,7 @@ import org.apache.commons.logging.LogFactory;
*/
public class HashIndex implements Index{
private static final String NAME_PREFIX="tree-index-";
private static final String NAME_PREFIX="hash-index-";
private static final int DEFAULT_PAGE_SIZE;
private static final int DEFAULT_KEY_SIZE;
private static final Log log=LogFactory.getLog(HashIndex.class);
@ -55,6 +56,9 @@ public class HashIndex implements Index{
private HashPage firstFree;
private HashPage lastFree;
private AtomicBoolean loaded=new AtomicBoolean();
private LRUCache<Long,HashPage> pageCache;
private boolean enablePageCaching=false;
private int pageCacheSize=10;
/**
* Constructor
@ -86,6 +90,7 @@ public class HashIndex implements Index{
capacity<<=1;
this.bins=new HashBin[capacity];
openIndexFile();
pageCache=new LRUCache<Long,HashPage>(pageCacheSize,pageCacheSize,0.75f,true);
}
/**
@ -130,6 +135,37 @@ public class HashIndex implements Index{
}
this.pageSize=pageSize;
}
/**
* @return the enablePageCaching
*/
public boolean isEnablePageCaching(){
return this.enablePageCaching;
}
/**
* @param enablePageCaching the enablePageCaching to set
*/
public void setEnablePageCaching(boolean enablePageCaching){
this.enablePageCaching=enablePageCaching;
}
/**
* @return the pageCacheSize
*/
public int getPageCacheSize(){
return this.pageCacheSize;
}
/**
* @param pageCacheSize the pageCacheSize to set
*/
public void setPageCacheSize(int pageCacheSize){
this.pageCacheSize=pageCacheSize;
pageCache.setMaxCacheSize(pageCacheSize);
}
public boolean isTransient(){
return false;
@ -187,6 +223,7 @@ public class HashIndex implements Index{
}
public void store(Object key,StoreEntry value) throws IOException{
load();
HashEntry entry=new HashEntry();
entry.setKey((Comparable)key);
entry.setIndexOffset(value.getOffset());
@ -194,16 +231,19 @@ public class HashIndex implements Index{
}
public StoreEntry get(Object key) throws IOException{
load();
HashEntry entry=new HashEntry();
entry.setKey((Comparable)key);
HashEntry result=getBin(key).find(entry);
return result!=null?indexManager.getIndex(result.getIndexOffset()):null;
}
public void remove(Object key) throws IOException{
public StoreEntry remove(Object key) throws IOException{
load();
HashEntry entry=new HashEntry();
entry.setKey((Comparable)key);
getBin(key).remove(entry);
HashEntry result = getBin(key).remove(entry);
return result!=null?indexManager.getIndex(result.getIndexOffset()):null;
}
public boolean containsKey(Object key) throws IOException{
@ -228,11 +268,15 @@ public class HashIndex implements Index{
HashPage lookupPage(long pageId) throws IOException{
HashPage result=null;
if(pageId>=0){
result=getFullPage(pageId);
if(result!=null){
if(result.isActive()){
}else{
throw new IllegalStateException("Trying to access an inactive page: "+pageId);
result=getFromCache(pageId);
if(result==null){
result=getFullPage(pageId);
if(result!=null){
if(result.isActive()){
addToCache(result);
}else{
throw new IllegalStateException("Trying to access an inactive page: "+pageId);
}
}
}
}
@ -251,10 +295,12 @@ public class HashIndex implements Index{
indexFile.seek(length);
indexFile.write(HashEntry.NOT_SET);
}
addToCache(result);
return result;
}
void releasePage(HashPage page) throws IOException{
removeFromCache(page);
page.reset();
page.setActive(false);
if(lastFree==null){
@ -349,6 +395,26 @@ public class HashIndex implements Index{
int i=indexFor(hash,bins.length);
return getBin(i);
}
private HashPage getFromCache(long pageId){
HashPage result=null;
if(enablePageCaching){
result=pageCache.get(pageId);
}
return result;
}
private void addToCache(HashPage page){
if(enablePageCaching){
pageCache.put(page.getId(),page);
}
}
private void removeFromCache(HashPage page){
if(enablePageCaching){
pageCache.remove(page.getId());
}
}
static int hash(Object x){
int h=x.hashCode();

View File

@ -225,10 +225,11 @@ public class TreeIndex implements Index{
return result!=null?indexManager.getIndex(result.getIndexOffset()):null;
}
public void remove(Object key) throws IOException{
public StoreEntry remove(Object key) throws IOException{
TreeEntry entry=new TreeEntry();
entry.setKey((Comparable)key);
root.remove(entry);
TreeEntry result = root.remove(entry);
return result!=null?indexManager.getIndex(result.getIndexOffset()):null;
}
public boolean containsKey(Object key) throws IOException{

View File

@ -268,14 +268,16 @@ class TreePage{
return result;
}
void remove(TreeEntry entry) throws IOException{
TreeEntry remove(TreeEntry entry) throws IOException{
TreeEntry result = null;
if(isRoot()){
if(!isEmpty()){
doRemove(entry);
result = doRemove(entry);
}
}else{
throw new IllegalStateException("remove() should not be called on non root page");
}
return result;
}
private TreeEntry doInsert(Flavour flavour,TreeEntry newEntry) throws IOException{
@ -369,7 +371,8 @@ class TreePage{
return result;
}
private void doRemove(TreeEntry entry) throws IOException{
private TreeEntry doRemove(TreeEntry entry) throws IOException{
TreeEntry result = null;
TreePageEntry closest=findClosestEntry(entry);
if(closest!=null){
TreeEntry closestEntry=closest.getTreeEntry();
@ -377,7 +380,7 @@ class TreePage{
TreePage closestPage=closest.getTreePage();
int cmp=closestEntry.compareTo(entry);
if(cmp==0){
TreeEntry result=closest.getTreeEntry();
result=closest.getTreeEntry();
int index=closest.getIndex();
removeTreeEntry(index);
save();
@ -388,6 +391,7 @@ class TreePage{
}
}
}
return result;
}
/**

View File

@ -20,7 +20,6 @@ package org.apache.activemq.store.kahadaptor;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;

View File

@ -1,151 +1,102 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import java.io.IOException;
import java.util.Iterator;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.memory.UsageListener;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.util.LRUCache;
/**
* An implementation of {@link org.apache.activemq.store.MessageStore} which uses a JPS Container
*
* @version $Revision: 1.7 $
*/
public class KahaMessageStore implements MessageStore, UsageListener{
protected final ActiveMQDestination destination;
protected final ListContainer<Object> messageContainer;
protected StoreEntry batchEntry = null;
protected final LRUCache<MessageId, StoreEntry> cache;
protected UsageManager usageManager;
public class KahaMessageStore implements MessageStore{
public KahaMessageStore(ListContainer<Object> container,ActiveMQDestination destination, int maximumCacheSize) throws IOException{
protected final ActiveMQDestination destination;
protected final MapContainer<MessageId,Message> messageContainer;
protected StoreEntry batchEntry=null;
public KahaMessageStore(MapContainer<MessageId,Message> container,ActiveMQDestination destination) throws IOException{
this.messageContainer=container;
this.destination=destination;
this.cache=new LRUCache<MessageId, StoreEntry>(maximumCacheSize,maximumCacheSize,0.75f,false);
// populate the cache
StoreEntry entry=messageContainer.getFirst();
int count = 0;
if(entry!=null){
do{
MessageId id = getMessageId(messageContainer.get(entry));
cache.put(id,entry);
entry = messageContainer.getNext(entry);
count++;
}while(entry!=null && count < maximumCacheSize);
}
}
protected MessageId getMessageId(Object object) {
return ((Message)object).getMessageId();
}
public Object getId(){
protected MessageId getMessageId(Object object){
return ((Message)object).getMessageId();
}
public Object getId(){
return messageContainer.getId();
}
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
StoreEntry item=messageContainer.placeLast(message);
messageContainer.put(message.getMessageId(),message);
// TODO: we should do the following but it is not need if the message is being added within a persistence
// transaction
// but since I can't tell if one is running right now.. I'll leave this out for now.
// if( message.isResponseRequired() ) {
// messageContainer.force();
// }
cache.put(message.getMessageId(),item);
}
public synchronized Message getMessage(MessageId identity) throws IOException{
Message result=null;
StoreEntry entry=cache.get(identity);
if(entry!=null){
entry = messageContainer.refresh(entry);
result = (Message)messageContainer.get(entry);
}else{
for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
Message msg=(Message)messageContainer.get(entry);
if(msg.getMessageId().equals(identity)){
result=msg;
cache.put(identity,entry);
break;
}
}
}
Message result=messageContainer.get(identity);
return result;
}
protected void recover(MessageRecoveryListener listener, Object msg) throws Exception {
protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{
listener.recoverMessage((Message)msg);
}
}
public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
removeMessage(ack.getLastMessageId());
}
public synchronized void removeMessage(MessageId msgId) throws IOException{
StoreEntry entry=cache.remove(msgId);
if(entry!=null){
entry = messageContainer.refresh(entry);
messageContainer.remove(entry);
}else{
for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
MessageId id=getMessageId(messageContainer.get(entry));
if(id.equals(msgId)){
messageContainer.remove(entry);
break;
}
}
}
if (messageContainer.isEmpty()) {
messageContainer.remove(msgId);
if(messageContainer.isEmpty()){
resetBatching();
}
}
public synchronized void recover(MessageRecoveryListener listener) throws Exception{
for(Iterator iter=messageContainer.iterator();iter.hasNext();){
recover(listener, iter.next());
for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
Message msg=(Message)messageContainer.getValue(entry);
recover(listener,msg);
}
listener.finished();
}
public void start() {
if( this.usageManager != null )
this.usageManager.addUsageListener(this);
public void start(){
}
public void stop() {
if( this.usageManager != null )
this.usageManager.removeUsageListener(this);
public void stop(){
}
public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
messageContainer.clear();
cache.clear();
}
public ActiveMQDestination getDestination(){
@ -154,14 +105,12 @@ public class KahaMessageStore implements MessageStore, UsageListener{
public synchronized void delete(){
messageContainer.clear();
cache.clear();
}
/**
* @param usageManager The UsageManager that is controlling the destination's memory usage.
*/
public void setUsageManager(UsageManager usageManager) {
this.usageManager = usageManager;
}
/**
@ -169,7 +118,7 @@ public class KahaMessageStore implements MessageStore, UsageListener{
* @see org.apache.activemq.store.MessageStore#getMessageCount()
*/
public int getMessageCount(){
return messageContainer.size();
return messageContainer.size();
}
/**
@ -187,30 +136,30 @@ public class KahaMessageStore implements MessageStore, UsageListener{
* @param maxReturned
* @param listener
* @throws Exception
* @see org.apache.activemq.store.MessageStore#recoverNextMessages(org.apache.activemq.command.MessageId, int, org.apache.activemq.store.MessageRecoveryListener)
* @see org.apache.activemq.store.MessageStore#recoverNextMessages(org.apache.activemq.command.MessageId, int,
* org.apache.activemq.store.MessageRecoveryListener)
*/
public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
StoreEntry entry = batchEntry;
if (entry == null) {
entry= messageContainer.getFirst();
}else {
StoreEntry entry=batchEntry;
if(entry==null){
entry=messageContainer.getFirst();
}else{
entry=messageContainer.refresh(entry);
entry=messageContainer.getNext(entry);
}
if(entry!=null){
int count = 0;
int count=0;
do{
Object msg=messageContainer.get(entry);
Object msg=messageContainer.getValue(entry);
if(msg!=null){
recover(listener, msg);
recover(listener,msg);
count++;
}
batchEntry = entry;
batchEntry=entry;
entry=messageContainer.getNext(entry);
}while(entry!=null&&count<maxReturned&&listener.hasSpace());
}
listener.finished();
}
/**
@ -218,27 +167,15 @@ public class KahaMessageStore implements MessageStore, UsageListener{
* @see org.apache.activemq.store.MessageStore#resetBatching(org.apache.activemq.command.MessageId)
*/
public void resetBatching(){
batchEntry = null;
batchEntry=null;
}
/**
* @return true if the store supports cursors
*/
public boolean isSupportForCursors() {
public boolean isSupportForCursors(){
return true;
}
/**
* @param memoryManager
* @param oldPercentUsage
* @param newPercentUsage
* @see org.apache.activemq.memory.UsageListener#onMemoryUseChanged(org.apache.activemq.memory.UsageManager, int, int)
*/
public synchronized void onMemoryUseChanged(UsageManager memoryManager,int oldPercentUsage,int newPercentUsage){
if(newPercentUsage==100){
cache.clear();
}
}
}

View File

@ -25,9 +25,13 @@ import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.MessageMarshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreFactory;
import org.apache.activemq.kaha.StringMarshaller;
@ -56,7 +60,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
ConcurrentHashMap<ActiveMQDestination, MessageStore> messageStores=new ConcurrentHashMap<ActiveMQDestination, MessageStore>();
protected OpenWireFormat wireFormat=new OpenWireFormat();
private long maxDataFileLength=32*1024*1024;
protected int maximumDestinationCacheSize=10000;
private File dir;
private Store theStore;
@ -89,7 +93,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
MessageStore rc=queues.get(destination);
if(rc==null){
rc=new KahaMessageStore(getListContainer(destination,"queue-data"),destination,maximumDestinationCacheSize);
rc=new KahaMessageStore(getMapContainer(destination,"queue-data"),destination);
messageStores.put(destination,rc);
if(transactionStore!=null){
rc=transactionStore.proxy(rc);
@ -103,11 +107,11 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
TopicMessageStore rc=topics.get(destination);
if(rc==null){
Store store=getStore();
ListContainer messageContainer=getListContainer(destination,"topic-data");
MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs");
ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks");
MapContainer messageContainer=getMapContainer(destination,"topic-data");
MapContainer subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","topic-subs");
ListContainer<TopicSubAck> ackContainer=store.getListContainer(destination.toString(),"topic-acks");
ackContainer.setMarshaller(new TopicSubAckMarshaller());
rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination,maximumDestinationCacheSize);
rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination);
messageStores.put(destination,rc);
if(transactionStore!=null){
rc=transactionStore.proxy(rc);
@ -171,11 +175,20 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
}
}
protected MapContainer<String, Object> getMapContainer(Object id,String containerName) throws IOException{
protected MapContainer<MessageId,Message> getMapContainer(Object id,String containerName) throws IOException{
Store store=getStore();
MapContainer<MessageId, Message> container=store.getMapContainer(id,containerName);
container.setKeyMarshaller(new MessageIdMarshaller());
container.setValueMarshaller(new MessageMarshaller(wireFormat));
container.load();
return container;
}
protected MapContainer<String,Object> getSubsMapContainer(Object id,String containerName) throws IOException{
Store store=getStore();
MapContainer<String, Object> container=store.getMapContainer(id,containerName);
container.setKeyMarshaller(new StringMarshaller());
container.setValueMarshaller(new CommandMarshaller(wireFormat));
container.setKeyMarshaller(Store.StringMarshaller);
container.setValueMarshaller(createMessageMarshaller());
container.load();
return container;
}
@ -214,21 +227,7 @@ public class KahaPersistenceAdapter implements PersistenceAdapter{
this.maxDataFileLength=maxDataFileLength;
}
/**
* @return the maximumDestinationCacheSize
*/
public int getMaximumDestinationCacheSize(){
return this.maximumDestinationCacheSize;
}
/**
* @param maximumDestinationCacheSize the maximumDestinationCacheSize to set
*/
public void setMaximumDestinationCacheSize(int maximumDestinationCacheSize){
this.maximumDestinationCacheSize=maximumDestinationCacheSize;
}
protected synchronized Store getStore() throws IOException{
if(theStore==null){

View File

@ -19,71 +19,96 @@ package org.apache.activemq.store.kahadaptor;
import java.io.IOException;
import java.util.Set;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.memory.UsageManager;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter.ReferenceRecord;
public class KahaReferenceStore extends KahaMessageStore implements ReferenceStore {
public class KahaReferenceStore implements ReferenceStore {
private final MapContainer<Integer, Integer> fileReferences;
protected final ActiveMQDestination destination;
protected final MapContainer<MessageId,ReferenceRecord> messageContainer;
protected StoreEntry batchEntry=null;
public KahaReferenceStore(ListContainer container, ActiveMQDestination destination, int maximumCacheSize, MapContainer<Integer, Integer> fileReferences) throws IOException {
super(container, destination, maximumCacheSize);
this.fileReferences = fileReferences;
public KahaReferenceStore(MapContainer container, ActiveMQDestination destination) throws IOException {
this.messageContainer=container;
this.destination=destination;
}
public void start(){
}
@Override
public void stop(){
}
protected MessageId getMessageId(Object object) {
return new MessageId(((ReferenceRecord)object).messageId);
}
@Override
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
throw new RuntimeException("Use addMessageReference instead");
}
@Override
public synchronized Message getMessage(MessageId identity) throws IOException {
throw new RuntimeException("Use addMessageReference instead");
}
@Override
protected void recover(MessageRecoveryListener listener, Object msg) throws Exception {
ReferenceRecord record = (ReferenceRecord) msg;
listener.recoverMessageReference(new MessageId(record.messageId));
}
public synchronized void recover(MessageRecoveryListener listener) throws Exception{
for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
ReferenceRecord record=messageContainer.getValue(entry);
recover(listener,new MessageId(record.messageId));
}
listener.finished();
}
public synchronized void recoverNextMessages(int maxReturned,MessageRecoveryListener listener) throws Exception{
StoreEntry entry=batchEntry;
if(entry==null){
entry=messageContainer.getFirst();
}else{
entry=messageContainer.refresh(entry);
entry=messageContainer.getNext(entry);
}
if(entry!=null){
int count=0;
do{
Object msg=messageContainer.getValue(entry);
if(msg!=null){
recover(listener,msg);
count++;
}
batchEntry=entry;
entry=messageContainer.getNext(entry);
}while(entry!=null&&count<maxReturned&&listener.hasSpace());
}
listener.finished();
}
public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException {
ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
StoreEntry item=messageContainer.placeLast(record);
cache.put(messageId,item);
messageContainer.put(messageId,record);
}
public ReferenceData getMessageReference(MessageId identity) throws IOException {
ReferenceRecord result=null;
StoreEntry entry=cache.get(identity);
if(entry!=null){
entry = messageContainer.refresh(entry);
result = (ReferenceRecord)messageContainer.get(entry);
}else{
for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry);
if(msg.messageId.equals(identity.toString())){
result=msg;
cache.put(identity,entry);
break;
}
}
}
ReferenceRecord result=messageContainer.get(identity);
if( result == null )
return null;
return result.data;
@ -95,6 +120,44 @@ public class KahaReferenceStore extends KahaMessageStore implements ReferenceSto
rc.add(msg.data.getFileId());
}
}
public void removeMessage(ConnectionContext context,MessageAck ack) throws IOException{
removeMessage(ack.getLastMessageId());
}
public synchronized void removeMessage(MessageId msgId) throws IOException{
messageContainer.remove(msgId);
if(messageContainer.isEmpty()){
resetBatching();
}
}
public synchronized void removeAllMessages(ConnectionContext context) throws IOException{
messageContainer.clear();
}
public ActiveMQDestination getDestination(){
return destination;
}
public synchronized void delete(){
messageContainer.clear();
}
public void resetBatching(){
batchEntry=null;
}
public int getMessageCount(){
return messageContainer.size();
}
public void setUsageManager(UsageManager usageManager) {
}
public boolean isSupportForCursors(){
return true;
}
}

View File

@ -27,9 +27,12 @@ import java.util.Set;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.MessageMarshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ReferenceStore;
@ -65,44 +68,12 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
fileReferences.load();
}
public static class ReferenceRecord {
public String messageId;
public ReferenceData data;
public ReferenceRecord() {
}
public ReferenceRecord(String messageId, ReferenceData data) {
this.messageId = messageId;
this.data = data;
}
}
protected Marshaller<Object> createMessageMarshaller() {
return new Marshaller<Object>() {
public void writePayload(Object object,DataOutput dataOut) throws IOException{
ReferenceRecord rr = (ReferenceRecord) object;
dataOut.writeUTF(rr.messageId);
dataOut.writeInt(rr.data.getFileId());
dataOut.writeInt(rr.data.getOffset());
dataOut.writeLong(rr.data.getExpiration());
}
public Object readPayload(DataInput dataIn) throws IOException{
ReferenceRecord rr = new ReferenceRecord();
rr.messageId = dataIn.readUTF();
rr.data = new ReferenceData();
rr.data.setFileId(dataIn.readInt());
rr.data.setOffset(dataIn.readInt());
rr.data.setExpiration(dataIn.readLong());
return rr;
}
};
}
public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException {
ReferenceStore rc=(ReferenceStore)queues.get(destination);
if(rc==null){
rc=new KahaReferenceStore(getListContainer(destination,"queue-data"),destination,maximumDestinationCacheSize, fileReferences);
rc=new KahaReferenceStore(getMapReferenceContainer(destination,"queue-data"),destination);
messageStores.put(destination,rc);
// if(transactionStore!=null){
// rc=transactionStore.proxy(rc);
@ -116,11 +87,11 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
TopicReferenceStore rc=(TopicReferenceStore)topics.get(destination);
if(rc==null){
Store store=getStore();
ListContainer messageContainer=getListContainer(destination,"topic-data");
MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs");
MapContainer messageContainer=getMapReferenceContainer(destination,"topic-data");
MapContainer subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","topic-subs");
ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks");
ackContainer.setMarshaller(new TopicSubAckMarshaller());
rc=new KahaTopicReferenceStore(store,messageContainer,ackContainer,subsContainer,destination,maximumDestinationCacheSize, fileReferences);
rc=new KahaTopicReferenceStore(store,messageContainer,ackContainer,subsContainer,destination);
messageStores.put(destination,rc);
// if(transactionStore!=null){
// rc=transactionStore.proxy(rc);
@ -148,6 +119,16 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
return rc;
}
protected MapContainer<MessageId,ReferenceRecord> getMapReferenceContainer(Object id,String containerName) throws IOException{
Store store=getStore();
MapContainer<MessageId, ReferenceRecord> container=store.getMapContainer(id,containerName);
container.setKeyMarshaller(new MessageIdMarshaller());
container.setValueMarshaller(new ReferenceRecordMarshaller());
container.load();
return container;
}
}

View File

@ -30,21 +30,20 @@ import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter.ReferenceRecord;
/**
* @version $Revision: 1.5 $
*/
public class KahaTopicMessageStore extends KahaMessageStore implements TopicMessageStore{
protected ListContainer ackContainer;
protected ListContainer<TopicSubAck> ackContainer;
private Map subscriberContainer;
private Store store;
protected Map subscriberMessages=new ConcurrentHashMap();
public KahaTopicMessageStore(Store store,ListContainer messageContainer,ListContainer ackContainer,
MapContainer subsContainer,ActiveMQDestination destination,int maximumCacheSize) throws IOException{
super(messageContainer,destination,maximumCacheSize);
public KahaTopicMessageStore(Store store,MapContainer messageContainer,ListContainer<TopicSubAck> ackContainer,
MapContainer subsContainer,ActiveMQDestination destination) throws IOException{
super(messageContainer,destination);
this.store=store;
this.ackContainer=ackContainer;
subscriberContainer=subsContainer;
@ -59,7 +58,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
int subscriberCount=subscriberMessages.size();
if(subscriberCount>0){
StoreEntry messageEntry=messageContainer.placeLast(message);
StoreEntry messageEntry=messageContainer.place(message.getMessageId(),message);
TopicSubAck tsa=new TopicSubAck();
tsa.setCount(subscriberCount);
tsa.setMessageEntry(messageEntry);
@ -166,7 +165,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
if(entry!=null){
do{
ConsumerMessageRef consumerRef=container.get(entry);
Object msg=messageContainer.get(consumerRef.getMessageEntry());
Object msg=messageContainer.getValue(consumerRef.getMessageEntry());
if(msg!=null){
recover(listener, msg);
count++;
@ -227,7 +226,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
public int getMessageCount(String clientId,String subscriberName) throws IOException{
String key=getSubscriptionKey(clientId,subscriberName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
return container.size();
return container != null ? container.size() : 0;
}
/**

View File

@ -1,75 +1,80 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.SubscriptionInfo;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.store.kahadaptor.KahaReferenceStoreAdapter.ReferenceRecord;
public class KahaTopicReferenceStore extends KahaTopicMessageStore implements TopicReferenceStore {
public class KahaTopicReferenceStore extends KahaReferenceStore implements TopicReferenceStore{
private final MapContainer<Integer, Integer> fileReferences;
protected ListContainer<TopicSubAck> ackContainer;
private Map subscriberContainer;
private Store store;
protected Map subscriberMessages=new ConcurrentHashMap();
public KahaTopicReferenceStore(Store store, ListContainer messageContainer, ListContainer ackContainer, MapContainer subsContainer, ActiveMQDestination destination, int maximumCacheSize, MapContainer<Integer, Integer> fileReferences) throws IOException {
super(store, messageContainer, ackContainer, subsContainer, destination, maximumCacheSize);
this.fileReferences = fileReferences;
}
public KahaTopicReferenceStore(Store store,MapContainer messageContainer,ListContainer ackContainer,
MapContainer subsContainer,ActiveMQDestination destination) throws IOException{
super(messageContainer,destination);
this.store=store;
this.ackContainer=ackContainer;
subscriberContainer=subsContainer;
// load all the Ack containers
for(Iterator i=subscriberContainer.keySet().iterator();i.hasNext();){
Object key=i.next();
addSubscriberMessageContainer(key);
}
}
@Override
protected MessageId getMessageId(Object object) {
return new MessageId(((ReferenceRecord)object).messageId);
}
protected MessageId getMessageId(Object object){
return new MessageId(((ReferenceRecord)object).messageId);
}
@Override
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
throw new RuntimeException("Use addMessageReference instead");
}
@Override
public synchronized Message getMessage(MessageId identity) throws IOException {
throw new RuntimeException("Use addMessageReference instead");
}
@Override
protected void recover(MessageRecoveryListener listener, Object msg) throws Exception {
ReferenceRecord record = (ReferenceRecord) msg;
listener.recoverMessageReference(new MessageId(record.messageId));
}
public synchronized void addMessage(ConnectionContext context,Message message) throws IOException{
throw new RuntimeException("Use addMessageReference instead");
}
public void addMessageReference(ConnectionContext context, MessageId messageId, ReferenceData data) throws IOException {
ReferenceRecord record = new ReferenceRecord(messageId.toString(), data);
public synchronized Message getMessage(MessageId identity) throws IOException{
throw new RuntimeException("Use addMessageReference instead");
}
protected void recover(MessageRecoveryListener listener,Object msg) throws Exception{
ReferenceRecord record=(ReferenceRecord)msg;
listener.recoverMessageReference(new MessageId(record.messageId));
}
public void addMessageReference(ConnectionContext context,MessageId messageId,ReferenceData data)
throws IOException{
ReferenceRecord record=new ReferenceRecord(messageId.toString(),data);
int subscriberCount=subscriberMessages.size();
if(subscriberCount>0){
StoreEntry messageEntry=messageContainer.placeLast(record);
StoreEntry messageEntry=messageContainer.place(messageId,record);
TopicSubAck tsa=new TopicSubAck();
tsa.setCount(subscriberCount);
tsa.setMessageEntry(messageEntry);
@ -82,40 +87,181 @@ public class KahaTopicReferenceStore extends KahaTopicMessageStore implements To
container.add(ref);
}
}
}
}
public ReferenceData getMessageReference(MessageId identity) throws IOException {
ReferenceRecord result=null;
StoreEntry entry=(StoreEntry)cache.get(identity);
if(entry!=null){
entry = messageContainer.refresh(entry);
result = (ReferenceRecord)messageContainer.get(entry);
}else{
for (entry = messageContainer.getFirst();entry != null; entry = messageContainer.getNext(entry)) {
ReferenceRecord msg=(ReferenceRecord)messageContainer.get(entry);
if(msg.messageId.equals(identity.toString())){
result=msg;
cache.put(identity,entry);
break;
public ReferenceData getMessageReference(MessageId identity) throws IOException{
ReferenceRecord result=messageContainer.get(identity);
if(result==null)
return null;
return result.data;
}
public void addReferenceFileIdsInUse(Set<Integer> rc){
for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry);
if(subAck.getCount()>0){
ReferenceRecord rr=(ReferenceRecord)messageContainer.get(subAck.getMessageEntry());
rc.add(rr.data.getFileId());
}
}
}
protected ListContainer addSubscriberMessageContainer(Object key) throws IOException{
ListContainer container=store.getListContainer(key,"topic-subs");
Marshaller marshaller=new ConsumerMessageRefMarshaller();
container.setMarshaller(marshaller);
TopicSubContainer tsc=new TopicSubContainer(container);
subscriberMessages.put(key,tsc);
return container;
}
public synchronized void acknowledge(ConnectionContext context,String clientId,String subscriptionName,
MessageId messageId) throws IOException{
String subcriberId=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(subcriberId);
if(container!=null){
ConsumerMessageRef ref=container.remove();
if(container.isEmpty()){
container.reset();
}
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){
if(tsa.decrementCount()<=0){
ackContainer.remove(ref.getAckEntry());
messageContainer.remove(tsa.getMessageEntry());
}else{
ackContainer.update(ref.getAckEntry(),tsa);
}
}
}
}
if( result == null )
return null;
return result.data;
}
}
public void addReferenceFileIdsInUse(Set<Integer> rc) {
for (StoreEntry entry = ackContainer.getFirst();entry != null; entry = ackContainer.getNext(entry)) {
TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry);
if( subAck.getCount() > 0 ) {
ReferenceRecord rr = (ReferenceRecord)messageContainer.get(subAck.getMessageEntry());
rc.add(rr.data.getFileId());
}
public synchronized void addSubsciption(String clientId,String subscriptionName,String selector,boolean retroactive)
throws IOException{
SubscriptionInfo info=new SubscriptionInfo();
info.setDestination(destination);
info.setClientId(clientId);
info.setSelector(selector);
info.setSubcriptionName(subscriptionName);
String key=getSubscriptionKey(clientId,subscriptionName);
// if already exists - won't add it again as it causes data files
// to hang around
if(!subscriberContainer.containsKey(key)){
subscriberContainer.put(key,info);
}
}
// add the subscriber
ListContainer container=addSubscriberMessageContainer(key);
if(retroactive){
for(StoreEntry entry=ackContainer.getFirst();entry!=null;){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(entry);
ConsumerMessageRef ref=new ConsumerMessageRef();
ref.setAckEntry(entry);
ref.setMessageEntry(tsa.getMessageEntry());
container.add(ref);
}
}
}
public synchronized void deleteSubscription(String clientId,String subscriptionName) throws IOException{
String key=getSubscriptionKey(clientId,subscriptionName);
removeSubscriberMessageContainer(key);
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException{
return (SubscriptionInfo[])subscriberContainer.values().toArray(
new SubscriptionInfo[subscriberContainer.size()]);
}
public int getMessageCount(String clientId,String subscriberName) throws IOException{
String key=getSubscriptionKey(clientId,subscriberName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
return container.size();
}
public SubscriptionInfo lookupSubscription(String clientId,String subscriptionName) throws IOException{
return (SubscriptionInfo)subscriberContainer.get(getSubscriptionKey(clientId,subscriptionName));
}
public void recoverNextMessages(String clientId,String subscriptionName,int maxReturned,
MessageRecoveryListener listener) throws Exception{
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
if(container!=null){
int count=0;
StoreEntry entry=container.getBatchEntry();
if(entry==null){
entry=container.getEntry();
}else{
entry=container.refreshEntry(entry);
if(entry!=null){
entry=container.getNextEntry(entry);
}
}
if(entry!=null){
do{
ConsumerMessageRef consumerRef=container.get(entry);
Object msg=messageContainer.getValue(consumerRef.getMessageEntry());
if(msg!=null){
recover(listener,msg);
count++;
}
container.setBatchEntry(entry);
entry=container.getNextEntry(entry);
}while(entry!=null&&count<maxReturned&&listener.hasSpace());
}
}
listener.finished();
}
public void recoverSubscription(String clientId,String subscriptionName,MessageRecoveryListener listener)
throws Exception{
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.get(key);
if(container!=null){
for(Iterator i=container.iterator();i.hasNext();){
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
Object msg=messageContainer.get(ref.getMessageEntry());
if(msg!=null){
recover(listener,msg);
}
}
}
listener.finished();
}
public synchronized void resetBatching(String clientId,String subscriptionName){
String key=getSubscriptionKey(clientId,subscriptionName);
TopicSubContainer topicSubContainer=(TopicSubContainer)subscriberMessages.get(key);
if(topicSubContainer!=null){
topicSubContainer.reset();
}
}
protected void removeSubscriberMessageContainer(Object key) throws IOException{
subscriberContainer.remove(key);
TopicSubContainer container=(TopicSubContainer)subscriberMessages.remove(key);
for(Iterator i=container.iterator();i.hasNext();){
ConsumerMessageRef ref=(ConsumerMessageRef)i.next();
if(ref!=null){
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){
if(tsa.decrementCount()<=0){
ackContainer.remove(ref.getAckEntry());
messageContainer.remove(tsa.getMessageEntry());
}else{
ackContainer.update(ref.getAckEntry(),tsa);
}
}
}
}
store.deleteListContainer(key,"topic-subs");
}
protected String getSubscriptionKey(String clientId,String subscriberName){
String result=clientId+":";
result+=subscriberName!=null?subscriberName:"NOT_SET";
return result;
}
}

View File

@ -0,0 +1,31 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import org.apache.activemq.store.ReferenceStore.ReferenceData;
public class ReferenceRecord{
public String messageId;
public ReferenceData data;
public ReferenceRecord(){
}
public ReferenceRecord(String messageId,ReferenceData data){
this.messageId=messageId;
this.data=data;
}
}

View File

@ -0,0 +1,47 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
* file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
* to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
* License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.apache.activemq.store.kahadaptor;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.store.ReferenceStore.ReferenceData;
public class ReferenceRecordMarshaller implements Marshaller<ReferenceRecord>{
public ReferenceRecord readPayload(DataInput dataIn) throws IOException{
ReferenceRecord rr=new ReferenceRecord();
rr.messageId=dataIn.readUTF();
rr.data=new ReferenceData();
rr.data.setFileId(dataIn.readInt());
rr.data.setOffset(dataIn.readInt());
rr.data.setExpiration(dataIn.readLong());
return rr;
}
/**
* @param object
* @param dataOut
* @throws IOException
* @see org.apache.activemq.kaha.Marshaller#writePayload(java.lang.Object, java.io.DataOutput)
*/
public void writePayload(ReferenceRecord rr,DataOutput dataOut) throws IOException{
dataOut.writeUTF(rr.messageId);
dataOut.writeInt(rr.data.getFileId());
dataOut.writeInt(rr.data.getOffset());
dataOut.writeLong(rr.data.getExpiration());
}
}