HBASE-6979 recovered.edits file should not break distributed log splitting

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1399352 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
jxiang 2012-10-17 17:21:25 +00:00
parent 6d731a10e1
commit 9c88d9d9ea
2 changed files with 44 additions and 5 deletions

View File

@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.master.SplitLogManager;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.LastSequenceId; import org.apache.hadoop.hbase.regionserver.LastSequenceId;
import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry; import org.apache.hadoop.hbase.regionserver.wal.HLog.Entry;
@ -596,8 +595,8 @@ public class HLogSplitter {
final Entry logEntry, final Path rootDir, boolean isCreate) final Entry logEntry, final Path rootDir, boolean isCreate)
throws IOException { throws IOException {
Path tableDir = HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename()); Path tableDir = HTableDescriptor.getTableDir(rootDir, logEntry.getKey().getTablename());
Path regiondir = HRegion.getRegionDir(tableDir, String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
Bytes.toString(logEntry.getKey().getEncodedRegionName())); Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
Path dir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir); Path dir = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
if (!fs.exists(regiondir)) { if (!fs.exists(regiondir)) {
@ -606,6 +605,21 @@ public class HLogSplitter {
" already split so it's safe to discard those edits."); " already split so it's safe to discard those edits.");
return null; return null;
} }
if (fs.exists(dir) && fs.isFile(dir)) {
Path tmp = new Path("/tmp");
if (!fs.exists(tmp)) {
fs.mkdirs(tmp);
}
tmp = new Path(tmp,
HLog.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
LOG.warn("Found existing old file: " + dir + ". It could be some "
+ "leftover of an old installation. It should be a folder instead. "
+ "So moving it to " + tmp);
if (!fs.rename(dir, tmp)) {
LOG.warn("Failed to sideline old file " + dir);
}
}
if (isCreate && !fs.exists(dir)) { if (isCreate && !fs.exists(dir)) {
if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir); if (!fs.mkdirs(dir)) LOG.warn("mkdir failed on " + dir);
} }

View File

@ -178,6 +178,33 @@ public class TestHLogSplit {
assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName()); assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
} }
/**
* Test old recovered edits file doesn't break HLogSplitter.
* This is useful in upgrading old instances.
*/
@Test
public void testOldRecoveredEditsFileSidelined() throws IOException {
FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
byte [] encoded = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
Path tdir = new Path(hbaseDir, Bytes.toString(HConstants.META_TABLE_NAME));
Path regiondir = new Path(tdir,
HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
fs.mkdirs(regiondir);
long now = System.currentTimeMillis();
HLog.Entry entry =
new HLog.Entry(new HLogKey(encoded,
HConstants.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
new WALEdit());
Path parent = HLogUtil.getRegionDirRecoveredEditsDir(regiondir);
assertEquals(parent.getName(), HLog.RECOVERED_EDITS_DIR);
fs.createNewFile(parent); // create a recovered.edits file
Path p = HLogSplitter.getRegionSplitEditsPath(fs, entry, hbaseDir, true);
String parentOfParent = p.getParent().getParent().getName();
assertEquals(parentOfParent, HRegionInfo.FIRST_META_REGIONINFO.getEncodedName());
HLogFactory.createWriter(fs, p, conf);
}
@Test(expected = OrphanHLogAfterSplitException.class) @Test(expected = OrphanHLogAfterSplitException.class)
public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted() public void testSplitFailsIfNewHLogGetsCreatedAfterSplitStarted()
throws IOException { throws IOException {
@ -1256,8 +1283,6 @@ public class TestHLogSplit {
break; break;
} }
} }
private void closeOrFlush(boolean close, FSDataOutputStream out) private void closeOrFlush(boolean close, FSDataOutputStream out)