HBASE-4797 [availability] Skip recovered.edits files with edits we know older than what region currently has (Jimmy Jiang)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1205290 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-11-23 05:23:45 +00:00
parent ef52263373
commit 351e75629f
2 changed files with 328 additions and 181 deletions

View File

@ -2502,16 +2502,36 @@ public class HRegion implements HeapSize { // , Writable{
protected long replayRecoveredEditsIfAny(final Path regiondir, protected long replayRecoveredEditsIfAny(final Path regiondir,
final long minSeqId, final CancelableProgressable reporter, final long minSeqId, final CancelableProgressable reporter,
final MonitoredTask status) final MonitoredTask status)
throws UnsupportedEncodingException, IOException { throws UnsupportedEncodingException, IOException {
long seqid = minSeqId; long seqid = minSeqId;
NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir); NavigableSet<Path> files = HLog.getSplitEditFilesSorted(this.fs, regiondir);
if (files == null || files.isEmpty()) return seqid; if (files == null || files.isEmpty()) return seqid;
boolean checkSafeToSkip = true;
for (Path edits: files) { for (Path edits: files) {
if (edits == null || !this.fs.exists(edits)) { if (edits == null || !this.fs.exists(edits)) {
LOG.warn("Null or non-existent edits file: " + edits); LOG.warn("Null or non-existent edits file: " + edits);
continue; continue;
} }
if (isZeroLengthThenDelete(this.fs, edits)) continue; if (isZeroLengthThenDelete(this.fs, edits)) continue;
if (checkSafeToSkip) {
Path higher = files.higher(edits);
long maxSeqId = Long.MAX_VALUE;
if (higher != null) {
// Edit file name pattern, HLog.EDITFILES_NAME_PATTERN: "-?[0-9]+"
String fileName = higher.getName();
maxSeqId = Math.abs(Long.parseLong(fileName));
}
if (maxSeqId <= minSeqId) {
String msg = "Maximum possible sequenceid for this log is " + maxSeqId
+ ", skipped the whole file, path=" + edits;
LOG.debug(msg);
continue;
} else {
checkSafeToSkip = false;
}
}
try { try {
seqid = replayRecoveredEdits(edits, seqid, reporter); seqid = replayRecoveredEdits(edits, seqid, reporter);
} catch (IOException e) { } catch (IOException e) {
@ -2560,135 +2580,135 @@ public class HRegion implements HeapSize { // , Writable{
status.setStatus("Opening logs"); status.setStatus("Opening logs");
HLog.Reader reader = HLog.getReader(this.fs, edits, conf); HLog.Reader reader = HLog.getReader(this.fs, edits, conf);
try { try {
long currentEditSeqId = minSeqId; long currentEditSeqId = minSeqId;
long firstSeqIdInLog = -1; long firstSeqIdInLog = -1;
long skippedEdits = 0; long skippedEdits = 0;
long editsCount = 0; long editsCount = 0;
long intervalEdits = 0; long intervalEdits = 0;
HLog.Entry entry; HLog.Entry entry;
Store store = null; Store store = null;
boolean reported_once = false; boolean reported_once = false;
try { try {
// How many edits seen before we check elapsed time // How many edits seen before we check elapsed time
int interval = this.conf.getInt("hbase.hstore.report.interval.edits", int interval = this.conf.getInt("hbase.hstore.report.interval.edits",
2000); 2000);
// How often to send a progress report (default 1/2 master timeout) // How often to send a progress report (default 1/2 master timeout)
int period = this.conf.getInt("hbase.hstore.report.period", int period = this.conf.getInt("hbase.hstore.report.period",
this.conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", this.conf.getInt("hbase.master.assignment.timeoutmonitor.timeout",
180000) / 2); 180000) / 2);
long lastReport = EnvironmentEdgeManager.currentTimeMillis(); long lastReport = EnvironmentEdgeManager.currentTimeMillis();
while ((entry = reader.next()) != null) { while ((entry = reader.next()) != null) {
HLogKey key = entry.getKey(); HLogKey key = entry.getKey();
WALEdit val = entry.getEdit(); WALEdit val = entry.getEdit();
if (reporter != null) { if (reporter != null) {
intervalEdits += val.size(); intervalEdits += val.size();
if (intervalEdits >= interval) { if (intervalEdits >= interval) {
// Number of edits interval reached // Number of edits interval reached
intervalEdits = 0; intervalEdits = 0;
long cur = EnvironmentEdgeManager.currentTimeMillis(); long cur = EnvironmentEdgeManager.currentTimeMillis();
if (lastReport + period <= cur) { if (lastReport + period <= cur) {
status.setStatus("Replaying edits..." + status.setStatus("Replaying edits..." +
" skipped=" + skippedEdits + " skipped=" + skippedEdits +
" edits=" + editsCount); " edits=" + editsCount);
// Timeout reached // Timeout reached
if(!reporter.progress()) { if(!reporter.progress()) {
msg = "Progressable reporter failed, stopping replay"; msg = "Progressable reporter failed, stopping replay";
LOG.warn(msg); LOG.warn(msg);
status.abort(msg); status.abort(msg);
throw new IOException(msg); throw new IOException(msg);
}
reported_once = true;
lastReport = cur;
} }
reported_once = true;
lastReport = cur;
} }
} }
}
// Start coprocessor replay here. The coprocessor is for each WALEdit // Start coprocessor replay here. The coprocessor is for each WALEdit
// instead of a KeyValue. // instead of a KeyValue.
if (coprocessorHost != null) { if (coprocessorHost != null) {
status.setStatus("Running pre-WAL-restore hook in coprocessors"); status.setStatus("Running pre-WAL-restore hook in coprocessors");
if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) { if (coprocessorHost.preWALRestore(this.getRegionInfo(), key, val)) {
// if bypass this log entry, ignore it ... // if bypass this log entry, ignore it ...
continue; continue;
}
} }
}
if (firstSeqIdInLog == -1) { if (firstSeqIdInLog == -1) {
firstSeqIdInLog = key.getLogSeqNum(); firstSeqIdInLog = key.getLogSeqNum();
}
// Now, figure if we should skip this edit.
if (key.getLogSeqNum() <= currentEditSeqId) {
skippedEdits++;
continue;
}
currentEditSeqId = key.getLogSeqNum();
boolean flush = false;
for (KeyValue kv: val.getKeyValues()) {
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
if (kv.matchingFamily(HLog.METAFAMILY) ||
!Bytes.equals(key.getEncodedRegionName(), this.regionInfo.getEncodedNameAsBytes())) {
skippedEdits++;
continue;
}
// Figure which store the edit is meant for.
if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
store = this.stores.get(kv.getFamily());
} }
if (store == null) { // Now, figure if we should skip this edit.
// This should never happen. Perhaps schema was changed between if (key.getLogSeqNum() <= currentEditSeqId) {
// crash and redeploy?
LOG.warn("No family for " + kv);
skippedEdits++; skippedEdits++;
continue; continue;
} }
// Once we are over the limit, restoreEdit will keep returning true to currentEditSeqId = key.getLogSeqNum();
// flush -- but don't flush until we've played all the kvs that make up boolean flush = false;
// the WALEdit. for (KeyValue kv: val.getKeyValues()) {
flush = restoreEdit(store, kv); // Check this edit is for me. Also, guard against writing the special
editsCount++; // METACOLUMN info such as HBASE::CACHEFLUSH entries
} if (kv.matchingFamily(HLog.METAFAMILY) ||
if (flush) internalFlushcache(null, currentEditSeqId, status); !Bytes.equals(key.getEncodedRegionName(), this.regionInfo.getEncodedNameAsBytes())) {
skippedEdits++;
continue;
}
// Figure which store the edit is meant for.
if (store == null || !kv.matchingFamily(store.getFamily().getName())) {
store = this.stores.get(kv.getFamily());
}
if (store == null) {
// This should never happen. Perhaps schema was changed between
// crash and redeploy?
LOG.warn("No family for " + kv);
skippedEdits++;
continue;
}
// Once we are over the limit, restoreEdit will keep returning true to
// flush -- but don't flush until we've played all the kvs that make up
// the WALEdit.
flush = restoreEdit(store, kv);
editsCount++;
}
if (flush) internalFlushcache(null, currentEditSeqId, status);
if (coprocessorHost != null) { if (coprocessorHost != null) {
coprocessorHost.postWALRestore(this.getRegionInfo(), key, val); coprocessorHost.postWALRestore(this.getRegionInfo(), key, val);
}
} }
} } catch (EOFException eof) {
} catch (EOFException eof) {
Path p = HLog.moveAsideBadEditsFile(fs, edits);
msg = "Encountered EOF. Most likely due to Master failure during " +
"log spliting, so we have this data in another edit. " +
"Continuing, but renaming " + edits + " as " + p;
LOG.warn(msg, eof);
status.abort(msg);
} catch (IOException ioe) {
// If the IOE resulted from bad file format,
// then this problem is idempotent and retrying won't help
if (ioe.getCause() instanceof ParseException) {
Path p = HLog.moveAsideBadEditsFile(fs, edits); Path p = HLog.moveAsideBadEditsFile(fs, edits);
msg = "File corruption encountered! " + msg = "Encountered EOF. Most likely due to Master failure during " +
"log spliting, so we have this data in another edit. " +
"Continuing, but renaming " + edits + " as " + p; "Continuing, but renaming " + edits + " as " + p;
LOG.warn(msg, ioe); LOG.warn(msg, eof);
status.setStatus(msg); status.abort(msg);
} else { } catch (IOException ioe) {
status.abort(StringUtils.stringifyException(ioe)); // If the IOE resulted from bad file format,
// other IO errors may be transient (bad network connection, // then this problem is idempotent and retrying won't help
// checksum exception on one datanode, etc). throw & retry if (ioe.getCause() instanceof ParseException) {
throw ioe; Path p = HLog.moveAsideBadEditsFile(fs, edits);
msg = "File corruption encountered! " +
"Continuing, but renaming " + edits + " as " + p;
LOG.warn(msg, ioe);
status.setStatus(msg);
} else {
status.abort(StringUtils.stringifyException(ioe));
// other IO errors may be transient (bad network connection,
// checksum exception on one datanode, etc). throw & retry
throw ioe;
}
} }
} if (reporter != null && !reported_once) {
if (reporter != null && !reported_once) { reporter.progress();
reporter.progress(); }
} msg = "Applied " + editsCount + ", skipped " + skippedEdits +
msg = "Applied " + editsCount + ", skipped " + skippedEdits + ", firstSequenceidInLog=" + firstSeqIdInLog +
", firstSequenceidInLog=" + firstSeqIdInLog + ", maxSequenceidInLog=" + currentEditSeqId + ", path=" + edits;
", maxSequenceidInLog=" + currentEditSeqId + ", path=" + edits; status.markComplete(msg);
status.markComplete(msg); LOG.debug(msg);
LOG.debug(msg); return currentEditSeqId;
return currentEditSeqId;
} finally { } finally {
reader.close(); reader.close();
status.cleanup(); status.cleanup();
@ -2712,7 +2732,7 @@ public class HRegion implements HeapSize { // , Writable{
* @throws IOException * @throws IOException
*/ */
private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p) private static boolean isZeroLengthThenDelete(final FileSystem fs, final Path p)
throws IOException { throws IOException {
FileStatus stat = fs.getFileStatus(p); FileStatus stat = fs.getFileStatus(p);
if (stat.getLen() > 0) return false; if (stat.getLen() > 0) return false;
LOG.warn("File " + p + " is zero-length, deleting."); LOG.warn("File " + p + " is zero-length, deleting.");
@ -2721,7 +2741,7 @@ public class HRegion implements HeapSize { // , Writable{
} }
protected Store instantiateHStore(Path tableDir, HColumnDescriptor c) protected Store instantiateHStore(Path tableDir, HColumnDescriptor c)
throws IOException { throws IOException {
return new Store(tableDir, this, c, this.fs, this.conf); return new Store(tableDir, this, c, this.fs, this.conf);
} }

View File

@ -34,16 +34,30 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.MultithreadedTestUtil;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread; import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter; import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@ -52,9 +66,13 @@ import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.NullComparator; import org.apache.hadoop.hbase.filter.NullComparator;
import org.apache.hadoop.hbase.filter.PrefixFilter; import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
@ -64,9 +82,9 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PairOfSameType; import org.apache.hadoop.hbase.util.PairOfSameType;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import org.junit.experimental.categories.Category;
/** /**
@ -2809,6 +2827,115 @@ public class TestHRegion extends HBaseTestCase {
region.get(g, null); region.get(g, null);
} }
public void testSkipRecoveredEditsReplay() throws Exception {
String method = "testSkipRecoveredEditsReplay";
byte[] tableName = Bytes.toBytes(method);
byte[] family = Bytes.toBytes("family");
Configuration conf = HBaseConfiguration.create();
initHRegion(tableName, method, conf, family);
Path regiondir = region.getRegionDir();
FileSystem fs = region.getFilesystem();
byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
Path recoveredEditsDir = HLog.getRegionDirRecoveredEditsDir(regiondir);
long maxSeqId = 1050;
long minSeqId = 1000;
for (long i = minSeqId; i <= maxSeqId; i += 10) {
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
HLog.Writer writer = HLog.createWriter(fs, recoveredEdits, conf);
long time = System.nanoTime();
WALEdit edit = new WALEdit();
edit.add(new KeyValue(row, family, Bytes.toBytes(i),
time, KeyValue.Type.Put, Bytes.toBytes(i)));
writer.append(new HLog.Entry(new HLogKey(regionName, tableName,
i, time, HConstants.DEFAULT_CLUSTER_ID), edit));
writer.close();
}
MonitoredTask status = TaskMonitor.get().createStatus(method);
long seqId = region.replayRecoveredEditsIfAny(regiondir, minSeqId-1, null, status);
assertEquals(maxSeqId, seqId);
Get get = new Get(row);
Result result = region.get(get, null);
for (long i = minSeqId; i <= maxSeqId; i += 10) {
List<KeyValue> kvs = result.getColumn(family, Bytes.toBytes(i));
assertEquals(1, kvs.size());
assertEquals(Bytes.toBytes(i), kvs.get(0).getValue());
}
}
public void testSkipRecoveredEditsReplaySomeIgnored() throws Exception {
String method = "testSkipRecoveredEditsReplaySomeIgnored";
byte[] tableName = Bytes.toBytes(method);
byte[] family = Bytes.toBytes("family");
initHRegion(tableName, method, HBaseConfiguration.create(), family);
Path regiondir = region.getRegionDir();
FileSystem fs = region.getFilesystem();
byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
Path recoveredEditsDir = HLog.getRegionDirRecoveredEditsDir(regiondir);
long maxSeqId = 1050;
long minSeqId = 1000;
for (long i = minSeqId; i <= maxSeqId; i += 10) {
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
HLog.Writer writer = HLog.createWriter(fs, recoveredEdits, conf);
long time = System.nanoTime();
WALEdit edit = new WALEdit();
edit.add(new KeyValue(row, family, Bytes.toBytes(i),
time, KeyValue.Type.Put, Bytes.toBytes(i)));
writer.append(new HLog.Entry(new HLogKey(regionName, tableName,
i, time, HConstants.DEFAULT_CLUSTER_ID), edit));
writer.close();
}
long recoverSeqId = 1030;
MonitoredTask status = TaskMonitor.get().createStatus(method);
long seqId = region.replayRecoveredEditsIfAny(regiondir, recoverSeqId-1, null, status);
assertEquals(maxSeqId, seqId);
Get get = new Get(row);
Result result = region.get(get, null);
for (long i = minSeqId; i <= maxSeqId; i += 10) {
List<KeyValue> kvs = result.getColumn(family, Bytes.toBytes(i));
if (i < recoverSeqId) {
assertEquals(0, kvs.size());
} else {
assertEquals(1, kvs.size());
assertEquals(Bytes.toBytes(i), kvs.get(0).getValue());
}
}
}
public void testSkipRecoveredEditsReplayAllIgnored() throws Exception {
String method = "testSkipRecoveredEditsReplayAllIgnored";
byte[] tableName = Bytes.toBytes(method);
byte[] family = Bytes.toBytes("family");
initHRegion(tableName, method, HBaseConfiguration.create(), family);
Path regiondir = region.getRegionDir();
FileSystem fs = region.getFilesystem();
Path recoveredEditsDir = HLog.getRegionDirRecoveredEditsDir(regiondir);
for (int i = 1000; i < 1050; i += 10) {
Path recoveredEdits = new Path(
recoveredEditsDir, String.format("%019d", i));
FSDataOutputStream dos= fs.create(recoveredEdits);
dos.writeInt(i);
dos.close();
}
long minSeqId = 2000;
Path recoveredEdits = new Path(
recoveredEditsDir, String.format("%019d", minSeqId-1));
FSDataOutputStream dos= fs.create(recoveredEdits);
dos.close();
long seqId = region.replayRecoveredEditsIfAny(regiondir, minSeqId, null, null);
assertEquals(minSeqId, seqId);
}
public void testIndexesScanWithOneDeletedRow() throws IOException { public void testIndexesScanWithOneDeletedRow() throws IOException {
byte[] tableName = Bytes.toBytes("testIndexesScanWithOneDeletedRow"); byte[] tableName = Bytes.toBytes("testIndexesScanWithOneDeletedRow");
byte[] family = Bytes.toBytes("family"); byte[] family = Bytes.toBytes("family");