HBASE-6134 Improvement for split-worker to speed up distributed log splitting (Chunhui)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1349632 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-06-13 04:34:03 +00:00
parent 5e3b1e4b5d
commit 0fdf8126c5
2 changed files with 212 additions and 170 deletions

View File

@ -33,6 +33,14 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -58,6 +66,7 @@ import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.io.MultipleIOException;
@ -106,9 +115,12 @@ public class HLogSplitter {
// Wait/notify for when data has been produced by the reader thread, // Wait/notify for when data has been produced by the reader thread,
// consumed by the reader thread, or an exception occurred // consumed by the reader thread, or an exception occurred
Object dataAvailable = new Object(); Object dataAvailable = new Object();
private MonitoredTask status; private MonitoredTask status;
// Used in distributed log splitting
private DistributedLogSplittingHelper distributedLogSplittingHelper = null;
/** /**
* Create a new HLogSplitter using the given {@link Configuration} and the * Create a new HLogSplitter using the given {@link Configuration} and the
@ -238,6 +250,10 @@ public class HLogSplitter {
return outputSink.getOutputCounts(); return outputSink.getOutputCounts();
} }
void setDistributedLogSplittingHelper(DistributedLogSplittingHelper helper) {
this.distributedLogSplittingHelper = helper;
}
/** /**
* Splits the HLog edits in the given list of logfiles (that are a mix of edits * Splits the HLog edits in the given list of logfiles (that are a mix of edits
* on multiple regions) by region and then splits them per region directories, * on multiple regions) by region and then splits them per region directories,
@ -270,7 +286,7 @@ public class HLogSplitter {
countTotalBytes(logfiles); countTotalBytes(logfiles);
splitSize = 0; splitSize = 0;
outputSink.startWriterThreads(entryBuffers); outputSink.startWriterThreads();
try { try {
int i = 0; int i = 0;
@ -335,8 +351,7 @@ public class HLogSplitter {
* out by region and stored. * out by region and stored.
* <p> * <p>
* If the log file has N regions then N recovered.edits files will be * If the log file has N regions then N recovered.edits files will be
* produced. There is no buffering in this code. Instead it relies on the * produced.
* buffering in the SequenceFileWriter.
* <p> * <p>
* @param rootDir * @param rootDir
* @param tmpname * @param tmpname
@ -357,27 +372,14 @@ public class HLogSplitter {
public boolean splitLogFileToTemp(FileStatus logfile, String tmpname, public boolean splitLogFileToTemp(FileStatus logfile, String tmpname,
CancelableProgressable reporter) CancelableProgressable reporter)
throws IOException { throws IOException {
final Map<byte[], Object> logWriters = Collections.
synchronizedMap(new TreeMap<byte[], Object>(Bytes.BYTES_COMPARATOR));
boolean isCorrupted = false; boolean isCorrupted = false;
Preconditions.checkState(status == null); Preconditions.checkState(status == null);
status = TaskMonitor.get().createStatus( status = TaskMonitor.get().createStatus(
"Splitting log file " + logfile.getPath() + "Splitting log file " + logfile.getPath() +
"into a temporary staging area."); "into a temporary staging area.");
Object BAD_WRITER = new Object();
boolean progress_failed = false;
boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors", boolean skipErrors = conf.getBoolean("hbase.hlog.split.skip.errors",
HLog.SPLIT_SKIP_ERRORS_DEFAULT); HLog.SPLIT_SKIP_ERRORS_DEFAULT);
int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024); int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
// How often to send a progress report (default 1/2 the zookeeper session
// timeout of if that not set, the split log DEFAULT_TIMEOUT)
int period = conf.getInt("hbase.splitlog.report.period",
conf.getInt("hbase.splitlog.manager.timeout", SplitLogManager.DEFAULT_TIMEOUT) / 2);
int numOpenedFilesBeforeReporting =
conf.getInt("hbase.splitlog.report.openedfiles", 3);
Path logPath = logfile.getPath(); Path logPath = logfile.getPath();
long logLength = logfile.getLen(); long logLength = logfile.getLen();
LOG.info("Splitting hlog: " + logPath + ", length=" + logLength); LOG.info("Splitting hlog: " + logPath + ", length=" + logLength);
@ -395,59 +397,38 @@ public class HLogSplitter {
LOG.warn("Nothing to split in log file " + logPath); LOG.warn("Nothing to split in log file " + logPath);
return true; return true;
} }
long t = EnvironmentEdgeManager.currentTimeMillis(); this.setDistributedLogSplittingHelper(new DistributedLogSplittingHelper(
long last_report_at = t; reporter, tmpname));
if (reporter != null && reporter.progress() == false) { if (!reportProgressIfIsDistributedLogSplitting()) {
status.markComplete("Failed: reporter.progress asked us to terminate");
return false; return false;
} }
boolean progress_failed = false;
int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
int numOpenedFilesLastCheck = 0;
outputSink.startWriterThreads();
// Report progress every so many edits and/or files opened (opening a file // Report progress every so many edits and/or files opened (opening a file
// takes a bit of time). // takes a bit of time).
int editsCount = 0; int editsCount = 0;
int numNewlyOpenedFiles = 0;
Entry entry; Entry entry;
try { try {
while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) { while ((entry = getNextLogLine(in,logPath, skipErrors)) != null) {
byte[] region = entry.getKey().getEncodedRegionName(); entryBuffers.appendEntry(entry);
Object o = logWriters.get(region);
if (o == BAD_WRITER) {
continue;
}
WriterAndPath wap = (WriterAndPath)o;
if (wap == null) {
wap = createWAP(region, entry, rootDir, tmpname, fs, conf);
numNewlyOpenedFiles++;
if (wap == null) {
// ignore edits from this region. It doesn't exist anymore.
// It was probably already split.
logWriters.put(region, BAD_WRITER);
continue;
} else {
logWriters.put(region, wap);
}
}
wap.w.append(entry);
outputSink.updateRegionMaximumEditLogSeqNum(entry);
editsCount++; editsCount++;
// If sufficient edits have passed OR we've opened a few files, check if // If sufficient edits have passed, check if we should report progress.
// we should report progress. if (editsCount % interval == 0
if (editsCount % interval == 0 || || (outputSink.logWriters.size() - numOpenedFilesLastCheck) > numOpenedFilesBeforeReporting) {
(numNewlyOpenedFiles > numOpenedFilesBeforeReporting)) { numOpenedFilesLastCheck = outputSink.logWriters.size();
// Zero out files counter each time we fall in here. String countsStr = "edits=" + editsCount;
numNewlyOpenedFiles = 0;
String countsStr = "edits=" + editsCount + ", files=" + logWriters.size();
status.setStatus("Split " + countsStr); status.setStatus("Split " + countsStr);
long t1 = EnvironmentEdgeManager.currentTimeMillis(); if (!reportProgressIfIsDistributedLogSplitting()) {
if ((t1 - last_report_at) > period) { return false;
last_report_at = t;
if (reporter != null && reporter.progress() == false) {
status.markComplete("Failed: reporter.progress asked us to terminate; " + countsStr);
progress_failed = true;
return false;
}
} }
} }
} }
} catch (InterruptedException ie) {
IOException iie = new InterruptedIOException();
iie.initCause(ie);
throw iie;
} catch (CorruptedLogFileException e) { } catch (CorruptedLogFileException e) {
LOG.warn("Could not parse, corrupted log file " + logPath, e); LOG.warn("Could not parse, corrupted log file " + logPath, e);
ZKSplitLog.markCorrupted(rootDir, tmpname, fs); ZKSplitLog.markCorrupted(rootDir, tmpname, fs);
@ -456,79 +437,15 @@ public class HLogSplitter {
e = RemoteExceptionHandler.checkIOException(e); e = RemoteExceptionHandler.checkIOException(e);
throw e; throw e;
} finally { } finally {
boolean allWritersClosed = false; LOG.info("Finishing writing output logs and closing down.");
try { progress_failed = outputSink.finishWritingAndClose() == null;
int n = 0; String msg = "Processed " + editsCount + " edits across "
for (Map.Entry<byte[], Object> logWritersEntry : logWriters.entrySet()) { + outputSink.getOutputCounts().size() + " regions; log file="
Object o = logWritersEntry.getValue(); + logPath + " is corrupted = " + isCorrupted + " progress failed = "
long t1 = EnvironmentEdgeManager.currentTimeMillis(); + progress_failed;
if ((t1 - last_report_at) > period) { ;
last_report_at = t; LOG.info(msg);
if ((progress_failed == false) && (reporter != null) status.markComplete(msg);
&& (reporter.progress() == false)) {
progress_failed = true;
}
}
if (o == BAD_WRITER) {
continue;
}
n++;
WriterAndPath wap = (WriterAndPath) o;
try {
wap.writerClosed = true;
wap.w.close();
LOG.debug("Closed " + wap.p);
} catch (IOException e) {
LOG.debug("Exception while closing the writer :", e);
}
Path dst = getCompletedRecoveredEditsFilePath(wap.p, outputSink
.getRegionMaximumEditLogSeqNum(logWritersEntry.getKey()));
if (!dst.equals(wap.p) && fs.exists(dst)) {
LOG.warn("Found existing old edits file. It could be the "
+ "result of a previous failed split attempt. Deleting " + dst
+ ", length=" + fs.getFileStatus(dst).getLen());
if (!fs.delete(dst, false)) {
LOG.warn("Failed deleting of old " + dst);
throw new IOException("Failed deleting of old " + dst);
}
}
// Skip the unit tests which create a splitter that reads and writes
// the
// data without touching disk. TestHLogSplit#testThreading is an
// example.
if (fs.exists(wap.p)) {
if (!fs.rename(wap.p, dst)) {
throw new IOException("Failed renaming " + wap.p + " to " + dst);
}
LOG.debug("Rename " + wap.p + " to " + dst);
}
}
allWritersClosed = true;
String msg = "Processed " + editsCount + " edits across " + n
+ " regions" + " threw away edits for " + (logWriters.size() - n)
+ " regions" + "; log file=" + logPath + " is corrupted = "
+ isCorrupted + " progress failed = " + progress_failed;
LOG.info(msg);
status.markComplete(msg);
} finally {
if (!allWritersClosed) {
for (Map.Entry<byte[], Object> logWritersEntry : logWriters.entrySet()) {
Object o = logWritersEntry.getValue();
if (o != BAD_WRITER) {
WriterAndPath wap = (WriterAndPath) o;
try {
if (!wap.writerClosed) {
wap.writerClosed = true;
wap.w.close();
}
} catch (IOException e) {
LOG.debug("Exception while closing the writer :", e);
}
}
}
}
in.close();
}
} }
return !progress_failed; return !progress_failed;
} }
@ -1171,6 +1088,63 @@ public class HLogSplitter {
return ret; return ret;
} }
/***
* @return false if it is a distributed log splitting and it failed to report
* progress
*/
private boolean reportProgressIfIsDistributedLogSplitting() {
if (this.distributedLogSplittingHelper != null) {
return distributedLogSplittingHelper.reportProgress();
} else {
return true;
}
}
/**
* A class used in distributed log splitting
*
*/
class DistributedLogSplittingHelper {
// Report progress, only used in distributed log splitting
private final CancelableProgressable splitReporter;
// How often to send a progress report (default 1/2 master timeout)
private final int report_period;
private long last_report_at = 0;
private final String tmpDirName;
public DistributedLogSplittingHelper(CancelableProgressable reporter,
String tmpName) {
this.splitReporter = reporter;
this.tmpDirName = tmpName;
report_period = conf.getInt("hbase.splitlog.report.period",
conf.getInt("hbase.splitlog.manager.timeout",
SplitLogManager.DEFAULT_TIMEOUT) / 2);
}
/***
* @return false if reporter failed progressing
*/
private boolean reportProgress() {
if (splitReporter == null) {
return true;
} else {
long t = EnvironmentEdgeManager.currentTimeMillis();
if ((t - last_report_at) > report_period) {
last_report_at = t;
if (this.splitReporter.progress() == false) {
LOG.warn("Failed: reporter.progress asked us to terminate");
return false;
}
}
return true;
}
}
String getTmpDirName() {
return this.tmpDirName;
}
}
/** /**
* Class that manages the output streams from the log splitting process. * Class that manages the output streams from the log splitting process.
*/ */
@ -1189,20 +1163,23 @@ public class HLogSplitter {
private boolean logWritersClosed = false; private boolean logWritersClosed = false;
/** private final int numThreads;
* Start the threads that will pump data from the entryBuffers
* to the output files. public OutputSink() {
* @return the list of started threads
*/
synchronized void startWriterThreads(EntryBuffers entryBuffers) {
// More threads could potentially write faster at the expense // More threads could potentially write faster at the expense
// of causing more disk seeks as the logs are split. // of causing more disk seeks as the logs are split.
// 3. After a certain setting (probably around 3) the // 3. After a certain setting (probably around 3) the
// process will be bound on the reader in the current // process will be bound on the reader in the current
// implementation anyway. // implementation anyway.
int numThreads = conf.getInt( numThreads = conf.getInt(
"hbase.regionserver.hlog.splitlog.writer.threads", 3); "hbase.regionserver.hlog.splitlog.writer.threads", 3);
}
/**
* Start the threads that will pump data from the entryBuffers
* to the output files.
*/
synchronized void startWriterThreads() {
for (int i = 0; i < numThreads; i++) { for (int i = 0; i < numThreads; i++) {
WriterThread t = new WriterThread(i); WriterThread t = new WriterThread(i);
t.start(); t.start();
@ -1210,22 +1187,35 @@ public class HLogSplitter {
} }
} }
/**
*
* @return null if failed to report progress
* @throws IOException
*/
List<Path> finishWritingAndClose() throws IOException { List<Path> finishWritingAndClose() throws IOException {
LOG.info("Waiting for split writer threads to finish"); LOG.info("Waiting for split writer threads to finish");
boolean progress_failed = false;
try { try {
for (WriterThread t : writerThreads) { for (WriterThread t : writerThreads) {
t.finish(); t.finish();
} }
for (WriterThread t : writerThreads) { for (WriterThread t : writerThreads) {
if (!progress_failed && !reportProgressIfIsDistributedLogSplitting()) {
progress_failed = true;
}
try { try {
t.join(); t.join();
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
throw new IOException(ie); IOException iie = new InterruptedIOException();
iie.initCause(ie);
throw iie;
} }
checkForErrors(); checkForErrors();
} }
LOG.info("Split writers finished"); LOG.info("Split writers finished");
if (progress_failed) {
return null;
}
return closeStreams(); return closeStreams();
} finally { } finally {
List<IOException> thrown = closeLogWriters(null); List<IOException> thrown = closeLogWriters(null);
@ -1242,45 +1232,92 @@ public class HLogSplitter {
private List<Path> closeStreams() throws IOException { private List<Path> closeStreams() throws IOException {
Preconditions.checkState(!closeAndCleanCompleted); Preconditions.checkState(!closeAndCleanCompleted);
List<Path> paths = new ArrayList<Path>(); final List<Path> paths = new ArrayList<Path>();
List<IOException> thrown = Lists.newArrayList(); final List<IOException> thrown = Lists.newArrayList();
closeLogWriters(thrown); ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(
for (Map.Entry<byte[], WriterAndPath> logWritersEntry : logWriters numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() {
private int count = 1;
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "split-log-closeStream-" + count++);
return t;
}
});
CompletionService<Void> completionService = new ExecutorCompletionService<Void>(
closeThreadPool);
for (final Map.Entry<byte[], WriterAndPath> logWritersEntry : logWriters
.entrySet()) { .entrySet()) {
WriterAndPath wap = logWritersEntry.getValue(); completionService.submit(new Callable<Void>() {
Path dst = getCompletedRecoveredEditsFilePath(wap.p, public Void call() throws Exception {
regionMaximumEditLogSeqNum.get(logWritersEntry.getKey())); WriterAndPath wap = logWritersEntry.getValue();
try { try {
if (!dst.equals(wap.p) && fs.exists(dst)) { wap.w.close();
LOG.warn("Found existing old edits file. It could be the " } catch (IOException ioe) {
+ "result of a previous failed split attempt. Deleting " + dst LOG.error("Couldn't close log at " + wap.p, ioe);
+ ", length=" + fs.getFileStatus(dst).getLen()); thrown.add(ioe);
if (!fs.delete(dst, false)) { return null;
LOG.warn("Failed deleting of old " + dst);
throw new IOException("Failed deleting of old " + dst);
} }
} LOG.info("Closed path " + wap.p + " (wrote " + wap.editsWritten
// Skip the unit tests which create a splitter that reads and writes + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms)");
// the data without touching disk. TestHLogSplit#testThreading is an Path dst = getCompletedRecoveredEditsFilePath(wap.p,
// example. regionMaximumEditLogSeqNum.get(logWritersEntry.getKey()));
if (fs.exists(wap.p)) { try {
if (!fs.rename(wap.p, dst)) { if (!dst.equals(wap.p) && fs.exists(dst)) {
throw new IOException("Failed renaming " + wap.p + " to " + dst); LOG.warn("Found existing old edits file. It could be the "
+ "result of a previous failed split attempt. Deleting "
+ dst + ", length=" + fs.getFileStatus(dst).getLen());
if (!fs.delete(dst, false)) {
LOG.warn("Failed deleting of old " + dst);
throw new IOException("Failed deleting of old " + dst);
}
}
// Skip the unit tests which create a splitter that reads and
// writes the data without touching disk.
// TestHLogSplit#testThreading is an example.
if (fs.exists(wap.p)) {
if (!fs.rename(wap.p, dst)) {
throw new IOException("Failed renaming " + wap.p + " to "
+ dst);
}
LOG.debug("Rename " + wap.p + " to " + dst);
}
} catch (IOException ioe) {
LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
thrown.add(ioe);
return null;
} }
LOG.debug("Rename " + wap.p + " to " + dst); paths.add(dst);
return null;
} }
} catch (IOException ioe) { });
LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
thrown.add(ioe);
continue;
}
paths.add(dst);
} }
boolean progress_failed = false;
try {
for (int i = 0; i < logWriters.size(); i++) {
Future<Void> future = completionService.take();
future.get();
if (!progress_failed && !reportProgressIfIsDistributedLogSplitting()) {
progress_failed = true;
}
}
} catch (InterruptedException e) {
IOException iie = new InterruptedIOException();
iie.initCause(e);
throw iie;
} catch (ExecutionException e) {
throw new IOException(e.getCause());
} finally {
closeThreadPool.shutdownNow();
}
if (!thrown.isEmpty()) { if (!thrown.isEmpty()) {
throw MultipleIOException.createIOException(thrown); throw MultipleIOException.createIOException(thrown);
} }
logWritersClosed = true;
closeAndCleanCompleted = true; closeAndCleanCompleted = true;
if (progress_failed) {
return null;
}
return paths; return paths;
} }
@ -1325,7 +1362,9 @@ public class HLogSplitter {
if (blacklistedRegions.contains(region)) { if (blacklistedRegions.contains(region)) {
return null; return null;
} }
ret = createWAP(region, entry, rootDir, null, fs, conf); String tmpName = distributedLogSplittingHelper == null ? null
: distributedLogSplittingHelper.getTmpDirName();
ret = createWAP(region, entry, rootDir, tmpName, fs, conf);
if (ret == null) { if (ret == null) {
blacklistedRegions.add(region); blacklistedRegions.add(region);
return null; return null;

View File

@ -397,6 +397,9 @@ public class TestDistributedLogSplitting {
List<HRegionInfo> hris, String tname, List<HRegionInfo> hris, String tname,
int num_edits, int edit_size) throws IOException { int num_edits, int edit_size) throws IOException {
// remove root and meta region
hris.remove(HRegionInfo.ROOT_REGIONINFO);
hris.remove(HRegionInfo.FIRST_META_REGIONINFO);
byte[] table = Bytes.toBytes(tname); byte[] table = Bytes.toBytes(tname);
HTableDescriptor htd = new HTableDescriptor(tname); HTableDescriptor htd = new HTableDescriptor(tname);
byte[] value = new byte[edit_size]; byte[] value = new byte[edit_size];