mirror of https://github.com/apache/activemq.git
more tuning - and ensure data files have more meaningful names
git-svn-id: https://svn.apache.org/repos/asf/incubator/activemq/trunk@395689 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4c91a69ead
commit
a16b869fd8
|
@ -2,36 +2,36 @@ package org.apache.activemq.kaha;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Store is holds persistent containers
|
* A Store is holds persistent containers
|
||||||
*
|
*
|
||||||
* @version $Revision: 1.2 $
|
* @version $Revision: 1.2 $
|
||||||
*/
|
*/
|
||||||
public interface Store{
|
public interface Store{
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* close the store
|
* close the store
|
||||||
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void close() throws IOException;
|
public void close() throws IOException;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Force all writes to disk
|
* Force all writes to disk
|
||||||
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void force() throws IOException;
|
public void force() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* empty all the contents of the store
|
* empty all the contents of the store
|
||||||
|
*
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public void clear() throws IOException;
|
public void clear() throws IOException;
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* delete the store
|
* delete the store
|
||||||
|
*
|
||||||
* @return true if the delete was successful
|
* @return true if the delete was successful
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
|
@ -39,22 +39,35 @@ public interface Store{
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks if a MapContainer exists
|
* Checks if a MapContainer exists
|
||||||
|
*
|
||||||
* @param id
|
* @param id
|
||||||
* @return new MapContainer
|
* @return new MapContainer
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public boolean doesMapContainerExist(Object id) throws IOException;
|
public boolean doesMapContainerExist(Object id) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a MapContainer with the given id - the MapContainer is created if needed
|
* Get a MapContainer with the given id - the MapContainer is created if needed
|
||||||
|
*
|
||||||
* @param id
|
* @param id
|
||||||
* @return container for the associated id or null if it doesn't exist
|
* @return container for the associated id or null if it doesn't exist
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public MapContainer getMapContainer(Object id) throws IOException;
|
public MapContainer getMapContainer(Object id) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a MapContainer with the given id - the MapContainer is created if needed
|
||||||
|
*
|
||||||
|
* @param id
|
||||||
|
* @param containerName
|
||||||
|
* @return container for the associated id or null if it doesn't exist
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public MapContainer getMapContainer(Object id,String containerName) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* delete a container
|
* delete a container
|
||||||
|
*
|
||||||
* @param id
|
* @param id
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
|
@ -62,40 +75,53 @@ public interface Store{
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a Set of call MapContainer Ids
|
* Get a Set of call MapContainer Ids
|
||||||
|
*
|
||||||
* @return the set of ids
|
* @return the set of ids
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public Set getMapContainerIds() throws IOException;
|
public Set getMapContainerIds() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Checks if a ListContainer exists
|
* Checks if a ListContainer exists
|
||||||
|
*
|
||||||
* @param id
|
* @param id
|
||||||
* @return new MapContainer
|
* @return new MapContainer
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public boolean doesListContainerExist(Object id) throws IOException;
|
public boolean doesListContainerExist(Object id) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a ListContainer with the given id and creates it if it doesn't exist
|
* Get a ListContainer with the given id and creates it if it doesn't exist
|
||||||
* @param id
|
*
|
||||||
* @return container for the associated id or null if it doesn't exist
|
* @param id
|
||||||
* @throws IOException
|
* @return container for the associated id or null if it doesn't exist
|
||||||
*/
|
* @throws IOException
|
||||||
public ListContainer getListContainer(Object id) throws IOException;
|
*/
|
||||||
|
public ListContainer getListContainer(Object id) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* delete a ListContainer
|
* Get a ListContainer with the given id and creates it if it doesn't exist
|
||||||
* @param id
|
*
|
||||||
* @throws IOException
|
* @param id
|
||||||
*/
|
* @param containerName
|
||||||
public void deleteListContainer(Object id) throws IOException;
|
* @return container for the associated id or null if it doesn't exist
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public ListContainer getListContainer(Object id,String containerName) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a Set of call ListContainer Ids
|
* delete a ListContainer
|
||||||
* @return the set of ids
|
*
|
||||||
* @throws IOException
|
* @param id
|
||||||
*/
|
* @throws IOException
|
||||||
public Set getListContainerIds() throws IOException;
|
*/
|
||||||
|
public void deleteListContainer(Object id) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a Set of call ListContainer Ids
|
||||||
|
*
|
||||||
|
* @return the set of ids
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public Set getListContainerIds() throws IOException;
|
||||||
}
|
}
|
|
@ -30,18 +30,22 @@ public abstract class BaseContainerImpl{
|
||||||
protected IndexLinkedList list;
|
protected IndexLinkedList list;
|
||||||
protected IndexManager indexManager;
|
protected IndexManager indexManager;
|
||||||
protected DataManager dataManager;
|
protected DataManager dataManager;
|
||||||
protected Object id;
|
protected ContainerId containerId;
|
||||||
protected boolean loaded=false;
|
protected boolean loaded=false;
|
||||||
protected boolean closed=false;
|
protected boolean closed=false;
|
||||||
protected final Object mutex=new Object();
|
protected final Object mutex=new Object();
|
||||||
|
|
||||||
protected BaseContainerImpl(Object id,IndexItem root,IndexManager indexManager,DataManager dataManager){
|
protected BaseContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager){
|
||||||
this.id=id;
|
this.containerId=id;
|
||||||
this.root=root;
|
this.root=root;
|
||||||
this.indexManager=indexManager;
|
this.indexManager=indexManager;
|
||||||
this.dataManager=dataManager;
|
this.dataManager=dataManager;
|
||||||
this.list=new IndexLinkedList(root);
|
this.list=new IndexLinkedList(root);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ContainerId getContainerId(){
|
||||||
|
return containerId;
|
||||||
|
}
|
||||||
|
|
||||||
public abstract void unload();
|
public abstract void unload();
|
||||||
|
|
||||||
|
@ -81,7 +85,7 @@ public abstract class BaseContainerImpl{
|
||||||
*/
|
*/
|
||||||
public final Object getId(){
|
public final Object getId(){
|
||||||
checkClosed();
|
checkClosed();
|
||||||
return id;
|
return containerId.getKey();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected final void expressDataInterest() throws IOException{
|
protected final void expressDataInterest() throws IOException{
|
||||||
|
|
|
@ -0,0 +1,80 @@
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Copyright 2005-2006 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
|
||||||
|
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.kaha.impl;
|
||||||
|
|
||||||
|
import java.io.Externalizable;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.ObjectInput;
|
||||||
|
import java.io.ObjectOutput;
|
||||||
|
/**
|
||||||
|
* Used by RootContainers
|
||||||
|
*
|
||||||
|
* @version $Revision: 1.1.1.1 $
|
||||||
|
*/
|
||||||
|
public class ContainerId implements Externalizable{
|
||||||
|
private static final long serialVersionUID=-8883779541021821943L;
|
||||||
|
private Object key;
|
||||||
|
private String dataContainerPrefix;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Returns the dataContainerPrefix.
|
||||||
|
*/
|
||||||
|
public String getDataContainerPrefix(){
|
||||||
|
return dataContainerPrefix;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param dataContainerPrefix The dataContainerPrefix to set.
|
||||||
|
*/
|
||||||
|
public void setDataContainerPrefix(String dataContainerPrefix){
|
||||||
|
this.dataContainerPrefix=dataContainerPrefix;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return Returns the key.
|
||||||
|
*/
|
||||||
|
public Object getKey(){
|
||||||
|
return key;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param key The key to set.
|
||||||
|
*/
|
||||||
|
public void setKey(Object key){
|
||||||
|
this.key=key;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int hashCode(){
|
||||||
|
return key.hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean equals(Object obj){
|
||||||
|
boolean result = false;
|
||||||
|
if (obj != null && obj instanceof ContainerId){
|
||||||
|
ContainerId other = (ContainerId) obj;
|
||||||
|
result = other.key.equals(this.key);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void writeExternal(ObjectOutput out) throws IOException{
|
||||||
|
out.writeUTF(getDataContainerPrefix());
|
||||||
|
out.writeObject(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void readExternal(ObjectInput in) throws IOException,ClassNotFoundException{
|
||||||
|
dataContainerPrefix=in.readUTF();
|
||||||
|
key=in.readObject();
|
||||||
|
}
|
||||||
|
}
|
|
@ -65,6 +65,10 @@ final class DataManager{
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getPrefix(){
|
||||||
|
return prefix;
|
||||||
|
}
|
||||||
|
|
||||||
DataFile findSpaceForData(DataItem item) throws IOException{
|
DataFile findSpaceForData(DataItem item) throws IOException{
|
||||||
if(currentWriteFile==null||((currentWriteFile.getLength()+item.getSize())>MAX_FILE_LENGTH)){
|
if(currentWriteFile==null||((currentWriteFile.getLength()+item.getSize())>MAX_FILE_LENGTH)){
|
||||||
|
@ -168,7 +172,7 @@ final class DataManager{
|
||||||
DataFile dataFile=(DataFile) purgeList.get(i);
|
DataFile dataFile=(DataFile) purgeList.get(i);
|
||||||
fileMap.remove(dataFile.getNumber());
|
fileMap.remove(dataFile.getNumber());
|
||||||
boolean result=dataFile.delete();
|
boolean result=dataFile.delete();
|
||||||
log.info("discarding data file "+dataFile+(result?"successful ":"failed"));
|
log.debug("discarding data file "+dataFile+(result?"successful ":"failed"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -183,6 +187,6 @@ final class DataManager{
|
||||||
private void removeDataFile(DataFile dataFile) throws IOException{
|
private void removeDataFile(DataFile dataFile) throws IOException{
|
||||||
fileMap.remove(dataFile.getNumber());
|
fileMap.remove(dataFile.getNumber());
|
||||||
boolean result=dataFile.delete();
|
boolean result=dataFile.delete();
|
||||||
log.info("discarding data file "+dataFile+(result?"successful ":"failed"));
|
log.debug("discarding data file "+dataFile+(result?"successful ":"failed"));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,13 +34,16 @@ import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
|
||||||
* @version $Revision: 1.1.1.1 $
|
* @version $Revision: 1.1.1.1 $
|
||||||
*/
|
*/
|
||||||
public class KahaStore implements Store{
|
public class KahaStore implements Store{
|
||||||
DataManager rootData;
|
private static final String DEFAULT_CONTAINER_NAME = "data-container";
|
||||||
DataManager containersData;
|
private File directory;
|
||||||
IndexManager indexManager;
|
private DataManager rootData;
|
||||||
|
private DataManager defaultContainerManager;
|
||||||
|
private IndexManager indexManager;
|
||||||
private IndexRootContainer mapsContainer;
|
private IndexRootContainer mapsContainer;
|
||||||
private IndexRootContainer listsContainer;
|
private IndexRootContainer listsContainer;
|
||||||
private Map lists=new ConcurrentHashMap();
|
private Map lists=new ConcurrentHashMap();
|
||||||
private Map maps=new ConcurrentHashMap();
|
private Map maps=new ConcurrentHashMap();
|
||||||
|
private Map dataManagers = new ConcurrentHashMap();
|
||||||
private boolean closed=false;
|
private boolean closed=false;
|
||||||
private String name;
|
private String name;
|
||||||
private String mode;
|
private String mode;
|
||||||
|
@ -58,7 +61,7 @@ public class KahaStore implements Store{
|
||||||
if(initialized){
|
if(initialized){
|
||||||
indexManager.close();
|
indexManager.close();
|
||||||
rootData.close();
|
rootData.close();
|
||||||
containersData.close();
|
defaultContainerManager.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -67,7 +70,7 @@ public class KahaStore implements Store{
|
||||||
if(initialized){
|
if(initialized){
|
||||||
indexManager.force();
|
indexManager.force();
|
||||||
rootData.force();
|
rootData.force();
|
||||||
containersData.force();
|
defaultContainerManager.force();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -90,7 +93,7 @@ public class KahaStore implements Store{
|
||||||
clear();
|
clear();
|
||||||
boolean result=indexManager.delete();
|
boolean result=indexManager.delete();
|
||||||
result&=rootData.delete();
|
result&=rootData.delete();
|
||||||
result&=containersData.delete();
|
result&=defaultContainerManager.delete();
|
||||||
initialized=false;
|
initialized=false;
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -101,22 +104,31 @@ public class KahaStore implements Store{
|
||||||
}
|
}
|
||||||
|
|
||||||
public MapContainer getMapContainer(Object id) throws IOException{
|
public MapContainer getMapContainer(Object id) throws IOException{
|
||||||
|
return getMapContainer(id, DEFAULT_CONTAINER_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized MapContainer getMapContainer(Object id, String dataContainerName) throws IOException{
|
||||||
initialize();
|
initialize();
|
||||||
|
|
||||||
MapContainer result=(MapContainer) maps.get(id);
|
MapContainer result=(MapContainer) maps.get(id);
|
||||||
if(result==null){
|
if(result==null){
|
||||||
IndexItem root=mapsContainer.addRoot(id);
|
DataManager dm = getDataManager(dataContainerName);
|
||||||
result=new MapContainerImpl(id,root,indexManager,containersData);
|
ContainerId containerId = new ContainerId();
|
||||||
maps.put(id,result);
|
containerId.setKey(id);
|
||||||
|
containerId.setDataContainerPrefix(dataContainerName);
|
||||||
|
IndexItem root=mapsContainer.addRoot(containerId);
|
||||||
|
result=new MapContainerImpl(containerId,root,indexManager,dm);
|
||||||
|
maps.put(containerId.getKey(),result);
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void deleteMapContainer(Object id) throws IOException{
|
public void deleteMapContainer(Object id) throws IOException{
|
||||||
initialize();
|
initialize();
|
||||||
MapContainer container=(MapContainer) maps.remove(id);
|
MapContainerImpl container=(MapContainerImpl) maps.remove(id);
|
||||||
if(container!=null){
|
if(container!=null){
|
||||||
container.clear();
|
container.clear();
|
||||||
mapsContainer.removeRoot(id);
|
mapsContainer.removeRoot(container.getContainerId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,22 +143,31 @@ public class KahaStore implements Store{
|
||||||
}
|
}
|
||||||
|
|
||||||
public ListContainer getListContainer(Object id) throws IOException{
|
public ListContainer getListContainer(Object id) throws IOException{
|
||||||
|
return getListContainer(id,DEFAULT_CONTAINER_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized ListContainer getListContainer(Object id, String dataContainerName) throws IOException{
|
||||||
initialize();
|
initialize();
|
||||||
|
|
||||||
ListContainer result=(ListContainer) lists.get(id);
|
ListContainer result=(ListContainer) lists.get(id);
|
||||||
if(result==null){
|
if(result==null){
|
||||||
IndexItem root=listsContainer.addRoot(id);
|
DataManager dm = getDataManager(dataContainerName);
|
||||||
result=new ListContainerImpl(id,root,indexManager,containersData);
|
ContainerId containerId = new ContainerId();
|
||||||
lists.put(id,result);
|
containerId.setKey(id);
|
||||||
|
containerId.setDataContainerPrefix(dataContainerName);
|
||||||
|
IndexItem root=listsContainer.addRoot(containerId);
|
||||||
|
result=new ListContainerImpl(containerId,root,indexManager,dm);
|
||||||
|
lists.put(containerId.getKey(),result);
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void deleteListContainer(Object id) throws IOException{
|
public void deleteListContainer(Object id) throws IOException{
|
||||||
initialize();
|
initialize();
|
||||||
ListContainer container=(ListContainer) lists.remove(id);
|
ListContainerImpl container=(ListContainerImpl) lists.remove(id);
|
||||||
if(container!=null){
|
if(container!=null){
|
||||||
container.clear();
|
container.clear();
|
||||||
listsContainer.removeRoot(id);
|
listsContainer.removeRoot(container.getContainerId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -164,12 +185,13 @@ public class KahaStore implements Store{
|
||||||
protected synchronized void initialize() throws IOException{
|
protected synchronized void initialize() throws IOException{
|
||||||
if(!initialized){
|
if(!initialized){
|
||||||
initialized=true;
|
initialized=true;
|
||||||
File dir=new File(name);
|
directory=new File(name);
|
||||||
dir.mkdirs();
|
directory.mkdirs();
|
||||||
File ifile=new File(dir,"kaha.idx");
|
File ifile=new File(directory,"kaha.idx");
|
||||||
indexManager=new IndexManager(ifile,mode);
|
indexManager=new IndexManager(ifile,mode);
|
||||||
rootData=new DataManager(dir,"roots-data");
|
rootData=new DataManager(directory,"roots-data");
|
||||||
containersData=new DataManager(dir,"containers-data");
|
defaultContainerManager=new DataManager(directory,DEFAULT_CONTAINER_NAME);
|
||||||
|
dataManagers.put(DEFAULT_CONTAINER_NAME, defaultContainerManager);
|
||||||
IndexItem mapRoot=new IndexItem();
|
IndexItem mapRoot=new IndexItem();
|
||||||
IndexItem listRoot=new IndexItem();
|
IndexItem listRoot=new IndexItem();
|
||||||
if(indexManager.isEmpty()){
|
if(indexManager.isEmpty()){
|
||||||
|
@ -186,20 +208,34 @@ public class KahaStore implements Store{
|
||||||
listsContainer=new IndexRootContainer(listRoot,indexManager,rootData);
|
listsContainer=new IndexRootContainer(listRoot,indexManager,rootData);
|
||||||
rootData.consolidateDataFiles();
|
rootData.consolidateDataFiles();
|
||||||
for(Iterator i=mapsContainer.getKeys().iterator();i.hasNext();){
|
for(Iterator i=mapsContainer.getKeys().iterator();i.hasNext();){
|
||||||
Object key=i.next();
|
ContainerId key=(ContainerId) i.next();
|
||||||
|
DataManager dm = getDataManager(key.getDataContainerPrefix());
|
||||||
IndexItem root=mapsContainer.getRoot(key);
|
IndexItem root=mapsContainer.getRoot(key);
|
||||||
BaseContainerImpl container=new MapContainerImpl(key,root,indexManager,containersData);
|
BaseContainerImpl container=new MapContainerImpl(key,root,indexManager,dm);
|
||||||
container.expressDataInterest();
|
container.expressDataInterest();
|
||||||
maps.put(key,container);
|
maps.put(key.getKey(),container);
|
||||||
}
|
}
|
||||||
for(Iterator i=listsContainer.getKeys().iterator();i.hasNext();){
|
for(Iterator i=listsContainer.getKeys().iterator();i.hasNext();){
|
||||||
Object key=i.next();
|
ContainerId key=(ContainerId) i.next();
|
||||||
|
DataManager dm = getDataManager(key.getDataContainerPrefix());
|
||||||
IndexItem root=listsContainer.getRoot(key);
|
IndexItem root=listsContainer.getRoot(key);
|
||||||
BaseContainerImpl container=new ListContainerImpl(key,root,indexManager,containersData);
|
BaseContainerImpl container=new ListContainerImpl(key,root,indexManager,dm);
|
||||||
container.expressDataInterest();
|
container.expressDataInterest();
|
||||||
lists.put(key,container);
|
lists.put(key.getKey(),container);
|
||||||
|
}
|
||||||
|
for (Iterator i = dataManagers.values().iterator(); i.hasNext();){
|
||||||
|
DataManager dm = (DataManager) i.next();
|
||||||
|
dm.consolidateDataFiles();
|
||||||
}
|
}
|
||||||
containersData.consolidateDataFiles();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected DataManager getDataManager(String prefix){
|
||||||
|
DataManager dm = (DataManager) dataManagers.get(prefix);
|
||||||
|
if (dm == null){
|
||||||
|
dm = new DataManager(directory,prefix);
|
||||||
|
dataManagers.put(prefix,dm);
|
||||||
|
}
|
||||||
|
return dm;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,7 +34,7 @@ final class ListContainerImpl extends BaseContainerImpl implements ListContainer
|
||||||
private static final Log log=LogFactory.getLog(ListContainerImpl.class);
|
private static final Log log=LogFactory.getLog(ListContainerImpl.class);
|
||||||
protected Marshaller marshaller=new ObjectMarshaller();
|
protected Marshaller marshaller=new ObjectMarshaller();
|
||||||
|
|
||||||
protected ListContainerImpl(Object id,IndexItem root,IndexManager indexManager,DataManager dataManager)
|
protected ListContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager)
|
||||||
throws IOException{
|
throws IOException{
|
||||||
super(id,root,indexManager,dataManager);
|
super(id,root,indexManager,dataManager);
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,7 +39,7 @@ final class MapContainerImpl extends BaseContainerImpl implements MapContainer{
|
||||||
protected Marshaller keyMarshaller=new ObjectMarshaller();
|
protected Marshaller keyMarshaller=new ObjectMarshaller();
|
||||||
protected Marshaller valueMarshaller=new ObjectMarshaller();
|
protected Marshaller valueMarshaller=new ObjectMarshaller();
|
||||||
|
|
||||||
protected MapContainerImpl(Object id,IndexItem root,IndexManager indexManager,DataManager dataManager){
|
protected MapContainerImpl(ContainerId id,IndexItem root,IndexManager indexManager,DataManager dataManager){
|
||||||
super(id,root,indexManager,dataManager);
|
super(id,root,indexManager,dataManager);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -78,7 +78,7 @@ public class KahaPersistentAdaptor implements PersistenceAdapter{
|
||||||
public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
|
public synchronized MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException{
|
||||||
MessageStore rc=(MessageStore) queues.get(destination);
|
MessageStore rc=(MessageStore) queues.get(destination);
|
||||||
if(rc==null){
|
if(rc==null){
|
||||||
rc=new KahaMessageStore(getMapContainer(destination),destination);
|
rc=new KahaMessageStore(getMapContainer(destination,"queue-data"),destination);
|
||||||
messageStores.put(destination, rc);
|
messageStores.put(destination, rc);
|
||||||
if(transactionStore!=null){
|
if(transactionStore!=null){
|
||||||
rc=transactionStore.proxy(rc);
|
rc=transactionStore.proxy(rc);
|
||||||
|
@ -91,9 +91,9 @@ public class KahaPersistentAdaptor implements PersistenceAdapter{
|
||||||
public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{
|
public synchronized TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException{
|
||||||
TopicMessageStore rc=(TopicMessageStore) topics.get(destination);
|
TopicMessageStore rc=(TopicMessageStore) topics.get(destination);
|
||||||
if(rc==null){
|
if(rc==null){
|
||||||
MapContainer messageContainer=getMapContainer(destination);
|
MapContainer messageContainer=getMapContainer(destination,"topic-data");
|
||||||
MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions");
|
MapContainer subsContainer=getMapContainer(destination.toString()+"-Subscriptions","topic-subs");
|
||||||
MapContainer ackContainer=store.getMapContainer(destination.toString()+"-Acks");
|
MapContainer ackContainer=store.getMapContainer(destination.toString(),"topic-acks");
|
||||||
ackContainer.setKeyMarshaller(new StringMarshaller());
|
ackContainer.setKeyMarshaller(new StringMarshaller());
|
||||||
ackContainer.setValueMarshaller(new AtomicIntegerMarshaller());
|
ackContainer.setValueMarshaller(new AtomicIntegerMarshaller());
|
||||||
rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination);
|
rc=new KahaTopicMessageStore(store,messageContainer,ackContainer,subsContainer,destination);
|
||||||
|
@ -114,7 +114,7 @@ public class KahaPersistentAdaptor implements PersistenceAdapter{
|
||||||
|
|
||||||
public TransactionStore createTransactionStore() throws IOException{
|
public TransactionStore createTransactionStore() throws IOException{
|
||||||
if(transactionStore==null){
|
if(transactionStore==null){
|
||||||
MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME);
|
MapContainer container=store.getMapContainer(PREPARED_TRANSACTIONS_NAME,"transactions");
|
||||||
container.setKeyMarshaller(new CommandMarshaller(wireFormat));
|
container.setKeyMarshaller(new CommandMarshaller(wireFormat));
|
||||||
container.setValueMarshaller(new TransactionMarshaller(wireFormat));
|
container.setValueMarshaller(new TransactionMarshaller(wireFormat));
|
||||||
container.load();
|
container.load();
|
||||||
|
@ -155,8 +155,8 @@ public class KahaPersistentAdaptor implements PersistenceAdapter{
|
||||||
this.useExternalMessageReferences=useExternalMessageReferences;
|
this.useExternalMessageReferences=useExternalMessageReferences;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected MapContainer getMapContainer(Object id) throws IOException{
|
protected MapContainer getMapContainer(Object id,String containerName) throws IOException{
|
||||||
MapContainer container=store.getMapContainer(id);
|
MapContainer container=store.getMapContainer(id,containerName);
|
||||||
container.setKeyMarshaller(new StringMarshaller());
|
container.setKeyMarshaller(new StringMarshaller());
|
||||||
if(useExternalMessageReferences){
|
if(useExternalMessageReferences){
|
||||||
container.setValueMarshaller(new StringMarshaller());
|
container.setValueMarshaller(new StringMarshaller());
|
||||||
|
|
|
@ -59,7 +59,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
|
||||||
ackContainer.put(id,new AtomicInteger(subscriberCount));
|
ackContainer.put(id,new AtomicInteger(subscriberCount));
|
||||||
for(Iterator i=subscriberAcks.keySet().iterator();i.hasNext();){
|
for(Iterator i=subscriberAcks.keySet().iterator();i.hasNext();){
|
||||||
Object key=i.next();
|
Object key=i.next();
|
||||||
ListContainer container=store.getListContainer(key);
|
ListContainer container=store.getListContainer(key,"durable-subs");
|
||||||
container.add(id);
|
container.add(id);
|
||||||
}
|
}
|
||||||
super.addMessage(context,message);
|
super.addMessage(context,message);
|
||||||
|
@ -163,7 +163,7 @@ public class KahaTopicMessageStore extends KahaMessageStore implements TopicMess
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void addSubscriberAckContainer(Object key) throws IOException{
|
protected void addSubscriberAckContainer(Object key) throws IOException{
|
||||||
ListContainer container=store.getListContainer(key);
|
ListContainer container=store.getListContainer(key,"topic-subs");
|
||||||
Marshaller marshaller=new StringMarshaller();
|
Marshaller marshaller=new StringMarshaller();
|
||||||
container.setMarshaller(marshaller);
|
container.setMarshaller(marshaller);
|
||||||
subscriberAcks.put(key,container);
|
subscriberAcks.put(key,container);
|
||||||
|
|
Loading…
Reference in New Issue