HBASE-6337 [MTTR] Remove renaming tmp log file in SplitLogManager (Chunhui)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1359957 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-07-11 00:07:53 +00:00
parent 05ad4a91f4
commit 30012934b5
6 changed files with 58 additions and 159 deletions

View File

@ -139,10 +139,8 @@ public class SplitLogManager extends ZooKeeperListener {
this(zkw, conf, stopper, serverName, new TaskFinisher() {
@Override
public Status finish(ServerName workerName, String logfile) {
String tmpname =
ZKSplitLog.getSplitLogDirTmpComponent(workerName.toString(), logfile);
try {
HLogSplitter.moveRecoveredEditsFromTemp(tmpname, logfile, conf);
HLogSplitter.finishSplitLogFile(logfile, conf);
} catch (IOException e) {
LOG.warn("Could not finish splitting of log file " + logfile);
return Status.ERR;

View File

@ -108,9 +108,7 @@ public class SplitLogWorker extends ZooKeeperListener implements Runnable {
// interrupted or has encountered a transient error and when it has
// encountered a bad non-retry-able persistent error.
try {
String tmpname =
ZKSplitLog.getSplitLogDirTmpComponent(serverName.toString(), filename);
if (HLogSplitter.splitLogFileToTemp(rootdir, tmpname,
if (HLogSplitter.splitLogFile(rootdir,
fs.getFileStatus(new Path(filename)), fs, conf, p) == false) {
return Status.PREEMPTED;
}

View File

@ -346,15 +346,12 @@ public class HLogSplitter {
}
/**
* Splits a HLog file into a temporary staging area. tmpname is used to build
* the name of the staging area where the recovered-edits will be separated
* out by region and stored.
* Splits a HLog file into region's recovered-edits directory
* <p>
* If the log file has N regions then N recovered.edits files will be
* produced.
* <p>
* @param rootDir
* @param tmpname
* @param logfile
* @param fs
* @param conf
@ -362,16 +359,15 @@ public class HLogSplitter {
* @return false if it is interrupted by the progress-able.
* @throws IOException
*/
static public boolean splitLogFileToTemp(Path rootDir, String tmpname,
FileStatus logfile, FileSystem fs,
Configuration conf, CancelableProgressable reporter) throws IOException {
static public boolean splitLogFile(Path rootDir, FileStatus logfile,
FileSystem fs, Configuration conf, CancelableProgressable reporter)
throws IOException {
HLogSplitter s = new HLogSplitter(conf, rootDir, null, null /* oldLogDir */, fs);
return s.splitLogFileToTemp(logfile, tmpname, reporter);
return s.splitLogFile(logfile, reporter);
}
public boolean splitLogFileToTemp(FileStatus logfile, String tmpname,
CancelableProgressable reporter)
throws IOException {
public boolean splitLogFile(FileStatus logfile,
CancelableProgressable reporter) throws IOException {
boolean isCorrupted = false;
Preconditions.checkState(status == null);
status = TaskMonitor.get().createStatus(
@ -389,7 +385,7 @@ public class HLogSplitter {
in = getReader(fs, logfile, conf, skipErrors);
} catch (CorruptedLogFileException e) {
LOG.warn("Could not get reader, corrupted log file " + logPath, e);
ZKSplitLog.markCorrupted(rootDir, tmpname, fs);
ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
isCorrupted = true;
}
if (in == null) {
@ -397,8 +393,7 @@ public class HLogSplitter {
LOG.warn("Nothing to split in log file " + logPath);
return true;
}
this.setDistributedLogSplittingHelper(new DistributedLogSplittingHelper(
reporter, tmpname));
this.setDistributedLogSplittingHelper(new DistributedLogSplittingHelper(reporter));
if (!reportProgressIfIsDistributedLogSplitting()) {
return false;
}
@ -431,7 +426,7 @@ public class HLogSplitter {
throw iie;
} catch (CorruptedLogFileException e) {
LOG.warn("Could not parse, corrupted log file " + logPath, e);
ZKSplitLog.markCorrupted(rootDir, tmpname, fs);
ZKSplitLog.markCorrupted(rootDir, logfile.getPath().getName(), fs);
isCorrupted = true;
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
@ -451,94 +446,40 @@ public class HLogSplitter {
}
/**
* Completes the work done by splitLogFileToTemp by moving the
* recovered.edits from the staging area to the respective region server's
* directories.
* Completes the work done by splitLogFile by archiving logs
* <p>
* It is invoked by SplitLogManager once it knows that one of the
* SplitLogWorkers have completed the splitLogFileToTemp() part. If the
* master crashes then this function might get called multiple times.
* SplitLogWorkers have completed the splitLogFile() part. If the master
* crashes then this function might get called multiple times.
* <p>
* @param tmpname
* @param logfile
* @param conf
* @throws IOException
*/
public static void moveRecoveredEditsFromTemp(String tmpname,
String logfile, Configuration conf)
throws IOException{
public static void finishSplitLogFile(String logfile, Configuration conf)
throws IOException {
Path rootdir = FSUtils.getRootDir(conf);
Path oldLogDir = new Path(rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
moveRecoveredEditsFromTemp(tmpname, rootdir, oldLogDir, logfile, conf);
finishSplitLogFile(rootdir, oldLogDir, logfile, conf);
}
public static void moveRecoveredEditsFromTemp(String tmpname,
Path rootdir, Path oldLogDir,
String logfile, Configuration conf)
throws IOException{
public static void finishSplitLogFile(Path rootdir, Path oldLogDir,
String logfile, Configuration conf) throws IOException {
List<Path> processedLogs = new ArrayList<Path>();
List<Path> corruptedLogs = new ArrayList<Path>();
FileSystem fs;
fs = rootdir.getFileSystem(conf);
Path logPath = new Path(logfile);
if (ZKSplitLog.isCorrupted(rootdir, tmpname, fs)) {
if (ZKSplitLog.isCorrupted(rootdir, logPath.getName(), fs)) {
corruptedLogs.add(logPath);
} else {
processedLogs.add(logPath);
}
Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, tmpname);
List<FileStatus> files = listAll(fs, stagingDir);
for (FileStatus f : files) {
Path src = f.getPath();
Path dst = ZKSplitLog.stripSplitLogTempDir(rootdir, src);
if (ZKSplitLog.isCorruptFlagFile(dst)) {
continue;
}
if (fs.exists(src)) {
if (fs.exists(dst)) {
fs.delete(dst, false);
} else {
Path regionDir = dst.getParent().getParent();
if (!fs.exists(regionDir)) {
// See HBASE-6050.
LOG.warn("Could not move recovered edits from " + src +
" to destination " + regionDir + " as it doesn't exist.");
continue;
}
Path dstdir = dst.getParent();
if (!fs.exists(dstdir)) {
if (!fs.mkdirs(dstdir)) LOG.warn("mkdir failed on " + dstdir);
}
}
fs.rename(src, dst);
LOG.debug(" moved " + src + " => " + dst);
} else {
LOG.debug("Could not move recovered edits from " + src +
" as it doesn't exist");
}
}
archiveLogs(null, corruptedLogs, processedLogs,
oldLogDir, fs, conf);
archiveLogs(null, corruptedLogs, processedLogs, oldLogDir, fs, conf);
Path stagingDir = ZKSplitLog.getSplitLogDir(rootdir, logPath.getName());
fs.delete(stagingDir, true);
return;
}
private static List<FileStatus> listAll(FileSystem fs, Path dir)
throws IOException {
List<FileStatus> fset = new ArrayList<FileStatus>(100);
FileStatus [] files = fs.exists(dir)? fs.listStatus(dir): null;
if (files != null) {
for (FileStatus f : files) {
if (f.isDir()) {
fset.addAll(listAll(fs, f.getPath()));
} else {
fset.add(f);
}
}
}
return fset;
}
/**
* Moves processed logs to a oldLogDir after successful processing Moves
* corrupted logs (any log that couldn't be successfully parsed to corruptDir
@ -1027,14 +968,14 @@ public class HLogSplitter {
}
}
private WriterAndPath createWAP(byte[] region, Entry entry,
Path rootdir, String tmpname, FileSystem fs, Configuration conf)
private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir,
FileSystem fs, Configuration conf)
throws IOException {
Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, tmpname==null);
Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, true);
if (regionedits == null) {
return null;
}
if ((tmpname == null) && fs.exists(regionedits)) {
if (fs.exists(regionedits)) {
LOG.warn("Found existing old edits file. It could be the "
+ "result of a previous failed split attempt. Deleting "
+ regionedits + ", length="
@ -1043,18 +984,10 @@ public class HLogSplitter {
LOG.warn("Failed delete of old " + regionedits);
}
}
Path editsfile;
if (tmpname != null) {
// During distributed log splitting the output by each
// SplitLogWorker is written to a temporary area.
editsfile = convertRegionEditsToTemp(rootdir, regionedits, tmpname);
} else {
editsfile = regionedits;
}
Writer w = createWriter(fs, editsfile, conf);
LOG.debug("Creating writer path=" + editsfile + " region="
Writer w = createWriter(fs, regionedits, conf);
LOG.debug("Creating writer path=" + regionedits + " region="
+ Bytes.toStringBinary(region));
return (new WriterAndPath(editsfile, w));
return (new WriterAndPath(regionedits, w));
}
Path convertRegionEditsToTemp(Path rootdir, Path edits, String tmpname) {
@ -1110,12 +1043,9 @@ public class HLogSplitter {
// How often to send a progress report (default 1/2 master timeout)
private final int report_period;
private long last_report_at = 0;
private final String tmpDirName;
public DistributedLogSplittingHelper(CancelableProgressable reporter,
String tmpName) {
public DistributedLogSplittingHelper(CancelableProgressable reporter) {
this.splitReporter = reporter;
this.tmpDirName = tmpName;
report_period = conf.getInt("hbase.splitlog.report.period",
conf.getInt("hbase.splitlog.manager.timeout",
SplitLogManager.DEFAULT_TIMEOUT) / 2);
@ -1139,10 +1069,6 @@ public class HLogSplitter {
return true;
}
}
String getTmpDirName() {
return this.tmpDirName;
}
}
/**
@ -1380,9 +1306,7 @@ public class HLogSplitter {
if (blacklistedRegions.contains(region)) {
return null;
}
String tmpName = distributedLogSplittingHelper == null ? null
: distributedLogSplittingHelper.getTmpDirName();
ret = createWAP(region, entry, rootDir, tmpName, fs, conf);
ret = createWAP(region, entry, rootDir, fs, conf);
if (ret == null) {
blacklistedRegions.add(region);
return null;

View File

@ -98,27 +98,14 @@ public class ZKSplitLog {
return new Path(new Path(rootdir, HConstants.SPLIT_LOGDIR_NAME), tmpname);
}
public static Path stripSplitLogTempDir(Path rootdir, Path file) {
int skipDepth = rootdir.depth() + 2;
List<String> components = new ArrayList<String>(10);
do {
components.add(file.getName());
file = file.getParent();
} while (file.depth() > skipDepth);
Path ret = rootdir;
for (int i = components.size() - 1; i >= 0; i--) {
ret = new Path(ret, components.get(i));
}
return ret;
}
public static String getSplitLogDirTmpComponent(final String worker, String file) {
return worker + "_" + ZKSplitLog.encode(file);
}
public static void markCorrupted(Path rootdir, String tmpname,
public static void markCorrupted(Path rootdir, String logFileName,
FileSystem fs) {
Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt");
Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt");
try {
fs.createNewFile(file);
} catch (IOException e) {
@ -127,15 +114,12 @@ public class ZKSplitLog {
}
}
public static boolean isCorrupted(Path rootdir, String tmpname,
public static boolean isCorrupted(Path rootdir, String logFileName,
FileSystem fs) throws IOException {
Path file = new Path(getSplitLogDir(rootdir, tmpname), "corrupt");
Path file = new Path(getSplitLogDir(rootdir, logFileName), "corrupt");
boolean isCorrupt;
isCorrupt = fs.exists(file);
return isCorrupt;
}
public static boolean isCorruptFlagFile(Path file) {
return file.getName().equals("corrupt");
}
}

View File

@ -1016,10 +1016,9 @@ public class TestHLogSplit {
generateHLogs(1, 10, -1);
FileStatus logfile = fs.listStatus(hlogDir)[0];
fs.initialize(fs.getUri(), conf);
HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
conf, reporter);
HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
logfile.getPath().toString(), conf);
HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
.toString(), conf);
Path originalLog = (fs.listStatus(oldLogDir))[0].getPath();
@ -1046,10 +1045,9 @@ public class TestHLogSplit {
LOG.info("Region directory is" + regiondir);
fs.delete(regiondir, true);
HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
conf, reporter);
HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
logfile.getPath().toString(), conf);
HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
.toString(), conf);
assertTrue(!fs.exists(regiondir));
assertTrue(true);
@ -1065,10 +1063,9 @@ public class TestHLogSplit {
fs.initialize(fs.getUri(), conf);
HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
conf, reporter);
HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
logfile.getPath().toString(), conf);
HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
.toString(), conf);
Path tdir = HTableDescriptor.getTableDir(hbaseDir, TABLE_NAME);
assertFalse(fs.exists(tdir));
@ -1082,10 +1079,9 @@ public class TestHLogSplit {
FileStatus logfile = fs.listStatus(hlogDir)[0];
fs.initialize(fs.getUri(), conf);
HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
conf, reporter);
HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
logfile.getPath().toString(), conf);
HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
.toString(), conf);
for (String region : regions) {
Path recovered = getLogForRegion(hbaseDir, TABLE_NAME, region);
assertEquals(10, countHLog(recovered, fs, conf));
@ -1103,10 +1099,9 @@ public class TestHLogSplit {
Corruptions.INSERT_GARBAGE_ON_FIRST_LINE, true, fs);
fs.initialize(fs.getUri(), conf);
HLogSplitter.splitLogFileToTemp(hbaseDir, "tmpdir", logfile, fs,
conf, reporter);
HLogSplitter.moveRecoveredEditsFromTemp("tmpdir", hbaseDir, oldLogDir,
logfile.getPath().toString(), conf);
HLogSplitter.splitLogFile(hbaseDir, logfile, fs, conf, reporter);
HLogSplitter.finishSplitLogFile(hbaseDir, oldLogDir, logfile.getPath()
.toString(), conf);
final Path corruptDir = new Path(conf.get(HConstants.HBASE_DIR), conf.get(
"hbase.regionserver.hlog.splitlog.corrupt.dir", ".corrupt"));

View File

@ -660,10 +660,10 @@ public class TestWALReplay {
wal.completeCacheFlush(hri.getEncodedNameAsBytes(), hri.getTableName(), sequenceNumber, false);
wal.close();
FileStatus[] listStatus = this.fs.listStatus(wal.getDir());
HLogSplitter.splitLogFileToTemp(hbaseRootDir, hbaseRootDir + "/temp", listStatus[0], this.fs,
this.conf, null);
FileStatus[] listStatus1 = this.fs.listStatus(new Path(hbaseRootDir + "/temp/" + tableNameStr
+ "/" + hri.getEncodedName() + "/recovered.edits"));
HLogSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf,
null);
FileStatus[] listStatus1 = this.fs.listStatus(new Path(hbaseRootDir + "/"
+ tableNameStr + "/" + hri.getEncodedName() + "/recovered.edits"));
int editCount = 0;
for (FileStatus fileStatus : listStatus1) {
editCount = Integer.parseInt(fileStatus.getPath().getName());