From 6c1aac8f0ef973bf1ce661c94f7740223366c9e3 Mon Sep 17 00:00:00 2001 From: Zhihong Yu Date: Wed, 23 Jan 2013 17:28:09 +0000 Subject: [PATCH] HBASE-6466 Enable multi-thread for memstore flush (Chunhui) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1437591 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/hadoop/hbase/util/Threads.java | 20 +- .../hbase/regionserver/HRegionServer.java | 5 +- .../hbase/regionserver/MemStoreFlusher.java | 177 +++++++++++------- .../regionserver/wal/TestLogRolling.java | 7 +- 4 files changed, 133 insertions(+), 76 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java index 5c7bef2104c..09c852130e1 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java @@ -208,16 +208,30 @@ public class Threads { } /** - * Get a named {@link ThreadFactory} that just builds daemon threads - * @param prefix name prefix for all threads created from the factory - * @return a thread factory that creates named, daemon threads + * Same as {#newDaemonThreadFactory(String, UncaughtExceptionHandler)}, + * without setting the exception handler. */ public static ThreadFactory newDaemonThreadFactory(final String prefix) { + return newDaemonThreadFactory(prefix, null); + } + + /** + * Get a named {@link ThreadFactory} that just builds daemon threads. + * @param prefix name prefix for all threads created from the factory + * @param handler unhandles exception handler to set for all threads + * @return a thread factory that creates named, daemon threads with + * the supplied exception handler and normal priority + */ + public static ThreadFactory newDaemonThreadFactory(final String prefix, + final UncaughtExceptionHandler handler) { final ThreadFactory namedFactory = getNamedThreadFactory(prefix); return new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = namedFactory.newThread(r); + if (handler != null) { + t.setUncaughtExceptionHandler(handler); + } if (!t.isDaemon()) { t.setDaemon(true); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 0efe8240c71..4a6b33149db 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1531,8 +1531,7 @@ public class HRegionServer implements ClientProtocol, Threads.setDaemonThreadRunning(this.hlogRoller.getThread(), n + ".logRoller", uncaughtExceptionHandler); - Threads.setDaemonThreadRunning(this.cacheFlusher.getThread(), n + ".cacheFlusher", - uncaughtExceptionHandler); + this.cacheFlusher.start(uncaughtExceptionHandler); Threads.setDaemonThreadRunning(this.compactionChecker.getThread(), n + ".compactionChecker", uncaughtExceptionHandler); if (this.healthCheckChore != null) { @@ -1790,7 +1789,7 @@ public class HRegionServer implements ClientProtocol, */ protected void join() { Threads.shutdown(this.compactionChecker.getThread()); - Threads.shutdown(this.cacheFlusher.getThread()); + this.cacheFlusher.join(); if (this.healthCheckChore != null) { Threads.shutdown(this.healthCheckChore.getThread()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 0a62d4f838d..283ef3aa411 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; import java.lang.management.ManagementFactory; import java.util.ConcurrentModificationException; import java.util.HashMap; @@ -29,10 +30,10 @@ import java.util.SortedMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.DelayQueue; import java.util.concurrent.Delayed; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -44,6 +45,7 @@ import org.apache.hadoop.hbase.RemoteExceptionHandler; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.util.StringUtils; import org.cliffc.high_scale_lib.Counter; @@ -59,7 +61,7 @@ import com.google.common.base.Preconditions; * @see FlushRequester */ @InterfaceAudience.Private -class MemStoreFlusher extends HasThread implements FlushRequester { +class MemStoreFlusher implements FlushRequester { static final Log LOG = LogFactory.getLog(MemStoreFlusher.class); // These two data members go together. Any entry in the one must have // a corresponding entry in the other. @@ -71,8 +73,8 @@ class MemStoreFlusher extends HasThread implements FlushRequester { private final long threadWakeFrequency; private final HRegionServer server; - private final ReentrantLock lock = new ReentrantLock(); - private final Condition flushOccurred = lock.newCondition(); + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + private final Object blockSignal = new Object(); protected final long globalMemStoreLimit; protected final long globalMemStoreLimitLowMark; @@ -87,6 +89,9 @@ class MemStoreFlusher extends HasThread implements FlushRequester { private long blockingWaitTime; private final Counter updatesBlockedMsHighWater = new Counter(); + private FlushHandler[] flushHandlers = null; + private int handlerCount; + /** * @param conf * @param server @@ -111,6 +116,7 @@ class MemStoreFlusher extends HasThread implements FlushRequester { conf.getInt("hbase.hstore.blockingStoreFiles", 7); this.blockingWaitTime = conf.getInt("hbase.hstore.blockingWaitTime", 90000); + this.handlerCount = conf.getInt("hbase.hstore.flusher.count", 1); LOG.info("globalMemStoreLimit=" + StringUtils.humanReadableInt(this.globalMemStoreLimit) + ", globalMemStoreLimitLowMark=" + @@ -213,64 +219,59 @@ class MemStoreFlusher extends HasThread implements FlushRequester { return true; } - @Override - public void run() { - while (!this.server.isStopped()) { - FlushQueueEntry fqe = null; - try { - wakeupPending.set(false); // allow someone to wake us up again - fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); - if (fqe == null || fqe instanceof WakeupFlushThread) { - if (isAboveLowWaterMark()) { - LOG.debug("Flush thread woke up because memory above low water=" + - StringUtils.humanReadableInt(this.globalMemStoreLimitLowMark)); - if (!flushOneForGlobalPressure()) { - // Wasn't able to flush any region, but we're above low water mark - // This is unlikely to happen, but might happen when closing the - // entire server - another thread is flushing regions. We'll just - // sleep a little bit to avoid spinning, and then pretend that - // we flushed one, so anyone blocked will check again - lock.lock(); - try { + private class FlushHandler extends HasThread { + @Override + public void run() { + while (!server.isStopped()) { + FlushQueueEntry fqe = null; + try { + wakeupPending.set(false); // allow someone to wake us up again + fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); + if (fqe == null || fqe instanceof WakeupFlushThread) { + if (isAboveLowWaterMark()) { + LOG.debug("Flush thread woke up because memory above low water=" + + StringUtils.humanReadableInt(globalMemStoreLimitLowMark)); + if (!flushOneForGlobalPressure()) { + // Wasn't able to flush any region, but we're above low water mark + // This is unlikely to happen, but might happen when closing the + // entire server - another thread is flushing regions. We'll just + // sleep a little bit to avoid spinning, and then pretend that + // we flushed one, so anyone blocked will check again Thread.sleep(1000); - flushOccurred.signalAll(); - } finally { - lock.unlock(); + wakeUpIfBlocking(); } + // Enqueue another one of these tokens so we'll wake up again + wakeupFlushThread(); } - // Enqueue another one of these tokens so we'll wake up again - wakeupFlushThread(); + continue; } + FlushRegionEntry fre = (FlushRegionEntry) fqe; + if (!flushRegion(fre)) { + break; + } + } catch (InterruptedException ex) { continue; - } - FlushRegionEntry fre = (FlushRegionEntry)fqe; - if (!flushRegion(fre)) { - break; - } - } catch (InterruptedException ex) { - continue; - } catch (ConcurrentModificationException ex) { - continue; - } catch (Exception ex) { - LOG.error("Cache flusher failed for entry " + fqe, ex); - if (!server.checkFileSystem()) { - break; + } catch (ConcurrentModificationException ex) { + continue; + } catch (Exception ex) { + LOG.error("Cache flusher failed for entry " + fqe, ex); + if (!server.checkFileSystem()) { + break; + } } } - } - this.regionsInQueue.clear(); - this.flushQueue.clear(); + synchronized (regionsInQueue) { + regionsInQueue.clear(); + flushQueue.clear(); + } - // Signal anyone waiting, so they see the close flag - lock.lock(); - try { - flushOccurred.signalAll(); - } finally { - lock.unlock(); + // Signal anyone waiting, so they see the close flag + wakeUpIfBlocking(); + LOG.info(getName() + " exiting"); } - LOG.info(getName() + " exiting"); } + private void wakeupFlushThread() { if (wakeupPending.compareAndSet(false, true)) { flushQueue.add(new WakeupFlushThread()); @@ -287,6 +288,10 @@ class MemStoreFlusher extends HasThread implements FlushRequester { continue; } + if (region.writestate.flushing || !region.writestate.writesEnabled) { + continue; + } + if (checkStoreFileCount && isTooManyStoreFiles(region)) { continue; } @@ -332,11 +337,41 @@ class MemStoreFlusher extends HasThread implements FlushRequester { * Only interrupt once it's done with a run through the work loop. */ void interruptIfNecessary() { - lock.lock(); + lock.writeLock().lock(); try { - this.interrupt(); + for (FlushHandler flushHander : flushHandlers) { + if (flushHander != null) flushHander.interrupt(); + } } finally { - lock.unlock(); + lock.writeLock().unlock(); + } + } + + synchronized void start(UncaughtExceptionHandler eh) { + ThreadFactory flusherThreadFactory = Threads.newDaemonThreadFactory( + server.getServerName().toString() + "-MemStoreFlusher", eh); + flushHandlers = new FlushHandler[handlerCount]; + for (int i = 0; i < flushHandlers.length; i++) { + flushHandlers[i] = new FlushHandler(); + flusherThreadFactory.newThread(flushHandlers[i]); + flushHandlers[i].start(); + } + } + + boolean isAlive() { + for (FlushHandler flushHander : flushHandlers) { + if (flushHander != null && flushHander.isAlive()) { + return true; + } + } + return false; + } + + void join() { + for (FlushHandler flushHander : flushHandlers) { + if (flushHander != null) { + Threads.shutdown(flushHander.getThread()); + } } } @@ -365,7 +400,8 @@ class MemStoreFlusher extends HasThread implements FlushRequester { "store files; delaying flush up to " + this.blockingWaitTime + "ms"); if (!this.server.compactSplitThread.requestSplit(region)) { try { - this.server.compactSplitThread.requestCompaction(region, getName()); + this.server.compactSplitThread.requestCompaction(region, Thread + .currentThread().getName()); } catch (IOException e) { LOG.error( "Cache flush failed for region " + Bytes.toStringBinary(region.getRegionName()), @@ -404,8 +440,8 @@ class MemStoreFlusher extends HasThread implements FlushRequester { // emergencyFlush, then item was removed via a flushQueue.poll. flushQueue.remove(fqe); } - lock.lock(); } + lock.readLock().lock(); try { boolean shouldCompact = region.flushcache(); // We just want to check the size @@ -413,7 +449,7 @@ class MemStoreFlusher extends HasThread implements FlushRequester { if (shouldSplit) { this.server.compactSplitThread.requestSplit(region); } else if (shouldCompact) { - server.compactSplitThread.requestCompaction(region, getName()); + server.compactSplitThread.requestCompaction(region, Thread.currentThread().getName()); } } catch (DroppedSnapshotException ex) { @@ -432,15 +468,18 @@ class MemStoreFlusher extends HasThread implements FlushRequester { return false; } } finally { - try { - flushOccurred.signalAll(); - } finally { - lock.unlock(); - } + lock.readLock().unlock(); + wakeUpIfBlocking(); } return true; } + private void wakeUpIfBlocking() { + synchronized (blockSignal) { + blockSignal.notifyAll(); + } + } + private boolean isTooManyStoreFiles(HRegion region) { for (Store hstore : region.stores.values()) { if (hstore.getStorefilesCount() > this.blockingStoreFilesNumber) { @@ -458,12 +497,12 @@ class MemStoreFlusher extends HasThread implements FlushRequester { */ public void reclaimMemStoreMemory() { if (isAboveHighWaterMark()) { - lock.lock(); - try { + long start = System.currentTimeMillis(); + synchronized (this.blockSignal) { boolean blocked = false; long startTime = 0; while (isAboveHighWaterMark() && !server.isStopped()) { - if(!blocked){ + if (!blocked) { startTime = EnvironmentEdgeManager.currentTimeMillis(); LOG.info("Blocking updates on " + server.toString() + ": the global memstore size " + @@ -476,10 +515,12 @@ class MemStoreFlusher extends HasThread implements FlushRequester { try { // we should be able to wait forever, but we've seen a bug where // we miss a notify, so put a 5 second bound on it at least. - flushOccurred.await(5, TimeUnit.SECONDS); + blockSignal.wait(5 * 1000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } + long took = System.currentTimeMillis() - start; + LOG.warn("Memstore is above high water mark and block " + took + "ms"); } if(blocked){ final long totalTime = EnvironmentEdgeManager.currentTimeMillis() - startTime; @@ -488,8 +529,6 @@ class MemStoreFlusher extends HasThread implements FlushRequester { } LOG.info("Unblocking updates for server " + server.toString()); } - } finally { - lock.unlock(); } } else if (isAboveLowWaterMark()) { wakeupFlushThread(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 6671a592f04..887dafd5166 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -247,12 +247,16 @@ public class TestLogRolling { put.add(HConstants.CATALOG_FAMILY, null, value); table.put(put); } + Put tmpPut = new Put(Bytes.toBytes("tmprow")); + tmpPut.add(HConstants.CATALOG_FAMILY, null, value); long startTime = System.currentTimeMillis(); long remaining = timeout; while (remaining > 0) { if (log.isLowReplicationRollEnabled() == expect) { break; } else { + // Trigger calling FSHlog#checkLowReplication() + table.put(tmpPut); try { Thread.sleep(200); } catch (InterruptedException e) { @@ -371,7 +375,8 @@ public class TestLogRolling { assertTrue(dfsCluster.stopDataNode(pipeline[1].getName()) != null); batchWriteAndWait(table, 3, false, 10000); - assertTrue("LowReplication Roller should've been disabled", + assertTrue("LowReplication Roller should've been disabled, current replication=" + + ((FSHLog) log).getLogReplication(), !log.isLowReplicationRollEnabled()); dfsCluster