git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@591148 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Robert Davies 2007-11-01 21:49:13 +00:00
parent 9a491bbc76
commit b2544046c5
3 changed files with 63 additions and 8 deletions

View File

@ -39,6 +39,7 @@ import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteCommand;
import org.apache.activemq.kaha.impl.async.DataFileAppender.WriteKey;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.IOHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -63,8 +64,8 @@ public final class AsyncDataManager {
public static final byte DATA_ITEM_TYPE = 1;
public static final byte REDO_ITEM_TYPE = 2;
public static final String DEFAULT_DIRECTORY = "data";
public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
public static final String DEFAULT_FILE_PREFIX = "data-";
public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
@ -73,6 +74,7 @@ public final class AsyncDataManager {
protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
File directory = new File(DEFAULT_DIRECTORY);
File directoryArchive = new File (DEFAULT_ARCHIVE_DIRECTORY);
String filePrefix = DEFAULT_FILE_PREFIX;
ControlFile controlFile;
boolean started;
@ -91,6 +93,7 @@ public final class AsyncDataManager {
private final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
private Runnable cleanupTask;
private final AtomicLong storeSize;
private boolean archiveDataLogs;
public AsyncDataManager(AtomicLong storeSize) {
this.storeSize=storeSize;
@ -409,15 +412,21 @@ public final class AsyncDataManager {
forceRemoveDataFile(dataFile);
}
private synchronized void forceRemoveDataFile(DataFile dataFile) throws IOException {
private synchronized void forceRemoveDataFile(DataFile dataFile)
throws IOException {
accessorPool.disposeDataFileAccessors(dataFile);
DataFile removed = fileMap.remove(dataFile.getDataFileId());
storeSize.addAndGet(-dataFile.getLength());
dataFile.unlink();
if (archiveDataLogs) {
dataFile.move(getDirectoryArchive());
LOG.debug("moced data file " + dataFile + " to "
+ getDirectoryArchive());
} else {
boolean result = dataFile.delete();
LOG.debug("discarding data file " + dataFile
+ (result ? "successful " : "failed"));
}
}
/**
@ -581,6 +590,21 @@ public final class AsyncDataManager {
this.useNio = useNio;
}
public File getDirectoryArchive() {
return directoryArchive;
}
public void setDirectoryArchive(File directoryArchive) {
this.directoryArchive = directoryArchive;
}
public boolean isArchiveDataLogs() {
return archiveDataLogs;
}
public void setArchiveDataLogs(boolean archiveDataLogs) {
this.archiveDataLogs = archiveDataLogs;
}
synchronized public Integer getCurrentDataFileId() {
if( currentWriteFile==null )

View File

@ -20,6 +20,7 @@ import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.LinkedNode;
/**
@ -104,6 +105,10 @@ class DataFile extends LinkedNode implements Comparable<DataFile> {
return file.delete();
}
public synchronized void move(File targetDirectory) throws IOException{
IOHelper.moveFile(file,targetDirectory);
}
public int compareTo(DataFile df) {
return dataFileId - df.dataFileId;
}

View File

@ -101,10 +101,12 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
private boolean syncOnWrite;
private String brokerName = "";
private File directory;
private File directoryArchive;
private BrokerService brokerService;
private AtomicLong storeSize = new AtomicLong();
private boolean persistentIndex=true;
private boolean useNio = true;
private boolean archiveDataLogs=false;
private int maxFileLength = AsyncDataManager.DEFAULT_MAX_FILE_LENGTH;
private int indexBinSize = HashIndex.DEFAULT_BIN_SIZE;
private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
@ -143,8 +145,14 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
this.directory = new File(directory, "amqstore");
}
}
if (this.directoryArchive == null) {
this.directoryArchive = new File(this.directory,"archive");
}
LOG.info("AMQStore starting using directory: " + directory);
this.directory.mkdirs();
if (archiveDataLogs) {
this.directoryArchive.mkdirs();
}
if (this.usageManager != null) {
this.usageManager.getMemoryUsage().addUsageListener(this);
@ -624,6 +632,8 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
protected AsyncDataManager createAsyncDataManager() {
AsyncDataManager manager = new AsyncDataManager(storeSize);
manager.setDirectory(new File(directory, "journal"));
manager.setDirectoryArchive(getDirectoryArchive());
manager.setArchiveDataLogs(isArchiveDataLogs());
manager.setMaxFileLength(maxFileLength);
manager.setUseNio(useNio);
return manager;
@ -794,6 +804,22 @@ public class AMQPersistenceAdapter implements PersistenceAdapter, UsageListener,
this.indexPageSize = indexPageSize;
}
public File getDirectoryArchive() {
return directoryArchive;
}
public void setDirectoryArchive(File directoryArchive) {
this.directoryArchive = directoryArchive;
}
public boolean isArchiveDataLogs() {
return archiveDataLogs;
}
public void setArchiveDataLogs(boolean archiveDataLogs) {
this.archiveDataLogs = archiveDataLogs;
}
protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
Set<Integer>set = dataFilesInProgress.get(store);