diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerId.java b/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java similarity index 85% rename from activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerId.java rename to activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java index d478becd0b..e730d06cc3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ContainerId.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/ContainerId.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.activemq.kaha.impl.container; +package org.apache.activemq.kaha; import java.io.Externalizable; import java.io.IOException; @@ -31,6 +31,15 @@ public class ContainerId implements Externalizable{ private Object key; private String dataContainerName; + public ContainerId() { + } + + public ContainerId(Object key,String dataContainerName) { + this.key=key; + this.dataContainerName=dataContainerName; + } + + /** * @return Returns the dataContainerPrefix. */ @@ -39,10 +48,10 @@ public class ContainerId implements Externalizable{ } /** - * @param dataContainerPrefix The dataContainerPrefix to set. + * @param dataContainerName The dataContainerPrefix to set. */ - public void setDataContainerName(String dataContainerPrefix){ - this.dataContainerName=dataContainerPrefix; + public void setDataContainerName(String dataContainerName){ + this.dataContainerName=dataContainerName; } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java b/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java index 965a69eee8..e855d6b59d 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java @@ -138,6 +138,13 @@ public interface Store{ * @throws IOException */ public void deleteMapContainer(Object id,String containerName) throws IOException; + + /** + * Delete Map container + * @param id + * @throws IOException + */ + public void deleteMapContainer(ContainerId id) throws IOException; /** * Get a Set of call MapContainer Ids @@ -145,7 +152,7 @@ public interface Store{ * @return the set of ids * @throws IOException */ - public Set getMapContainerIds() throws IOException; + public Set getMapContainerIds() throws IOException; /** * Checks if a ListContainer exists in the default container @@ -213,6 +220,12 @@ public interface Store{ */ public void deleteListContainer(Object id,String containerName) throws IOException; + /** + * delete a list container + * @param id + * @throws IOException + */ + public void deleteListContainer(ContainerId id) throws IOException; /** * Get a Set of call ListContainer Ids @@ -220,7 +233,7 @@ public interface Store{ * @return the set of ids * @throws IOException */ - public Set getListContainerIds() throws IOException; + public Set getListContainerIds() throws IOException; /** * @return the maxDataFileLength diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java index d145a835e2..47f980815c 100755 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/IndexRootContainer.java @@ -22,11 +22,11 @@ import java.util.LinkedList; import java.util.Map; import java.util.Set; +import org.apache.activemq.kaha.ContainerId; import org.apache.activemq.kaha.Marshaller; import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.StoreEntry; import org.apache.activemq.kaha.StoreLocation; -import org.apache.activemq.kaha.impl.container.ContainerId; import org.apache.activemq.kaha.impl.data.Item; import org.apache.activemq.kaha.impl.index.IndexItem; import org.apache.activemq.kaha.impl.index.IndexManager; diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java index c2046a6f82..01dfb4bfa7 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import org.apache.activemq.kaha.ContainerId; import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.RuntimeStoreException; @@ -34,7 +35,6 @@ import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.StoreLocation; import org.apache.activemq.kaha.impl.async.AsyncDataManager; import org.apache.activemq.kaha.impl.async.DataManagerFacade; -import org.apache.activemq.kaha.impl.container.ContainerId; import org.apache.activemq.kaha.impl.container.ListContainerImpl; import org.apache.activemq.kaha.impl.container.MapContainerImpl; import org.apache.activemq.kaha.impl.data.DataManagerImpl; @@ -218,12 +218,14 @@ public class KahaStore implements Store{ public void deleteMapContainer(Object id) throws IOException{ deleteMapContainer(id,DEFAULT_CONTAINER_NAME); } + + public void deleteMapContainer(Object id,String containerName) throws IOException{ + ContainerId containerId = new ContainerId(id,containerName); + deleteMapContainer(containerId); + } - public synchronized void deleteMapContainer(Object id,String containerName) throws IOException{ + public synchronized void deleteMapContainer(ContainerId containerId) throws IOException{ initialize(); - ContainerId containerId=new ContainerId(); - containerId.setKey(id); - containerId.setDataContainerName(containerName); MapContainerImpl container=maps.remove(containerId); if(container!=null){ container.clear(); @@ -232,12 +234,12 @@ public class KahaStore implements Store{ } } - public synchronized Set getMapContainerIds() throws IOException{ + public synchronized Set getMapContainerIds() throws IOException{ initialize(); - Set set = new HashSet(); + Set set = new HashSet(); for (Iterator i = mapsContainer.getKeys().iterator(); i.hasNext();) { ContainerId id = (ContainerId)i.next(); - set.add(id.getKey()); + set.add(id); } return set; } @@ -286,12 +288,14 @@ public class KahaStore implements Store{ public void deleteListContainer(Object id) throws IOException{ deleteListContainer(id,DEFAULT_CONTAINER_NAME); } - + public synchronized void deleteListContainer(Object id,String containerName) throws IOException{ + ContainerId containerId=new ContainerId(id,containerName); + deleteListContainer(containerId); + } + + public synchronized void deleteListContainer(ContainerId containerId) throws IOException{ initialize(); - ContainerId containerId=new ContainerId(); - containerId.setKey(id); - containerId.setDataContainerName(containerName); ListContainerImpl container=lists.remove(containerId); if(container!=null){ listsContainer.removeRoot(container.getIndexManager(),containerId); @@ -300,12 +304,12 @@ public class KahaStore implements Store{ } } - public synchronized Set getListContainerIds() throws IOException{ + public synchronized Set getListContainerIds() throws IOException{ initialize(); - Set set = new HashSet(); + Set set = new HashSet(); for (Iterator i = listsContainer.getKeys().iterator(); i.hasNext();) { ContainerId id = (ContainerId)i.next(); - set.add(id.getKey()); + set.add(id); } return set; } @@ -333,7 +337,7 @@ public class KahaStore implements Store{ if( isUseAsyncDataManager() ) { AsyncDataManager t=new AsyncDataManager(); t.setDirectory(directory); - t.setFilePrefix("data-"+name+"-"); + t.setFilePrefix("async-data-"+name+"-"); t.setMaxFileLength((int) maxDataFileLength); t.start(); dm=new DataManagerFacade(t, name); diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java index c73622d37b..a2e0765017 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/BaseContainerImpl.java @@ -21,6 +21,7 @@ package org.apache.activemq.kaha.impl.container; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.activemq.kaha.ContainerId; import org.apache.activemq.kaha.RuntimeStoreException; import org.apache.activemq.kaha.Store; import org.apache.activemq.kaha.StoreEntry; diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java index fe5a139b7a..644d2b2956 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/ListContainerImpl.java @@ -21,6 +21,7 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.ListIterator; +import org.apache.activemq.kaha.ContainerId; import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.Marshaller; import org.apache.activemq.kaha.RuntimeStoreException; diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java index 261ec1e844..89a7b25228 100755 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/container/MapContainerImpl.java @@ -20,6 +20,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.Set; +import org.apache.activemq.kaha.ContainerId; import org.apache.activemq.kaha.MapContainer; import org.apache.activemq.kaha.Marshaller; import org.apache.activemq.kaha.RuntimeStoreException; @@ -69,7 +70,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont throw new RuntimeException(e); } }else{ - this.index=new VMIndex(); + this.index=new VMIndex(indexManager); } } index.setKeyMarshaller(keyMarshaller); @@ -505,7 +506,7 @@ public final class MapContainerImpl extends BaseContainerImpl implements MapCont StoreLocation data=dataManager.storeDataItem(valueMarshaller,value); index.setValueData(data); } - IndexItem prev=indexList.getLast(); + IndexItem prev=indexList.getLast(); prev=prev!=null?prev:indexList.getRoot(); IndexItem next=indexList.getNextEntry(prev); prev.setNextItem(index.getOffset()); diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java index 597a0cecb9..376678ce5e 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataFile.java @@ -34,6 +34,7 @@ class DataFile{ private RandomAccessFile randomAcessFile; private Object writerData; long length=0; + private boolean dirty; DataFile(File file,int number){ this.file=file; @@ -107,6 +108,15 @@ class DataFile{ */ public synchronized void setWriterData(Object writerData) { this.writerData = writerData; + dirty=true; } + + public synchronized boolean isDirty() { + return dirty; + } + + public synchronized void setDirty(boolean value) { + this.dirty = value; + } } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java index 7c09c7a446..6c485e238f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/DataManagerImpl.java @@ -57,7 +57,7 @@ public final class DataManagerImpl implements DataManager { Marshaller redoMarshaller = RedoStoreIndexItem.MARSHALLER; private String dataFilePrefix; - + public DataManagerImpl(File dir, final String name){ this.dir=dir; this.name=name; diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java index e561c6901c..4080e170b3 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/data/SyncDataFileWriter.java @@ -96,9 +96,10 @@ final public class SyncDataFileWriter { public synchronized void force(DataFile dataFile) throws IOException { // If our dirty marker was set.. then we need to sync - if( dataFile.getWriterData()!=null ) { + if( dataFile.getWriterData()!=null && dataFile.isDirty()) { dataFile.getRandomAccessFile().getFD().sync(); dataFile.setWriterData(null); + dataFile.setDirty(false); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java index 46c7bf7cc6..29d2be74b9 100755 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/DiskIndexLinkedList.java @@ -65,6 +65,10 @@ public class DiskIndexLinkedList implements IndexLinkedList{ public synchronized IndexItem getLast(){ if(size==0) return null; + if(last!=null){ + last.next=null; + last.setNextItem(IndexItem.POSITION_NOT_SET); + } return last; } @@ -323,6 +327,7 @@ public class DiskIndexLinkedList implements IndexLinkedList{ return; if(e==last||e.equals(last)){ if(size>1){ + last = (IndexItem)refreshEntry(last); last=getPrevEntry(last); }else{ last=null; diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java index f0454b12be..32de310fb9 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/IndexManager.java @@ -45,6 +45,7 @@ public final class IndexManager{ private long length=0; private IndexItem firstFree; private IndexItem lastFree; + private boolean dirty; public IndexManager(File directory,String name,String mode,DataManager redoLog) throws IOException{ this.directory=directory; @@ -76,10 +77,12 @@ public final class IndexManager{ lastFree.setNextItem(item.getOffset()); } writer.updateIndexes(item); + dirty=true; } public synchronized void storeIndex(IndexItem index) throws IOException{ writer.storeItem(index); + dirty=true; } public synchronized void updateIndexes(IndexItem index) throws IOException{ @@ -88,10 +91,12 @@ public final class IndexManager{ }catch(Throwable e){ log.error(name+" error updating indexes ",e); } + dirty=true; } public synchronized void redo(final RedoStoreIndexItem redo) throws IOException{ writer.redoStoreItem(redo); + dirty=true; } public synchronized IndexItem createNewIndex() throws IOException{ @@ -113,8 +118,9 @@ public final class IndexManager{ } public synchronized void force() throws IOException{ - if(indexFile!=null){ + if(indexFile!=null && dirty){ indexFile.getFD().sync(); + dirty=false; } } diff --git a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java index 3649ed78db..a8ba9327b1 100644 --- a/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java +++ b/activemq-core/src/main/java/org/apache/activemq/kaha/impl/index/VMIndex.java @@ -14,10 +14,14 @@ package org.apache.activemq.kaha.impl.index; +import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.activemq.kaha.Marshaller; import org.apache.activemq.kaha.StoreEntry; +import org.apache.activemq.kaha.impl.container.MapContainerImpl; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; /** * Index implementation using a HashMap @@ -25,9 +29,13 @@ import org.apache.activemq.kaha.StoreEntry; * @version $Revision: 1.2 $ */ public class VMIndex implements Index{ - + private static final Log log=LogFactory.getLog(VMIndex.class); + private IndexManager indexManager; private Map map=new HashMap(); + public VMIndex(IndexManager manager) { + this.indexManager= manager; + } /** * * @see org.apache.activemq.kaha.impl.index.Index#clear() @@ -47,10 +55,20 @@ public class VMIndex implements Index{ /** * @param key + * @return store entry * @see org.apache.activemq.kaha.impl.index.Index#removeKey(java.lang.Object) */ public StoreEntry remove(Object key){ - return map.remove(key); + StoreEntry result = map.remove(key); + if (result != null) { + try{ + result=indexManager.refreshIndex((IndexItem)result); + }catch(IOException e){ + log.error("Failed to refresh entry",e); + throw new RuntimeException("Failed to refresh entry"); + } + } + return result; } /** @@ -68,7 +86,16 @@ public class VMIndex implements Index{ * @return the entry */ public StoreEntry get(Object key){ - return map.get(key); + StoreEntry result = map.get(key); + if (result != null) { + try{ + result=indexManager.refreshIndex((IndexItem)result); + }catch(IOException e){ + log.error("Failed to refresh entry",e); + throw new RuntimeException("Failed to refresh entry"); + } + } + return result; } /** diff --git a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java index f797a6f3b9..e07f551864 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java @@ -84,7 +84,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener private final ConcurrentHashMap topics = new ConcurrentHashMap(); private AsyncDataManager asyncDataManager; - private ReferenceStoreAdapter referenceStoreAdapter; + private KahaReferenceStoreAdapter referenceStoreAdapter; private TaskRunnerFactory taskRunnerFactory; private WireFormat wireFormat = new OpenWireFormat(); @@ -106,7 +106,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener private Runnable periodicCleanupTask; private boolean deleteAllMessages; - private File directory = new File(IOHelper.getDefaultDataDirectory() + "/quick"); + private File directory = new File(IOHelper.getDefaultDataDirectory() + "/amq"); @@ -242,7 +242,9 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener checkpointTask.wakeup(); if (sync) { - log.debug("Waitng for checkpoint to complete."); + if(log.isDebugEnabled()){ + log.debug("Waitng for checkpoint to complete."); + } latch.await(); } } @@ -264,7 +266,10 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener } try { - log.debug("Checkpoint started."); + if(log.isDebugEnabled()){ + log.debug("Checkpoint started."); + } + referenceStoreAdapter.sync(); Location newMark = null; Iterator iterator = queues.values().iterator(); @@ -287,7 +292,9 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener try { if (newMark != null) { - log.debug("Marking journal at: " + newMark); + if(log.isDebugEnabled()){ + log.debug("Marking journal at: " + newMark); + } asyncDataManager.setMark(newMark, false); writeTraceMessage("CHECKPOINT "+new Date(), true); } @@ -296,17 +303,12 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener log.error("Failed to mark the Journal: " + e, e); } -// if (referenceStoreAdapter instanceof JDBCReferenceStoreAdapter) { -// // We may be check pointing more often than the checkpointInterval if under high use -// // But we don't want to clean up the db that often. -// long now = System.currentTimeMillis(); -// if( now > lastCleanup+checkpointInterval ) { -// lastCleanup = now; -// ((JDBCReferenceStoreAdapter) referenceStoreAdapter).cleanup(); -// } -// } - - log.debug("Checkpoint done."); + if(log.isDebugEnabled()){ + log.debug("Checkpoint done."); + } + } + catch(IOException e) { + log.error("Failed to sync reference store",e); } finally { latch.countDown(); @@ -603,7 +605,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener return manager; } - protected ReferenceStoreAdapter createReferenceStoreAdapter() throws IOException { + protected KahaReferenceStoreAdapter createReferenceStoreAdapter() throws IOException { KahaReferenceStoreAdapter adaptor = new KahaReferenceStoreAdapter(directory); return adaptor; } @@ -627,9 +629,7 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener public ReferenceStoreAdapter getReferenceStoreAdapter() { return referenceStoreAdapter; } - public void setReferenceStoreAdapter(ReferenceStoreAdapter referenceStoreAdapter) { - this.referenceStoreAdapter = referenceStoreAdapter; - } + public TaskRunnerFactory getTaskRunnerFactory() { return taskRunnerFactory; diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java index 31ef5e1cb3..188919ab4c 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStore.java @@ -31,9 +31,11 @@ public class KahaReferenceStore implements ReferenceStore{ protected final ActiveMQDestination destination; protected final MapContainer messageContainer; + protected KahaReferenceStoreAdapter adapter; protected StoreEntry batchEntry=null; - public KahaReferenceStore(MapContainer container,ActiveMQDestination destination) throws IOException{ + public KahaReferenceStore(KahaReferenceStoreAdapter adapter,MapContainer container,ActiveMQDestination destination) throws IOException{ + this.adapter = adapter; this.messageContainer=container; this.destination=destination; } @@ -109,10 +111,10 @@ public class KahaReferenceStore implements ReferenceStore{ return result.data; } - public void addReferenceFileIdsInUse(Set rc){ + public void addReferenceFileIdsInUse(){ for(StoreEntry entry=messageContainer.getFirst();entry!=null;entry=messageContainer.getNext(entry)){ ReferenceRecord msg=(ReferenceRecord)messageContainer.getValue(entry); - rc.add(msg.data.getFileId()); + addInterest(msg); } } @@ -172,10 +174,10 @@ public class KahaReferenceStore implements ReferenceStore{ } void removeInterest(ReferenceRecord rr) { - + adapter.removeInterestInRecordFile(rr.data.getFileId()); } void addInterest(ReferenceRecord rr) { - + adapter.addInterestInRecordFile(rr.data.getFileId()); } } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java index bbba5dec23..bae8539a9f 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java @@ -17,34 +17,39 @@ */ package org.apache.activemq.store.kahadaptor; -import java.io.DataInput; -import java.io.DataOutput; import java.io.File; import java.io.IOException; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; - +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQTopic; -import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageId; +import org.apache.activemq.kaha.ContainerId; import org.apache.activemq.kaha.ListContainer; import org.apache.activemq.kaha.MapContainer; -import org.apache.activemq.kaha.Marshaller; import org.apache.activemq.kaha.MessageIdMarshaller; -import org.apache.activemq.kaha.MessageMarshaller; import org.apache.activemq.kaha.Store; import org.apache.activemq.store.MessageStore; import org.apache.activemq.store.ReferenceStore; import org.apache.activemq.store.ReferenceStoreAdapter; import org.apache.activemq.store.TopicMessageStore; import org.apache.activemq.store.TopicReferenceStore; -import org.apache.activemq.store.ReferenceStore.ReferenceData; +import org.apache.activemq.store.amq.AMQPersistenceAdapter; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements ReferenceStoreAdapter { - - private MapContainer fileReferences; + private static final Log log = LogFactory.getLog(KahaPersistenceAdapter.class); + private static final String STORE_STATE = "store-state"; + private static final String RECORD_REFERENCES = "record-references"; + private MapContainer stateMap; + private MaprecordReferences = new HashMap(); + private boolean storeValid; public KahaReferenceStoreAdapter(File dir) throws IOException { super(dir); @@ -59,22 +64,63 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements } @Override - public void start() throws Exception { - super.start(); - + public void start() throws Exception{ + super.start(); Store store=getStore(); - fileReferences=store.getMapContainer("file-references"); - fileReferences.setKeyMarshaller(new IntegerMarshaller()); - fileReferences.setValueMarshaller(new IntegerMarshaller()); - fileReferences.load(); + boolean empty=store.getMapContainerIds().isEmpty(); + stateMap=store.getMapContainer("state",STORE_STATE); + stateMap.load(); + if(!empty){ + + AtomicBoolean status=(AtomicBoolean)stateMap.get(STORE_STATE); + if(status!=null){ + storeValid=status.get(); + } + + if(storeValid){ + if(stateMap.containsKey(RECORD_REFERENCES)){ + recordReferences=(Map)stateMap.get(RECORD_REFERENCES); + } + }else { + /* + log.warn("Store Not shutdown cleanly - clearing out unsafe records ..."); + Set set = store.getListContainerIds(); + for (ContainerId cid:set) { + if (!cid.getDataContainerName().equals(STORE_STATE)) { + store.deleteListContainer(cid); + } + } + set = store.getMapContainerIds(); + for (ContainerId cid:set) { + if (!cid.getDataContainerName().equals(STORE_STATE)) { + store.deleteMapContainer(cid); + } + } + */ + buildReferenceFileIdsInUse(); + } + + } + stateMap.put(STORE_STATE,new AtomicBoolean()); } + @Override + public void stop() throws Exception { + stateMap.put(RECORD_REFERENCES,recordReferences); + stateMap.put(STORE_STATE,new AtomicBoolean(true)); + super.stop(); + } + + + public boolean isStoreValid() { + return storeValid; + } public ReferenceStore createQueueReferenceStore(ActiveMQQueue destination) throws IOException { ReferenceStore rc=(ReferenceStore)queues.get(destination); if(rc==null){ - rc=new KahaReferenceStore(getMapReferenceContainer(destination,"queue-data"),destination); + rc=new KahaReferenceStore(this,getMapReferenceContainer(destination,"queue-data"),destination); messageStores.put(destination,rc); // if(transactionStore!=null){ // rc=transactionStore.proxy(rc); @@ -89,10 +135,10 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements if(rc==null){ Store store=getStore(); MapContainer messageContainer=getMapReferenceContainer(destination,"topic-data"); - MapContainer subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","topic-subs"); + MapContainer subsContainer=getSubsMapContainer(destination.toString()+"-Subscriptions","blob"); ListContainer ackContainer=store.getListContainer(destination.toString(),"topic-acks"); ackContainer.setMarshaller(new TopicSubAckMarshaller()); - rc=new KahaTopicReferenceStore(store,messageContainer,ackContainer,subsContainer,destination); + rc=new KahaTopicReferenceStore(store,this,messageContainer,ackContainer,subsContainer,destination); messageStores.put(destination,rc); // if(transactionStore!=null){ // rc=transactionStore.proxy(rc); @@ -102,25 +148,26 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements return rc; } - public Set getReferenceFileIdsInUse() throws IOException { + public void buildReferenceFileIdsInUse() throws IOException { - Set rc = new HashSet(); + recordReferences = new HashMap(); Set destinations = getDestinations(); for (ActiveMQDestination destination : destinations) { if( destination.isQueue() ) { KahaReferenceStore store = (KahaReferenceStore) createQueueReferenceStore((ActiveMQQueue) destination); - store.addReferenceFileIdsInUse(rc); + store.addReferenceFileIdsInUse(); } else { KahaTopicReferenceStore store = (KahaTopicReferenceStore) createTopicReferenceStore((ActiveMQTopic) destination); - store.addReferenceFileIdsInUse(rc); + store.addReferenceFileIdsInUse(); } - } - - return rc; - + } } + public void sync() throws IOException { + getStore().force(); + } + protected MapContainer getMapReferenceContainer(Object id,String containerName) throws IOException{ Store store=getStore(); MapContainer container=store.getMapContainer(id,containerName); @@ -129,6 +176,33 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements container.load(); return container; } + + synchronized void addInterestInRecordFile(int recordNumber) { + Integer key = new Integer(recordNumber); + AtomicInteger rr = recordReferences.get(key); + if (rr == null) { + rr = new AtomicInteger(); + recordReferences.put(key,rr); + } + rr.incrementAndGet(); + } + + synchronized void removeInterestInRecordFile(int recordNumber) { + Integer key = new Integer(recordNumber); + AtomicInteger rr = recordReferences.get(key); + if (rr != null && rr.decrementAndGet() <= 0) { + recordReferences.remove(key); + } + } + + /** + * @return + * @throws IOException + * @see org.apache.activemq.store.ReferenceStoreAdapter#getReferenceFileIdsInUse() + */ + public Set getReferenceFileIdsInUse() throws IOException{ + return recordReferences.keySet(); + } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java index 17ea7a2c6a..ab88617e94 100755 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicMessageStore.java @@ -86,8 +86,12 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); if(tsa!=null){ if(tsa.decrementCount()<=0){ - ackContainer.remove(ref.getAckEntry()); - messageContainer.remove(tsa.getMessageEntry()); + StoreEntry entry = ref.getAckEntry(); + entry = ackContainer.refresh(entry); + ackContainer.remove(entry); + entry = tsa.getMessageEntry(); + entry =messageContainer.refresh(entry); + messageContainer.remove(entry); }else{ ackContainer.update(ref.getAckEntry(),tsa); } diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java index bce6afbe67..e90fc46aaa 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaTopicReferenceStore.java @@ -39,9 +39,9 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic private Store store; protected Map subscriberMessages=new ConcurrentHashMap(); - public KahaTopicReferenceStore(Store store,MapContainer messageContainer,ListContainer ackContainer, + public KahaTopicReferenceStore(Store store,KahaReferenceStoreAdapter adapter,MapContainer messageContainer,ListContainer ackContainer, MapContainer subsContainer,ActiveMQDestination destination) throws IOException{ - super(messageContainer,destination); + super(adapter,messageContainer,destination); this.store=store; this.ackContainer=ackContainer; subscriberContainer=subsContainer; @@ -97,18 +97,18 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic return result.data; } - public void addReferenceFileIdsInUse(Set rc){ + public void addReferenceFileIdsInUse(){ for(StoreEntry entry=ackContainer.getFirst();entry!=null;entry=ackContainer.getNext(entry)){ TopicSubAck subAck=(TopicSubAck)ackContainer.get(entry); if(subAck.getCount()>0){ ReferenceRecord rr=(ReferenceRecord)messageContainer.getValue(subAck.getMessageEntry()); - rc.add(rr.data.getFileId()); + addInterest(rr); } } } protected ListContainer addSubscriberMessageContainer(Object key) throws IOException{ - ListContainer container=store.getListContainer(key,"topic-subs"); + ListContainer container=store.getListContainer(key,"topic-subs-references"); Marshaller marshaller=new ConsumerMessageRefMarshaller(); container.setMarshaller(marshaller); TopicSubContainer tsc=new TopicSubContainer(container); @@ -129,11 +129,15 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic TopicSubAck tsa=(TopicSubAck)ackContainer.get(ref.getAckEntry()); if(tsa!=null){ if(tsa.decrementCount()<=0){ - ackContainer.remove(ref.getAckEntry()); - ReferenceRecord rr = messageContainer.get(messageId); - if (rr != null) { - messageContainer.remove(tsa.getMessageEntry()); - removeInterest(rr); + StoreEntry entry=ref.getAckEntry(); + entry=ackContainer.refresh(entry); + ackContainer.remove(entry); + ReferenceRecord rr=messageContainer.get(messageId); + if(rr!=null){ + entry=tsa.getMessageEntry(); + entry=messageContainer.refresh(entry); + messageContainer.remove(entry); + removeInterest(rr); } }else{ ackContainer.update(ref.getAckEntry(),tsa); @@ -261,7 +265,7 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic } } } - store.deleteListContainer(key,"topic-subs"); + store.deleteListContainer(key,"topic-subs-references"); } protected String getSubscriptionKey(String clientId,String subscriberName){ diff --git a/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java b/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java index d65a201c13..e5f4cef538 100644 --- a/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/kaha/MapContainerTest.java @@ -37,10 +37,23 @@ public class MapContainerTest extends TestCase{ protected MapContainer container; protected Map testMap; protected static final int COUNT = 10; + + public void testBasicAllocations() throws Exception{ + String key = "key"; + Object value = testMap; + MapContainer test = store.getMapContainer("test","test"); + test.put(key,value); + store.close(); + store = getStore(); + assertTrue(store.getMapContainerIds().isEmpty()==false); + test = store.getMapContainer("test","test"); + assertEquals(value,test.get(key)); + + } /* * Test method for 'org.apache.activemq.kaha.MapContainer.size()' */ - public void XtestSize() throws Exception { + public void testSize() throws Exception { container.putAll(testMap); assertTrue(container.size()==testMap.size()); } @@ -48,14 +61,14 @@ public class MapContainerTest extends TestCase{ /* * Test method for 'org.apache.activemq.kaha.MapContainer.isEmpty()' */ - public void XtestIsEmpty() throws Exception { + public void testIsEmpty() throws Exception { assertTrue(container.isEmpty()); } /* * Test method for 'org.apache.activemq.kaha.MapContainer.clear()' */ - public void XtestClear() throws Exception { + public void testClear() throws Exception { container.putAll(testMap); assertTrue(container.size()==testMap.size()); container.clear(); diff --git a/activemq-core/src/test/java/org/apache/activemq/kaha/StoreTest.java b/activemq-core/src/test/java/org/apache/activemq/kaha/StoreTest.java index 9297154338..0c4b31ea5d 100644 --- a/activemq-core/src/test/java/org/apache/activemq/kaha/StoreTest.java +++ b/activemq-core/src/test/java/org/apache/activemq/kaha/StoreTest.java @@ -100,16 +100,7 @@ public class StoreTest extends TestCase{ assertFalse(store.doesMapContainerExist(containerId)); } - /* - * Test method for 'org.apache.activemq.kaha.Store.getMapContainerIds()' - */ - public void testGetMapContainerIds()throws Exception { - String containerId = "test"; - MapContainer container = store.getMapContainer(containerId); - Set set = store.getMapContainerIds(); - assertTrue(set.contains(containerId)); - } - + /* @@ -139,16 +130,7 @@ public class StoreTest extends TestCase{ assertFalse(store.doesListContainerExist(containerId)); } - /* - * Test method for 'org.apache.activemq.kaha.Store.getListContainerIds()' - */ - public void testGetListContainerIds()throws Exception { - String containerId = "test"; - ListContainer container = store.getListContainer(containerId); - Set set = store.getListContainerIds(); - assertTrue(set.contains(containerId)); - } - + public void testBasicAllocations() throws Exception{ Map testMap = new HashMap(); int count = 1000;