git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@597581 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-11-23 07:40:51 +00:00
parent 1aab7d3597
commit ba8b248788
4 changed files with 100 additions and 15 deletions

View File

@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -39,10 +40,11 @@ import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Manages DataFiles
*
@ -87,6 +89,7 @@ public final class AsyncDataManager {
private DataFileAccessorPool accessorPool = new DataFileAccessorPool(this);
private Map<Integer, DataFile> fileMap = new HashMap<Integer, DataFile>();
private Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
private DataFile currentWriteFile;
private Location mark;
@ -131,7 +134,7 @@ public final class AsyncDataManager {
return dir.equals(directory) && n.startsWith(filePrefix);
}
});
if (files != null) {
for (int i = 0; i < files.length; i++) {
try {
@ -157,6 +160,7 @@ public final class AsyncDataManager {
currentWriteFile.linkAfter(df);
}
currentWriteFile = df;
fileByFileMap.put(df.getFile(), df);
}
}
@ -254,8 +258,10 @@ public final class AsyncDataManager {
int nextNum = currentWriteFile != null ? currentWriteFile.getDataFileId().intValue() + 1 : 1;
String fileName = filePrefix + nextNum;
DataFile nextWriteFile = new DataFile(new File(directory, fileName), nextNum, preferedFileLength);
File file = new File(directory, fileName);
DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
fileByFileMap.put(file, nextWriteFile);
if (currentWriteFile != null) {
currentWriteFile.linkAfter(nextWriteFile);
if (currentWriteFile.isUnused()) {
@ -289,6 +295,16 @@ public final class AsyncDataManager {
}
return dataFile;
}
File getFile(Location item) throws IOException {
Integer key = Integer.valueOf(item.getDataFileId());
DataFile dataFile = fileMap.get(key);
if (dataFile == null) {
LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
throw new IOException("Could not locate data file " + filePrefix + "-" + item.getDataFileId());
}
return dataFile.getFile();
}
private DataFile getNextDataFile(DataFile dataFile) {
return (DataFile)dataFile.getNext();
@ -303,6 +319,7 @@ public final class AsyncDataManager {
storeState(false);
appender.close();
fileMap.clear();
fileByFileMap.clear();
controlFile.unlock();
controlFile.dispose();
started = false;
@ -327,6 +344,7 @@ public final class AsyncDataManager {
result &= dataFile.delete();
}
fileMap.clear();
fileByFileMap.clear();
lastAppendLocation.set(null);
mark = null;
currentWriteFile = null;
@ -415,6 +433,7 @@ public final class AsyncDataManager {
private synchronized void forceRemoveDataFile(DataFile dataFile)
throws IOException {
accessorPool.disposeDataFileAccessors(dataFile);
fileByFileMap.remove(dataFile.getFile());
DataFile removed = fileMap.remove(dataFile.getDataFileId());
storeSize.addAndGet(-dataFile.getLength());
dataFile.unlink();
@ -461,16 +480,6 @@ public final class AsyncDataManager {
cur = new Location();
cur.setDataFileId(head.getDataFileId());
cur.setOffset(0);
// DataFileAccessor reader =
// accessorPool.openDataFileAccessor(head);
// try {
// if( !reader.readLocationDetailsAndValidate(cur) ) {
// return null;
// }
// } finally {
// accessorPool.closeDataFileAccessor(reader);
// }
} else {
// Set to the next offset..
cur = new Location(location);
@ -509,6 +518,64 @@ public final class AsyncDataManager {
}
}
}
public synchronized Location getNextLocation(File file, Location lastLocation,boolean thisFileOnly) throws IllegalStateException, IOException{
DataFile df = fileByFileMap.get(file);
return getNextLocation(df, lastLocation,thisFileOnly);
}
public synchronized Location getNextLocation(DataFile dataFile,
Location lastLocation,boolean thisFileOnly) throws IOException, IllegalStateException {
Location cur = null;
while (true) {
if (cur == null) {
if (lastLocation == null) {
DataFile head = (DataFile)dataFile.getHeadNode();
cur = new Location();
cur.setDataFileId(head.getDataFileId());
cur.setOffset(0);
} else {
// Set to the next offset..
cur = new Location(lastLocation);
cur.setOffset(cur.getOffset() + cur.getSize());
}
} else {
cur.setOffset(cur.getOffset() + cur.getSize());
}
// Did it go into the next file??
if (dataFile.getLength() <= cur.getOffset()) {
if (thisFileOnly) {
return null;
}else {
dataFile = getNextDataFile(dataFile);
if (dataFile == null) {
return null;
} else {
cur.setDataFileId(dataFile.getDataFileId().intValue());
cur.setOffset(0);
}
}
}
// Load in location size and type.
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
try {
reader.readLocationDetails(cur);
} finally {
accessorPool.closeDataFileAccessor(reader);
}
if (cur.getType() == 0) {
return null;
} else if (cur.getType() > 0) {
// Only return user records.
return cur;
}
}
}
public ByteSequence read(Location location) throws IOException, IllegalStateException {
DataFile dataFile = getDataFile(location);
@ -611,4 +678,12 @@ public final class AsyncDataManager {
return null;
return currentWriteFile.getDataFileId();
}
/**
* Get a set of files - only valid after start()
* @return files currently being used
*/
public Set<File> getFiles(){
return fileByFileMap.keySet();
}
}

View File

@ -43,6 +43,10 @@ class DataFile extends LinkedNode implements Comparable<DataFile> {
this.dataFileId = Integer.valueOf(number);
length = (int)(file.exists() ? file.length() : 0);
}
File getFile() {
return file;
}
public Integer getDataFileId() {
return dataFileId;

View File

@ -91,11 +91,13 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
boolean empty = store.getMapContainerIds().isEmpty();
stateMap = store.getMapContainer("state", STORE_STATE);
stateMap.load();
storeValid=true;
if (!empty) {
AtomicBoolean status = (AtomicBoolean)stateMap.get(STORE_STATE);
if (status != null) {
storeValid = status.get();
}
if (storeValid) {
//check what version the indexes are at
Integer indexVersion = (Integer) stateMap.get(INDEX_VERSION_NAME);
@ -236,7 +238,9 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
* @see org.apache.activemq.store.ReferenceStoreAdapter#clearMessages()
*/
public void clearMessages() throws IOException {
deleteAllMessages();
//don't delete messages as it will clear state - call base
//class method to clear out the data instead
super.deleteAllMessages();
}
/**
@ -247,6 +251,7 @@ public class KahaReferenceStoreAdapter extends KahaPersistenceAdapter implements
public void recoverState() throws IOException {
for (Iterator<SubscriptionInfo> i = durableSubscribers.iterator(); i.hasNext();) {
SubscriptionInfo info = i.next();
LOG.info("Recovering subscriber state for durable subscriber: " + info);
TopicReferenceStore ts = createTopicReferenceStore((ActiveMQTopic)info.getDestination());
ts.addSubsciption(info, false);
}

View File

@ -215,8 +215,9 @@ public class KahaTopicReferenceStore extends KahaReferenceStore implements Topic
}
public SubscriptionInfo[] getAllSubscriptions() throws IOException {
return subscriberContainer.values()
SubscriptionInfo[] result = subscriberContainer.values()
.toArray(new SubscriptionInfo[subscriberContainer.size()]);
return result;
}
public int getMessageCount(String clientId, String subscriberName) throws IOException {