git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@881221 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Bosanac Dejan 2009-11-17 09:53:49 +00:00
parent a15b39569b
commit 5cf4d83556
1 changed files with 76 additions and 68 deletions

View File

@ -177,6 +177,10 @@ public class PageFile {
diskBound=null;
return current == null;
}
boolean isDone() {
return diskBound == null && current == null;
}
}
@ -939,10 +943,7 @@ public class PageFile {
batch = new ArrayList<PageWrite>(writes.size());
// build a write batch from the current write cache.
Iterator<Long> it = writes.keySet().iterator();
while (it.hasNext()) {
Long key = it.next();
PageWrite write = writes.get(key);
for (PageWrite write : writes.values()) {
batch.add(write);
// Move the current write to the diskBound write, this lets folks update the
// page again without blocking for this write.
@ -958,75 +959,82 @@ public class PageFile {
this.checkpointLatch=null;
}
if (enableRecoveryFile) {
// Using Adler-32 instead of CRC-32 because it's much faster and it's
// weakness for short messages with few hundred bytes is not a factor in this case since we know
// our write batches are going to much larger.
Checksum checksum = new Adler32();
for (PageWrite w : batch) {
try {
checksum.update(w.diskBound, 0, pageSize);
} catch (Throwable t) {
throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
}
}
// Can we shrink the recovery buffer??
if( recoveryPageCount > recoveryFileMaxPageCount ) {
int t = Math.max(recoveryFileMinPageCount, batch.size());
recoveryFile.setLength(recoveryFileSizeForPages(t));
}
// Record the page writes in the recovery buffer.
recoveryFile.seek(0);
// Store the next tx id...
recoveryFile.writeLong(nextTxid.get());
// Store the checksum for thw write batch so that on recovery we know if we have a consistent
// write batch on disk.
recoveryFile.writeLong(checksum.getValue());
// Write the # of pages that will follow
recoveryFile.writeInt(batch.size());
// Write the pages.
recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
for (PageWrite w : batch) {
recoveryFile.writeLong(w.page.getPageId());
recoveryFile.write(w.diskBound, 0, pageSize);
try {
if (enableRecoveryFile) {
// Using Adler-32 instead of CRC-32 because it's much faster and
// it's
// weakness for short messages with few hundred bytes is not a
// factor in this case since we know
// our write batches are going to much larger.
Checksum checksum = new Adler32();
for (PageWrite w : batch) {
try {
checksum.update(w.diskBound, 0, pageSize);
} catch (Throwable t) {
throw IOExceptionSupport.create(
"Cannot create recovery file. Reason: " + t, t);
}
}
// Can we shrink the recovery buffer??
if (recoveryPageCount > recoveryFileMaxPageCount) {
int t = Math.max(recoveryFileMinPageCount, batch.size());
recoveryFile.setLength(recoveryFileSizeForPages(t));
}
// Record the page writes in the recovery buffer.
recoveryFile.seek(0);
// Store the next tx id...
recoveryFile.writeLong(nextTxid.get());
// Store the checksum for thw write batch so that on recovery we
// know if we have a consistent
// write batch on disk.
recoveryFile.writeLong(checksum.getValue());
// Write the # of pages that will follow
recoveryFile.writeInt(batch.size());
// Write the pages.
recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
for (PageWrite w : batch) {
recoveryFile.writeLong(w.page.getPageId());
recoveryFile.write(w.diskBound, 0, pageSize);
}
if (enableDiskSyncs) {
// Sync to make sure recovery buffer writes land on disk..
recoveryFile.getFD().sync();
}
recoveryPageCount = batch.size();
}
for (PageWrite w : batch) {
writeFile.seek(toOffset(w.page.getPageId()));
writeFile.write(w.diskBound, 0, pageSize);
w.done();
}
// Sync again
if (enableDiskSyncs) {
// Sync to make sure recovery buffer writes land on disk..
recoveryFile.getFD().sync();
writeFile.getFD().sync();
}
recoveryPageCount = batch.size();
}
for (PageWrite w : batch) {
writeFile.seek(toOffset(w.page.getPageId()));
writeFile.write(w.diskBound, 0, pageSize);
}
// Sync again
if( enableDiskSyncs ) {
writeFile.getFD().sync();
}
synchronized( writes ) {
for (PageWrite w : batch) {
// If there are no more pending writes, then remove it from the write cache.
if( w.done() ) {
writes.remove(w.page.getPageId());
} finally {
synchronized (writes) {
for (PageWrite w : batch) {
// If there are no more pending writes, then remove it from
// the write cache.
if (w.isDone()) {
writes.remove(w.page.getPageId());
}
}
}
}
if( checkpointLatch!=null ) {
checkpointLatch.countDown();
if( checkpointLatch!=null ) {
checkpointLatch.countDown();
}
}
}