git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@513455 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-03-01 19:24:17 +00:00
parent 1ea6d27053
commit 167f99a3a1
20 changed files with 276 additions and 119 deletions

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.kaha.impl.container;
package org.apache.activemq.kaha;
import java.io.Externalizable;
import java.io.IOException;
@ -31,6 +31,15 @@ public class ContainerId implements Externalizable{
private Object key;
private String dataContainerName;
public ContainerId() {
}
public ContainerId(Object key,String dataContainerName) {
this.key=key;
this.dataContainerName=dataContainerName;
}
/**
* @return Returns the dataContainerPrefix.
*/
@ -39,10 +48,10 @@ public class ContainerId implements Externalizable{
}
/**
* @param dataContainerPrefix The dataContainerPrefix to set.
* @param dataContainerName The dataContainerPrefix to set.
*/
public void setDataContainerName(String dataContainerPrefix){
this.dataContainerName=dataContainerPrefix;
public void setDataContainerName(String dataContainerName){
this.dataContainerName=dataContainerName;
}
/**

View File

@ -139,13 +139,20 @@ public interface Store{
*/
public void deleteMapContainer(Object id,String containerName) throws IOException;
/**
* Delete Map container
* @param id
* @throws IOException
*/
public void deleteMapContainer(ContainerId id) throws IOException;
/**
* Get a Set of call MapContainer Ids
*
* @return the set of ids
* @throws IOException
*/
public Set getMapContainerIds() throws IOException;
public Set<ContainerId> getMapContainerIds() throws IOException;
/**
* Checks if a ListContainer exists in the default container
@ -213,6 +220,12 @@ public interface Store{
*/
public void deleteListContainer(Object id,String containerName) throws IOException;
/**
* delete a list container
* @param id
* @throws IOException
*/
public void deleteListContainer(ContainerId id) throws IOException;
/**
* Get a Set of call ListContainer Ids
@ -220,7 +233,7 @@ public interface Store{
* @return the set of ids
* @throws IOException
*/
public Set getListContainerIds() throws IOException;
public Set<ContainerId> getListContainerIds() throws IOException;
/**
* @return the maxDataFileLength

View File

@ -22,11 +22,11 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.container.ContainerId;
import org.apache.activemq.kaha.impl.data.Item;
import org.apache.activemq.kaha.impl.index.IndexItem;
import org.apache.activemq.kaha.impl.index.IndexManager;

View File

@ -27,6 +27,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.RuntimeStoreException;
@ -34,7 +35,6 @@ import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreLocation;
import org.apache.activemq.kaha.impl.async.AsyncDataManager;
import org.apache.activemq.kaha.impl.async.DataManagerFacade;
import org.apache.activemq.kaha.impl.container.ContainerId;
import org.apache.activemq.kaha.impl.container.ListContainerImpl;
import org.apache.activemq.kaha.impl.container.MapContainerImpl;
import org.apache.activemq.kaha.impl.data.DataManagerImpl;
@ -219,11 +219,13 @@ public class KahaStore implements Store{
deleteMapContainer(id,DEFAULT_CONTAINER_NAME);
}
public synchronized void deleteMapContainer(Object id,String containerName) throws IOException{
public void deleteMapContainer(Object id,String containerName) throws IOException{
ContainerId containerId = new ContainerId(id,containerName);
deleteMapContainer(containerId);
}
public synchronized void deleteMapContainer(ContainerId containerId) throws IOException{
initialize();
ContainerId containerId=new ContainerId();
containerId.setKey(id);
containerId.setDataContainerName(containerName);
MapContainerImpl container=maps.remove(containerId);
if(container!=null){
container.clear();
@ -232,12 +234,12 @@ public class KahaStore implements Store{
}
}
public synchronized Set<Object> getMapContainerIds() throws IOException{
public synchronized Set<ContainerId> getMapContainerIds() throws IOException{
initialize();
Set<Object> set = new HashSet<Object>();
Set<ContainerId> set = new HashSet<ContainerId>();
for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) {
ContainerId id = (ContainerId)i.next();
set.add(id.getKey());
set.add(id);
}
return set;
}
@ -288,10 +290,12 @@ public class KahaStore implements Store{
}
public synchronized void deleteListContainer(Object id,String containerName) throws IOException{
ContainerId containerId=new ContainerId(id,containerName);
deleteListContainer(containerId);
}
public synchronized void deleteListContainer(ContainerId containerId) throws IOException{
initialize();
ContainerId containerId=new ContainerId();
containerId.setKey(id);
containerId.setDataContainerName(containerName);
ListContainerImpl container=lists.remove(containerId);
if(container!=null){
listsContainer.removeRoot(container.getIndexManager(),containerId);
@ -300,12 +304,12 @@ public class KahaStore implements Store{
}
}
public synchronized Set<Object> getListContainerIds() throws IOException{
public synchronized Set<ContainerId> getListContainerIds() throws IOException{
initialize();
Set<Object> set = new HashSet<Object>();
Set<ContainerId> set = new HashSet<ContainerId>();
for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) {
ContainerId id = (ContainerId)i.next();
set.add(id.getKey());
set.add(id);
}
return set;
}
@ -333,7 +337,7 @@ public class KahaStore implements Store{
if( isUseAsyncDataManager() ) {
AsyncDataManager t=new AsyncDataManager();
t.setDirectory(directory);
t.setFilePrefix("data-"+name+"-");
t.setFilePrefix("async-data-"+name+"-");
t.setMaxFileLength((int) maxDataFileLength);
t.start();
dm=new DataManagerFacade(t, name);

View File

@ -21,6 +21,7 @@ package org.apache.activemq.kaha.impl.container;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.RuntimeStoreException;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.kaha.StoreEntry;

View File

@ -21,6 +21,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.RuntimeStoreException;

View File

@ -20,6 +20,7 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.RuntimeStoreException;
@ -69,7 +70,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont
throw new RuntimeException(e);
}
}else{
this.index=new VMIndex();
this.index=new VMIndex(indexManager);
}
}
index.setKeyMarshaller(keyMarshaller);

View File

@ -34,6 +34,7 @@ class DataFile{
private RandomAccessFile randomAcessFile;
private Object writerData;
long length=0;
private boolean dirty;
DataFile(File file,int number){
this.file=file;
@ -107,6 +108,15 @@ class DataFile{
*/
public synchronized void setWriterData(Object writerData) {
this.writerData = writerData;
dirty=true;
}
public synchronized boolean isDirty() {
return dirty;
}
public synchronized void setDirty(boolean value) {
this.dirty = value;
}
}

View File

@ -96,9 +96,10 @@ final public class SyncDataFileWriter {
public synchronized void force(DataFile dataFile) throws IOException {
// If our dirty marker was set.. then we need to sync
if( dataFile.getWriterData()!=null ) {
if( dataFile.getWriterData()!=null && dataFile.isDirty()) {
dataFile.getRandomAccessFile().getFD().sync();
dataFile.setWriterData(null);
dataFile.setDirty(false);
}
}

View File

@ -65,6 +65,10 @@ public class DiskIndexLinkedList implements IndexLinkedList{
public synchronized IndexItem getLast(){
if(size==0)
return null;
if(last!=null){
last.next=null;
last.setNextItem(IndexItem.POSITION_NOT_SET);
}
return last;
}
@ -323,6 +327,7 @@ public class DiskIndexLinkedList implements IndexLinkedList{
return;
if(e==last||e.equals(last)){
if(size>1){
last = (IndexItem)refreshEntry(last);
last=getPrevEntry(last);
}else{
last=null;

View File

@ -45,6 +45,7 @@ public final class IndexManager{
private long length=0;
private IndexItem firstFree;
private IndexItem lastFree;
private boolean dirty;
public IndexManager(File directory,String name,String mode,DataManager redoLog) throws IOException{
this.directory=directory;
@ -76,10 +77,12 @@ public final class IndexManager{
lastFree.setNextItem(item.getOffset());
}
writer.updateIndexes(item);
dirty=true;
}
public synchronized void storeIndex(IndexItem index) throws IOException{
writer.storeItem(index);
dirty=true;
}
public synchronized void updateIndexes(IndexItem index) throws IOException{
@ -88,10 +91,12 @@ public final class IndexManager{
}catch(Throwable e){
log.error(name+" error updating indexes ",e);
}
dirty=true;
}
public synchronized void redo(final RedoStoreIndexItem redo) throws IOException{
writer.redoStoreItem(redo);
dirty=true;
}
public synchronized IndexItem createNewIndex() throws IOException{
@ -113,8 +118,9 @@ public final class IndexManager{
}
public synchronized void force() throws IOException{
if(indexFile!=null){
if(indexFile!=null && dirty){
indexFile.getFD().sync();
dirty=false;
}
}

View File

@ -14,10 +14,14 @@
package org.apache.activemq.kaha.impl.index;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.StoreEntry;
import org.apache.activemq.kaha.impl.container.MapContainerImpl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Index implementation using a HashMap
@ -25,9 +29,13 @@ import org.apache.activemq.kaha.StoreEntry;
* @version $Revision: 1.2 $
*/
public class VMIndex implements Index{
private static final Log log=LogFactory.getLog(VMIndex.class);
private IndexManager indexManager;
private Map<Object,StoreEntry> map=new HashMap<Object,StoreEntry>();
public VMIndex(IndexManager manager) {
this.indexManager= manager;
}
/**
*
* @see org.apache.activemq.kaha.impl.index.Index#clear()
@ -47,10 +55,20 @@ public class VMIndex implements Index{
/**
* @param key
* @return store entry
* @see org.apache.activemq.kaha.impl.index.Index#removeKey(java.lang.Object)
*/
public StoreEntry remove(Object key){
return map.remove(key);
StoreEntry result = map.remove(key);
if (result != null) {
try{
result=indexManager.refreshIndex((IndexItem)result);
}catch(IOException e){
log.error("Failed to refresh entry",e);
throw new RuntimeException("Failed to refresh entry");
}
}
return result;
}
/**
@ -68,7 +86,16 @@ public class VMIndex implements Index{
* @return the entry
*/
public StoreEntry get(Object key){
return map.get(key);
StoreEntry result = map.get(key);
if (result != null) {
try{
result=indexManager.refreshIndex((IndexItem)result);
}catch(IOException e){
log.error("Failed to refresh entry",e);
throw new RuntimeException("Failed to refresh entry");
}
}
return result;
}
/**

View File

@ -84,7 +84,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener
private final ConcurrentHashMap<ActiveMQTopic, AMQMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQMessageStore>();
private AsyncDataManager asyncDataManager;
private ReferenceStoreAdapter referenceStoreAdapter;
private KahaReferenceStoreAdapter referenceStoreAdapter;
private TaskRunnerFactory taskRunnerFactory;
private WireFormat wireFormat = new OpenWireFormat();
@ -106,7 +106,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener
private Runnable periodicCleanupTask;
private boolean deleteAllMessages;
private File directory = new File(IOHelper.getDefaultDataDirectory() + "/quick");
private File directory = new File(IOHelper.getDefaultDataDirectory() + "/amq");
@ -242,7 +242,9 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener
checkpointTask.wakeup();
if (sync) {
if(log.isDebugEnabled()){
log.debug("Waitng for checkpoint to complete.");
}
latch.await();
}
}
@ -264,7 +266,10 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener
}
try {
if(log.isDebugEnabled()){
log.debug("Checkpoint started.");
}
referenceStoreAdapter.sync();
Location newMark = null;
Iterator<AMQMessageStore> iterator = queues.values().iterator();
@ -287,7 +292,9 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener
try {
if (newMark != null) {
if(log.isDebugEnabled()){
log.debug("Marking journal at: " + newMark);
}
asyncDataManager.setMark(newMark, false);
writeTraceMessage("CHECKPOINT "+new Date(), true);
}
@ -296,18 +303,13 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener
log.error("Failed to mark the Journal: " + e, e);
}
// if (referenceStoreAdapter instanceof JDBCReferenceStoreAdapter) {
// // We may be check pointing more often than the checkpointInterval if under high use
// // But we don't want to clean up the db that often.
// long now = System.currentTimeMillis();
// if( now > lastCleanup+checkpointInterval ) {
// lastCleanup = now;
// ((JDBCReferenceStoreAdapter) referenceStoreAdapter).cleanup();
// }
// }
if(log.isDebugEnabled()){
log.debug("Checkpoint done.");
}
}
catch(IOException e) {
log.error("Failed to sync reference store",e);
}
finally {
latch.countDown();
}
@ -603,7 +605,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener
return manager;
}
protected ReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException {
KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(directory);
return adaptor;
}
@ -627,9 +629,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener
public ReferenceStoreAdapter getReferenceStoreAdapter() {
return referenceStoreAdapter;
}
public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) {
this.referenceStoreAdapter = referenceStoreAdapter;
}
public TaskRunnerFactory getTaskRunnerFactory() {
return taskRunnerFactory;

View File

@ -31,9 +31,11 @@ public class KahaReferenceStore implements ReferenceStore{
protected final ActiveMQDestination destination;
protected final MapContainer<MessageId,ReferenceRecord> messageContainer;
protected KahaReferenceStoreAdapter adapter;
protected StoreEntry batchEntry=null;
public KahaReferenceStore(MapContainer container,ActiveMQDestination destination) throws IOException{
public KahaReferenceStore(KahaReferenceStoreAdapter adapter,MapContainer container,ActiveMQDestination destination) throws IOException{
this.adapter = adapter;
this.messageContainer=container;
this.destination=destination;
}
@ -109,10 +111,10 @@ public class KahaReferenceStore implements ReferenceStore{
return result.data;
}
public void addReferenceFileIdsInUse(Set<Integer> rc){
public void addReferenceFileIdsInUse(){
for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){
ReferenceRecord msg=(ReferenceRecord)messageContainer.getValue(entry);
rc.add(msg.data.getFileId());
addInterest(msg);
}
}
@ -172,10 +174,10 @@ public class KahaReferenceStore implements ReferenceStore{
}
void removeInterest(ReferenceRecord rr) {
adapter.removeInterestInRecordFile(rr.data.getFileId());
}
void addInterest(ReferenceRecord rr) {
adapter.addInterestInRecordFile(rr.data.getFileId());
}
}

View File

@ -17,34 +17,39 @@
*/
package org.apache.activemq.store.kahadaptor;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.kaha.ContainerId;
import org.apache.activemq.kaha.ListContainer;
import org.apache.activemq.kaha.MapContainer;
import org.apache.activemq.kaha.Marshaller;
import org.apache.activemq.kaha.MessageIdMarshaller;
import org.apache.activemq.kaha.MessageMarshaller;
import org.apache.activemq.kaha.Store;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.store.ReferenceStore;
import org.apache.activemq.store.ReferenceStoreAdapter;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.store.TopicReferenceStore;
import org.apache.activemq.store.ReferenceStore.ReferenceData;
import org.apache.activemq.store.amq.AMQPersistenceAdapter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter {
private MapContainer<Integer, Integer> fileReferences;
private static final Log log = LogFactory.getLog(KahaPersistenceAdapter.class);
private static final String STORE_STATE = "store-state";
private static final String RECORD_REFERENCES = "record-references";
private MapContainer stateMap;
private Map<Integer,AtomicInteger>recordReferences = new HashMap<Integer,AtomicInteger>();
private boolean storeValid;
public KahaReferenceStoreAdapter(File dir) throws IOException {
super(dir);
@ -61,20 +66,61 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
@Override
public void start() throws Exception{
super.start();
Store store=getStore();
fileReferences=store.getMapContainer("file-references");
fileReferences.setKeyMarshaller(new IntegerMarshaller());
fileReferences.setValueMarshaller(new IntegerMarshaller());
fileReferences.load();
boolean empty=store.getMapContainerIds().isEmpty();
stateMap=store.getMapContainer("state",STORE_STATE);
stateMap.load();
if(!empty){
AtomicBoolean status=(AtomicBoolean)stateMap.get(STORE_STATE);
if(status!=null){
storeValid=status.get();
}
if(storeValid){
if(stateMap.containsKey(RECORD_REFERENCES)){
recordReferences=(Map<Integer,AtomicInteger>)stateMap.get(RECORD_REFERENCES);
}
}else {
/*
log.warn("Store Not shutdown cleanly - clearing out unsafe records ...");
Set<ContainerId> set = store.getListContainerIds();
for (ContainerId cid:set) {
if (!cid.getDataContainerName().equals(STORE_STATE)) {
store.deleteListContainer(cid);
}
}
set = store.getMapContainerIds();
for (ContainerId cid:set) {
if (!cid.getDataContainerName().equals(STORE_STATE)) {
store.deleteMapContainer(cid);
}
}
*/
buildReferenceFileIdsInUse();
}
}
stateMap.put(STORE_STATE,new AtomicBoolean());
}
@Override
public void stop() throws Exception {
stateMap.put(RECORD_REFERENCES,recordReferences);
stateMap.put(STORE_STATE,new AtomicBoolean(true));
super.stop();
}
public boolean isStoreValid() {
return storeValid;
}
public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException {
ReferenceStore rc=(ReferenceStore)queues.get(destination);
if(rc==null){
rc=new KahaReferenceStore(getMapReferenceContainer(destination,"queue-data"),destination);
rc=new KahaReferenceStore(this,getMapReferenceContainer(destination,"queue-data"),destination);
messageStores.put(destination,rc);
// if(transactionStore!=null){
// rc=transactionStore.proxy(rc);
@ -89,10 +135,10 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
if(rc==null){
Store store=getStore();
MapContainer messageContainer=getMapReferenceContainer(destination,"topic-data");
MapContainer subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","topic-subs");
MapContainer subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","blob");
ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks");
ackContainer.setMarshaller(new TopicSubAckMarshaller());
rc=new KahaTopicReferenceStore(store,messageContainer,ackContainer,subsContainer,destination);
rc=new KahaTopicReferenceStore(store,this,messageContainer,ackContainer,subsContainer,destination);
messageStores.put(destination,rc);
// if(transactionStore!=null){
// rc=transactionStore.proxy(rc);
@ -102,23 +148,24 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
return rc;
}
public Set<Integer> getReferenceFileIdsInUse() throws IOException {
public void buildReferenceFileIdsInUse() throws IOException {
Set<Integer> rc = new HashSet<Integer>();
recordReferences = new HashMap<Integer,AtomicInteger>();
Set<ActiveMQDestination> destinations = getDestinations();
for (ActiveMQDestination destination : destinations) {
if( destination.isQueue() ) {
KahaReferenceStore store = (KahaReferenceStore) createQueueReferenceStore((ActiveMQQueue) destination);
store.addReferenceFileIdsInUse(rc);
store.addReferenceFileIdsInUse();
} else {
KahaTopicReferenceStore store = (KahaTopicReferenceStore) createTopicReferenceStore((ActiveMQTopic) destination);
store.addReferenceFileIdsInUse(rc);
store.addReferenceFileIdsInUse();
}
}
}
return rc;
public void sync() throws IOException {
getStore().force();
}
protected MapContainer<MessageId,ReferenceRecord> getMapReferenceContainer(Object id,String containerName) throws IOException{
@ -130,6 +177,33 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
return container;
}
synchronized void addInterestInRecordFile(int recordNumber) {
Integer key = new Integer(recordNumber);
AtomicInteger rr = recordReferences.get(key);
if (rr == null) {
rr = new AtomicInteger();
recordReferences.put(key,rr);
}
rr.incrementAndGet();
}
synchronized void removeInterestInRecordFile(int recordNumber) {
Integer key = new Integer(recordNumber);
AtomicInteger rr = recordReferences.get(key);
if (rr != null && rr.decrementAndGet() <= 0) {
recordReferences.remove(key);
}
}
/**
* @return
* @throws IOException
* @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse()
*/
public Set<Integer> getReferenceFileIdsInUse() throws IOException{
return recordReferences.keySet();
}
}

View File

@ -86,8 +86,12 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){
if(tsa.decrementCount()<=0){
ackContainer.remove(ref.getAckEntry());
messageContainer.remove(tsa.getMessageEntry());
StoreEntry entry = ref.getAckEntry();
entry = ackContainer.refresh(entry);
ackContainer.remove(entry);
entry = tsa.getMessageEntry();
entry =messageContainer.refresh(entry);
messageContainer.remove(entry);
}else{
ackContainer.update(ref.getAckEntry(),tsa);
}

View File

@ -39,9 +39,9 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
private Store store;
protected Map subscriberMessages=new ConcurrentHashMap();
public KahaTopicReferenceStore(Store store,MapContainer messageContainer,ListContainer ackContainer,
public KahaTopicReferenceStore(Store store,KahaReferenceStoreAdapter adapter,MapContainer messageContainer,ListContainer ackContainer,
MapContainer subsContainer,ActiveMQDestination destination) throws IOException{
super(messageContainer,destination);
super(adapter,messageContainer,destination);
this.store=store;
this.ackContainer=ackContainer;
subscriberContainer=subsContainer;
@ -97,18 +97,18 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
return result.data;
}
public void addReferenceFileIdsInUse(Set<Integer> rc){
public void addReferenceFileIdsInUse(){
for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){
TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry);
if(subAck.getCount()>0){
ReferenceRecord rr=(ReferenceRecord)messageContainer.getValue(subAck.getMessageEntry());
rc.add(rr.data.getFileId());
addInterest(rr);
}
}
}
protected ListContainer addSubscriberMessageContainer(Object key) throws IOException{
ListContainer container=store.getListContainer(key,"topic-subs");
ListContainer container=store.getListContainer(key,"topic-subs-references");
Marshaller marshaller=new ConsumerMessageRefMarshaller();
container.setMarshaller(marshaller);
TopicSubContainer tsc=new TopicSubContainer(container);
@ -129,10 +129,14 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry());
if(tsa!=null){
if(tsa.decrementCount()<=0){
ackContainer.remove(ref.getAckEntry());
StoreEntry entry=ref.getAckEntry();
entry=ackContainer.refresh(entry);
ackContainer.remove(entry);
ReferenceRecord rr=messageContainer.get(messageId);
if(rr!=null){
messageContainer.remove(tsa.getMessageEntry());
entry=tsa.getMessageEntry();
entry=messageContainer.refresh(entry);
messageContainer.remove(entry);
removeInterest(rr);
}
}else{
@ -261,7 +265,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
}
}
}
store.deleteListContainer(key,"topic-subs");
store.deleteListContainer(key,"topic-subs-references");
}
protected String getSubscriptionKey(String clientId,String subscriberName){

View File

@ -37,10 +37,23 @@ public class MapContainerTest extends TestCase{
protected MapContainer container;
protected Map testMap;
protected static final int COUNT = 10;
public void testBasicAllocations() throws Exception{
String key = "key";
Object value = testMap;
MapContainer test = store.getMapContainer("test","test");
test.put(key,value);
store.close();
store = getStore();
assertTrue(store.getMapContainerIds().isEmpty()==false);
test = store.getMapContainer("test","test");
assertEquals(value,test.get(key));
}
/*
* Test method for 'org.apache.activemq.kaha.MapContainer.size()'
*/
public void XtestSize() throws Exception {
public void testSize() throws Exception {
container.putAll(testMap);
assertTrue(container.size()==testMap.size());
}
@ -48,14 +61,14 @@ public class MapContainerTest extends TestCase{
/*
* Test method for 'org.apache.activemq.kaha.MapContainer.isEmpty()'
*/
public void XtestIsEmpty() throws Exception {
public void testIsEmpty() throws Exception {
assertTrue(container.isEmpty());
}
/*
* Test method for 'org.apache.activemq.kaha.MapContainer.clear()'
*/
public void XtestClear() throws Exception {
public void testClear() throws Exception {
container.putAll(testMap);
assertTrue(container.size()==testMap.size());
container.clear();

View File

@ -100,15 +100,6 @@ public class StoreTest extends TestCase{
assertFalse(store.doesMapContainerExist(containerId));
}
/*
* Test method for 'org.apache.activemq.kaha.Store.getMapContainerIds()'
*/
public void testGetMapContainerIds()throws Exception {
String containerId = "test";
MapContainer container = store.getMapContainer(containerId);
Set set = store.getMapContainerIds();
assertTrue(set.contains(containerId));
}
@ -139,15 +130,6 @@ public class StoreTest extends TestCase{
assertFalse(store.doesListContainerExist(containerId));
}
/*
* Test method for 'org.apache.activemq.kaha.Store.getListContainerIds()'
*/
public void testGetListContainerIds()throws Exception {
String containerId = "test";
ListContainer container = store.getListContainer(containerId);
Set set = store.getListContainerIds();
assertTrue(set.contains(containerId));
}
public void testBasicAllocations() throws Exception{
Map testMap = new HashMap();