HBASE-728 Support for HLog appends

- Passes all unit tests.
- Runs Performance evaluation random write in 8min, 43sec on 4 node cluster. I believe this is a new speed record.
- Eliminates time-based log rolling and cache-flushing (because of append support)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@707247 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2008-10-23 02:30:35 +00:00
parent d0ce40d1ec
commit d5e81859e1
6 changed files with 98 additions and 114 deletions

View File

@ -39,6 +39,7 @@ Release 0.19.0 - Unreleased
HBASE-945 Be consistent in use of qualified/unqualified mapfile paths
HBASE-946 Row with 55k deletes timesout scanner lease
HBASE-950 HTable.commit no longer works with existing RowLocks though it's still in API
HBASE-728 Support for HLog appends
IMPROVEMENTS
HBASE-901 Add a limit to key length, check key and value length on client side

View File

@ -156,30 +156,25 @@
</property>
<property>
<name>hbase.regionserver.maxlogentries</name>
<value>30000</value>
<value>100000</value>
<description>Rotate the HRegion HLogs when count of entries exceeds this
value. Default: 30,000. Value is checked by a thread that runs every
value. Default: 100,000. Value is checked by a thread that runs every
hbase.server.thread.wakefrequency.
</description>
</property>
<property>
<name>hbase.regionserver.optionalcacheflushinterval</name>
<value>1800000</value>
<description>
Amount of time to wait since the last time a region was flushed before
invoking an optional cache flush (An optional cache flush is a
flush even though memcache is not at the memcache.flush.size).
Default: 30 minutes (in miliseconds)
<name>hbase.regionserver.flushlogentries</name>
<value>100</value>
<description>Sync the HLog to the HDFS when it has accumulated this many
entries. Default 100. Value is checked on every HLog.append
</description>
</property>
<property>
<name>hbase.regionserver.optionallogrollinterval</name>
<value>1800000</value>
<description>
Amount of time to wait since the last time a the region server's log was
rolled before invoking an optional log roll (An optional log roll is a
one in which the log does not contain hbase.regionserver.maxlogentries).
Default: 30 minutes (in miliseconds)
<name>hbase.regionserver.optionallogflushinterval</name>
<value>10000</value>
<description>Sync the HLog to the HDFS after this interval if it has not
accumulated enough entries to trigger a sync. Default 10 seconds. Units:
milliseconds.
</description>
</property>
<property>

View File

@ -25,7 +25,6 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.TimeUnit;
import java.util.HashSet;
import java.util.Set;
import java.util.SortedMap;
import java.util.ConcurrentModificationException;
@ -48,10 +47,8 @@ class Flusher extends Thread implements FlushRequester {
private final HashSet<HRegion> regionsInQueue = new HashSet<HRegion>();
private final long threadWakeFrequency;
private final long optionalFlushPeriod;
private final HRegionServer server;
private final ReentrantLock lock = new ReentrantLock();
private long lastOptionalCheck = System.currentTimeMillis();
protected final long globalMemcacheLimit;
protected final long globalMemcacheLimitLowMark;
@ -63,8 +60,6 @@ class Flusher extends Thread implements FlushRequester {
public Flusher(final HBaseConfiguration conf, final HRegionServer server) {
super();
this.server = server;
optionalFlushPeriod = conf.getLong(
"hbase.regionserver.optionalcacheflushinterval", 30 * 60 * 1000L);
threadWakeFrequency = conf.getLong(
HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000);
@ -82,7 +77,6 @@ class Flusher extends Thread implements FlushRequester {
while (!server.isStopRequested()) {
HRegion r = null;
try {
enqueueOptionalFlushRegions();
r = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
if (r == null) {
continue;
@ -109,15 +103,23 @@ class Flusher extends Thread implements FlushRequester {
}
public void request(HRegion r) {
addRegion(r, System.currentTimeMillis());
synchronized (regionsInQueue) {
if (!regionsInQueue.contains(r)) {
regionsInQueue.add(r);
flushQueue.add(r);
}
}
}
/**
* Only interrupt once it's done with a run through the work loop.
*/
void interruptIfNecessary() {
if (lock.tryLock()) {
lock.lock();
try {
this.interrupt();
} finally {
lock.unlock();
}
}
@ -195,52 +197,6 @@ class Flusher extends Thread implements FlushRequester {
return true;
}
/**
* Find the regions that should be optionally flushed and put them on the
* flush queue.
*/
private void enqueueOptionalFlushRegions() {
long now = System.currentTimeMillis();
if (now - threadWakeFrequency > lastOptionalCheck) {
lastOptionalCheck = now;
// Queue up regions for optional flush if they need it
Set<HRegion> regions = server.getRegionsToCheck();
for (HRegion region: regions) {
optionallyAddRegion(region, now);
}
}
}
/*
* Add region if not already added and if optional flush period has been
* exceeded.
* @param r Region to add.
* @param now The 'now' to use. Set last flush time to this value.
*/
private void optionallyAddRegion(final HRegion r, final long now) {
synchronized (regionsInQueue) {
if (!regionsInQueue.contains(r) &&
(now - optionalFlushPeriod) > r.getLastFlushTime()) {
addRegion(r, now);
}
}
}
/*
* Add region if not already added.
* @param r Region to add.
* @param now The 'now' to use. Set last flush time to this value.
*/
private void addRegion(final HRegion r,
@SuppressWarnings("unused") final long now) {
synchronized (regionsInQueue) {
if (!regionsInQueue.contains(r)) {
regionsInQueue.add(r);
flushQueue.add(r);
}
}
}
/**
* Check if the regionserver's memcache memory usage is greater than the
* limit. If so, flush regions with the biggest memcaches until we're down

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
@ -82,16 +83,8 @@ import org.apache.hadoop.io.SequenceFile.Reader;
* rolling is not. To prevent log rolling taking place during this period, a
* separate reentrant lock is used.
*
* <p>
* TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs in
* HDFS is currently flawed. HBase writes edits to logs and to a memcache. The
* 'atomic' write to the log is meant to serve as insurance against abnormal
* RegionServer exit: on startup, the log is rerun to reconstruct an HRegion's
* last wholesome state. But files in HDFS do not 'exist' until they are cleanly
* closed -- something that will not happen if RegionServer exits without
* running its 'close'.
*/
public class HLog implements HConstants {
public class HLog extends Thread implements HConstants, Syncable {
private static final Log LOG = LogFactory.getLog(HLog.class);
private static final String HLOG_DATFILE = "hlog.dat.";
static final byte [] METACOLUMN = Bytes.toBytes("METACOLUMN:");
@ -100,8 +93,12 @@ public class HLog implements HConstants {
final Path dir;
final Configuration conf;
final LogRollListener listener;
final long threadWakeFrequency;
private final int maxlogentries;
private final long optionalFlushInterval;
private final int flushlogentries;
private volatile int unflushedEntries = 0;
private volatile long lastLogFlushTime;
final long threadWakeFrequency;
/*
* Current log file.
@ -153,13 +150,22 @@ public class HLog implements HConstants {
*/
public HLog(final FileSystem fs, final Path dir, final Configuration conf,
final LogRollListener listener) throws IOException {
super();
this.fs = fs;
this.dir = dir;
this.conf = conf;
this.listener = listener;
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
this.setName(this.getClass().getSimpleName());
this.maxlogentries =
conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000);
conf.getInt("hbase.regionserver.maxlogentries", 100000);
this.flushlogentries =
conf.getInt("hbase.regionserver.flushlogentries", 100);
this.optionalFlushInterval =
conf.getLong("hbase.regionserver.optionallogflushinterval", 10 * 1000);
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
this.lastLogFlushTime = System.currentTimeMillis();
if (fs.exists(dir)) {
throw new IOException("Target HLog directory already exists: " + dir);
}
@ -168,7 +174,7 @@ public class HLog implements HConstants {
}
/*
* Accessor for tests.
* Accessor for tests. Not a part of the public API.
* @return Current state of the monotonically increasing file id.
*/
public long getFilenum() {
@ -313,6 +319,7 @@ public class HLog implements HConstants {
}
}
this.numEntries = 0;
updateLock.notifyAll();
}
} finally {
this.cacheFlushLock.unlock();
@ -354,11 +361,15 @@ public class HLog implements HConstants {
cacheFlushLock.lock();
try {
synchronized (updateLock) {
this.closed = true;
if (this.isAlive()) {
this.interrupt();
}
if (LOG.isDebugEnabled()) {
LOG.debug("closing log writer in " + this.dir.toString());
}
this.writer.close();
this.closed = true;
updateLock.notifyAll();
}
} finally {
cacheFlushLock.unlock();
@ -415,11 +426,40 @@ public class HLog implements HConstants {
this.numEntries++;
}
updateLock.notifyAll();
}
if (this.numEntries > this.maxlogentries) {
requestLogRoll();
}
}
/** {@inheritDoc} */
@Override
public void run() {
while (!this.closed) {
synchronized (updateLock) {
if (((System.currentTimeMillis() - this.optionalFlushInterval) >
this.lastLogFlushTime) && this.unflushedEntries > 0) {
try {
sync();
} catch (IOException e) {
LOG.error("Error flushing HLog", e);
}
}
try {
updateLock.wait(this.threadWakeFrequency);
} catch (InterruptedException e) {
// continue
}
}
}
}
public void sync() throws IOException {
lastLogFlushTime = System.currentTimeMillis();
this.writer.sync();
unflushedEntries = 0;
}
private void requestLogRoll() {
if (this.listener != null) {
@ -430,6 +470,9 @@ public class HLog implements HConstants {
private void doWrite(HLogKey logKey, HLogEdit logEdit) throws IOException {
try {
this.writer.append(logKey, logEdit);
if (++unflushedEntries >= flushlogentries) {
sync();
}
} catch (IOException e) {
LOG.fatal("Could not append. Requesting close of log", e);
requestLogRoll();
@ -454,7 +497,8 @@ public class HLog implements HConstants {
* @param logEdit
* @throws IOException
*/
public void append(HRegionInfo regionInfo, byte [] row, HLogEdit logEdit) throws IOException {
public void append(HRegionInfo regionInfo, byte [] row, HLogEdit logEdit)
throws IOException {
if (closed) {
throw new IOException("Cannot append; log is closed");
}
@ -474,6 +518,7 @@ public class HLog implements HConstants {
HLogKey logKey = new HLogKey(regionName, tableName, row, seqNum);
doWrite(logKey, logEdit);
this.numEntries++;
updateLock.notifyAll();
}
if (this.numEntries > this.maxlogentries) {
@ -563,6 +608,7 @@ public class HLog implements HConstants {
if (seq != null && logSeqId >= seq.longValue()) {
this.lastSeqWritten.remove(regionName);
}
updateLock.notifyAll();
}
} finally {
this.cacheFlushLock.unlock();

View File

@ -561,7 +561,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
"running at " + this.serverInfo.getServerAddress().toString() +
" because logdir " + logdir.toString() + " exists");
}
return new HLog(fs, logdir, conf, logRoller);
HLog newlog = new HLog(fs, logdir, conf, logRoller);
newlog.start();
return newlog;
}
/*

View File

@ -25,57 +25,38 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
/** Runs periodically to determine if the HLog should be rolled */
class LogRoller extends Thread implements LogRollListener {
static final Log LOG = LogFactory.getLog(LogRoller.class);
private final ReentrantLock rollLock = new ReentrantLock();
private final long optionalLogRollInterval;
private long lastLogRollTime;
private final AtomicBoolean rollLog = new AtomicBoolean(false);
private final HRegionServer server;
private final HBaseConfiguration conf;
/** @param server */
public LogRoller(final HRegionServer server) {
super();
this.server = server;
conf = server.conf;
this.optionalLogRollInterval = conf.getLong(
"hbase.regionserver.optionallogrollinterval", 30L * 60L * 1000L);
lastLogRollTime = System.currentTimeMillis();
}
@Override
public void run() {
while (!server.isStopRequested()) {
while (!rollLog.get() && !server.isStopRequested()) {
long now = System.currentTimeMillis();
if (this.lastLogRollTime + this.optionalLogRollInterval <= now) {
rollLog.set(true);
this.lastLogRollTime = now;
} else {
synchronized (rollLog) {
try {
rollLog.wait(server.threadWakeFrequency);
} catch (InterruptedException e) {
continue;
}
if (!rollLog.get()) {
synchronized (rollLog) {
try {
rollLog.wait(server.threadWakeFrequency);
} catch (InterruptedException e) {
continue;
}
}
}
if (!rollLog.get()) {
// There's only two reasons to break out of the while loop.
// 1. Log roll requested
// 2. Stop requested
// so if a log roll was not requested, continue and break out of loop
continue;
}
rollLock.lock(); // Don't interrupt us. We're working
try {
LOG.info("Rolling hlog. Number of entries: " + server.getLog().getNumEntries());
LOG.info("Rolling hlog. Number of entries: " +
server.getLog().getNumEntries());
server.getLog().rollWriter();
} catch (FailedLogCloseException e) {
LOG.fatal("Forcing server shutdown", e);
@ -107,8 +88,11 @@ class LogRoller extends Thread implements LogRollListener {
* It is sleeping if rollLock is not held.
*/
public void interruptIfNecessary() {
if (rollLock.tryLock()) {
try {
rollLock.lock();
this.interrupt();
} finally {
rollLock.unlock();
}
}
}