From 63ba21eca557ac66937a5cb739b8b64ac1057a36 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Sat, 16 Oct 2010 05:27:57 +0000 Subject: [PATCH] HBASE-2933 Skip EOF Errors during Log Recovery git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1023183 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + .../hadoop/hbase/regionserver/HRegion.java | 131 +++++++++--------- 2 files changed, 69 insertions(+), 64 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 0f7e222f20e..45f7a50cbdc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -586,6 +586,8 @@ Release 0.21.0 - Unreleased HBASE-3044 [replication] ReplicationSource won't cleanup logs if there's nothing to replicate HBASE-3113 Don't reassign regions if cluster is being shutdown + HBASE-2933 Skip EOF Errors during Log Recovery + (Nicolas Spiegelberg via Stack) IMPROVEMENTS diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 5f829e4f176..977784cbeae 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.EOFException; import java.io.IOException; import java.io.InterruptedIOException; import java.io.UnsupportedEncodingException; @@ -1807,83 +1808,85 @@ public class HRegion implements HeapSize { // , Writable{ LOG.info("Replaying edits from " + edits + "; minSequenceid=" + minSeqId); HLog.Reader reader = HLog.getReader(this.fs, edits, conf); try { - return replayRecoveredEdits(reader, minSeqId, reporter); - } finally { - reader.close(); - } - } - - /* @param reader Reader against file of recovered edits. - * @param minSeqId Any edit found in split editlogs needs to be in excess of - * this minSeqId to be applied, else its skipped. - * @param reporter - * @return the sequence id of the last edit added to this region out of the - * recovered edits log or minSeqId if nothing added from editlogs. - * @throws IOException - */ - private long replayRecoveredEdits(final HLog.Reader reader, - final long minSeqId, final Progressable reporter) - throws IOException { long currentEditSeqId = minSeqId; long firstSeqIdInLog = -1; long skippedEdits = 0; long editsCount = 0; HLog.Entry entry; Store store = null; - // How many edits to apply before we send a progress report. - int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000); - while ((entry = reader.next()) != null) { - HLogKey key = entry.getKey(); - WALEdit val = entry.getEdit(); - if (firstSeqIdInLog == -1) { - firstSeqIdInLog = key.getLogSeqNum(); - } - // Now, figure if we should skip this edit. - if (key.getLogSeqNum() <= currentEditSeqId) { - skippedEdits++; - continue; - } - currentEditSeqId = key.getLogSeqNum(); - boolean flush = false; - for (KeyValue kv: val.getKeyValues()) { - // Check this edit is for me. Also, guard against writing the special - // METACOLUMN info such as HBASE::CACHEFLUSH entries - if (kv.matchingFamily(HLog.METAFAMILY) || - !Bytes.equals(key.getEncodedRegionName(), this.regionInfo.getEncodedNameAsBytes())) { - skippedEdits++; - continue; - } - // Figure which store the edit is meant for. - if (store == null || !kv.matchingFamily(store.getFamily().getName())) { - store = this.stores.get(kv.getFamily()); - } - if (store == null) { - // This should never happen. Perhaps schema was changed between - // crash and redeploy? - LOG.warn("No family for " + kv); - skippedEdits++; - continue; - } - // Once we are over the limit, restoreEdit will keep returning true to - // flush -- but don't flush until we've played all the kvs that make up - // the WALEdit. - flush = restoreEdit(store, kv); - editsCount++; - } - if (flush) internalFlushcache(null, currentEditSeqId); - // Every 'interval' edits, tell the reporter we're making progress. - // Have seen 60k edits taking 3minutes to complete. - if (reporter != null && (editsCount % interval) == 0) { - reporter.progress(); + try { + // How many edits to apply before we send a progress report. + int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000); + while ((entry = reader.next()) != null) { + HLogKey key = entry.getKey(); + WALEdit val = entry.getEdit(); + if (firstSeqIdInLog == -1) { + firstSeqIdInLog = key.getLogSeqNum(); + } + // Now, figure if we should skip this edit. + if (key.getLogSeqNum() <= currentEditSeqId) { + skippedEdits++; + continue; + } + currentEditSeqId = key.getLogSeqNum(); + boolean flush = false; + for (KeyValue kv: val.getKeyValues()) { + // Check this edit is for me. Also, guard against writing the special + // METACOLUMN info such as HBASE::CACHEFLUSH entries + if (kv.matchingFamily(HLog.METAFAMILY) || + !Bytes.equals(key.getEncodedRegionName(), this.regionInfo.getEncodedNameAsBytes())) { + skippedEdits++; + continue; + } + // Figure which store the edit is meant for. + if (store == null || !kv.matchingFamily(store.getFamily().getName())) { + store = this.stores.get(kv.getFamily()); + } + if (store == null) { + // This should never happen. Perhaps schema was changed between + // crash and redeploy? + LOG.warn("No family for " + kv); + skippedEdits++; + continue; + } + // Once we are over the limit, restoreEdit will keep returning true to + // flush -- but don't flush until we've played all the kvs that make up + // the WALEdit. + flush = restoreEdit(store, kv); + editsCount++; + } + if (flush) internalFlushcache(null, currentEditSeqId); + + // Every 'interval' edits, tell the reporter we're making progress. + // Have seen 60k edits taking 3minutes to complete. + if (reporter != null && (editsCount % interval) == 0) { + reporter.progress(); + } + } + } catch (EOFException eof) { + Path p = HLog.moveAsideBadEditsFile(fs, edits); + LOG.warn("Encountered EOF. Most likely due to Master failure during " + + "log spliting, so we have this data in another edit. " + + "Continuing, but renaming " + edits + " as " + p, eof); + } catch (IOException ioe) { + if (ioe.getMessage().startsWith("File is corrupt")) { + Path p = HLog.moveAsideBadEditsFile(fs, edits); + LOG.warn("File corruption encountered! " + + "Continuing, but renaming " + edits + " as " + p, ioe); + } else { + throw ioe; } } if (LOG.isDebugEnabled()) { LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits + - ", firstSequenceidInLog=" + firstSeqIdInLog + - ", maxSequenceidInLog=" + currentEditSeqId); + ", firstSequenceidInLog=" + firstSeqIdInLog + + ", maxSequenceidInLog=" + currentEditSeqId); } return currentEditSeqId; + } finally { + reader.close(); + } } /**