diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index 34bcccc0fb..75e02ebf28 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -914,14 +914,7 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter { } public long size() { - if (!isStarted()) { - return 0; - } - try { - return journal.getDiskSize() + pageFile.getDiskSize(); - } catch (IOException e) { - throw new RuntimeException(e); - } + return storeSize.get(); } public void beginTransaction(ConnectionContext context) throws IOException { diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 4a40c0e779..e9b17043c5 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -39,6 +39,7 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.activemq.ActiveMQMessageAuditNoSync; @@ -190,6 +191,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar protected boolean enableJournalDiskSyncs=true; protected boolean archiveDataLogs; protected File directoryArchive; + protected AtomicLong storeSize = new AtomicLong(0); long checkpointInterval = 5*1000; long cleanupInterval = 30*1000; int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH; @@ -1594,6 +1596,7 @@ public class MessageDatabase extends ServiceSupport implements BrokerServiceAwar manager.setChecksum(checksumJournalFiles || checkForCorruptJournalFiles); manager.setWriteBatchSize(getJournalMaxWriteBatchSize()); manager.setArchiveDataLogs(isArchiveDataLogs()); + manager.setSizeAccumulator(storeSize); if (getDirectoryArchive() != null) { IOHelper.mkdirs(getDirectoryArchive()); manager.setDirectoryArchive(getDirectoryArchive()); diff --git a/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java b/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java index cd84290e5b..b44e1198a4 100644 --- a/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java +++ b/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java @@ -112,7 +112,7 @@ public class Journal { protected final AtomicReference lastAppendLocation = new AtomicReference(); protected Runnable cleanupTask; - protected final AtomicLong totalLength = new AtomicLong(); + protected AtomicLong totalLength = new AtomicLong(); protected boolean archiveDataLogs; private ReplicationTarget replicationTarget; protected boolean checksum; @@ -226,7 +226,11 @@ public class Journal { accessorPool.closeDataFileAccessor(reader); } + int existingLen = dataFile.getLength(); dataFile.setLength(location.getOffset()); + if (existingLen > dataFile.getLength()) { + totalLength.addAndGet(dataFile.getLength() - existingLen); + } if( !dataFile.corruptedBlocks.isEmpty() ) { // Is the end of the data file corrupted? @@ -735,4 +739,8 @@ public class Journal { public int getWriteBatchSize() { return writeBatchSize; } + + public void setSizeAccumulator(AtomicLong storeSizeAccumulator) { + this.totalLength = storeSizeAccumulator; + } }