HBASE-10829 Flush is skipped after log replay if the last recovered edits file is skipped

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1581947 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Enis Soztutar 2014-03-26 17:21:36 +00:00
parent af8d406d14
commit 28efa14cc3
2 changed files with 69 additions and 4 deletions

View File

@ -3081,7 +3081,8 @@ public class HRegion implements HeapSize { // , Writable{
}
try {
seqid = replayRecoveredEdits(edits, maxSeqIdInStores, reporter);
// replay the edits. Replay can return -1 if everything is skipped, only update if seqId is greater
seqid = Math.max(seqid, replayRecoveredEdits(edits, maxSeqIdInStores, reporter));
} catch (IOException e) {
boolean skipErrors = conf.getBoolean(
HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
@ -3208,6 +3209,7 @@ public class HRegion implements HeapSize { // , Writable{
if (firstSeqIdInLog == -1) {
firstSeqIdInLog = key.getLogSeqNum();
}
currentEditSeqId = key.getLogSeqNum();
boolean flush = false;
for (KeyValue kv: val.getKeyValues()) {
// Check this edit is for me. Also, guard against writing the special
@ -3242,7 +3244,6 @@ public class HRegion implements HeapSize { // , Writable{
skippedEdits++;
continue;
}
currentEditSeqId = key.getLogSeqNum();
// Once we are over the limit, restoreEdit will keep returning true to
// flush -- but don't flush until we've played all the kvs that make up
// the WALEdit.

View File

@ -140,10 +140,11 @@ import org.junit.rules.TestName;
import org.mockito.Mockito;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
/**
* Basic stand-alone testing of HRegion. No clusters!
*
*
* A lot of the meta information for an HRegion now lives inside other HRegions
* or in the HBaseMaster, so only basic testing is possible.
*/
@ -209,7 +210,7 @@ public class TestHRegion {
* case previous flush fails and leaves some data in snapshot. The bug could cause loss of data
* in current memstore. The fix is removing all conditions except abort check so we ensure 2
* flushes for region close."
* @throws IOException
* @throws IOException
*/
@Test (timeout=60000)
public void testCloseCarryingSnapshot() throws IOException {
@ -258,6 +259,7 @@ public class TestHRegion {
// Inject our faulty LocalFileSystem
conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
user.runAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// Make sure it worked (above is sensitive to caching details in hadoop core)
FileSystem fs = FileSystem.get(conf);
@ -318,6 +320,7 @@ public class TestHRegion {
// Inject our faulty LocalFileSystem
conf.setClass("fs.file.impl", FaultyFileSystem.class, FileSystem.class);
user.runAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
// Make sure it worked (above is sensitive to caching details in hadoop core)
FileSystem fs = FileSystem.get(conf);
@ -574,6 +577,67 @@ public class TestHRegion {
}
}
@Test
public void testSkipRecoveredEditsReplayTheLastFileIgnored() throws Exception {
String method = "testSkipRecoveredEditsReplayTheLastFileIgnored";
TableName tableName = TableName.valueOf(method);
byte[] family = Bytes.toBytes("family");
this.region = initHRegion(tableName, method, CONF, family);
try {
Path regiondir = region.getRegionFileSystem().getRegionDir();
FileSystem fs = region.getRegionFileSystem().getFileSystem();
byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes();
assertEquals(0, region.getStoreFileList(
region.getStores().keySet().toArray(new byte[0][])).size());
Path recoveredEditsDir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
long maxSeqId = 1050;
long minSeqId = 1000;
for (long i = minSeqId; i <= maxSeqId; i += 10) {
Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i));
fs.create(recoveredEdits);
HLog.Writer writer = HLogFactory.createRecoveredEditsWriter(fs, recoveredEdits, CONF);
long time = System.nanoTime();
WALEdit edit = null;
if (i == maxSeqId) {
edit = WALEdit.createCompaction(CompactionDescriptor.newBuilder()
.setTableName(ByteString.copyFrom(tableName.getName()))
.setFamilyName(ByteString.copyFrom(regionName))
.setEncodedRegionName(ByteString.copyFrom(regionName))
.setStoreHomeDirBytes(ByteString.copyFrom(Bytes.toBytes(regiondir.toString())))
.build());
} else {
edit = new WALEdit();
edit.add(new KeyValue(row, family, Bytes.toBytes(i), time, KeyValue.Type.Put, Bytes
.toBytes(i)));
}
writer.append(new HLog.Entry(new HLogKey(regionName, tableName, i, time,
HConstants.DEFAULT_CLUSTER_ID), edit));
writer.close();
}
long recoverSeqId = 1030;
Map<byte[], Long> maxSeqIdInStores = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
MonitoredTask status = TaskMonitor.get().createStatus(method);
for (Store store : region.getStores().values()) {
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(), recoverSeqId - 1);
}
long seqId = region.replayRecoveredEditsIfAny(regiondir, maxSeqIdInStores, null, status);
assertEquals(maxSeqId, seqId);
// assert that the files are flushed
assertEquals(1, region.getStoreFileList(
region.getStores().keySet().toArray(new byte[0][])).size());
} finally {
HRegion.closeHRegion(this.region);
this.region = null;
} }
@Test
public void testRecoveredEditsReplayCompaction() throws Exception {
String method = name.getMethodName();