From b2afba580b4b9285973b57812741857cf480be4c Mon Sep 17 00:00:00 2001 From: tedyu Date: Sat, 16 Jun 2018 01:34:53 -0700 Subject: [PATCH] HBASE-20723 Custom hbase.wal.dir results in data loss because we write recovered edits into a different place than where the recovering region server looks for them --- .../hadoop/hbase/wal/FSHLogProvider.java | 3 +- .../apache/hadoop/hbase/wal/WALSplitter.java | 55 ++++++++++--------- .../hadoop/hbase/wal/TestWALFactory.java | 4 +- .../apache/hadoop/hbase/wal/TestWALSplit.java | 8 +-- 4 files changed, 39 insertions(+), 31 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java index efcd377a844..44f692d620f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java @@ -73,7 +73,8 @@ public class FSHLogProvider extends AbstractFSWALProvider { Writer writer = null; try { writer = logWriterClass.getDeclaredConstructor().newInstance(); - writer.init(fs, path, conf, overwritable, blocksize); + FileSystem rootFs = FileSystem.get(path.toUri(), conf); + writer.init(rootFs, path, conf, overwritable, blocksize); return writer; } catch (Exception e) { if (e instanceof CommonFSUtils.StreamLacksCapabilityException) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 254916e67ce..a49b96b95cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -114,7 +114,7 @@ public class WALSplitter { public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false; // Parameters for split process - protected final Path rootDir; + protected final Path walDir; protected final FileSystem fs; protected final Configuration conf; @@ -147,14 +147,14 @@ public class WALSplitter { @VisibleForTesting - WALSplitter(final WALFactory factory, Configuration conf, Path rootDir, + WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem fs, LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) { this.conf = HBaseConfiguration.create(conf); String codecClassName = conf .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); - this.rootDir = rootDir; + this.walDir = walDir; this.fs = fs; this.sequenceIdChecker = idChecker; this.splitLogWorkerCoordination = splitLogWorkerCoordination; @@ -185,11 +185,11 @@ public class WALSplitter { *

* @return false if it is interrupted by the progress-able. */ - public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs, + public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem fs, Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory) throws IOException { - WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker, + WALSplitter s = new WALSplitter(factory, conf, walDir, fs, idChecker, splitLogWorkerCoordination); return s.splitLogFile(logfile, reporter); } @@ -321,10 +321,10 @@ public class WALSplitter { LOG.warn("Could not parse, corrupted WAL={}", logPath, e); if (splitLogWorkerCoordination != null) { // Some tests pass in a csm of null. - splitLogWorkerCoordination.markCorrupted(rootDir, logfile.getPath().getName(), fs); + splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), fs); } else { // for tests only - ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs); + ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), fs); } isCorrupted = true; } catch (IOException e) { @@ -456,18 +456,19 @@ public class WALSplitter { * logEntry: e.g. /hbase/some_table/2323432434/recovered.edits/2332. * This method also ensures existence of RECOVERED_EDITS_DIR under the region * creating it if necessary. - * @param fs * @param logEntry - * @param rootDir HBase root dir. * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name. + * @param conf * @return Path to file into which to dump split log edits. * @throws IOException */ @SuppressWarnings("deprecation") @VisibleForTesting - static Path getRegionSplitEditsPath(final FileSystem fs, - final Entry logEntry, final Path rootDir, String fileNameBeingSplit) + static Path getRegionSplitEditsPath(final Entry logEntry, String fileNameBeingSplit, + Configuration conf) throws IOException { + FileSystem fs = FileSystem.get(conf); + Path rootDir = FSUtils.getRootDir(conf); Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTableName()); String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName()); Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName); @@ -1259,7 +1260,8 @@ public class WALSplitter { } // delete the one with fewer wal entries - private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws IOException { + private void deleteOneWithFewerEntries(FileSystem rootFs, WriterAndPath wap, Path dst) + throws IOException { long dstMinLogSeqNum = -1L; try (WAL.Reader reader = walFactory.createReader(fs, dst)) { WAL.Entry entry = reader.next(); @@ -1280,8 +1282,8 @@ public class WALSplitter { } } else { LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p - + ", length=" + fs.getFileStatus(wap.p).getLen()); - if (!fs.delete(wap.p, false)) { + + ", length=" + rootFs.getFileStatus(wap.p).getLen()); + if (!rootFs.delete(wap.p, false)) { LOG.warn("Failed deleting of {}", wap.p); throw new IOException("Failed deleting of " + wap.p); } @@ -1368,6 +1370,7 @@ public class WALSplitter { if (LOG.isTraceEnabled()) { LOG.trace("Closing " + wap.p); } + FileSystem rootFs = FileSystem.get(conf); try { wap.w.close(); } catch (IOException ioe) { @@ -1382,7 +1385,7 @@ public class WALSplitter { } if (wap.editsWritten == 0) { // just remove the empty recovered.edits file - if (fs.exists(wap.p) && !fs.delete(wap.p, false)) { + if (rootFs.exists(wap.p) && !rootFs.delete(wap.p, false)) { LOG.warn("Failed deleting empty " + wap.p); throw new IOException("Failed deleting empty " + wap.p); } @@ -1392,14 +1395,14 @@ public class WALSplitter { Path dst = getCompletedRecoveredEditsFilePath(wap.p, regionMaximumEditLogSeqNum.get(encodedRegionName)); try { - if (!dst.equals(wap.p) && fs.exists(dst)) { - deleteOneWithFewerEntries(wap, dst); + if (!dst.equals(wap.p) && rootFs.exists(dst)) { + deleteOneWithFewerEntries(rootFs, wap, dst); } // Skip the unit tests which create a splitter that reads and // writes the data without touching disk. // TestHLogSplit#testThreading is an example. - if (fs.exists(wap.p)) { - if (!fs.rename(wap.p, dst)) { + if (rootFs.exists(wap.p)) { + if (!rootFs.rename(wap.p, dst)) { throw new IOException("Failed renaming " + wap.p + " to " + dst); } LOG.info("Rename " + wap.p + " to " + dst); @@ -1471,7 +1474,7 @@ public class WALSplitter { if (blacklistedRegions.contains(region)) { return null; } - ret = createWAP(region, entry, rootDir); + ret = createWAP(region, entry); if (ret == null) { blacklistedRegions.add(region); return null; @@ -1485,16 +1488,18 @@ public class WALSplitter { /** * @return a path with a write for that path. caller should close. */ - WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException { - Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, fileBeingSplit.getPath().getName()); + WriterAndPath createWAP(byte[] region, Entry entry) throws IOException { + Path regionedits = getRegionSplitEditsPath(entry, + fileBeingSplit.getPath().getName(), conf); if (regionedits == null) { return null; } - if (fs.exists(regionedits)) { + FileSystem rootFs = FileSystem.get(conf); + if (rootFs.exists(regionedits)) { LOG.warn("Found old edits file. It could be the " + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" - + fs.getFileStatus(regionedits).getLen()); - if (!fs.delete(regionedits, false)) { + + rootFs.getFileStatus(regionedits).getLen()); + if (!rootFs.delete(regionedits, false)) { LOG.warn("Failed delete of old {}", regionedits); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index b1fe67b7793..fd2b3c49856 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -126,6 +127,7 @@ public class TestWALFactory { @BeforeClass public static void setUpBeforeClass() throws Exception { + CommonFSUtils.setWALRootDir(TEST_UTIL.getConfiguration(), new Path("file:///tmp/wal")); // Make block sizes small. TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024); // needed for testAppendClose() @@ -176,7 +178,7 @@ public class TestWALFactory { final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1); final int howmany = 3; RegionInfo[] infos = new RegionInfo[3]; - Path tabledir = FSUtils.getTableDir(hbaseWALDir, tableName); + Path tabledir = FSUtils.getTableDir(hbaseDir, tableName); fs.mkdirs(tabledir); for (int i = 0; i < howmany; i++) { infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i)) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index 030c99f41e8..0d5aa0d2ff5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -390,8 +390,8 @@ public class TestWALSplit { new Entry(new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), new WALEdit()); - Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, - FILENAME_BEING_SPLIT); + Path p = WALSplitter.getRegionSplitEditsPath(entry, + FILENAME_BEING_SPLIT, conf); String parentOfParent = p.getParent().getParent().getName(); assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); } @@ -416,8 +416,8 @@ public class TestWALSplit { assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName()); fs.createNewFile(parent); // create a recovered.edits file - Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR, - FILENAME_BEING_SPLIT); + Path p = WALSplitter.getRegionSplitEditsPath(entry, + FILENAME_BEING_SPLIT, conf); String parentOfParent = p.getParent().getParent().getName(); assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); WALFactory.createRecoveredEditsWriter(fs, p, conf).close();