HBASE-1008 [performance] The replay of logs on server crash takes way too long

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@775132 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2009-05-15 13:45:11 +00:00
parent 02a969d3e1
commit ac347387ab
3 changed files with 157 additions and 63 deletions

View File

@ -248,6 +248,7 @@ Release 0.20.0 - Unreleased
HBASE-1417 Cleanup disorientating RPC message
HBASE-1424 have shell print regioninfo and location on first load if
DEBUG enabled
HBASE-1008 [performance] The replay of logs on server crash takes way too long
OPTIMIZATIONS
HBASE-1412 Change values for delete column and column family in KeyValue

View File

@ -133,6 +133,10 @@ public interface HConstants {
/** Default size of a reservation block */
static final int DEFAULT_SIZE_RESERVATION_BLOCK = 1024 * 1024 * 5;
/** Default number of threads to use when log splitting
* to rewrite the logs. More means faster but bigger mem consumption */
static final int DEFAULT_NUMBER_LOG_WRITER_THREAD = 10;
// Always store the location of the root table's HRegion.
// This HRegion is never split.

View File

@ -24,12 +24,16 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
@ -715,6 +719,7 @@ public class HLog implements HConstants, Syncable {
public static void splitLog(final Path rootDir, final Path srcDir,
final FileSystem fs, final Configuration conf)
throws IOException {
long millis = System.currentTimeMillis();
if (!fs.exists(srcDir)) {
// Nothing to do
return;
@ -735,7 +740,9 @@ public class HLog implements HConstants, Syncable {
io.initCause(e);
throw io;
}
LOG.info("log file splitting completed for " + srcDir.toString());
long endMillis = System.currentTimeMillis();
LOG.info("log file splitting completed in " + (endMillis - millis) +
" millis for " + srcDir.toString());
}
/*
@ -748,8 +755,10 @@ public class HLog implements HConstants, Syncable {
private static void splitLog(final Path rootDir, final FileStatus [] logfiles,
final FileSystem fs, final Configuration conf)
throws IOException {
Map<byte [], SequenceFile.Writer> logWriters =
final Map<byte [], SequenceFile.Writer> logWriters =
new TreeMap<byte [], SequenceFile.Writer>(Bytes.BYTES_COMPARATOR);
final Map<byte[], LinkedList<HLogEntry>> logEntries =
new TreeMap<byte[], LinkedList<HLogEntry>>(Bytes.BYTES_COMPARATOR);
try {
for (int i = 0; i < logfiles.length; i++) {
if (LOG.isDebugEnabled()) {
@ -762,78 +771,29 @@ public class HLog implements HConstants, Syncable {
long length = logfiles[i].getLen();
HLogKey key = new HLogKey();
KeyValue val = new KeyValue();
SequenceFile.Reader in = null;
try {
SequenceFile.Reader in =
new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
in = new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
try {
int count = 0;
for (; in.next(key, val); count++) {
byte [] tableName = key.getTablename();
byte [] regionName = key.getRegionName();
SequenceFile.Writer w = logWriters.get(regionName);
if (w == null) {
Path logfile = new Path(
HRegion.getRegionDir(
HTableDescriptor.getTableDir(rootDir, tableName),
HRegionInfo.encodeRegionName(regionName)),
HREGION_OLDLOGFILE_NAME);
Path oldlogfile = null;
SequenceFile.Reader old = null;
if (fs.exists(logfile)) {
LOG.warn("Old log file " + logfile +
" already exists. Copying existing file to new file");
oldlogfile = new Path(logfile.toString() + ".old");
fs.rename(logfile, oldlogfile);
old = new SequenceFile.Reader(fs, oldlogfile, conf);
}
w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class,
KeyValue.class, getCompressionType(conf));
// Use copy of regionName; regionName object is reused inside in
// HStoreKey.getRegionName so its content changes as we iterate.
logWriters.put(regionName, w);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating new log file writer for path " + logfile +
" and region " + Bytes.toString(regionName));
}
if (old != null) {
// Copy from existing log file
HLogKey oldkey = new HLogKey();
KeyValue oldval = new KeyValue();
for (; old.next(oldkey, oldval); count++) {
if (LOG.isDebugEnabled() && count > 0 && count % 10000 == 0) {
LOG.debug("Copied " + count + " edits");
}
w.append(oldkey, oldval);
}
old.close();
fs.delete(oldlogfile, true);
}
while (in.next(key, val)) {
byte[] regionName = key.getRegionName();
LinkedList<HLogEntry> queue = logEntries.get(regionName);
if (queue == null) {
queue = new LinkedList<HLogEntry>();
LOG.debug("Adding queue for " + Bytes.toString(regionName));
logEntries.put(regionName, queue);
}
w.append(key, val);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Applied " + count + " total edits from " +
logfiles[i].getPath().toString());
queue.push(new HLogEntry(val, key));
count++;
}
LOG.debug("Pushed " + count + " entries");
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
if (!(e instanceof EOFException)) {
LOG.warn("Exception processing " + logfiles[i].getPath() +
" -- continuing. Possible DATA LOSS!", e);
}
} finally {
try {
in.close();
} catch (IOException e) {
LOG.warn("Close in finally threw exception -- continuing", e);
}
// Delete the input file now so we do not replay edits. We could
// have gotten here because of an exception. If so, probably
// nothing we can do about it. Replaying it, it could work but we
// could be stuck replaying for ever. Just continue though we
// could have lost some edits.
fs.delete(logfiles[i].getPath(), true);
}
} catch (IOException e) {
if (length <= 0) {
@ -841,14 +801,143 @@ public class HLog implements HConstants, Syncable {
continue;
}
throw e;
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
LOG.warn("Close in finally threw exception -- continuing", e);
}
// Delete the input file now so we do not replay edits. We could
// have gotten here because of an exception. If so, probably
// nothing we can do about it. Replaying it, it could work but we
// could be stuck replaying for ever. Just continue though we
// could have lost some edits.
fs.delete(logfiles[i].getPath(), true);
}
}
ExecutorService threadPool =
Executors.newFixedThreadPool(DEFAULT_NUMBER_LOG_WRITER_THREAD);
for (final byte[] key : logEntries.keySet()) {
Thread thread = new Thread(Bytes.toString(key)) {
public void run() {
LinkedList<HLogEntry> entries = logEntries.get(key);
LOG.debug("Thread got " + entries.size() + " to process");
long threadTime = System.currentTimeMillis();
try {
int count = 0;
for (HLogEntry logEntry : entries) {
SequenceFile.Writer w = logWriters.get(key);
if (w == null) {
Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
.getTableDir(rootDir, logEntry.getKey().getTablename()),
HRegionInfo.encodeRegionName(key)),
HREGION_OLDLOGFILE_NAME);
Path oldlogfile = null;
SequenceFile.Reader old = null;
if (fs.exists(logfile)) {
LOG.warn("Old log file " + logfile
+ " already exists. Copying existing file to new file");
oldlogfile = new Path(logfile.toString() + ".old");
fs.rename(logfile, oldlogfile);
old = new SequenceFile.Reader(fs, oldlogfile, conf);
}
w = SequenceFile.createWriter(fs, conf, logfile,
HLogKey.class, KeyValue.class, getCompressionType(conf));
// Use copy of regionName; regionName object is reused inside
// in
// HStoreKey.getRegionName so its content changes as we
// iterate.
logWriters.put(key, w);
if (LOG.isDebugEnabled()) {
LOG.debug("Creating new log file writer for path "
+ logfile + " and region " + Bytes.toString(key));
}
if (old != null) {
// Copy from existing log file
HLogKey oldkey = new HLogKey();
KeyValue oldval = new KeyValue();
for (; old.next(oldkey, oldval); count++) {
if (LOG.isDebugEnabled() && count > 0
&& count % 10000 == 0) {
LOG.debug("Copied " + count + " edits");
}
w.append(oldkey, oldval);
}
old.close();
fs.delete(oldlogfile, true);
}
}
w.append(logEntry.getKey(), logEntry.getEdit());
count++;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Applied " + count + " total edits to "
+ Bytes.toString(key) + " in "
+ (System.currentTimeMillis() - threadTime) + "ms");
}
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.warn("Got while writing region " + Bytes.toString(key)
+ " log " + e);
e.printStackTrace();
}
}
};
threadPool.execute(thread);
}
threadPool.shutdown();
// Wait for all threads to terminate
try {
for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS) ; i++) {
LOG.debug("Waiting for log writers to terminate, iteration #" + i);
}
}catch(InterruptedException ex) {
LOG.warn("Log writers were interrupted, possible data loss!");
}
} finally {
for (SequenceFile.Writer w : logWriters.values()) {
w.close();
}
}
}
/**
* Utility class that lets us keep track of the edit with it's key
* Only used when splitting logs
*/
public static class HLogEntry {
private KeyValue edit;
private HLogKey key;
/**
* Constructor for both params
* @param edit log's edit
* @param key log's key
*/
public HLogEntry(KeyValue edit, HLogKey key) {
super();
this.edit = edit;
this.key = key;
}
/**
* Gets the edit
* @return edit
*/
public KeyValue getEdit() {
return edit;
}
/**
* Gets the key
* @return key
*/
public HLogKey getKey() {
return key;
}
}
/**
* Construct the HLog directory name