From 4ee8b16d67afc2b02562e303a396970d65eb5163 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Fri, 13 Dec 2013 17:32:09 +0000 Subject: [PATCH] HBASE-8755 A new write thread model for HLog to improve the overall HBase write throughput git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1550778 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/resources/hbase-default.xml | 6 - .../hadoop/hbase/regionserver/wal/FSHLog.java | 474 +++++++++++------- .../regionserver/wal/TestDurability.java | 13 +- .../regionserver/wal/TestLogRollAbort.java | 2 - 4 files changed, 302 insertions(+), 193 deletions(-) diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml index 542b393ea2e..e8cc56c162b 100644 --- a/hbase-common/src/main/resources/hbase-default.xml +++ b/hbase-common/src/main/resources/hbase-default.xml @@ -203,12 +203,6 @@ possible configurations would overwhelm and obscure the important. Interval between messages from the RegionServer to Master in milliseconds. - - hbase.regionserver.optionallogflushinterval - 1000 - Sync the HLog to the HDFS after this interval if it has not - accumulated enough entries to trigger a sync. Units: milliseconds. - hbase.regionserver.regionSplitLimit 2147483647 diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 1c69d0ff890..0528a7046ff 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.util.DrainBarrier; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HasThread; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.StringUtils; import org.cloudera.htrace.Trace; import org.cloudera.htrace.TraceScope; @@ -118,15 +117,19 @@ class FSHLog implements HLog, Syncable { // Listeners that are called on WAL events. private List listeners = new CopyOnWriteArrayList(); - private final long optionalFlushInterval; private final long blocksize; private final String prefix; private final AtomicLong unflushedEntries = new AtomicLong(0); - private volatile long syncedTillHere = 0; + private final AtomicLong syncedTillHere = new AtomicLong(0); private long lastDeferredTxid; private final Path oldLogDir; private volatile boolean logRollRunning; + // all writes pending on AsyncWriter/AsyncSyncer thread with + // txid <= failedTxid will fail by throwing asyncIOE + private final AtomicLong failedTxid = new AtomicLong(0); + private volatile IOException asyncIOE = null; + private WALCoprocessorHost coprocessorHost; private FSDataOutputStream hdfs_out; // FSDataOutputStream associated with the current SequenceFile.writer @@ -208,7 +211,7 @@ class FSHLog implements HLog, Syncable { // during an update // locked during appends private final Object updateLock = new Object(); - private final Object flushLock = new Object(); + private final Object pendingWritesLock = new Object(); private final boolean enabled; @@ -219,10 +222,20 @@ class FSHLog implements HLog, Syncable { */ private final int maxLogs; - /** - * Thread that handles optional sync'ing - */ - private final LogSyncer logSyncer; + // List of pending writes to the HLog. There corresponds to transactions + // that have not yet returned to the client. We keep them cached here + // instead of writing them to HDFS piecemeal. The goal is to increase + // the batchsize for writing-to-hdfs as well as sync-to-hdfs, so that + // we can get better system throughput. + private List pendingWrites = new LinkedList(); + + private final AsyncWriter asyncWriter; + // since AsyncSyncer takes much longer than other phase(add WALEdits to local + // buffer, write local buffer to HDFS, notify pending write handler threads), + // when a sync is ongoing, all other phase pend, we use multiple parallel + // AsyncSyncer threads to improve overall throughput. + private final AsyncSyncer[] asyncSyncers; + private final AsyncNotifier asyncNotifier; /** Number of log close errors tolerated before we abort */ private final int closeErrorsTolerated; @@ -368,8 +381,6 @@ class FSHLog implements HLog, Syncable { // Roll at 95% of block size. float multi = conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f); this.logrollsize = (long)(this.blocksize * multi); - this.optionalFlushInterval = - conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000); this.maxLogs = conf.getInt("hbase.regionserver.maxlogs", 32); this.minTolerableReplication = conf.getInt( @@ -381,13 +392,11 @@ class FSHLog implements HLog, Syncable { this.closeErrorsTolerated = conf.getInt( "hbase.regionserver.logroll.errors.tolerated", 0); - this.logSyncer = new LogSyncer(this.optionalFlushInterval); LOG.info("WAL/HLog configuration: blocksize=" + StringUtils.byteDesc(this.blocksize) + ", rollsize=" + StringUtils.byteDesc(this.logrollsize) + - ", enabled=" + this.enabled + - ", optionallogflushinternal=" + this.optionalFlushInterval + "ms"); + ", enabled=" + this.enabled); // If prefix is null||empty then just name it hlog this.prefix = prefix == null || prefix.isEmpty() ? "hlog" : URLEncoder.encode(prefix, "UTF8"); @@ -411,15 +420,22 @@ class FSHLog implements HLog, Syncable { // handle the reflection necessary to call getNumCurrentReplicas() this.getNumCurrentReplicas = getGetNumCurrentReplicas(this.hdfs_out); - // When optionalFlushInterval is set as 0, don't start a thread for deferred log sync. - if (this.optionalFlushInterval > 0) { - Threads.setDaemonThreadRunning(logSyncer.getThread(), Thread.currentThread().getName() - + ".logSyncer"); - } else { - LOG.info("hbase.regionserver.optionallogflushinterval is set as " - + this.optionalFlushInterval + ". Deferred log syncing won't work. " - + "Any Mutation, marked to be deferred synced, will be flushed immediately."); + final String n = Thread.currentThread().getName(); + + + asyncWriter = new AsyncWriter(n + "-WAL.AsyncWriter"); + asyncWriter.start(); + + int syncerNums = conf.getInt("hbase.hlog.asyncer.number", 5); + asyncSyncers = new AsyncSyncer[syncerNums]; + for (int i = 0; i < asyncSyncers.length; ++i) { + asyncSyncers[i] = new AsyncSyncer(n + "-WAL.AsyncSyncer" + i); + asyncSyncers[i].start(); } + + asyncNotifier = new AsyncNotifier(n + "-WAL.AsyncNotifier"); + asyncNotifier.start(); + coprocessorHost = new WALCoprocessorHost(this, conf); this.metrics = new MetricsWAL(); @@ -735,11 +751,11 @@ class FSHLog implements HLog, Syncable { try { // Wait till all current transactions are written to the hlog. // No new transactions can occur because we have the updatelock. - if (this.unflushedEntries.get() != this.syncedTillHere) { + if (this.unflushedEntries.get() != this.syncedTillHere.get()) { LOG.debug("cleanupCurrentWriter " + " waiting for transactions to get synced " + " total " + this.unflushedEntries.get() + - " synced till here " + syncedTillHere); + " synced till here " + this.syncedTillHere.get()); sync(); } this.writer.close(); @@ -874,17 +890,33 @@ class FSHLog implements HLog, Syncable { if (this.closed) { return; } - // When optionalFlushInterval is 0, the logSyncer is not started as a Thread. - if (this.optionalFlushInterval > 0) { + + try { + asyncNotifier.interrupt(); + asyncNotifier.join(); + } catch (InterruptedException e) { + LOG.error("Exception while waiting for " + asyncNotifier.getName() + + " threads to die", e); + } + + for (int i = 0; i < asyncSyncers.length; ++i) { try { - logSyncer.close(); - // Make sure we synced everything - logSyncer.join(this.optionalFlushInterval * 2); + asyncSyncers[i].interrupt(); + asyncSyncers[i].join(); } catch (InterruptedException e) { - LOG.error("Exception while waiting for syncer thread to die", e); - Thread.currentThread().interrupt(); + LOG.error("Exception while waiting for " + asyncSyncers[i].getName() + + " threads to die", e); } } + + try { + asyncWriter.interrupt(); + asyncWriter.join(); + } catch (InterruptedException e) { + LOG.error("Exception while waiting for " + asyncWriter.getName() + + " thread to die", e); + } + try { // Prevent all further flushing and rolling. closeBarrier.stopAndDrainOps(); @@ -985,9 +1017,14 @@ class FSHLog implements HLog, Syncable { if (isInMemstore) this.oldestUnflushedSeqNums.putIfAbsent(encodedRegionName, seqNum); HLogKey logKey = makeKey( encodedRegionName, tableName, seqNum, now, clusterIds, nonceGroup, nonce); - doWrite(info, logKey, edits, htd); + + synchronized (pendingWritesLock) { + doWrite(info, logKey, edits, htd); + txid = this.unflushedEntries.incrementAndGet(); + } this.numEntries.incrementAndGet(); - txid = this.unflushedEntries.incrementAndGet(); + this.asyncWriter.setPendingTxid(txid); + if (htd.isDeferredLogFlush()) { lastDeferredTxid = txid; } @@ -1017,91 +1054,245 @@ class FSHLog implements HLog, Syncable { now, htd, false, isInMemstore, sequenceId, nonceGroup, nonce); } - /** - * This class is responsible to hold the HLog's appended Entry list - * and to sync them according to a configurable interval. - * - * Deferred log flushing works first by piggy backing on this process by - * simply not sync'ing the appended Entry. It can also be sync'd by other - * non-deferred log flushed entries outside of this thread. + /* The work of current write process of HLog goes as below: + * 1). All write handler threads append edits to HLog's local pending buffer; + * (it notifies AsyncWriter thread that there is new edits in local buffer) + * 2). All write handler threads wait in HLog.syncer() function for underlying threads to + * finish the sync that contains its txid; + * 3). An AsyncWriter thread is responsible for retrieving all edits in HLog's + * local pending buffer and writing to the hdfs (hlog.writer.append); + * (it notifies AsyncSyncer threads that there is new writes to hdfs which needs a sync) + * 4). AsyncSyncer threads are responsible for issuing sync request to hdfs to persist the + * writes by AsyncWriter; (they notify the AsyncNotifier thread that sync is done) + * 5). An AsyncNotifier thread is responsible for notifying all pending write handler + * threads which are waiting in the HLog.syncer() function + * 6). No LogSyncer thread any more (since there is always AsyncWriter/AsyncFlusher threads + * do the same job it does) + * note: more than one AsyncSyncer threads are needed here to guarantee good enough performance + * when less concurrent write handler threads. since sync is the most time-consuming + * operation in the whole write process, multiple AsyncSyncer threads can provide better + * parallelism of sync to get better overall throughput */ - class LogSyncer extends HasThread { + // thread to write locally buffered writes to HDFS + private class AsyncWriter extends HasThread { + private long pendingTxid = 0; + private long txidToWrite = 0; + private long lastWrittenTxid = 0; + private Object writeLock = new Object(); - private final long optionalFlushInterval; - - private final AtomicBoolean closeLogSyncer = new AtomicBoolean(false); - - // List of pending writes to the HLog. There corresponds to transactions - // that have not yet returned to the client. We keep them cached here - // instead of writing them to HDFS piecemeal, because the HDFS write - // method is pretty heavyweight as far as locking is concerned. The - // goal is to increase the batchsize for writing-to-hdfs as well as - // sync-to-hdfs, so that we can get better system throughput. - private List pendingWrites = new LinkedList(); - - LogSyncer(long optionalFlushInterval) { - this.optionalFlushInterval = optionalFlushInterval; + public AsyncWriter(String name) { + super(name); + } + + // wake up (called by (write) handler thread) AsyncWriter thread + // to write buffered writes to HDFS + public void setPendingTxid(long txid) { + synchronized (this.writeLock) { + if (txid <= this.pendingTxid) + return; + + this.pendingTxid = txid; + this.writeLock.notify(); + } } - @Override public void run() { try { - // awaiting with a timeout doesn't always - // throw exceptions on interrupt - while(!this.isInterrupted() && !closeLogSyncer.get()) { - - try { - if (unflushedEntries.get() <= syncedTillHere) { - synchronized (closeLogSyncer) { - closeLogSyncer.wait(this.optionalFlushInterval); - } + while (!this.isInterrupted()) { + // 1. wait until there is new writes in local buffer + synchronized (this.writeLock) { + while (this.pendingTxid <= this.lastWrittenTxid) { + this.writeLock.wait(); } - // Calling sync since we waited or had unflushed entries. - // Entries appended but not sync'd are taken care of here AKA - // deferred log flush - sync(); - } catch (IOException e) { - LOG.error("Error while syncing, requesting close of hlog ", e); + } + + // 2. get all buffered writes and update 'real' pendingTxid + // since maybe newer writes enter buffer as AsyncWriter wakes + // up and holds the lock + // NOTE! can't hold 'updateLock' here since rollWriter will pend + // on 'sync()' with 'updateLock', but 'sync()' will wait for + // AsyncWriter/AsyncSyncer/AsyncNotifier series. without updateLock + // can leads to pendWrites more than pendingTxid, but not problem + List pendWrites = null; + synchronized (pendingWritesLock) { + this.txidToWrite = unflushedEntries.get(); + pendWrites = pendingWrites; + pendingWrites = new LinkedList(); + } + + // 3. write all buffered writes to HDFS(append, without sync) + try { + for (Entry e : pendWrites) { + writer.append(e); + } + } catch(IOException e) { + LOG.error("Error while AsyncWriter write, request close of hlog ", e); requestLogRoll(); - Threads.sleep(this.optionalFlushInterval); + + asyncIOE = e; + failedTxid.set(this.txidToWrite); + } + + // 4. update 'lastWrittenTxid' and notify AsyncSyncer to do 'sync' + this.lastWrittenTxid = this.txidToWrite; + boolean hasIdleSyncer = false; + for (int i = 0; i < asyncSyncers.length; ++i) { + if (!asyncSyncers[i].isSyncing()) { + hasIdleSyncer = true; + asyncSyncers[i].setWrittenTxid(this.lastWrittenTxid); + break; + } + } + if (!hasIdleSyncer) { + int idx = (int)this.lastWrittenTxid % asyncSyncers.length; + asyncSyncers[idx].setWrittenTxid(this.lastWrittenTxid); } } } catch (InterruptedException e) { - LOG.debug(getName() + " interrupted while waiting for sync requests"); + LOG.debug(getName() + " interrupted while waiting for " + + "newer writes added to local buffer"); + } catch (Exception e) { + LOG.error("UNEXPECTED", e); } finally { LOG.info(getName() + " exiting"); } } + } - // appends new writes to the pendingWrites. It is better to keep it in - // our own queue rather than writing it to the HDFS output stream because - // HDFSOutputStream.writeChunk is not lightweight at all. - synchronized void append(Entry e) throws IOException { - pendingWrites.add(e); + // thread to request HDFS to sync the WALEdits written by AsyncWriter + // to make those WALEdits durable on HDFS side + private class AsyncSyncer extends HasThread { + private long writtenTxid = 0; + private long txidToSync = 0; + private long lastSyncedTxid = 0; + private volatile boolean isSyncing = false; + private Object syncLock = new Object(); + + public AsyncSyncer(String name) { + super(name); } - // Returns all currently pending writes. New writes - // will accumulate in a new list. - synchronized List getPendingWrites() { - List save = this.pendingWrites; - this.pendingWrites = new LinkedList(); - return save; + public boolean isSyncing() { + return this.isSyncing; } - // writes out pending entries to the HLog - void hlogFlush(Writer writer, List pending) throws IOException { - if (pending == null) return; + // wake up (called by AsyncWriter thread) AsyncSyncer thread + // to sync(flush) writes written by AsyncWriter in HDFS + public void setWrittenTxid(long txid) { + synchronized (this.syncLock) { + if (txid <= this.writtenTxid) + return; - // write out all accumulated Entries to hdfs. - for (Entry e : pending) { - writer.append(e); + this.writtenTxid = txid; + this.syncLock.notify(); } } - void close() { - synchronized (closeLogSyncer) { - closeLogSyncer.set(true); - closeLogSyncer.notifyAll(); + public void run() { + try { + while (!this.isInterrupted()) { + // 1. wait until AsyncWriter has written data to HDFS and + // called setWrittenTxid to wake up us + synchronized (this.syncLock) { + while (this.writtenTxid <= this.lastSyncedTxid) { + this.syncLock.wait(); + } + this.txidToSync = this.writtenTxid; + } + + // 2. do 'sync' to HDFS to provide durability + long now = EnvironmentEdgeManager.currentTimeMillis(); + try { + this.isSyncing = true; + if (writer != null) writer.sync(); + this.isSyncing = false; + } catch (IOException e) { + LOG.fatal("Error while AsyncSyncer sync, request close of hlog ", e); + requestLogRoll(); + + asyncIOE = e; + failedTxid.set(this.txidToSync); + } + metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now); + + // 3. wake up AsyncNotifier to notify(wake-up) all pending 'put' + // handler threads on 'sync()' + this.lastSyncedTxid = this.txidToSync; + asyncNotifier.setFlushedTxid(this.lastSyncedTxid); + + // 4. check and do logRoll if needed + if (!logRollRunning) { + checkLowReplication(); + try { + if (writer != null && writer.getLength() > logrollsize) { + requestLogRoll(); + } + } catch (IOException e) { + LOG.warn("writer.getLength() failed,this failure won't block here"); + } + } + } + } catch (InterruptedException e) { + LOG.debug(getName() + " interrupted while waiting for " + + "notification from AsyncWriter thread"); + } catch (Exception e) { + LOG.error("UNEXPECTED", e); + } finally { + LOG.info(getName() + " exiting"); + } + } + } + + // thread to notify all write handler threads which are pending on + // their written WALEdits' durability(sync) + // why an extra 'notifier' thread is needed rather than letting + // AsyncSyncer thread itself notifies when sync is done is to let + // AsyncSyncer thread do next sync as soon as possible since 'notify' + // has heavy synchronization with all pending write handler threads + private class AsyncNotifier extends HasThread { + private long flushedTxid = 0; + private long lastNotifiedTxid = 0; + private Object notifyLock = new Object(); + + public AsyncNotifier(String name) { + super(name); + } + + public void setFlushedTxid(long txid) { + synchronized (this.notifyLock) { + if (txid <= this.flushedTxid) { + return; + } + + this.flushedTxid = txid; + this.notifyLock.notify(); + } + } + + public void run() { + try { + while (!this.isInterrupted()) { + synchronized (this.notifyLock) { + while (this.flushedTxid <= this.lastNotifiedTxid) { + this.notifyLock.wait(); + } + this.lastNotifiedTxid = this.flushedTxid; + } + + // notify(wake-up) all pending (write) handler thread + // (or logroller thread which also may pend on sync()) + synchronized (syncedTillHere) { + syncedTillHere.set(this.lastNotifiedTxid); + syncedTillHere.notifyAll(); + } + } + } catch (InterruptedException e) { + LOG.debug(getName() + " interrupted while waiting for " + + " notification from AsyncSyncer thread"); + } catch (Exception e) { + LOG.error("UNEXPECTED", e); + } finally { + LOG.info(getName() + " exiting"); } } } @@ -1113,95 +1304,20 @@ class FSHLog implements HLog, Syncable { // sync all transactions upto the specified txid private void syncer(long txid) throws IOException { - // if the transaction that we are interested in is already - // synced, then return immediately. - if (txid <= this.syncedTillHere) { - return; - } - Writer tempWriter; - synchronized (this.updateLock) { - if (this.closed) return; - // Guaranteed non-null. - // Note that parallel sync can close tempWriter. - // The current method of dealing with this is to catch exceptions. - // See HBASE-4387, HBASE-5623, HBASE-7329. - tempWriter = this.writer; - } - try { - long doneUpto; - long now = EnvironmentEdgeManager.currentTimeMillis(); - // First flush all the pending writes to HDFS. Then - // issue the sync to HDFS. If sync is successful, then update - // syncedTillHere to indicate that transactions till this - // number has been successfully synced. - IOException ioe = null; - List pending = null; - synchronized (flushLock) { - if (txid <= this.syncedTillHere) { - return; - } - doneUpto = this.unflushedEntries.get(); - pending = logSyncer.getPendingWrites(); + synchronized (this.syncedTillHere) { + while (this.syncedTillHere.get() < txid) { try { - logSyncer.hlogFlush(tempWriter, pending); - postAppend(pending); - } catch(IOException io) { - ioe = io; - LOG.error("syncer encountered error, will retry. txid=" + txid, ioe); - } - } - if (ioe != null && pending != null) { - synchronized (this.updateLock) { - synchronized (flushLock) { - // HBASE-4387, HBASE-5623, retry with updateLock held - tempWriter = this.writer; - logSyncer.hlogFlush(tempWriter, pending); - postAppend(pending); - } - } - } - // another thread might have sync'ed avoid double-sync'ing - if (txid <= this.syncedTillHere) { - return; - } - try { - if (tempWriter != null) { - tempWriter.sync(); - postSync(); - } - } catch(IOException ex) { - synchronized (this.updateLock) { - // HBASE-4387, HBASE-5623, retry with updateLock held - // TODO: we don't actually need to do it for concurrent close - what is the point - // of syncing new unrelated writer? Keep behavior for now. - tempWriter = this.writer; - if (tempWriter != null) { - tempWriter.sync(); - postSync(); - } - } - } - this.syncedTillHere = Math.max(this.syncedTillHere, doneUpto); + this.syncedTillHere.wait(); - this.metrics.finishSync(EnvironmentEdgeManager.currentTimeMillis() - now); - // TODO: preserving the old behavior for now, but this check is strange. It's not - // protected by any locks here, so for all we know rolling locks might start - // as soon as we enter the "if". Is this best-effort optimization check? - if (!this.logRollRunning) { - checkLowReplication(); - try { - curLogSize = tempWriter.getLength(); - if (curLogSize > this.logrollsize) { - requestLogRoll(); + if (txid <= this.failedTxid.get()) { + assert asyncIOE != null : + "current txid is among(under) failed txids, but asyncIOE is null!"; + throw asyncIOE; } - } catch (IOException x) { - LOG.debug("Log roll failed and will be retried. (This is not an error)"); + } catch (InterruptedException e) { + LOG.debug("interrupted while waiting for notification from AsyncNotifier"); } } - } catch (IOException e) { - LOG.fatal("Could not sync. Requesting roll of hlog", e); - requestLogRoll(); - throw e; } } @@ -1333,7 +1449,7 @@ class FSHLog implements HLog, Syncable { logKey.setScopes(null); } // write to our buffer for the Hlog file. - logSyncer.append(new FSHLog.Entry(logKey, logEdit)); + this.pendingWrites.add(new HLog.Entry(logKey, logEdit)); } long took = EnvironmentEdgeManager.currentTimeMillis() - now; coprocessorHost.postWALWrite(info, logKey, logEdit); @@ -1489,7 +1605,7 @@ class FSHLog implements HLog, Syncable { /** Provide access to currently deferred sequence num for tests */ boolean hasDeferredEntries() { - return lastDeferredTxid > syncedTillHere; + return this.lastDeferredTxid > this.syncedTillHere.get(); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java index 6ad6dafa247..8f74ce7eefd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestDurability.java @@ -82,25 +82,27 @@ public class TestDurability { HRegion deferredRegion = createHRegion(tableName, "deferredRegion", wal, true); region.put(newPut(null)); - verifyHLogCount(wal, 1); - // a put through the deferred table does not write to the wal immdiately + // a put through the deferred table does not write to the wal immdiately, + // but maybe has been successfully sync-ed by the underlying AsyncWriter + + // AsyncFlusher thread deferredRegion.put(newPut(null)); - verifyHLogCount(wal, 1); // but will after we sync the wal wal.sync(); verifyHLogCount(wal, 2); // a put through a deferred table will be sync with the put sync'ed put deferredRegion.put(newPut(null)); - verifyHLogCount(wal, 2); + wal.sync(); + verifyHLogCount(wal, 3); region.put(newPut(null)); verifyHLogCount(wal, 4); // a put through a deferred table will be sync with the put sync'ed put deferredRegion.put(newPut(Durability.USE_DEFAULT)); - verifyHLogCount(wal, 4); + wal.sync(); + verifyHLogCount(wal, 5); region.put(newPut(Durability.USE_DEFAULT)); verifyHLogCount(wal, 6); @@ -114,7 +116,6 @@ public class TestDurability { // async overrides sync table default region.put(newPut(Durability.ASYNC_WAL)); deferredRegion.put(newPut(Durability.ASYNC_WAL)); - verifyHLogCount(wal, 6); wal.sync(); verifyHLogCount(wal, 8); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java index 618e85d6d45..9320aed2896 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRollAbort.java @@ -151,10 +151,8 @@ public class TestLogRollAbort { dfsCluster.restartDataNodes(); LOG.info("Restarted datanodes"); - assertTrue("Should have an outstanding WAL edit", ((FSHLog) log).hasDeferredEntries()); try { log.rollWriter(true); - fail("Log roll should have triggered FailedLogCloseException"); } catch (FailedLogCloseException flce) { assertTrue("Should have deferred flush log edits outstanding", ((FSHLog) log).hasDeferredEntries());