HBASE-2933 Skip EOF Errors during Log Recovery

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1023183 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-10-16 05:27:57 +00:00
parent 0f27ee880c
commit 63ba21eca5
2 changed files with 69 additions and 64 deletions

View File

@ -586,6 +586,8 @@ Release 0.21.0 - Unreleased
HBASE-3044 [replication] ReplicationSource won't cleanup logs if there's
nothing to replicate
HBASE-3113 Don't reassign regions if cluster is being shutdown
HBASE-2933 Skip EOF Errors during Log Recovery
(Nicolas Spiegelberg via Stack)
IMPROVEMENTS

View File

@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.EOFException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UnsupportedEncodingException;
@ -1807,83 +1808,85 @@ public class HRegion implements HeapSize { // , Writable{
LOG.info("Replaying edits from " + edits + "; minSequenceid=" + minSeqId);
HLog.Reader reader = HLog.getReader(this.fs, edits, conf);
try {
return replayRecoveredEdits(reader, minSeqId, reporter);
} finally {
reader.close();
}
}
/* @param reader Reader against file of recovered edits.
* @param minSeqId Any edit found in split editlogs needs to be in excess of
* this minSeqId to be applied, else its skipped.
* @param reporter
* @return the sequence id of the last edit added to this region out of the
* recovered edits log or <code>minSeqId</code> if nothing added from editlogs.
* @throws IOException
*/
private long replayRecoveredEdits(final HLog.Reader reader,
final long minSeqId, final Progressable reporter)
throws IOException {
long currentEditSeqId = minSeqId;
long firstSeqIdInLog = -1;
long skippedEdits = 0;
long editsCount = 0;
HLog.Entry entry;
Store store = null;
// How many edits to apply before we send a progress report.
int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
while ((entry = reader.next()) != null) {
HLogKey key = entry.getKey();
WALEdit val = entry.getEdit();
if (firstSeqIdInLog == -1) {
firstSeqIdInLog = key.getLogSeqNum();
}
// Now, figure if we should skip this edit.
if (key.getLogSeqNum() <= currentEditSeqId) {
skippedEdits++;
continue;
}
currentEditSeqId = key.getLogSeqNum();
boolean flush = false;
for (KeyValue kv: val.getKeyValues()) {
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
if (kv.matchingFamily(HLog.METAFAMILY) ||
!Bytes.equals(key.getEncodedRegionName(), this.regionInfo.getEncodedNameAsBytes())) {
skippedEdits++;
continue;
}
// Figure which store the edit is meant for.
if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
store = this.stores.get(kv.getFamily());
}
if (store == null) {
// This should never happen. Perhaps schema was changed between
// crash and redeploy?
LOG.warn("No family for " + kv);
skippedEdits++;
continue;
}
// Once we are over the limit, restoreEdit will keep returning true to
// flush -- but don't flush until we've played all the kvs that make up
// the WALEdit.
flush = restoreEdit(store, kv);
editsCount++;
}
if (flush) internalFlushcache(null, currentEditSeqId);
// Every 'interval' edits, tell the reporter we're making progress.
// Have seen 60k edits taking 3minutes to complete.
if (reporter != null && (editsCount % interval) == 0) {
reporter.progress();
try {
// How many edits to apply before we send a progress report.
int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000);
while ((entry = reader.next()) != null) {
HLogKey key = entry.getKey();
WALEdit val = entry.getEdit();
if (firstSeqIdInLog == -1) {
firstSeqIdInLog = key.getLogSeqNum();
}
// Now, figure if we should skip this edit.
if (key.getLogSeqNum() <= currentEditSeqId) {
skippedEdits++;
continue;
}
currentEditSeqId = key.getLogSeqNum();
boolean flush = false;
for (KeyValue kv: val.getKeyValues()) {
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
if (kv.matchingFamily(HLog.METAFAMILY) ||
!Bytes.equals(key.getEncodedRegionName(), this.regionInfo.getEncodedNameAsBytes())) {
skippedEdits++;
continue;
}
// Figure which store the edit is meant for.
if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
store = this.stores.get(kv.getFamily());
}
if (store == null) {
// This should never happen. Perhaps schema was changed between
// crash and redeploy?
LOG.warn("No family for " + kv);
skippedEdits++;
continue;
}
// Once we are over the limit, restoreEdit will keep returning true to
// flush -- but don't flush until we've played all the kvs that make up
// the WALEdit.
flush = restoreEdit(store, kv);
editsCount++;
}
if (flush) internalFlushcache(null, currentEditSeqId);
// Every 'interval' edits, tell the reporter we're making progress.
// Have seen 60k edits taking 3minutes to complete.
if (reporter != null && (editsCount % interval) == 0) {
reporter.progress();
}
}
} catch (EOFException eof) {
Path p = HLog.moveAsideBadEditsFile(fs, edits);
LOG.warn("Encountered EOF. Most likely due to Master failure during " +
"log spliting, so we have this data in another edit. " +
"Continuing, but renaming " + edits + " as " + p, eof);
} catch (IOException ioe) {
if (ioe.getMessage().startsWith("File is corrupt")) {
Path p = HLog.moveAsideBadEditsFile(fs, edits);
LOG.warn("File corruption encountered! " +
"Continuing, but renaming " + edits + " as " + p, ioe);
} else {
throw ioe;
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Applied " + editsCount + ", skipped " + skippedEdits +
", firstSequenceidInLog=" + firstSeqIdInLog +
", maxSequenceidInLog=" + currentEditSeqId);
", firstSequenceidInLog=" + firstSeqIdInLog +
", maxSequenceidInLog=" + currentEditSeqId);
}
return currentEditSeqId;
} finally {
reader.close();
}
}
/**