HBASE-5078 DistributedLogSplitter failing to split file because it has edits for lots of regions

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1221596 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2011-12-21 04:00:47 +00:00
parent cd508de041
commit f000c951c6
1 changed files with 25 additions and 13 deletions

View File

@ -264,7 +264,7 @@ public class HLogSplitter {
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", true);
long totalBytesToSplit = countTotalBytes(logfiles);
countTotalBytes(logfiles);
splitSize = 0;
outputSink.startWriterThreads(entryBuffers);
@ -369,10 +369,12 @@ public class HLogSplitter {
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
HLog.SPLIT_SKIP_ERRORS_DEFAULT);
int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
// How often to send a progress report (default 1/2 master timeout)
// How often to send a progress report (default 1/2 the zookeeper session
// timeout of if that not set, the split log DEFAULT_TIMEOUT)
int period = conf.getInt("hbase.splitlog.report.period",
conf.getInt("hbase.splitlog.manager.timeout",
ZKSplitLog.DEFAULT_TIMEOUT) / 2);
conf.getInt("hbase.splitlog.manager.timeout", ZKSplitLog.DEFAULT_TIMEOUT) / 2);
int numOpenedFilesBeforeReporting =
conf.getInt("hbase.splitlog.report.openedfiles", 3);
Path logPath = logfile.getPath();
long logLength = logfile.getLen();
LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
@ -396,7 +398,10 @@ public class HLogSplitter {
status.markComplete("Failed: reporter.progress asked us to terminate");
return false;
}
// Report progress every so many edits and/or files opened (opening a file
// takes a bit of time).
int editsCount = 0;
int numNewlyOpenedFiles = 0;
Entry entry;
try {
while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) {
@ -408,9 +413,10 @@ public class HLogSplitter {
WriterAndPath wap = (WriterAndPath)o;
if (wap == null) {
wap = createWAP(region, entry, rootDir, tmpname, fs, conf);
numNewlyOpenedFiles++;
if (wap == null) {
// ignore edits from this region. It doesn't ezist anymore.
// It was probably already split.
// ignore edits from this region. It doesn't exist anymore.
// It was probably already split.
logWriters.put(region, BAD_WRITER);
continue;
} else {
@ -419,13 +425,19 @@ public class HLogSplitter {
}
wap.w.append(entry);
editsCount++;
if (editsCount % interval == 0) {
status.setStatus("Split " + editsCount + " edits");
// If sufficient edits have passed OR we've opened a few files, check if
// we should report progress.
if (editsCount % interval == 0 ||
(numNewlyOpenedFiles > numOpenedFilesBeforeReporting)) {
// Zero out files counter each time we fall in here.
numNewlyOpenedFiles = 0;
String countsStr = "edits=" + editsCount + ", files=" + logWriters.size();
status.setStatus("Split " + countsStr);
long t1 = EnvironmentEdgeManager.currentTimeMillis();
if ((t1 - last_report_at) > period) {
last_report_at = t;
if (reporter != null && reporter.progress() == false) {
status.markComplete("Failed: reporter.progress asked us to terminate");
status.markComplete("Failed: reporter.progress asked us to terminate; " + countsStr);
progress_failed = true;
return false;
}
@ -476,10 +488,10 @@ public class HLogSplitter {
}
}
}
String msg = ("processed " + editsCount + " edits across " + n + " regions" +
" threw away edits for " + (logWriters.size() - n) + " regions" +
" log file = " + logPath +
" is corrupted = " + isCorrupted);
String msg = "Processed " + editsCount + " edits across " + n + " regions" +
" threw away edits for " + (logWriters.size() - n) + " regions" +
"; log file=" + logPath +
", corrupted=" + isCorrupted;
LOG.info(msg);
status.markComplete(msg);
}