HBASE-8755 A new write thread model for HLog to improve the overall HBase write throughput
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1550778 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4c8631145e
commit
4ee8b16d67
|
@ -203,12 +203,6 @@ possible configurations would overwhelm and obscure the important.
|
|||
<description>Interval between messages from the RegionServer to Master
|
||||
in milliseconds.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.regionserver.optionallogflushinterval</name>
|
||||
<value>1000</value>
|
||||
<description>Sync the HLog to the HDFS after this interval if it has not
|
||||
accumulated enough entries to trigger a sync. Units: milliseconds.</description>
|
||||
</property>
|
||||
<property>
|
||||
<name>hbase.regionserver.regionSplitLimit</name>
|
||||
<value>2147483647</value>
|
||||
|
|
|
@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.util.DrainBarrier;
|
|||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.HasThread;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.cloudera.htrace.Trace;
|
||||
import org.cloudera.htrace.TraceScope;
|
||||
|
@ -118,15 +117,19 @@ class FSHLog implements HLog, Syncable {
|
|||
// Listeners that are called on WAL events.
|
||||
private List<WALActionsListener> listeners =
|
||||
new CopyOnWriteArrayList<WALActionsListener>();
|
||||
private final long optionalFlushInterval;
|
||||
private final long blocksize;
|
||||
private final String prefix;
|
||||
private final AtomicLong unflushedEntries = new AtomicLong(0);
|
||||
private volatile long syncedTillHere = 0;
|
||||
private final AtomicLong syncedTillHere = new AtomicLong(0);
|
||||
private long lastDeferredTxid;
|
||||
private final Path oldLogDir;
|
||||
private volatile boolean logRollRunning;
|
||||
|
||||
// all writes pending on AsyncWriter/AsyncSyncer thread with
|
||||
// txid <= failedTxid will fail by throwing asyncIOE
|
||||
private final AtomicLong failedTxid = new AtomicLong(0);
|
||||
private volatile IOException asyncIOE = null;
|
||||
|
||||
private WALCoprocessorHost coprocessorHost;
|
||||
|
||||
private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer
|
||||
|
@ -208,7 +211,7 @@ class FSHLog implements HLog, Syncable {
|
|||
// during an update
|
||||
// locked during appends
|
||||
private final Object updateLock = new Object();
|
||||
private final Object flushLock = new Object();
|
||||
private final Object pendingWritesLock = new Object();
|
||||
|
||||
private final boolean enabled;
|
||||
|
||||
|
@ -219,10 +222,20 @@ class FSHLog implements HLog, Syncable {
|
|||
*/
|
||||
private final int maxLogs;
|
||||
|
||||
/**
|
||||
* Thread that handles optional sync'ing
|
||||
*/
|
||||
private final LogSyncer logSyncer;
|
||||
// List of pending writes to the HLog. There corresponds to transactions
|
||||
// that have not yet returned to the client. We keep them cached here
|
||||
// instead of writing them to HDFS piecemeal. The goal is to increase
|
||||
// the batchsize for writing-to-hdfs as well as sync-to-hdfs, so that
|
||||
// we can get better system throughput.
|
||||
private List<Entry> pendingWrites = new LinkedList<Entry>();
|
||||
|
||||
private final AsyncWriter asyncWriter;
|
||||
// since AsyncSyncer takes much longer than other phase(add WALEdits to local
|
||||
// buffer, write local buffer to HDFS, notify pending write handler threads),
|
||||
// when a sync is ongoing, all other phase pend, we use multiple parallel
|
||||
// AsyncSyncer threads to improve overall throughput.
|
||||
private final AsyncSyncer[] asyncSyncers;
|
||||
private final AsyncNotifier asyncNotifier;
|
||||
|
||||
/** Number of log close errors tolerated before we abort */
|
||||
private final int closeErrorsTolerated;
|
||||
|
@ -368,8 +381,6 @@ class FSHLog implements HLog, Syncable {
|
|||
// Roll at 95% of block size.
|
||||
float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
|
||||
this.logrollsize = (long)(this.blocksize * multi);
|
||||
this.optionalFlushInterval =
|
||||
conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
|
||||
|
||||
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
|
||||
this.minTolerableReplication = conf.getInt(
|
||||
|
@ -381,13 +392,11 @@ class FSHLog implements HLog, Syncable {
|
|||
this.closeErrorsTolerated = conf.getInt(
|
||||
"hbase.regionserver.logroll.errors.tolerated", 0);
|
||||
|
||||
this.logSyncer = new LogSyncer(this.optionalFlushInterval);
|
||||
|
||||
LOG.info("WAL/HLog configuration: blocksize=" +
|
||||
StringUtils.byteDesc(this.blocksize) +
|
||||
", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
|
||||
", enabled=" + this.enabled +
|
||||
", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
|
||||
", enabled=" + this.enabled);
|
||||
// If prefix is null||empty then just name it hlog
|
||||
this.prefix = prefix == null || prefix.isEmpty() ?
|
||||
"hlog" : URLEncoder.encode(prefix, "UTF8");
|
||||
|
@ -411,15 +420,22 @@ class FSHLog implements HLog, Syncable {
|
|||
// handle the reflection necessary to call getNumCurrentReplicas()
|
||||
this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
|
||||
|
||||
// When optionalFlushInterval is set as 0, don't start a thread for deferred log sync.
|
||||
if (this.optionalFlushInterval > 0) {
|
||||
Threads.setDaemonThreadRunning(logSyncer.getThread(), Thread.currentThread().getName()
|
||||
+ ".logSyncer");
|
||||
} else {
|
||||
LOG.info("hbase.regionserver.optionallogflushinterval is set as "
|
||||
+ this.optionalFlushInterval + ". Deferred log syncing won't work. "
|
||||
+ "Any Mutation, marked to be deferred synced, will be flushed immediately.");
|
||||
final String n = Thread.currentThread().getName();
|
||||
|
||||
|
||||
asyncWriter = new AsyncWriter(n + "-WAL.AsyncWriter");
|
||||
asyncWriter.start();
|
||||
|
||||
int syncerNums = conf.getInt("hbase.hlog.asyncer.number", 5);
|
||||
asyncSyncers = new AsyncSyncer[syncerNums];
|
||||
for (int i = 0; i < asyncSyncers.length; ++i) {
|
||||
asyncSyncers[i] = new AsyncSyncer(n + "-WAL.AsyncSyncer" + i);
|
||||
asyncSyncers[i].start();
|
||||
}
|
||||
|
||||
asyncNotifier = new AsyncNotifier(n + "-WAL.AsyncNotifier");
|
||||
asyncNotifier.start();
|
||||
|
||||
coprocessorHost = new WALCoprocessorHost(this, conf);
|
||||
|
||||
this.metrics = new MetricsWAL();
|
||||
|
@ -735,11 +751,11 @@ class FSHLog implements HLog, Syncable {
|
|||
try {
|
||||
// Wait till all current transactions are written to the hlog.
|
||||
// No new transactions can occur because we have the updatelock.
|
||||
if (this.unflushedEntries.get() != this.syncedTillHere) {
|
||||
if (this.unflushedEntries.get() != this.syncedTillHere.get()) {
|
||||
LOG.debug("cleanupCurrentWriter " +
|
||||
" waiting for transactions to get synced " +
|
||||
" total " + this.unflushedEntries.get() +
|
||||
" synced till here " + syncedTillHere);
|
||||
" synced till here " + this.syncedTillHere.get());
|
||||
sync();
|
||||
}
|
||||
this.writer.close();
|
||||
|
@ -874,17 +890,33 @@ class FSHLog implements HLog, Syncable {
|
|||
if (this.closed) {
|
||||
return;
|
||||
}
|
||||
// When optionalFlushInterval is 0, the logSyncer is not started as a Thread.
|
||||
if (this.optionalFlushInterval > 0) {
|
||||
|
||||
try {
|
||||
logSyncer.close();
|
||||
// Make sure we synced everything
|
||||
logSyncer.join(this.optionalFlushInterval * 2);
|
||||
asyncNotifier.interrupt();
|
||||
asyncNotifier.join();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Exception while waiting for syncer thread to die", e);
|
||||
Thread.currentThread().interrupt();
|
||||
LOG.error("Exception while waiting for " + asyncNotifier.getName() +
|
||||
" threads to die", e);
|
||||
}
|
||||
|
||||
for (int i = 0; i < asyncSyncers.length; ++i) {
|
||||
try {
|
||||
asyncSyncers[i].interrupt();
|
||||
asyncSyncers[i].join();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Exception while waiting for " + asyncSyncers[i].getName() +
|
||||
" threads to die", e);
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
asyncWriter.interrupt();
|
||||
asyncWriter.join();
|
||||
} catch (InterruptedException e) {
|
||||
LOG.error("Exception while waiting for " + asyncWriter.getName() +
|
||||
" thread to die", e);
|
||||
}
|
||||
|
||||
try {
|
||||
// Prevent all further flushing and rolling.
|
||||
closeBarrier.stopAndDrainOps();
|
||||
|
@ -985,9 +1017,14 @@ class FSHLog implements HLog, Syncable {
|
|||
if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
|
||||
HLogKey logKey = makeKey(
|
||||
encodedRegionName, tableName, seqNum, now, clusterIds, nonceGroup, nonce);
|
||||
|
||||
synchronized (pendingWritesLock) {
|
||||
doWrite(info, logKey, edits, htd);
|
||||
this.numEntries.incrementAndGet();
|
||||
txid = this.unflushedEntries.incrementAndGet();
|
||||
}
|
||||
this.numEntries.incrementAndGet();
|
||||
this.asyncWriter.setPendingTxid(txid);
|
||||
|
||||
if (htd.isDeferredLogFlush()) {
|
||||
lastDeferredTxid = txid;
|
||||
}
|
||||
|
@ -1017,91 +1054,245 @@ class FSHLog implements HLog, Syncable {
|
|||
now, htd, false, isInMemstore, sequenceId, nonceGroup, nonce);
|
||||
}
|
||||
|
||||
/**
|
||||
* This class is responsible to hold the HLog's appended Entry list
|
||||
* and to sync them according to a configurable interval.
|
||||
*
|
||||
* Deferred log flushing works first by piggy backing on this process by
|
||||
* simply not sync'ing the appended Entry. It can also be sync'd by other
|
||||
* non-deferred log flushed entries outside of this thread.
|
||||
/* The work of current write process of HLog goes as below:
|
||||
* 1). All write handler threads append edits to HLog's local pending buffer;
|
||||
* (it notifies AsyncWriter thread that there is new edits in local buffer)
|
||||
* 2). All write handler threads wait in HLog.syncer() function for underlying threads to
|
||||
* finish the sync that contains its txid;
|
||||
* 3). An AsyncWriter thread is responsible for retrieving all edits in HLog's
|
||||
* local pending buffer and writing to the hdfs (hlog.writer.append);
|
||||
* (it notifies AsyncSyncer threads that there is new writes to hdfs which needs a sync)
|
||||
* 4). AsyncSyncer threads are responsible for issuing sync request to hdfs to persist the
|
||||
* writes by AsyncWriter; (they notify the AsyncNotifier thread that sync is done)
|
||||
* 5). An AsyncNotifier thread is responsible for notifying all pending write handler
|
||||
* threads which are waiting in the HLog.syncer() function
|
||||
* 6). No LogSyncer thread any more (since there is always AsyncWriter/AsyncFlusher threads
|
||||
* do the same job it does)
|
||||
* note: more than one AsyncSyncer threads are needed here to guarantee good enough performance
|
||||
* when less concurrent write handler threads. since sync is the most time-consuming
|
||||
* operation in the whole write process, multiple AsyncSyncer threads can provide better
|
||||
* parallelism of sync to get better overall throughput
|
||||
*/
|
||||
class LogSyncer extends HasThread {
|
||||
// thread to write locally buffered writes to HDFS
|
||||
private class AsyncWriter extends HasThread {
|
||||
private long pendingTxid = 0;
|
||||
private long txidToWrite = 0;
|
||||
private long lastWrittenTxid = 0;
|
||||
private Object writeLock = new Object();
|
||||
|
||||
private final long optionalFlushInterval;
|
||||
|
||||
private final AtomicBoolean closeLogSyncer = new AtomicBoolean(false);
|
||||
|
||||
// List of pending writes to the HLog. There corresponds to transactions
|
||||
// that have not yet returned to the client. We keep them cached here
|
||||
// instead of writing them to HDFS piecemeal, because the HDFS write
|
||||
// method is pretty heavyweight as far as locking is concerned. The
|
||||
// goal is to increase the batchsize for writing-to-hdfs as well as
|
||||
// sync-to-hdfs, so that we can get better system throughput.
|
||||
private List<Entry> pendingWrites = new LinkedList<Entry>();
|
||||
|
||||
LogSyncer(long optionalFlushInterval) {
|
||||
this.optionalFlushInterval = optionalFlushInterval;
|
||||
public AsyncWriter(String name) {
|
||||
super(name);
|
||||
}
|
||||
|
||||
// wake up (called by (write) handler thread) AsyncWriter thread
|
||||
// to write buffered writes to HDFS
|
||||
public void setPendingTxid(long txid) {
|
||||
synchronized (this.writeLock) {
|
||||
if (txid <= this.pendingTxid)
|
||||
return;
|
||||
|
||||
this.pendingTxid = txid;
|
||||
this.writeLock.notify();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
// awaiting with a timeout doesn't always
|
||||
// throw exceptions on interrupt
|
||||
while(!this.isInterrupted() && !closeLogSyncer.get()) {
|
||||
while (!this.isInterrupted()) {
|
||||
// 1. wait until there is new writes in local buffer
|
||||
synchronized (this.writeLock) {
|
||||
while (this.pendingTxid <= this.lastWrittenTxid) {
|
||||
this.writeLock.wait();
|
||||
}
|
||||
}
|
||||
|
||||
// 2. get all buffered writes and update 'real' pendingTxid
|
||||
// since maybe newer writes enter buffer as AsyncWriter wakes
|
||||
// up and holds the lock
|
||||
// NOTE! can't hold 'updateLock' here since rollWriter will pend
|
||||
// on 'sync()' with 'updateLock', but 'sync()' will wait for
|
||||
// AsyncWriter/AsyncSyncer/AsyncNotifier series. without updateLock
|
||||
// can leads to pendWrites more than pendingTxid, but not problem
|
||||
List<Entry> pendWrites = null;
|
||||
synchronized (pendingWritesLock) {
|
||||
this.txidToWrite = unflushedEntries.get();
|
||||
pendWrites = pendingWrites;
|
||||
pendingWrites = new LinkedList<Entry>();
|
||||
}
|
||||
|
||||
// 3. write all buffered writes to HDFS(append, without sync)
|
||||
try {
|
||||
if (unflushedEntries.get() <= syncedTillHere) {
|
||||
synchronized (closeLogSyncer) {
|
||||
closeLogSyncer.wait(this.optionalFlushInterval);
|
||||
for (Entry e : pendWrites) {
|
||||
writer.append(e);
|
||||
}
|
||||
}
|
||||
// Calling sync since we waited or had unflushed entries.
|
||||
// Entries appended but not sync'd are taken care of here AKA
|
||||
// deferred log flush
|
||||
sync();
|
||||
} catch(IOException e) {
|
||||
LOG.error("Error while syncing, requesting close of hlog ", e);
|
||||
LOG.error("Error while AsyncWriter write, request close of hlog ", e);
|
||||
requestLogRoll();
|
||||
Threads.sleep(this.optionalFlushInterval);
|
||||
|
||||
asyncIOE = e;
|
||||
failedTxid.set(this.txidToWrite);
|
||||
}
|
||||
|
||||
// 4. update 'lastWrittenTxid' and notify AsyncSyncer to do 'sync'
|
||||
this.lastWrittenTxid = this.txidToWrite;
|
||||
boolean hasIdleSyncer = false;
|
||||
for (int i = 0; i < asyncSyncers.length; ++i) {
|
||||
if (!asyncSyncers[i].isSyncing()) {
|
||||
hasIdleSyncer = true;
|
||||
asyncSyncers[i].setWrittenTxid(this.lastWrittenTxid);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!hasIdleSyncer) {
|
||||
int idx = (int)this.lastWrittenTxid % asyncSyncers.length;
|
||||
asyncSyncers[idx].setWrittenTxid(this.lastWrittenTxid);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug(getName() + " interrupted while waiting for sync requests");
|
||||
LOG.debug(getName() + " interrupted while waiting for " +
|
||||
"newer writes added to local buffer");
|
||||
} catch (Exception e) {
|
||||
LOG.error("UNEXPECTED", e);
|
||||
} finally {
|
||||
LOG.info(getName() + " exiting");
|
||||
}
|
||||
}
|
||||
|
||||
// appends new writes to the pendingWrites. It is better to keep it in
|
||||
// our own queue rather than writing it to the HDFS output stream because
|
||||
// HDFSOutputStream.writeChunk is not lightweight at all.
|
||||
synchronized void append(Entry e) throws IOException {
|
||||
pendingWrites.add(e);
|
||||
}
|
||||
|
||||
// Returns all currently pending writes. New writes
|
||||
// will accumulate in a new list.
|
||||
synchronized List<Entry> getPendingWrites() {
|
||||
List<Entry> save = this.pendingWrites;
|
||||
this.pendingWrites = new LinkedList<Entry>();
|
||||
return save;
|
||||
// thread to request HDFS to sync the WALEdits written by AsyncWriter
|
||||
// to make those WALEdits durable on HDFS side
|
||||
private class AsyncSyncer extends HasThread {
|
||||
private long writtenTxid = 0;
|
||||
private long txidToSync = 0;
|
||||
private long lastSyncedTxid = 0;
|
||||
private volatile boolean isSyncing = false;
|
||||
private Object syncLock = new Object();
|
||||
|
||||
public AsyncSyncer(String name) {
|
||||
super(name);
|
||||
}
|
||||
|
||||
// writes out pending entries to the HLog
|
||||
void hlogFlush(Writer writer, List<Entry> pending) throws IOException {
|
||||
if (pending == null) return;
|
||||
public boolean isSyncing() {
|
||||
return this.isSyncing;
|
||||
}
|
||||
|
||||
// write out all accumulated Entries to hdfs.
|
||||
for (Entry e : pending) {
|
||||
writer.append(e);
|
||||
// wake up (called by AsyncWriter thread) AsyncSyncer thread
|
||||
// to sync(flush) writes written by AsyncWriter in HDFS
|
||||
public void setWrittenTxid(long txid) {
|
||||
synchronized (this.syncLock) {
|
||||
if (txid <= this.writtenTxid)
|
||||
return;
|
||||
|
||||
this.writtenTxid = txid;
|
||||
this.syncLock.notify();
|
||||
}
|
||||
}
|
||||
|
||||
void close() {
|
||||
synchronized (closeLogSyncer) {
|
||||
closeLogSyncer.set(true);
|
||||
closeLogSyncer.notifyAll();
|
||||
public void run() {
|
||||
try {
|
||||
while (!this.isInterrupted()) {
|
||||
// 1. wait until AsyncWriter has written data to HDFS and
|
||||
// called setWrittenTxid to wake up us
|
||||
synchronized (this.syncLock) {
|
||||
while (this.writtenTxid <= this.lastSyncedTxid) {
|
||||
this.syncLock.wait();
|
||||
}
|
||||
this.txidToSync = this.writtenTxid;
|
||||
}
|
||||
|
||||
// 2. do 'sync' to HDFS to provide durability
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
try {
|
||||
this.isSyncing = true;
|
||||
if (writer != null) writer.sync();
|
||||
this.isSyncing = false;
|
||||
} catch (IOException e) {
|
||||
LOG.fatal("Error while AsyncSyncer sync, request close of hlog ", e);
|
||||
requestLogRoll();
|
||||
|
||||
asyncIOE = e;
|
||||
failedTxid.set(this.txidToSync);
|
||||
}
|
||||
metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);
|
||||
|
||||
// 3. wake up AsyncNotifier to notify(wake-up) all pending 'put'
|
||||
// handler threads on 'sync()'
|
||||
this.lastSyncedTxid = this.txidToSync;
|
||||
asyncNotifier.setFlushedTxid(this.lastSyncedTxid);
|
||||
|
||||
// 4. check and do logRoll if needed
|
||||
if (!logRollRunning) {
|
||||
checkLowReplication();
|
||||
try {
|
||||
if (writer != null && writer.getLength() > logrollsize) {
|
||||
requestLogRoll();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("writer.getLength() failed,this failure won't block here");
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug(getName() + " interrupted while waiting for " +
|
||||
"notification from AsyncWriter thread");
|
||||
} catch (Exception e) {
|
||||
LOG.error("UNEXPECTED", e);
|
||||
} finally {
|
||||
LOG.info(getName() + " exiting");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// thread to notify all write handler threads which are pending on
|
||||
// their written WALEdits' durability(sync)
|
||||
// why an extra 'notifier' thread is needed rather than letting
|
||||
// AsyncSyncer thread itself notifies when sync is done is to let
|
||||
// AsyncSyncer thread do next sync as soon as possible since 'notify'
|
||||
// has heavy synchronization with all pending write handler threads
|
||||
private class AsyncNotifier extends HasThread {
|
||||
private long flushedTxid = 0;
|
||||
private long lastNotifiedTxid = 0;
|
||||
private Object notifyLock = new Object();
|
||||
|
||||
public AsyncNotifier(String name) {
|
||||
super(name);
|
||||
}
|
||||
|
||||
public void setFlushedTxid(long txid) {
|
||||
synchronized (this.notifyLock) {
|
||||
if (txid <= this.flushedTxid) {
|
||||
return;
|
||||
}
|
||||
|
||||
this.flushedTxid = txid;
|
||||
this.notifyLock.notify();
|
||||
}
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
while (!this.isInterrupted()) {
|
||||
synchronized (this.notifyLock) {
|
||||
while (this.flushedTxid <= this.lastNotifiedTxid) {
|
||||
this.notifyLock.wait();
|
||||
}
|
||||
this.lastNotifiedTxid = this.flushedTxid;
|
||||
}
|
||||
|
||||
// notify(wake-up) all pending (write) handler thread
|
||||
// (or logroller thread which also may pend on sync())
|
||||
synchronized (syncedTillHere) {
|
||||
syncedTillHere.set(this.lastNotifiedTxid);
|
||||
syncedTillHere.notifyAll();
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug(getName() + " interrupted while waiting for " +
|
||||
" notification from AsyncSyncer thread");
|
||||
} catch (Exception e) {
|
||||
LOG.error("UNEXPECTED", e);
|
||||
} finally {
|
||||
LOG.info(getName() + " exiting");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1113,95 +1304,20 @@ class FSHLog implements HLog, Syncable {
|
|||
|
||||
// sync all transactions upto the specified txid
|
||||
private void syncer(long txid) throws IOException {
|
||||
// if the transaction that we are interested in is already
|
||||
// synced, then return immediately.
|
||||
if (txid <= this.syncedTillHere) {
|
||||
return;
|
||||
}
|
||||
Writer tempWriter;
|
||||
synchronized (this.updateLock) {
|
||||
if (this.closed) return;
|
||||
// Guaranteed non-null.
|
||||
// Note that parallel sync can close tempWriter.
|
||||
// The current method of dealing with this is to catch exceptions.
|
||||
// See HBASE-4387, HBASE-5623, HBASE-7329.
|
||||
tempWriter = this.writer;
|
||||
}
|
||||
synchronized (this.syncedTillHere) {
|
||||
while (this.syncedTillHere.get() < txid) {
|
||||
try {
|
||||
long doneUpto;
|
||||
long now = EnvironmentEdgeManager.currentTimeMillis();
|
||||
// First flush all the pending writes to HDFS. Then
|
||||
// issue the sync to HDFS. If sync is successful, then update
|
||||
// syncedTillHere to indicate that transactions till this
|
||||
// number has been successfully synced.
|
||||
IOException ioe = null;
|
||||
List<Entry> pending = null;
|
||||
synchronized (flushLock) {
|
||||
if (txid <= this.syncedTillHere) {
|
||||
return;
|
||||
}
|
||||
doneUpto = this.unflushedEntries.get();
|
||||
pending = logSyncer.getPendingWrites();
|
||||
try {
|
||||
logSyncer.hlogFlush(tempWriter, pending);
|
||||
postAppend(pending);
|
||||
} catch(IOException io) {
|
||||
ioe = io;
|
||||
LOG.error("syncer encountered error, will retry. txid=" + txid, ioe);
|
||||
}
|
||||
}
|
||||
if (ioe != null && pending != null) {
|
||||
synchronized (this.updateLock) {
|
||||
synchronized (flushLock) {
|
||||
// HBASE-4387, HBASE-5623, retry with updateLock held
|
||||
tempWriter = this.writer;
|
||||
logSyncer.hlogFlush(tempWriter, pending);
|
||||
postAppend(pending);
|
||||
}
|
||||
}
|
||||
}
|
||||
// another thread might have sync'ed avoid double-sync'ing
|
||||
if (txid <= this.syncedTillHere) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (tempWriter != null) {
|
||||
tempWriter.sync();
|
||||
postSync();
|
||||
}
|
||||
} catch(IOException ex) {
|
||||
synchronized (this.updateLock) {
|
||||
// HBASE-4387, HBASE-5623, retry with updateLock held
|
||||
// TODO: we don't actually need to do it for concurrent close - what is the point
|
||||
// of syncing new unrelated writer? Keep behavior for now.
|
||||
tempWriter = this.writer;
|
||||
if (tempWriter != null) {
|
||||
tempWriter.sync();
|
||||
postSync();
|
||||
}
|
||||
}
|
||||
}
|
||||
this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
|
||||
this.syncedTillHere.wait();
|
||||
|
||||
this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);
|
||||
// TODO: preserving the old behavior for now, but this check is strange. It's not
|
||||
// protected by any locks here, so for all we know rolling locks might start
|
||||
// as soon as we enter the "if". Is this best-effort optimization check?
|
||||
if (!this.logRollRunning) {
|
||||
checkLowReplication();
|
||||
try {
|
||||
curLogSize = tempWriter.getLength();
|
||||
if (curLogSize > this.logrollsize) {
|
||||
requestLogRoll();
|
||||
if (txid <= this.failedTxid.get()) {
|
||||
assert asyncIOE != null :
|
||||
"current txid is among(under) failed txids, but asyncIOE is null!";
|
||||
throw asyncIOE;
|
||||
}
|
||||
} catch (IOException x) {
|
||||
LOG.debug("Log roll failed and will be retried. (This is not an error)");
|
||||
} catch (InterruptedException e) {
|
||||
LOG.debug("interrupted while waiting for notification from AsyncNotifier");
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.fatal("Could not sync. Requesting roll of hlog", e);
|
||||
requestLogRoll();
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1333,7 +1449,7 @@ class FSHLog implements HLog, Syncable {
|
|||
logKey.setScopes(null);
|
||||
}
|
||||
// write to our buffer for the Hlog file.
|
||||
logSyncer.append(new FSHLog.Entry(logKey, logEdit));
|
||||
this.pendingWrites.add(new HLog.Entry(logKey, logEdit));
|
||||
}
|
||||
long took = EnvironmentEdgeManager.currentTimeMillis() - now;
|
||||
coprocessorHost.postWALWrite(info, logKey, logEdit);
|
||||
|
@ -1489,7 +1605,7 @@ class FSHLog implements HLog, Syncable {
|
|||
|
||||
/** Provide access to currently deferred sequence num for tests */
|
||||
boolean hasDeferredEntries() {
|
||||
return lastDeferredTxid > syncedTillHere;
|
||||
return this.lastDeferredTxid > this.syncedTillHere.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -82,25 +82,27 @@ public class TestDurability {
|
|||
HRegion deferredRegion = createHRegion(tableName, "deferredRegion", wal, true);
|
||||
|
||||
region.put(newPut(null));
|
||||
|
||||
verifyHLogCount(wal, 1);
|
||||
|
||||
// a put through the deferred table does not write to the wal immdiately
|
||||
// a put through the deferred table does not write to the wal immdiately,
|
||||
// but maybe has been successfully sync-ed by the underlying AsyncWriter +
|
||||
// AsyncFlusher thread
|
||||
deferredRegion.put(newPut(null));
|
||||
verifyHLogCount(wal, 1);
|
||||
// but will after we sync the wal
|
||||
wal.sync();
|
||||
verifyHLogCount(wal, 2);
|
||||
|
||||
// a put through a deferred table will be sync with the put sync'ed put
|
||||
deferredRegion.put(newPut(null));
|
||||
verifyHLogCount(wal, 2);
|
||||
wal.sync();
|
||||
verifyHLogCount(wal, 3);
|
||||
region.put(newPut(null));
|
||||
verifyHLogCount(wal, 4);
|
||||
|
||||
// a put through a deferred table will be sync with the put sync'ed put
|
||||
deferredRegion.put(newPut(Durability.USE_DEFAULT));
|
||||
verifyHLogCount(wal, 4);
|
||||
wal.sync();
|
||||
verifyHLogCount(wal, 5);
|
||||
region.put(newPut(Durability.USE_DEFAULT));
|
||||
verifyHLogCount(wal, 6);
|
||||
|
||||
|
@ -114,7 +116,6 @@ public class TestDurability {
|
|||
// async overrides sync table default
|
||||
region.put(newPut(Durability.ASYNC_WAL));
|
||||
deferredRegion.put(newPut(Durability.ASYNC_WAL));
|
||||
verifyHLogCount(wal, 6);
|
||||
wal.sync();
|
||||
verifyHLogCount(wal, 8);
|
||||
|
||||
|
|
|
@ -151,10 +151,8 @@ public class TestLogRollAbort {
|
|||
dfsCluster.restartDataNodes();
|
||||
LOG.info("Restarted datanodes");
|
||||
|
||||
assertTrue("Should have an outstanding WAL edit", ((FSHLog) log).hasDeferredEntries());
|
||||
try {
|
||||
log.rollWriter(true);
|
||||
fail("Log roll should have triggered FailedLogCloseException");
|
||||
} catch (FailedLogCloseException flce) {
|
||||
assertTrue("Should have deferred flush log edits outstanding",
|
||||
((FSHLog) log).hasDeferredEntries());
|
||||
|
|
Loading…
Reference in New Issue