From 0d5da71571d9438ba2e09e7d9c619fe7c5a1b2fe Mon Sep 17 00:00:00 2001 From: Jean-Daniel Cryans Date: Wed, 28 Oct 2009 03:58:03 +0000 Subject: [PATCH] HBASE-1936 HLog group commit git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@830434 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../hbase/regionserver/HRegionServer.java | 12 - .../hadoop/hbase/regionserver/LogFlusher.java | 62 ----- .../hadoop/hbase/regionserver/wal/HLog.java | 222 ++++++++++-------- 4 files changed, 123 insertions(+), 174 deletions(-) delete mode 100644 src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java diff --git a/CHANGES.txt b/CHANGES.txt index 2fa76165ac9..528cde63618 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -142,6 +142,7 @@ Release 0.21.0 - Unreleased HBASE-1918 Don't do DNS resolving in .META. scanner for each row HBASE-1756 Refactor HLog (changing package first) HBASE-1926 Remove unused xmlenc jar from trunk + HBASE-1936 HLog group commit OPTIMIZATIONS HBASE-410 [testing] Speed up the test suite diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 87e5b6ebe9c..be2767c481b 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -91,8 +91,6 @@ import org.apache.hadoop.hbase.ipc.HMasterRegionInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics; import org.apache.hadoop.hbase.regionserver.wal.HLog; -import org.apache.hadoop.hbase.regionserver.LogFlusher; -import org.apache.hadoop.hbase.regionserver.LogRoller; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.InfoServer; @@ -207,7 +205,6 @@ public class HRegionServer implements HConstants, HRegionInterface, // eclipse warning when accessed by inner classes protected volatile HLog hlog; LogRoller hlogRoller; - LogFlusher hlogFlusher; // limit compactions while starting up CompactionLimitThread compactionLimitThread; @@ -329,10 +326,6 @@ public class HRegionServer implements HConstants, HRegionInterface, // Log rolling thread this.hlogRoller = new LogRoller(this); - // Log flushing thread - this.hlogFlusher = - new LogFlusher(this.threadWakeFrequency, this.stopRequested); - // Background thread to check for major compactions; needed if region // has not gotten updates in a while. Make it run at a lesser frequency. int multiplier = this.conf.getInt(THREAD_WAKE_FREQUENCY + @@ -518,7 +511,6 @@ public class HRegionServer implements HConstants, HRegionInterface, try { serverInfo.setStartCode(System.currentTimeMillis()); hlog = setupHLog(); - this.hlogFlusher.setHLog(hlog); } catch (IOException e) { this.abortRequested = true; this.stopRequested.set(true); @@ -616,7 +608,6 @@ public class HRegionServer implements HConstants, HRegionInterface, // Send interrupts to wake up threads if sleeping so they notice shutdown. // TODO: Should we check they are alive? If OOME could have exited already cacheFlusher.interruptIfNecessary(); - hlogFlusher.interrupt(); compactSplitThread.interruptIfNecessary(); hlogRoller.interruptIfNecessary(); this.majorCompactionChecker.interrupt(); @@ -746,7 +737,6 @@ public class HRegionServer implements HConstants, HRegionInterface, this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR)); this.hlog = setupHLog(); - this.hlogFlusher.setHLog(hlog); // Init in here rather than in constructor after thread name has been set this.metrics = new RegionServerMetrics(); startServiceThreads(); @@ -1131,8 +1121,6 @@ public class HRegionServer implements HConstants, HRegionInterface, }; Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller", handler); - Threads.setDaemonThreadRunning(this.hlogFlusher, n + ".logFlusher", - handler); Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher", handler); Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor", diff --git a/src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java b/src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java deleted file mode 100644 index 7989b990417..00000000000 --- a/src/java/org/apache/hadoop/hbase/regionserver/LogFlusher.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Copyright 2008 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.regionserver; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.Chore; -import org.apache.hadoop.hbase.regionserver.wal.HLog; - -/** - * LogFlusher is a Chore that wakes every threadWakeInterval and calls - * the HLog to do an optional sync if there are unflushed entries, and the - * optionalFlushInterval has passed since the last flush. - */ -public class LogFlusher extends Chore { - static final Log LOG = LogFactory.getLog(LogFlusher.class); - - private final AtomicReference log = - new AtomicReference(null); - - public LogFlusher(final int period, final AtomicBoolean stop) { - super(period, stop); - } - - void setHLog(HLog log) { - this.log.set(log); - } - - @Override - protected void chore() { - HLog hlog = log.get(); - if (hlog != null) { - try { - hlog.sync(true); // force a flush - } catch (IOException e) { - LOG.error("LogFlusher got exception while syncing: " + e); - } - } - } -} \ No newline at end of file diff --git a/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 6f833eee124..df25e78d3f4 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -24,7 +24,6 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.lang.reflect.Field; -import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; @@ -34,14 +33,16 @@ import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; import java.util.TreeSet; -import java.util.concurrent.ConcurrentSkipListMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -59,15 +60,12 @@ import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.RemoteExceptionHandler; -import org.apache.hadoop.hbase.regionserver.wal.LogRollListener; -import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; -import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.SequenceFile.Metadata; import org.apache.hadoop.io.SequenceFile.Reader; @@ -126,11 +124,10 @@ public class HLog implements HConstants, Syncable { private final long blocksize; private final int flushlogentries; private final AtomicInteger unflushedEntries = new AtomicInteger(0); - private volatile long lastLogFlushTime; - private final boolean append; - private final Method syncfs; private final short replicationLevel; - private final static Object [] NO_ARGS = new Object []{}; + + // used to indirectly tell syncFs to force the sync + private final AtomicBoolean forceSync = new AtomicBoolean(false); /* * Current log file. @@ -183,6 +180,11 @@ public class HLog implements HConstants, Syncable { */ private final int maxLogs; + /** + * Thread that handles group commit + */ + private final LogSyncer logSyncerThread; + static byte [] COMPLETE_CACHE_FLUSH; static { try { @@ -224,7 +226,6 @@ public class HLog implements HConstants, Syncable { this.logrollsize = (long)(this.blocksize * multi); this.optionalFlushInterval = conf.getLong("hbase.regionserver.optionallogflushinterval", 10 * 1000); - this.lastLogFlushTime = System.currentTimeMillis(); if (fs.exists(dir)) { throw new IOException("Target HLog directory already exists: " + dir); } @@ -237,21 +238,9 @@ public class HLog implements HConstants, Syncable { ", flushlogentries=" + this.flushlogentries + ", optionallogflushinternal=" + this.optionalFlushInterval + "ms"); rollWriter(); - // Test if syncfs is available. - this.append = isAppend(conf); - Method m = null; - if (this.append) { - try { - m = this.writer.getClass().getMethod("syncFs", new Class []{}); - LOG.debug("Using syncFs--hadoop-4379"); - } catch (SecurityException e) { - throw new IOException("Failed test for syncfs", e); - } catch (NoSuchMethodException e) { - // This can happen - LOG.info("syncFs--hadoop-4379 not available" ); - } - } - this.syncfs = m; + logSyncerThread = new LogSyncer(this.flushlogentries); + Threads.setDaemonThreadRunning(logSyncerThread, + Thread.currentThread().getName() + ".logSyncer"); } /** @@ -591,6 +580,14 @@ public class HLog implements HConstants, Syncable { * @throws IOException */ public void close() throws IOException { + try { + logSyncerThread.interrupt(); + // Make sure we synced everything + logSyncerThread.join(); + } catch (InterruptedException e) { + LOG.error("Exception while waiting for syncer thread to die", e); + } + cacheFlushLock.lock(); try { synchronized (updateLock) { @@ -718,39 +715,118 @@ public class HLog implements HConstants, Syncable { } } + /** + * This thread is responsible to call syncFs and buffer up the writers while + * it happens. + */ + class LogSyncer extends Thread { + + // Using fairness to make sure locks are given in order + private final ReentrantLock lock = new ReentrantLock(true); + + // Condition used to wait until we have something to sync + private final Condition queueEmpty = lock.newCondition(); + + // Condition used to signal that the sync is done + private final Condition syncDone = lock.newCondition(); + + private final int optionalFlushInterval; + + LogSyncer(int optionalFlushInterval) { + this.optionalFlushInterval = optionalFlushInterval; + } + + public void run() { + try { + lock.lock(); + while(!closed) { + + // Wait until something has to be synced or do it if we waited enough + // time (useful if something appends but does not sync). + queueEmpty.await(this.optionalFlushInterval, TimeUnit.MILLISECONDS); + + // We got the signal, let's syncFS. We currently own the lock so new + // writes are waiting to acquire it in addToSyncQueue while the ones + // we sync are waiting on await() + hflush(); + + // Release all the clients waiting on the sync. Notice that we still + // own the lock until we get back to await at which point all the + // other threads waiting will first acquire and release locks + syncDone.signalAll(); + } + } catch (IOException e) { + LOG.error("Error while syncing, requesting close of hlog ", e); + requestLogRoll(); + } catch (InterruptedException e) { + LOG.debug(getName() + "interrupted while waiting for sync requests",e ); + } finally { + lock.unlock(); + LOG.info(getName() + " exiting"); + } + } + + /** + * This method first signals the thread that there's a sync needed + * and then waits for it to happen before returning. + */ + public void addToSyncQueue() { + + // Don't bother if somehow our append was already synced + if (unflushedEntries.get() == 0) { + return; + } + lock.lock(); + try { + // Wake the thread + queueEmpty.signal(); + + // Wait for it to syncFs + syncDone.await(); + } catch (InterruptedException e) { + LOG.debug(getName() + " was interrupted while waiting for sync", e); + } + finally { + lock.unlock(); + } + } + } + public void sync() throws IOException { sync(false); } + /** + * This method calls the LogSyncer in order to group commit the sync + * with other threads. + * @param force For catalog regions, force the sync to happen + * @throws IOException + */ + public void sync(boolean force) throws IOException { + // Set force sync if force is true and forceSync is false + forceSync.compareAndSet(!forceSync.get() && force, true); + logSyncerThread.addToSyncQueue(); + } + /** * Multiple threads will call sync() at the same time, only the winner * will actually flush if there is any race or build up. * - * @param force sync regardless (for meta updates) if there is data * @throws IOException */ - public void sync(boolean force) throws IOException { + protected void hflush() throws IOException { synchronized (this.updateLock) { if (this.closed) return; - if (this.unflushedEntries.get() == 0) - return; // win - - if (force || this.unflushedEntries.get() > this.flushlogentries) { + if (this.forceSync.get() || + this.unflushedEntries.get() > this.flushlogentries) { try { - lastLogFlushTime = System.currentTimeMillis(); - if (this.append && syncfs != null) { - try { - this.syncfs.invoke(this.writer, NO_ARGS); - } catch (Exception e) { - throw new IOException("Reflection", e); - } - } else { - this.writer.sync(); - if (this.writer_out != null) - this.writer_out.sync(); + this.writer.sync(); + if (this.writer_out != null) { + this.writer_out.sync(); } + this.forceSync.compareAndSet(true, false); this.unflushedEntries.set(0); } catch (IOException e) { LOG.fatal("Could not append. Requesting close of hlog", e); @@ -986,7 +1062,6 @@ public class HLog implements HConstants, Syncable { int concurrentLogReads = conf.getInt("hbase.regionserver.hlog.splitlog.reader.threads", 3); // Is append supported? - boolean append = isAppend(conf); try { int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) / concurrentLogReads)).intValue(); @@ -1005,7 +1080,6 @@ public class HLog implements HConstants, Syncable { LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length + ": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen()); } - recoverLog(fs, logfiles[i].getPath(), append); SequenceFile.Reader in = null; int count = 0; try { @@ -1159,24 +1233,6 @@ public class HLog implements HConstants, Syncable { return splits; } - /** - * @param conf - * @return True if append enabled and we have the syncFs in our path. - */ - private static boolean isAppend(final HBaseConfiguration conf) { - boolean append = conf.getBoolean("dfs.support.append", false); - if (append) { - try { - SequenceFile.Writer.class.getMethod("syncFs", new Class []{}); - append = true; - } catch (SecurityException e) { - } catch (NoSuchMethodException e) { - append = false; - } - } - return append; - } - /** * Utility class that lets us keep track of the edit with it's key * Only used when splitting logs @@ -1224,40 +1280,6 @@ public class HLog implements HConstants, Syncable { return getHLogDirectoryName(HServerInfo.getServerName(info)); } - /* - * Recover log. - * If append has been set, try and open log in append mode. - * Doing this, we get a hold of the file that crashed writer - * was writing to. Once we have it, close it. This will - * allow subsequent reader to see up to last sync. - * @param fs - * @param p - * @param append - */ - private static void recoverLog(final FileSystem fs, final Path p, - final boolean append) { - if (!append) { - return; - } - // Trying recovery - boolean recovered = false; - while (!recovered) { - try { - FSDataOutputStream out = fs.append(p); - out.close(); - recovered = true; - } catch (IOException e) { - LOG.info("Failed open for append, waiting on lease recovery: " + p, e); - try { - Thread.sleep(1000); - } catch (InterruptedException ex) { - // ignore it and try again - } - } - } - LOG.info("Past out lease recovery"); - } - /** * Construct the HLog directory name *