HBASE-20723 Custom hbase.wal.dir results in data loss because we write recovered edits into a different place than where the recovering region server looks for them
This commit is contained in:
parent
0825462436
commit
ac5bb8155b
|
@ -73,7 +73,8 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> {
|
||||||
Writer writer = null;
|
Writer writer = null;
|
||||||
try {
|
try {
|
||||||
writer = logWriterClass.getDeclaredConstructor().newInstance();
|
writer = logWriterClass.getDeclaredConstructor().newInstance();
|
||||||
writer.init(fs, path, conf, overwritable, blocksize);
|
FileSystem rootFs = FileSystem.get(path.toUri(), conf);
|
||||||
|
writer.init(rootFs, path, conf, overwritable, blocksize);
|
||||||
return writer;
|
return writer;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
if (e instanceof CommonFSUtils.StreamLacksCapabilityException) {
|
if (e instanceof CommonFSUtils.StreamLacksCapabilityException) {
|
||||||
|
|
|
@ -115,7 +115,7 @@ public class WALSplitter {
|
||||||
public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
|
public static final boolean SPLIT_SKIP_ERRORS_DEFAULT = false;
|
||||||
|
|
||||||
// Parameters for split process
|
// Parameters for split process
|
||||||
protected final Path rootDir;
|
protected final Path walDir;
|
||||||
protected final FileSystem fs;
|
protected final FileSystem fs;
|
||||||
protected final Configuration conf;
|
protected final Configuration conf;
|
||||||
|
|
||||||
|
@ -148,14 +148,14 @@ public class WALSplitter {
|
||||||
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
|
WALSplitter(final WALFactory factory, Configuration conf, Path walDir,
|
||||||
FileSystem fs, LastSequenceId idChecker,
|
FileSystem fs, LastSequenceId idChecker,
|
||||||
SplitLogWorkerCoordination splitLogWorkerCoordination) {
|
SplitLogWorkerCoordination splitLogWorkerCoordination) {
|
||||||
this.conf = HBaseConfiguration.create(conf);
|
this.conf = HBaseConfiguration.create(conf);
|
||||||
String codecClassName = conf
|
String codecClassName = conf
|
||||||
.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
|
.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName());
|
||||||
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
|
this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName);
|
||||||
this.rootDir = rootDir;
|
this.walDir = walDir;
|
||||||
this.fs = fs;
|
this.fs = fs;
|
||||||
this.sequenceIdChecker = idChecker;
|
this.sequenceIdChecker = idChecker;
|
||||||
this.splitLogWorkerCoordination = splitLogWorkerCoordination;
|
this.splitLogWorkerCoordination = splitLogWorkerCoordination;
|
||||||
|
@ -186,11 +186,11 @@ public class WALSplitter {
|
||||||
* <p>
|
* <p>
|
||||||
* @return false if it is interrupted by the progress-able.
|
* @return false if it is interrupted by the progress-able.
|
||||||
*/
|
*/
|
||||||
public static boolean splitLogFile(Path rootDir, FileStatus logfile, FileSystem fs,
|
public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem fs,
|
||||||
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
|
Configuration conf, CancelableProgressable reporter, LastSequenceId idChecker,
|
||||||
SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory)
|
SplitLogWorkerCoordination splitLogWorkerCoordination, final WALFactory factory)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, idChecker,
|
WALSplitter s = new WALSplitter(factory, conf, walDir, fs, idChecker,
|
||||||
splitLogWorkerCoordination);
|
splitLogWorkerCoordination);
|
||||||
return s.splitLogFile(logfile, reporter);
|
return s.splitLogFile(logfile, reporter);
|
||||||
}
|
}
|
||||||
|
@ -322,10 +322,10 @@ public class WALSplitter {
|
||||||
LOG.warn("Could not parse, corrupted WAL={}", logPath, e);
|
LOG.warn("Could not parse, corrupted WAL={}", logPath, e);
|
||||||
if (splitLogWorkerCoordination != null) {
|
if (splitLogWorkerCoordination != null) {
|
||||||
// Some tests pass in a csm of null.
|
// Some tests pass in a csm of null.
|
||||||
splitLogWorkerCoordination.markCorrupted(rootDir, logfile.getPath().getName(), fs);
|
splitLogWorkerCoordination.markCorrupted(walDir, logfile.getPath().getName(), fs);
|
||||||
} else {
|
} else {
|
||||||
// for tests only
|
// for tests only
|
||||||
ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
|
ZKSplitLog.markCorrupted(walDir, logfile.getPath().getName(), fs);
|
||||||
}
|
}
|
||||||
isCorrupted = true;
|
isCorrupted = true;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -457,18 +457,19 @@ public class WALSplitter {
|
||||||
* <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
|
* <code>logEntry</code>: e.g. /hbase/some_table/2323432434/recovered.edits/2332.
|
||||||
* This method also ensures existence of RECOVERED_EDITS_DIR under the region
|
* This method also ensures existence of RECOVERED_EDITS_DIR under the region
|
||||||
* creating it if necessary.
|
* creating it if necessary.
|
||||||
* @param fs
|
|
||||||
* @param logEntry
|
* @param logEntry
|
||||||
* @param rootDir HBase root dir.
|
|
||||||
* @param fileNameBeingSplit the file being split currently. Used to generate tmp file name.
|
* @param fileNameBeingSplit the file being split currently. Used to generate tmp file name.
|
||||||
|
* @param conf
|
||||||
* @return Path to file into which to dump split log edits.
|
* @return Path to file into which to dump split log edits.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static Path getRegionSplitEditsPath(final FileSystem fs,
|
static Path getRegionSplitEditsPath(final Entry logEntry, String fileNameBeingSplit,
|
||||||
final Entry logEntry, final Path rootDir, String fileNameBeingSplit)
|
Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
Path rootDir = FSUtils.getRootDir(conf);
|
||||||
Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTableName());
|
Path tableDir = FSUtils.getTableDir(rootDir, logEntry.getKey().getTableName());
|
||||||
String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
|
String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName());
|
||||||
Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
|
Path regiondir = HRegion.getRegionDir(tableDir, encodedRegionName);
|
||||||
|
@ -1260,7 +1261,8 @@ public class WALSplitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
// delete the one with fewer wal entries
|
// delete the one with fewer wal entries
|
||||||
private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws IOException {
|
private void deleteOneWithFewerEntries(FileSystem rootFs, WriterAndPath wap, Path dst)
|
||||||
|
throws IOException {
|
||||||
long dstMinLogSeqNum = -1L;
|
long dstMinLogSeqNum = -1L;
|
||||||
try (WAL.Reader reader = walFactory.createReader(fs, dst)) {
|
try (WAL.Reader reader = walFactory.createReader(fs, dst)) {
|
||||||
WAL.Entry entry = reader.next();
|
WAL.Entry entry = reader.next();
|
||||||
|
@ -1281,8 +1283,8 @@ public class WALSplitter {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p
|
LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p
|
||||||
+ ", length=" + fs.getFileStatus(wap.p).getLen());
|
+ ", length=" + rootFs.getFileStatus(wap.p).getLen());
|
||||||
if (!fs.delete(wap.p, false)) {
|
if (!rootFs.delete(wap.p, false)) {
|
||||||
LOG.warn("Failed deleting of {}", wap.p);
|
LOG.warn("Failed deleting of {}", wap.p);
|
||||||
throw new IOException("Failed deleting of " + wap.p);
|
throw new IOException("Failed deleting of " + wap.p);
|
||||||
}
|
}
|
||||||
|
@ -1367,6 +1369,7 @@ public class WALSplitter {
|
||||||
Path closeWriter(String encodedRegionName, WriterAndPath wap,
|
Path closeWriter(String encodedRegionName, WriterAndPath wap,
|
||||||
List<IOException> thrown) throws IOException{
|
List<IOException> thrown) throws IOException{
|
||||||
LOG.trace("Closing {}", wap.p);
|
LOG.trace("Closing {}", wap.p);
|
||||||
|
FileSystem rootFs = FileSystem.get(conf);
|
||||||
try {
|
try {
|
||||||
wap.w.close();
|
wap.w.close();
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
|
@ -1381,7 +1384,7 @@ public class WALSplitter {
|
||||||
}
|
}
|
||||||
if (wap.editsWritten == 0) {
|
if (wap.editsWritten == 0) {
|
||||||
// just remove the empty recovered.edits file
|
// just remove the empty recovered.edits file
|
||||||
if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
|
if (rootFs.exists(wap.p) && !rootFs.delete(wap.p, false)) {
|
||||||
LOG.warn("Failed deleting empty {}", wap.p);
|
LOG.warn("Failed deleting empty {}", wap.p);
|
||||||
throw new IOException("Failed deleting empty " + wap.p);
|
throw new IOException("Failed deleting empty " + wap.p);
|
||||||
}
|
}
|
||||||
|
@ -1391,14 +1394,14 @@ public class WALSplitter {
|
||||||
Path dst = getCompletedRecoveredEditsFilePath(wap.p,
|
Path dst = getCompletedRecoveredEditsFilePath(wap.p,
|
||||||
regionMaximumEditLogSeqNum.get(encodedRegionName));
|
regionMaximumEditLogSeqNum.get(encodedRegionName));
|
||||||
try {
|
try {
|
||||||
if (!dst.equals(wap.p) && fs.exists(dst)) {
|
if (!dst.equals(wap.p) && rootFs.exists(dst)) {
|
||||||
deleteOneWithFewerEntries(wap, dst);
|
deleteOneWithFewerEntries(rootFs, wap, dst);
|
||||||
}
|
}
|
||||||
// Skip the unit tests which create a splitter that reads and
|
// Skip the unit tests which create a splitter that reads and
|
||||||
// writes the data without touching disk.
|
// writes the data without touching disk.
|
||||||
// TestHLogSplit#testThreading is an example.
|
// TestHLogSplit#testThreading is an example.
|
||||||
if (fs.exists(wap.p)) {
|
if (rootFs.exists(wap.p)) {
|
||||||
if (!fs.rename(wap.p, dst)) {
|
if (!rootFs.rename(wap.p, dst)) {
|
||||||
throw new IOException("Failed renaming " + wap.p + " to " + dst);
|
throw new IOException("Failed renaming " + wap.p + " to " + dst);
|
||||||
}
|
}
|
||||||
LOG.info("Rename {} to {}", wap.p, dst);
|
LOG.info("Rename {} to {}", wap.p, dst);
|
||||||
|
@ -1470,7 +1473,7 @@ public class WALSplitter {
|
||||||
if (blacklistedRegions.contains(region)) {
|
if (blacklistedRegions.contains(region)) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
ret = createWAP(region, entry, rootDir);
|
ret = createWAP(region, entry);
|
||||||
if (ret == null) {
|
if (ret == null) {
|
||||||
blacklistedRegions.add(region);
|
blacklistedRegions.add(region);
|
||||||
return null;
|
return null;
|
||||||
|
@ -1484,16 +1487,18 @@ public class WALSplitter {
|
||||||
/**
|
/**
|
||||||
* @return a path with a write for that path. caller should close.
|
* @return a path with a write for that path. caller should close.
|
||||||
*/
|
*/
|
||||||
WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
|
WriterAndPath createWAP(byte[] region, Entry entry) throws IOException {
|
||||||
Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, fileBeingSplit.getPath().getName());
|
Path regionedits = getRegionSplitEditsPath(entry,
|
||||||
|
fileBeingSplit.getPath().getName(), conf);
|
||||||
if (regionedits == null) {
|
if (regionedits == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
if (fs.exists(regionedits)) {
|
FileSystem rootFs = FileSystem.get(conf);
|
||||||
|
if (rootFs.exists(regionedits)) {
|
||||||
LOG.warn("Found old edits file. It could be the "
|
LOG.warn("Found old edits file. It could be the "
|
||||||
+ "result of a previous failed split attempt. Deleting " + regionedits + ", length="
|
+ "result of a previous failed split attempt. Deleting " + regionedits + ", length="
|
||||||
+ fs.getFileStatus(regionedits).getLen());
|
+ rootFs.getFileStatus(regionedits).getLen());
|
||||||
if (!fs.delete(regionedits, false)) {
|
if (!rootFs.delete(regionedits, false)) {
|
||||||
LOG.warn("Failed delete of old {}", regionedits);
|
LOG.warn("Failed delete of old {}", regionedits);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.CommonFSUtils;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
@ -126,6 +127,7 @@ public class TestWALFactory {
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUpBeforeClass() throws Exception {
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
CommonFSUtils.setWALRootDir(TEST_UTIL.getConfiguration(), new Path("file:///tmp/wal"));
|
||||||
// Make block sizes small.
|
// Make block sizes small.
|
||||||
TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
|
TEST_UTIL.getConfiguration().setInt("dfs.blocksize", 1024 * 1024);
|
||||||
// needed for testAppendClose()
|
// needed for testAppendClose()
|
||||||
|
@ -176,7 +178,7 @@ public class TestWALFactory {
|
||||||
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
|
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
|
||||||
final int howmany = 3;
|
final int howmany = 3;
|
||||||
RegionInfo[] infos = new RegionInfo[3];
|
RegionInfo[] infos = new RegionInfo[3];
|
||||||
Path tabledir = FSUtils.getTableDir(hbaseWALDir, tableName);
|
Path tabledir = FSUtils.getTableDir(hbaseDir, tableName);
|
||||||
fs.mkdirs(tabledir);
|
fs.mkdirs(tabledir);
|
||||||
for (int i = 0; i < howmany; i++) {
|
for (int i = 0; i < howmany; i++) {
|
||||||
infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i))
|
infos[i] = RegionInfoBuilder.newBuilder(tableName).setStartKey(Bytes.toBytes("" + i))
|
||||||
|
|
|
@ -390,8 +390,8 @@ public class TestWALSplit {
|
||||||
new Entry(new WALKeyImpl(encoded,
|
new Entry(new WALKeyImpl(encoded,
|
||||||
TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
|
TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
|
||||||
new WALEdit());
|
new WALEdit());
|
||||||
Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR,
|
Path p = WALSplitter.getRegionSplitEditsPath(entry,
|
||||||
FILENAME_BEING_SPLIT);
|
FILENAME_BEING_SPLIT, conf);
|
||||||
String parentOfParent = p.getParent().getParent().getName();
|
String parentOfParent = p.getParent().getParent().getName();
|
||||||
assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
|
assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
|
||||||
}
|
}
|
||||||
|
@ -416,8 +416,8 @@ public class TestWALSplit {
|
||||||
assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName());
|
assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName());
|
||||||
fs.createNewFile(parent); // create a recovered.edits file
|
fs.createNewFile(parent); // create a recovered.edits file
|
||||||
|
|
||||||
Path p = WALSplitter.getRegionSplitEditsPath(fs, entry, HBASEDIR,
|
Path p = WALSplitter.getRegionSplitEditsPath(entry,
|
||||||
FILENAME_BEING_SPLIT);
|
FILENAME_BEING_SPLIT, conf);
|
||||||
String parentOfParent = p.getParent().getParent().getName();
|
String parentOfParent = p.getParent().getParent().getName();
|
||||||
assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
|
assertEquals(parentOfParent, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName());
|
||||||
WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
|
WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
|
||||||
|
|
Loading…
Reference in New Issue