HBASE-2467 Concurrent flushers in HLog sync using HDFS-895

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1042790 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2010-12-06 20:58:53 +00:00
parent 8d5248d3d9
commit a5ce594797
3 changed files with 45 additions and 116 deletions

View File

@ -1227,6 +1227,7 @@ Release 0.90.0 - Unreleased
HBASE-3223 Get VersionInfo for Running HBase Process
(Nicolas Spiegelberg via Stack)
HBASE-3303 Lower hbase.regionserver.handler.count from 25 back to 10
HBASE-2467 Concurrent flushers in HLog sync using HDFS-895
NEW FEATURES

View File

@ -128,8 +128,8 @@ public class HLog implements Syncable {
private final long blocksize;
private final int flushlogentries;
private final String prefix;
private final AtomicInteger unflushedEntries = new AtomicInteger(0);
private final Path oldLogDir;
private boolean logRollRequested;
private static Class<? extends Writer> logWriterClass;
@ -202,6 +202,7 @@ public class HLog implements Syncable {
// We synchronize on updateLock to prevent updates and to prevent a log roll
// during an update
// locked during appends
private final Object updateLock = new Object();
private final boolean enabled;
@ -214,7 +215,7 @@ public class HLog implements Syncable {
private final int maxLogs;
/**
* Thread that handles group commit
* Thread that handles optional sync'ing
*/
private final LogSyncer logSyncerThread;
@ -506,6 +507,7 @@ public class HLog implements Syncable {
this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
"New hlog " + FSUtils.getPath(newPath));
this.numEntries.set(0);
this.logRollRequested = false;
}
// Tell our listeners that a new log was created
if (!this.listeners.isEmpty()) {
@ -870,7 +872,6 @@ public class HLog implements Syncable {
this.lastSeqWritten.putIfAbsent(regionInfo.getEncodedNameAsBytes(),
Long.valueOf(seqNum));
doWrite(regionInfo, logKey, logEdit);
this.unflushedEntries.incrementAndGet();
this.numEntries.incrementAndGet();
}
@ -927,9 +928,6 @@ public class HLog implements Syncable {
HLogKey logKey = makeKey(hriKey, tableName, seqNum, now);
doWrite(info, logKey, edits);
this.numEntries.incrementAndGet();
// Only count 1 row as an unflushed entry.
this.unflushedEntries.incrementAndGet();
}
// Sync if catalog region, and if not then check if that table supports
// deferred log flushing
@ -946,15 +944,6 @@ public class HLog implements Syncable {
*/
class LogSyncer extends Thread {
// Using fairness to make sure locks are given in order
private final ReentrantLock lock = new ReentrantLock(true);
// Condition used to wait until we have something to sync
private final Condition queueEmpty = lock.newCondition();
// Condition used to signal that the sync is done
private final Condition syncDone = lock.newCondition();
private final long optionalFlushInterval;
private boolean syncerShuttingDown = false;
@ -966,28 +955,12 @@ public class HLog implements Syncable {
@Override
public void run() {
try {
lock.lock();
// awaiting with a timeout doesn't always
// throw exceptions on interrupt
while(!this.isInterrupted()) {
// Wait until something has to be hflushed or do it if we waited
// enough time (useful if something appends but does not hflush).
// 0 or less means that it timed out and maybe waited a bit more.
if (!(queueEmpty.awaitNanos(
this.optionalFlushInterval*1000000) <= 0)) {
forceSync = true;
}
// We got the signal, let's hflush. We currently own the lock so new
// writes are waiting to acquire it in addToSyncQueue while the ones
// we hflush are waiting on await()
hflush();
// Release all the clients waiting on the hflush. Notice that we still
// own the lock until we get back to await at which point all the
// other threads waiting will first acquire and release locks
syncDone.signalAll();
Thread.sleep(this.optionalFlushInterval);
sync();
}
} catch (IOException e) {
LOG.error("Error while syncing, requesting close of hlog ", e);
@ -996,101 +969,56 @@ public class HLog implements Syncable {
LOG.debug(getName() + " interrupted while waiting for sync requests");
} finally {
syncerShuttingDown = true;
syncDone.signalAll();
lock.unlock();
LOG.info(getName() + " exiting");
}
}
/**
* This method first signals the thread that there's a sync needed
* and then waits for it to happen before returning.
*/
public void addToSyncQueue(boolean force) {
// Don't bother if somehow our append was already hflushed
if (unflushedEntries.get() == 0) {
return;
}
lock.lock();
try {
if (syncerShuttingDown) {
LOG.warn(getName() + " was shut down while waiting for sync");
return;
}
if(force) {
forceSync = true;
}
// Wake the thread
queueEmpty.signal();
// Wait for it to hflush
syncDone.await();
} catch (InterruptedException e) {
LOG.debug(getName() + " was interrupted while waiting for sync", e);
}
finally {
lock.unlock();
}
}
}
public void sync(){
sync(false);
}
/**
* This method calls the LogSyncer in order to group commit the sync
* with other threads.
* @param force For catalog regions, force the sync to happen
*/
public void sync(boolean force) {
logSyncerThread.addToSyncQueue(force);
}
public void hflush() throws IOException {
public void sync() throws IOException {
synchronized (this.updateLock) {
if (this.closed) {
return;
}
boolean logRollRequested = false;
if (this.forceSync ||
this.unflushedEntries.get() >= this.flushlogentries) {
try {
long now = System.currentTimeMillis();
this.writer.sync();
syncTime += System.currentTimeMillis() - now;
syncOps++;
this.forceSync = false;
this.unflushedEntries.set(0);
// if the number of replicas in HDFS has fallen below the initial
// value, then roll logs.
try {
int numCurrentReplicas = getLogReplication();
if (numCurrentReplicas != 0 &&
numCurrentReplicas < this.initialReplication) {
LOG.warn("HDFS pipeline error detected. " +
"Found " + numCurrentReplicas + " replicas but expecting " +
this.initialReplication + " replicas. " +
" Requesting close of hlog.");
requestLogRoll();
logRollRequested = true;
}
} catch (Exception e) {
LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
" still proceeding ahead...");
}
try {
long now = System.currentTimeMillis();
// Done in parallel for all writer threads, thanks to HDFS-895
this.writer.sync();
synchronized (this.updateLock) {
syncTime += System.currentTimeMillis() - now;
syncOps++;
if (!logRollRequested) {
checkLowReplication();
if (this.writer.getLength() > this.logrollsize) {
requestLogRoll();
}
} catch (IOException e) {
LOG.fatal("Could not append. Requesting close of hlog", e);
requestLogRoll();
throw e;
}
}
if (!logRollRequested && (this.writer.getLength() > this.logrollsize)) {
} catch (IOException e) {
LOG.fatal("Could not append. Requesting close of hlog", e);
requestLogRoll();
throw e;
}
}
private void checkLowReplication() {
// if the number of replicas in HDFS has fallen below the initial
// value, then roll logs.
try {
int numCurrentReplicas = getLogReplication();
if (numCurrentReplicas != 0 &&
numCurrentReplicas < this.initialReplication) {
LOG.warn("HDFS pipeline error detected. " +
"Found " + numCurrentReplicas + " replicas but expecting " +
this.initialReplication + " replicas. " +
" Requesting close of hlog.");
requestLogRoll();
logRollRequested = true;
}
} catch (Exception e) {
LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
" still proceeding ahead...");
}
}
@ -1122,7 +1050,7 @@ public class HLog implements Syncable {
public void hsync() throws IOException {
// Not yet implemented up in hdfs so just call hflush.
hflush();
sync();
}
private void requestLogRoll() {
@ -1233,7 +1161,7 @@ public class HLog implements Syncable {
}
}
// sync txn to file system
this.sync(isMetaRegion);
this.sync();
} finally {
this.cacheFlushLock.unlock();

View File

@ -341,7 +341,7 @@ public class TestHLog {
wal.append(regioninfo, tableName, kvs, System.currentTimeMillis());
}
// Now call sync to send the data to HDFS datanodes
wal.sync(true);
wal.sync();
int namenodePort = cluster.getNameNodePort();
final Path walPath = wal.computeFilename();