From 5cf4d83556831672aff36e3c06dd0b9cf9e81174 Mon Sep 17 00:00:00 2001 From: Bosanac Dejan Date: Tue, 17 Nov 2009 09:53:49 +0000 Subject: [PATCH] https://issues.apache.org/activemq/browse/AMQ-2042 - kahadb cleaning up after io exception git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@881221 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/kahadb/page/PageFile.java | 144 +++++++++--------- 1 file changed, 76 insertions(+), 68 deletions(-) diff --git a/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java b/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java index 065f355e77..0202fa2bcd 100644 --- a/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java +++ b/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java @@ -177,6 +177,10 @@ public class PageFile { diskBound=null; return current == null; } + + boolean isDone() { + return diskBound == null && current == null; + } } @@ -939,10 +943,7 @@ public class PageFile { batch = new ArrayList(writes.size()); // build a write batch from the current write cache. - Iterator it = writes.keySet().iterator(); - while (it.hasNext()) { - Long key = it.next(); - PageWrite write = writes.get(key); + for (PageWrite write : writes.values()) { batch.add(write); // Move the current write to the diskBound write, this lets folks update the // page again without blocking for this write. @@ -958,75 +959,82 @@ public class PageFile { this.checkpointLatch=null; } - - if (enableRecoveryFile) { - - // Using Adler-32 instead of CRC-32 because it's much faster and it's - // weakness for short messages with few hundred bytes is not a factor in this case since we know - // our write batches are going to much larger. - Checksum checksum = new Adler32(); - for (PageWrite w : batch) { - try { - checksum.update(w.diskBound, 0, pageSize); - } catch (Throwable t) { - throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t); - } - } - - // Can we shrink the recovery buffer?? - if( recoveryPageCount > recoveryFileMaxPageCount ) { - int t = Math.max(recoveryFileMinPageCount, batch.size()); - recoveryFile.setLength(recoveryFileSizeForPages(t)); - } - - // Record the page writes in the recovery buffer. - recoveryFile.seek(0); - // Store the next tx id... - recoveryFile.writeLong(nextTxid.get()); - // Store the checksum for thw write batch so that on recovery we know if we have a consistent - // write batch on disk. - recoveryFile.writeLong(checksum.getValue()); - // Write the # of pages that will follow - recoveryFile.writeInt(batch.size()); - - - // Write the pages. - recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); - for (PageWrite w : batch) { - recoveryFile.writeLong(w.page.getPageId()); - recoveryFile.write(w.diskBound, 0, pageSize); + try { + if (enableRecoveryFile) { + + // Using Adler-32 instead of CRC-32 because it's much faster and + // it's + // weakness for short messages with few hundred bytes is not a + // factor in this case since we know + // our write batches are going to much larger. + Checksum checksum = new Adler32(); + for (PageWrite w : batch) { + try { + checksum.update(w.diskBound, 0, pageSize); + } catch (Throwable t) { + throw IOExceptionSupport.create( + "Cannot create recovery file. Reason: " + t, t); + } + } + + // Can we shrink the recovery buffer?? + if (recoveryPageCount > recoveryFileMaxPageCount) { + int t = Math.max(recoveryFileMinPageCount, batch.size()); + recoveryFile.setLength(recoveryFileSizeForPages(t)); + } + + // Record the page writes in the recovery buffer. + recoveryFile.seek(0); + // Store the next tx id... + recoveryFile.writeLong(nextTxid.get()); + // Store the checksum for thw write batch so that on recovery we + // know if we have a consistent + // write batch on disk. + recoveryFile.writeLong(checksum.getValue()); + // Write the # of pages that will follow + recoveryFile.writeInt(batch.size()); + + // Write the pages. + recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE); + + for (PageWrite w : batch) { + recoveryFile.writeLong(w.page.getPageId()); + recoveryFile.write(w.diskBound, 0, pageSize); + } + + if (enableDiskSyncs) { + // Sync to make sure recovery buffer writes land on disk.. + recoveryFile.getFD().sync(); + } + + recoveryPageCount = batch.size(); } - + + for (PageWrite w : batch) { + writeFile.seek(toOffset(w.page.getPageId())); + writeFile.write(w.diskBound, 0, pageSize); + w.done(); + } + + // Sync again if (enableDiskSyncs) { - // Sync to make sure recovery buffer writes land on disk.. - recoveryFile.getFD().sync(); + writeFile.getFD().sync(); } - - recoveryPageCount = batch.size(); - } - - - for (PageWrite w : batch) { - writeFile.seek(toOffset(w.page.getPageId())); - writeFile.write(w.diskBound, 0, pageSize); - } - - // Sync again - if( enableDiskSyncs ) { - writeFile.getFD().sync(); - } - - synchronized( writes ) { - for (PageWrite w : batch) { - // If there are no more pending writes, then remove it from the write cache. - if( w.done() ) { - writes.remove(w.page.getPageId()); + + } finally { + synchronized (writes) { + for (PageWrite w : batch) { + // If there are no more pending writes, then remove it from + // the write cache. + if (w.isDone()) { + writes.remove(w.page.getPageId()); + } } } - } - - if( checkpointLatch!=null ) { - checkpointLatch.countDown(); + + if( checkpointLatch!=null ) { + checkpointLatch.countDown(); + } } }