diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 542b393ea2e..e8cc56c162b 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -203,12 +203,6 @@ possible configurations would overwhelm and obscure the important.
Interval between messages from the RegionServer to Master
in milliseconds.
-
- hbase.regionserver.optionallogflushinterval
- 1000
- Sync the HLog to the HDFS after this interval if it has not
- accumulated enough entries to trigger a sync. Units: milliseconds.
-
hbase.regionserver.regionSplitLimit
2147483647
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 1c69d0ff890..0528a7046ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.util.DrainBarrier;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HasThread;
-import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.util.StringUtils;
import org.cloudera.htrace.Trace;
import org.cloudera.htrace.TraceScope;
@@ -118,15 +117,19 @@ class FSHLog implements HLog, Syncable {
// Listeners that are called on WAL events.
private List listeners =
new CopyOnWriteArrayList();
- private final long optionalFlushInterval;
private final long blocksize;
private final String prefix;
private final AtomicLong unflushedEntries = new AtomicLong(0);
- private volatile long syncedTillHere = 0;
+ private final AtomicLong syncedTillHere = new AtomicLong(0);
private long lastDeferredTxid;
private final Path oldLogDir;
private volatile boolean logRollRunning;
+ // all writes pending on AsyncWriter/AsyncSyncer thread with
+ // txid <= failedTxid will fail by throwing asyncIOE
+ private final AtomicLong failedTxid = new AtomicLong(0);
+ private volatile IOException asyncIOE = null;
+
private WALCoprocessorHost coprocessorHost;
private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer
@@ -208,7 +211,7 @@ class FSHLog implements HLog, Syncable {
// during an update
// locked during appends
private final Object updateLock = new Object();
- private final Object flushLock = new Object();
+ private final Object pendingWritesLock = new Object();
private final boolean enabled;
@@ -219,10 +222,20 @@ class FSHLog implements HLog, Syncable {
*/
private final int maxLogs;
- /**
- * Thread that handles optional sync'ing
- */
- private final LogSyncer logSyncer;
+ // List of pending writes to the HLog. There corresponds to transactions
+ // that have not yet returned to the client. We keep them cached here
+ // instead of writing them to HDFS piecemeal. The goal is to increase
+ // the batchsize for writing-to-hdfs as well as sync-to-hdfs, so that
+ // we can get better system throughput.
+ private List pendingWrites = new LinkedList();
+
+ private final AsyncWriter asyncWriter;
+ // since AsyncSyncer takes much longer than other phase(add WALEdits to local
+ // buffer, write local buffer to HDFS, notify pending write handler threads),
+ // when a sync is ongoing, all other phase pend, we use multiple parallel
+ // AsyncSyncer threads to improve overall throughput.
+ private final AsyncSyncer[] asyncSyncers;
+ private final AsyncNotifier asyncNotifier;
/** Number of log close errors tolerated before we abort */
private final int closeErrorsTolerated;
@@ -368,8 +381,6 @@ class FSHLog implements HLog, Syncable {
// Roll at 95% of block size.
float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f);
this.logrollsize = (long)(this.blocksize * multi);
- this.optionalFlushInterval =
- conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32);
this.minTolerableReplication = conf.getInt(
@@ -381,13 +392,11 @@ class FSHLog implements HLog, Syncable {
this.closeErrorsTolerated = conf.getInt(
"hbase.regionserver.logroll.errors.tolerated", 0);
- this.logSyncer = new LogSyncer(this.optionalFlushInterval);
LOG.info("WAL/HLog configuration: blocksize=" +
StringUtils.byteDesc(this.blocksize) +
", rollsize=" + StringUtils.byteDesc(this.logrollsize) +
- ", enabled=" + this.enabled +
- ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
+ ", enabled=" + this.enabled);
// If prefix is null||empty then just name it hlog
this.prefix = prefix == null || prefix.isEmpty() ?
"hlog" : URLEncoder.encode(prefix, "UTF8");
@@ -411,15 +420,22 @@ class FSHLog implements HLog, Syncable {
// handle the reflection necessary to call getNumCurrentReplicas()
this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out);
- // When optionalFlushInterval is set as 0, don't start a thread for deferred log sync.
- if (this.optionalFlushInterval > 0) {
- Threads.setDaemonThreadRunning(logSyncer.getThread(), Thread.currentThread().getName()
- + ".logSyncer");
- } else {
- LOG.info("hbase.regionserver.optionallogflushinterval is set as "
- + this.optionalFlushInterval + ". Deferred log syncing won't work. "
- + "Any Mutation, marked to be deferred synced, will be flushed immediately.");
+ final String n = Thread.currentThread().getName();
+
+
+ asyncWriter = new AsyncWriter(n + "-WAL.AsyncWriter");
+ asyncWriter.start();
+
+ int syncerNums = conf.getInt("hbase.hlog.asyncer.number", 5);
+ asyncSyncers = new AsyncSyncer[syncerNums];
+ for (int i = 0; i < asyncSyncers.length; ++i) {
+ asyncSyncers[i] = new AsyncSyncer(n + "-WAL.AsyncSyncer" + i);
+ asyncSyncers[i].start();
}
+
+ asyncNotifier = new AsyncNotifier(n + "-WAL.AsyncNotifier");
+ asyncNotifier.start();
+
coprocessorHost = new WALCoprocessorHost(this, conf);
this.metrics = new MetricsWAL();
@@ -735,11 +751,11 @@ class FSHLog implements HLog, Syncable {
try {
// Wait till all current transactions are written to the hlog.
// No new transactions can occur because we have the updatelock.
- if (this.unflushedEntries.get() != this.syncedTillHere) {
+ if (this.unflushedEntries.get() != this.syncedTillHere.get()) {
LOG.debug("cleanupCurrentWriter " +
" waiting for transactions to get synced " +
" total " + this.unflushedEntries.get() +
- " synced till here " + syncedTillHere);
+ " synced till here " + this.syncedTillHere.get());
sync();
}
this.writer.close();
@@ -874,17 +890,33 @@ class FSHLog implements HLog, Syncable {
if (this.closed) {
return;
}
- // When optionalFlushInterval is 0, the logSyncer is not started as a Thread.
- if (this.optionalFlushInterval > 0) {
+
+ try {
+ asyncNotifier.interrupt();
+ asyncNotifier.join();
+ } catch (InterruptedException e) {
+ LOG.error("Exception while waiting for " + asyncNotifier.getName() +
+ " threads to die", e);
+ }
+
+ for (int i = 0; i < asyncSyncers.length; ++i) {
try {
- logSyncer.close();
- // Make sure we synced everything
- logSyncer.join(this.optionalFlushInterval * 2);
+ asyncSyncers[i].interrupt();
+ asyncSyncers[i].join();
} catch (InterruptedException e) {
- LOG.error("Exception while waiting for syncer thread to die", e);
- Thread.currentThread().interrupt();
+ LOG.error("Exception while waiting for " + asyncSyncers[i].getName() +
+ " threads to die", e);
}
}
+
+ try {
+ asyncWriter.interrupt();
+ asyncWriter.join();
+ } catch (InterruptedException e) {
+ LOG.error("Exception while waiting for " + asyncWriter.getName() +
+ " thread to die", e);
+ }
+
try {
// Prevent all further flushing and rolling.
closeBarrier.stopAndDrainOps();
@@ -985,9 +1017,14 @@ class FSHLog implements HLog, Syncable {
if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum);
HLogKey logKey = makeKey(
encodedRegionName, tableName, seqNum, now, clusterIds, nonceGroup, nonce);
- doWrite(info, logKey, edits, htd);
+
+ synchronized (pendingWritesLock) {
+ doWrite(info, logKey, edits, htd);
+ txid = this.unflushedEntries.incrementAndGet();
+ }
this.numEntries.incrementAndGet();
- txid = this.unflushedEntries.incrementAndGet();
+ this.asyncWriter.setPendingTxid(txid);
+
if (htd.isDeferredLogFlush()) {
lastDeferredTxid = txid;
}
@@ -1017,91 +1054,245 @@ class FSHLog implements HLog, Syncable {
now, htd, false, isInMemstore, sequenceId, nonceGroup, nonce);
}
- /**
- * This class is responsible to hold the HLog's appended Entry list
- * and to sync them according to a configurable interval.
- *
- * Deferred log flushing works first by piggy backing on this process by
- * simply not sync'ing the appended Entry. It can also be sync'd by other
- * non-deferred log flushed entries outside of this thread.
+ /* The work of current write process of HLog goes as below:
+ * 1). All write handler threads append edits to HLog's local pending buffer;
+ * (it notifies AsyncWriter thread that there is new edits in local buffer)
+ * 2). All write handler threads wait in HLog.syncer() function for underlying threads to
+ * finish the sync that contains its txid;
+ * 3). An AsyncWriter thread is responsible for retrieving all edits in HLog's
+ * local pending buffer and writing to the hdfs (hlog.writer.append);
+ * (it notifies AsyncSyncer threads that there is new writes to hdfs which needs a sync)
+ * 4). AsyncSyncer threads are responsible for issuing sync request to hdfs to persist the
+ * writes by AsyncWriter; (they notify the AsyncNotifier thread that sync is done)
+ * 5). An AsyncNotifier thread is responsible for notifying all pending write handler
+ * threads which are waiting in the HLog.syncer() function
+ * 6). No LogSyncer thread any more (since there is always AsyncWriter/AsyncFlusher threads
+ * do the same job it does)
+ * note: more than one AsyncSyncer threads are needed here to guarantee good enough performance
+ * when less concurrent write handler threads. since sync is the most time-consuming
+ * operation in the whole write process, multiple AsyncSyncer threads can provide better
+ * parallelism of sync to get better overall throughput
*/
- class LogSyncer extends HasThread {
+ // thread to write locally buffered writes to HDFS
+ private class AsyncWriter extends HasThread {
+ private long pendingTxid = 0;
+ private long txidToWrite = 0;
+ private long lastWrittenTxid = 0;
+ private Object writeLock = new Object();
- private final long optionalFlushInterval;
-
- private final AtomicBoolean closeLogSyncer = new AtomicBoolean(false);
-
- // List of pending writes to the HLog. There corresponds to transactions
- // that have not yet returned to the client. We keep them cached here
- // instead of writing them to HDFS piecemeal, because the HDFS write
- // method is pretty heavyweight as far as locking is concerned. The
- // goal is to increase the batchsize for writing-to-hdfs as well as
- // sync-to-hdfs, so that we can get better system throughput.
- private List pendingWrites = new LinkedList();
-
- LogSyncer(long optionalFlushInterval) {
- this.optionalFlushInterval = optionalFlushInterval;
+ public AsyncWriter(String name) {
+ super(name);
+ }
+
+ // wake up (called by (write) handler thread) AsyncWriter thread
+ // to write buffered writes to HDFS
+ public void setPendingTxid(long txid) {
+ synchronized (this.writeLock) {
+ if (txid <= this.pendingTxid)
+ return;
+
+ this.pendingTxid = txid;
+ this.writeLock.notify();
+ }
}
- @Override
public void run() {
try {
- // awaiting with a timeout doesn't always
- // throw exceptions on interrupt
- while(!this.isInterrupted() && !closeLogSyncer.get()) {
-
- try {
- if (unflushedEntries.get() <= syncedTillHere) {
- synchronized (closeLogSyncer) {
- closeLogSyncer.wait(this.optionalFlushInterval);
- }
+ while (!this.isInterrupted()) {
+ // 1. wait until there is new writes in local buffer
+ synchronized (this.writeLock) {
+ while (this.pendingTxid <= this.lastWrittenTxid) {
+ this.writeLock.wait();
}
- // Calling sync since we waited or had unflushed entries.
- // Entries appended but not sync'd are taken care of here AKA
- // deferred log flush
- sync();
- } catch (IOException e) {
- LOG.error("Error while syncing, requesting close of hlog ", e);
+ }
+
+ // 2. get all buffered writes and update 'real' pendingTxid
+ // since maybe newer writes enter buffer as AsyncWriter wakes
+ // up and holds the lock
+ // NOTE! can't hold 'updateLock' here since rollWriter will pend
+ // on 'sync()' with 'updateLock', but 'sync()' will wait for
+ // AsyncWriter/AsyncSyncer/AsyncNotifier series. without updateLock
+ // can leads to pendWrites more than pendingTxid, but not problem
+ List pendWrites = null;
+ synchronized (pendingWritesLock) {
+ this.txidToWrite = unflushedEntries.get();
+ pendWrites = pendingWrites;
+ pendingWrites = new LinkedList();
+ }
+
+ // 3. write all buffered writes to HDFS(append, without sync)
+ try {
+ for (Entry e : pendWrites) {
+ writer.append(e);
+ }
+ } catch(IOException e) {
+ LOG.error("Error while AsyncWriter write, request close of hlog ", e);
requestLogRoll();
- Threads.sleep(this.optionalFlushInterval);
+
+ asyncIOE = e;
+ failedTxid.set(this.txidToWrite);
+ }
+
+ // 4. update 'lastWrittenTxid' and notify AsyncSyncer to do 'sync'
+ this.lastWrittenTxid = this.txidToWrite;
+ boolean hasIdleSyncer = false;
+ for (int i = 0; i < asyncSyncers.length; ++i) {
+ if (!asyncSyncers[i].isSyncing()) {
+ hasIdleSyncer = true;
+ asyncSyncers[i].setWrittenTxid(this.lastWrittenTxid);
+ break;
+ }
+ }
+ if (!hasIdleSyncer) {
+ int idx = (int)this.lastWrittenTxid % asyncSyncers.length;
+ asyncSyncers[idx].setWrittenTxid(this.lastWrittenTxid);
}
}
} catch (InterruptedException e) {
- LOG.debug(getName() + " interrupted while waiting for sync requests");
+ LOG.debug(getName() + " interrupted while waiting for " +
+ "newer writes added to local buffer");
+ } catch (Exception e) {
+ LOG.error("UNEXPECTED", e);
} finally {
LOG.info(getName() + " exiting");
}
}
+ }
- // appends new writes to the pendingWrites. It is better to keep it in
- // our own queue rather than writing it to the HDFS output stream because
- // HDFSOutputStream.writeChunk is not lightweight at all.
- synchronized void append(Entry e) throws IOException {
- pendingWrites.add(e);
+ // thread to request HDFS to sync the WALEdits written by AsyncWriter
+ // to make those WALEdits durable on HDFS side
+ private class AsyncSyncer extends HasThread {
+ private long writtenTxid = 0;
+ private long txidToSync = 0;
+ private long lastSyncedTxid = 0;
+ private volatile boolean isSyncing = false;
+ private Object syncLock = new Object();
+
+ public AsyncSyncer(String name) {
+ super(name);
}
- // Returns all currently pending writes. New writes
- // will accumulate in a new list.
- synchronized List getPendingWrites() {
- List save = this.pendingWrites;
- this.pendingWrites = new LinkedList();
- return save;
+ public boolean isSyncing() {
+ return this.isSyncing;
}
- // writes out pending entries to the HLog
- void hlogFlush(Writer writer, List pending) throws IOException {
- if (pending == null) return;
+ // wake up (called by AsyncWriter thread) AsyncSyncer thread
+ // to sync(flush) writes written by AsyncWriter in HDFS
+ public void setWrittenTxid(long txid) {
+ synchronized (this.syncLock) {
+ if (txid <= this.writtenTxid)
+ return;
- // write out all accumulated Entries to hdfs.
- for (Entry e : pending) {
- writer.append(e);
+ this.writtenTxid = txid;
+ this.syncLock.notify();
}
}
- void close() {
- synchronized (closeLogSyncer) {
- closeLogSyncer.set(true);
- closeLogSyncer.notifyAll();
+ public void run() {
+ try {
+ while (!this.isInterrupted()) {
+ // 1. wait until AsyncWriter has written data to HDFS and
+ // called setWrittenTxid to wake up us
+ synchronized (this.syncLock) {
+ while (this.writtenTxid <= this.lastSyncedTxid) {
+ this.syncLock.wait();
+ }
+ this.txidToSync = this.writtenTxid;
+ }
+
+ // 2. do 'sync' to HDFS to provide durability
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ try {
+ this.isSyncing = true;
+ if (writer != null) writer.sync();
+ this.isSyncing = false;
+ } catch (IOException e) {
+ LOG.fatal("Error while AsyncSyncer sync, request close of hlog ", e);
+ requestLogRoll();
+
+ asyncIOE = e;
+ failedTxid.set(this.txidToSync);
+ }
+ metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);
+
+ // 3. wake up AsyncNotifier to notify(wake-up) all pending 'put'
+ // handler threads on 'sync()'
+ this.lastSyncedTxid = this.txidToSync;
+ asyncNotifier.setFlushedTxid(this.lastSyncedTxid);
+
+ // 4. check and do logRoll if needed
+ if (!logRollRunning) {
+ checkLowReplication();
+ try {
+ if (writer != null && writer.getLength() > logrollsize) {
+ requestLogRoll();
+ }
+ } catch (IOException e) {
+ LOG.warn("writer.getLength() failed,this failure won't block here");
+ }
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.debug(getName() + " interrupted while waiting for " +
+ "notification from AsyncWriter thread");
+ } catch (Exception e) {
+ LOG.error("UNEXPECTED", e);
+ } finally {
+ LOG.info(getName() + " exiting");
+ }
+ }
+ }
+
+ // thread to notify all write handler threads which are pending on
+ // their written WALEdits' durability(sync)
+ // why an extra 'notifier' thread is needed rather than letting
+ // AsyncSyncer thread itself notifies when sync is done is to let
+ // AsyncSyncer thread do next sync as soon as possible since 'notify'
+ // has heavy synchronization with all pending write handler threads
+ private class AsyncNotifier extends HasThread {
+ private long flushedTxid = 0;
+ private long lastNotifiedTxid = 0;
+ private Object notifyLock = new Object();
+
+ public AsyncNotifier(String name) {
+ super(name);
+ }
+
+ public void setFlushedTxid(long txid) {
+ synchronized (this.notifyLock) {
+ if (txid <= this.flushedTxid) {
+ return;
+ }
+
+ this.flushedTxid = txid;
+ this.notifyLock.notify();
+ }
+ }
+
+ public void run() {
+ try {
+ while (!this.isInterrupted()) {
+ synchronized (this.notifyLock) {
+ while (this.flushedTxid <= this.lastNotifiedTxid) {
+ this.notifyLock.wait();
+ }
+ this.lastNotifiedTxid = this.flushedTxid;
+ }
+
+ // notify(wake-up) all pending (write) handler thread
+ // (or logroller thread which also may pend on sync())
+ synchronized (syncedTillHere) {
+ syncedTillHere.set(this.lastNotifiedTxid);
+ syncedTillHere.notifyAll();
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.debug(getName() + " interrupted while waiting for " +
+ " notification from AsyncSyncer thread");
+ } catch (Exception e) {
+ LOG.error("UNEXPECTED", e);
+ } finally {
+ LOG.info(getName() + " exiting");
}
}
}
@@ -1113,95 +1304,20 @@ class FSHLog implements HLog, Syncable {
// sync all transactions upto the specified txid
private void syncer(long txid) throws IOException {
- // if the transaction that we are interested in is already
- // synced, then return immediately.
- if (txid <= this.syncedTillHere) {
- return;
- }
- Writer tempWriter;
- synchronized (this.updateLock) {
- if (this.closed) return;
- // Guaranteed non-null.
- // Note that parallel sync can close tempWriter.
- // The current method of dealing with this is to catch exceptions.
- // See HBASE-4387, HBASE-5623, HBASE-7329.
- tempWriter = this.writer;
- }
- try {
- long doneUpto;
- long now = EnvironmentEdgeManager.currentTimeMillis();
- // First flush all the pending writes to HDFS. Then
- // issue the sync to HDFS. If sync is successful, then update
- // syncedTillHere to indicate that transactions till this
- // number has been successfully synced.
- IOException ioe = null;
- List pending = null;
- synchronized (flushLock) {
- if (txid <= this.syncedTillHere) {
- return;
- }
- doneUpto = this.unflushedEntries.get();
- pending = logSyncer.getPendingWrites();
+ synchronized (this.syncedTillHere) {
+ while (this.syncedTillHere.get() < txid) {
try {
- logSyncer.hlogFlush(tempWriter, pending);
- postAppend(pending);
- } catch(IOException io) {
- ioe = io;
- LOG.error("syncer encountered error, will retry. txid=" + txid, ioe);
- }
- }
- if (ioe != null && pending != null) {
- synchronized (this.updateLock) {
- synchronized (flushLock) {
- // HBASE-4387, HBASE-5623, retry with updateLock held
- tempWriter = this.writer;
- logSyncer.hlogFlush(tempWriter, pending);
- postAppend(pending);
- }
- }
- }
- // another thread might have sync'ed avoid double-sync'ing
- if (txid <= this.syncedTillHere) {
- return;
- }
- try {
- if (tempWriter != null) {
- tempWriter.sync();
- postSync();
- }
- } catch(IOException ex) {
- synchronized (this.updateLock) {
- // HBASE-4387, HBASE-5623, retry with updateLock held
- // TODO: we don't actually need to do it for concurrent close - what is the point
- // of syncing new unrelated writer? Keep behavior for now.
- tempWriter = this.writer;
- if (tempWriter != null) {
- tempWriter.sync();
- postSync();
- }
- }
- }
- this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto);
+ this.syncedTillHere.wait();
- this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now);
- // TODO: preserving the old behavior for now, but this check is strange. It's not
- // protected by any locks here, so for all we know rolling locks might start
- // as soon as we enter the "if". Is this best-effort optimization check?
- if (!this.logRollRunning) {
- checkLowReplication();
- try {
- curLogSize = tempWriter.getLength();
- if (curLogSize > this.logrollsize) {
- requestLogRoll();
+ if (txid <= this.failedTxid.get()) {
+ assert asyncIOE != null :
+ "current txid is among(under) failed txids, but asyncIOE is null!";
+ throw asyncIOE;
}
- } catch (IOException x) {
- LOG.debug("Log roll failed and will be retried. (This is not an error)");
+ } catch (InterruptedException e) {
+ LOG.debug("interrupted while waiting for notification from AsyncNotifier");
}
}
- } catch (IOException e) {
- LOG.fatal("Could not sync. Requesting roll of hlog", e);
- requestLogRoll();
- throw e;
}
}
@@ -1333,7 +1449,7 @@ class FSHLog implements HLog, Syncable {
logKey.setScopes(null);
}
// write to our buffer for the Hlog file.
- logSyncer.append(new FSHLog.Entry(logKey, logEdit));
+ this.pendingWrites.add(new HLog.Entry(logKey, logEdit));
}
long took = EnvironmentEdgeManager.currentTimeMillis() - now;
coprocessorHost.postWALWrite(info, logKey, logEdit);
@@ -1489,7 +1605,7 @@ class FSHLog implements HLog, Syncable {
/** Provide access to currently deferred sequence num for tests */
boolean hasDeferredEntries() {
- return lastDeferredTxid > syncedTillHere;
+ return this.lastDeferredTxid > this.syncedTillHere.get();
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
index 6ad6dafa247..8f74ce7eefd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java
@@ -82,25 +82,27 @@ public class TestDurability {
HRegion deferredRegion = createHRegion(tableName, "deferredRegion", wal, true);
region.put(newPut(null));
-
verifyHLogCount(wal, 1);
- // a put through the deferred table does not write to the wal immdiately
+ // a put through the deferred table does not write to the wal immdiately,
+ // but maybe has been successfully sync-ed by the underlying AsyncWriter +
+ // AsyncFlusher thread
deferredRegion.put(newPut(null));
- verifyHLogCount(wal, 1);
// but will after we sync the wal
wal.sync();
verifyHLogCount(wal, 2);
// a put through a deferred table will be sync with the put sync'ed put
deferredRegion.put(newPut(null));
- verifyHLogCount(wal, 2);
+ wal.sync();
+ verifyHLogCount(wal, 3);
region.put(newPut(null));
verifyHLogCount(wal, 4);
// a put through a deferred table will be sync with the put sync'ed put
deferredRegion.put(newPut(Durability.USE_DEFAULT));
- verifyHLogCount(wal, 4);
+ wal.sync();
+ verifyHLogCount(wal, 5);
region.put(newPut(Durability.USE_DEFAULT));
verifyHLogCount(wal, 6);
@@ -114,7 +116,6 @@ public class TestDurability {
// async overrides sync table default
region.put(newPut(Durability.ASYNC_WAL));
deferredRegion.put(newPut(Durability.ASYNC_WAL));
- verifyHLogCount(wal, 6);
wal.sync();
verifyHLogCount(wal, 8);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
index 618e85d6d45..9320aed2896 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java
@@ -151,10 +151,8 @@ public class TestLogRollAbort {
dfsCluster.restartDataNodes();
LOG.info("Restarted datanodes");
- assertTrue("Should have an outstanding WAL edit", ((FSHLog) log).hasDeferredEntries());
try {
log.rollWriter(true);
- fail("Log roll should have triggered FailedLogCloseException");
} catch (FailedLogCloseException flce) {
assertTrue("Should have deferred flush log edits outstanding",
((FSHLog) log).hasDeferredEntries());