resolve PerDestinationStoreLimitTest - have store size reflect stored messages rather than disk usage (so now it reports used rather than allocated disk soace excluding the index) such that 100MB messages gives store size ~100MB while disk utilisation may be bigger. getDiskSize still exsits to get totall allocated space on disk. This allows configured limits to be more accuratly enforced and fixes this test regression. Also calculation is accumulative rather than on demand so it much faster

git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@962468 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Gary Tully 2010-07-09 09:52:43 +00:00
parent 40ae8b1d4c
commit 5763561d60
3 changed files with 13 additions and 9 deletions

View File

@ -914,14 +914,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
} }
public long size() { public long size() {
if (!isStarted()) { return storeSize.get();
return 0;
}
try {
return journal.getDiskSize() + pageFile.getDiskSize();
} catch (IOException e) {
throw new RuntimeException(e);
}
} }
public void beginTransaction(ConnectionContext context) throws IOException { public void beginTransaction(ConnectionContext context) throws IOException {

View File

@ -39,6 +39,7 @@ import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.ActiveMQMessageAuditNoSync; import org.apache.activemq.ActiveMQMessageAuditNoSync;
@ -190,6 +191,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
protected boolean enableJournalDiskSyncs=true; protected boolean enableJournalDiskSyncs=true;
protected boolean archiveDataLogs; protected boolean archiveDataLogs;
protected File directoryArchive; protected File directoryArchive;
protected AtomicLong storeSize = new AtomicLong(0);
long checkpointInterval = 5*1000; long checkpointInterval = 5*1000;
long cleanupInterval = 30*1000; long cleanupInterval = 30*1000;
int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
@ -1594,6 +1596,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar
manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles); manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles);
manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); manager.setWriteBatchSize(getJournalMaxWriteBatchSize());
manager.setArchiveDataLogs(isArchiveDataLogs()); manager.setArchiveDataLogs(isArchiveDataLogs());
manager.setSizeAccumulator(storeSize);
if (getDirectoryArchive() != null) { if (getDirectoryArchive() != null) {
IOHelper.mkdirs(getDirectoryArchive()); IOHelper.mkdirs(getDirectoryArchive());
manager.setDirectoryArchive(getDirectoryArchive()); manager.setDirectoryArchive(getDirectoryArchive());

View File

@ -112,7 +112,7 @@ public class Journal {
protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>(); protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
protected Runnable cleanupTask; protected Runnable cleanupTask;
protected final AtomicLong totalLength = new AtomicLong(); protected AtomicLong totalLength = new AtomicLong();
protected boolean archiveDataLogs; protected boolean archiveDataLogs;
private ReplicationTarget replicationTarget; private ReplicationTarget replicationTarget;
protected boolean checksum; protected boolean checksum;
@ -226,7 +226,11 @@ public class Journal {
accessorPool.closeDataFileAccessor(reader); accessorPool.closeDataFileAccessor(reader);
} }
int existingLen = dataFile.getLength();
dataFile.setLength(location.getOffset()); dataFile.setLength(location.getOffset());
if (existingLen > dataFile.getLength()) {
totalLength.addAndGet(dataFile.getLength() - existingLen);
}
if( !dataFile.corruptedBlocks.isEmpty() ) { if( !dataFile.corruptedBlocks.isEmpty() ) {
// Is the end of the data file corrupted? // Is the end of the data file corrupted?
@ -735,4 +739,8 @@ public class Journal {
public int getWriteBatchSize() { public int getWriteBatchSize() {
return writeBatchSize; return writeBatchSize;
} }
public void setSizeAccumulator(AtomicLong storeSizeAccumulator) {
this.totalLength = storeSizeAccumulator;
}
} }