HBASE-1936 HLog group commit

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@830434 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2009-10-28 03:58:03 +00:00
parent 9b43b98449
commit 0d5da71571
4 changed files with 123 additions and 174 deletions

View File

@ -142,6 +142,7 @@ Release 0.21.0 - Unreleased
HBASE-1918 Don't do DNS resolving in .META. scanner for each row
HBASE-1756 Refactor HLog (changing package first)
HBASE-1926 Remove unused xmlenc jar from trunk
HBASE-1936 HLog group commit
OPTIMIZATIONS
HBASE-410 [testing] Speed up the test suite

View File

@ -91,8 +91,6 @@ import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.LogFlusher;
import org.apache.hadoop.hbase.regionserver.LogRoller;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.InfoServer;
@ -207,7 +205,6 @@ public class HRegionServer implements HConstants, HRegionInterface,
// eclipse warning when accessed by inner classes
protected volatile HLog hlog;
LogRoller hlogRoller;
LogFlusher hlogFlusher;
// limit compactions while starting up
CompactionLimitThread compactionLimitThread;
@ -329,10 +326,6 @@ public class HRegionServer implements HConstants, HRegionInterface,
// Log rolling thread
this.hlogRoller = new LogRoller(this);
// Log flushing thread
this.hlogFlusher =
new LogFlusher(this.threadWakeFrequency, this.stopRequested);
// Background thread to check for major compactions; needed if region
// has not gotten updates in a while. Make it run at a lesser frequency.
int multiplier = this.conf.getInt(THREAD_WAKE_FREQUENCY +
@ -518,7 +511,6 @@ public class HRegionServer implements HConstants, HRegionInterface,
try {
serverInfo.setStartCode(System.currentTimeMillis());
hlog = setupHLog();
this.hlogFlusher.setHLog(hlog);
} catch (IOException e) {
this.abortRequested = true;
this.stopRequested.set(true);
@ -616,7 +608,6 @@ public class HRegionServer implements HConstants, HRegionInterface,
// Send interrupts to wake up threads if sleeping so they notice shutdown.
// TODO: Should we check they are alive? If OOME could have exited already
cacheFlusher.interruptIfNecessary();
hlogFlusher.interrupt();
compactSplitThread.interruptIfNecessary();
hlogRoller.interruptIfNecessary();
this.majorCompactionChecker.interrupt();
@ -746,7 +737,6 @@ public class HRegionServer implements HConstants, HRegionInterface,
this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
this.hlog = setupHLog();
this.hlogFlusher.setHLog(hlog);
// Init in here rather than in constructor after thread name has been set
this.metrics = new RegionServerMetrics();
startServiceThreads();
@ -1131,8 +1121,6 @@ public class HRegionServer implements HConstants, HRegionInterface,
};
Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller",
handler);
Threads.setDaemonThreadRunning(this.hlogFlusher, n + ".logFlusher",
handler);
Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
handler);
Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor",

View File

@ -1,62 +0,0 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
/**
* LogFlusher is a Chore that wakes every threadWakeInterval and calls
* the HLog to do an optional sync if there are unflushed entries, and the
* optionalFlushInterval has passed since the last flush.
*/
public class LogFlusher extends Chore {
static final Log LOG = LogFactory.getLog(LogFlusher.class);
private final AtomicReference<HLog> log =
new AtomicReference<HLog>(null);
public LogFlusher(final int period, final AtomicBoolean stop) {
super(period, stop);
}
void setHLog(HLog log) {
this.log.set(log);
}
@Override
protected void chore() {
HLog hlog = log.get();
if (hlog != null) {
try {
hlog.sync(true); // force a flush
} catch (IOException e) {
LOG.error("LogFlusher got exception while syncing: " + e);
}
}
}
}

View File

@ -24,7 +24,6 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
@ -34,14 +33,16 @@ import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -59,15 +60,12 @@ import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.regionserver.wal.LogRollListener;
import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.SequenceFile.Metadata;
import org.apache.hadoop.io.SequenceFile.Reader;
@ -126,11 +124,10 @@ public class HLog implements HConstants, Syncable {
private final long blocksize;
private final int flushlogentries;
private final AtomicInteger unflushedEntries = new AtomicInteger(0);
private volatile long lastLogFlushTime;
private final boolean append;
private final Method syncfs;
private final short replicationLevel;
private final static Object [] NO_ARGS = new Object []{};
// used to indirectly tell syncFs to force the sync
private final AtomicBoolean forceSync = new AtomicBoolean(false);
/*
* Current log file.
@ -183,6 +180,11 @@ public class HLog implements HConstants, Syncable {
*/
private final int maxLogs;
/**
* Thread that handles group commit
*/
private final LogSyncer logSyncerThread;
static byte [] COMPLETE_CACHE_FLUSH;
static {
try {
@ -224,7 +226,6 @@ public class HLog implements HConstants, Syncable {
this.logrollsize = (long)(this.blocksize * multi);
this.optionalFlushInterval =
conf.getLong("hbase.regionserver.optionallogflushinterval", 10 * 1000);
this.lastLogFlushTime = System.currentTimeMillis();
if (fs.exists(dir)) {
throw new IOException("Target HLog directory already exists: " + dir);
}
@ -237,21 +238,9 @@ public class HLog implements HConstants, Syncable {
", flushlogentries=" + this.flushlogentries +
", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
rollWriter();
// Test if syncfs is available.
this.append = isAppend(conf);
Method m = null;
if (this.append) {
try {
m = this.writer.getClass().getMethod("syncFs", new Class<?> []{});
LOG.debug("Using syncFs--hadoop-4379");
} catch (SecurityException e) {
throw new IOException("Failed test for syncfs", e);
} catch (NoSuchMethodException e) {
// This can happen
LOG.info("syncFs--hadoop-4379 not available" );
}
}
this.syncfs = m;
logSyncerThread = new LogSyncer(this.flushlogentries);
Threads.setDaemonThreadRunning(logSyncerThread,
Thread.currentThread().getName() + ".logSyncer");
}
/**
@ -591,6 +580,14 @@ public class HLog implements HConstants, Syncable {
* @throws IOException
*/
public void close() throws IOException {
try {
logSyncerThread.interrupt();
// Make sure we synced everything
logSyncerThread.join();
} catch (InterruptedException e) {
LOG.error("Exception while waiting for syncer thread to die", e);
}
cacheFlushLock.lock();
try {
synchronized (updateLock) {
@ -718,39 +715,118 @@ public class HLog implements HConstants, Syncable {
}
}
/**
* This thread is responsible to call syncFs and buffer up the writers while
* it happens.
*/
class LogSyncer extends Thread {
// Using fairness to make sure locks are given in order
private final ReentrantLock lock = new ReentrantLock(true);
// Condition used to wait until we have something to sync
private final Condition queueEmpty = lock.newCondition();
// Condition used to signal that the sync is done
private final Condition syncDone = lock.newCondition();
private final int optionalFlushInterval;
LogSyncer(int optionalFlushInterval) {
this.optionalFlushInterval = optionalFlushInterval;
}
public void run() {
try {
lock.lock();
while(!closed) {
// Wait until something has to be synced or do it if we waited enough
// time (useful if something appends but does not sync).
queueEmpty.await(this.optionalFlushInterval, TimeUnit.MILLISECONDS);
// We got the signal, let's syncFS. We currently own the lock so new
// writes are waiting to acquire it in addToSyncQueue while the ones
// we sync are waiting on await()
hflush();
// Release all the clients waiting on the sync. Notice that we still
// own the lock until we get back to await at which point all the
// other threads waiting will first acquire and release locks
syncDone.signalAll();
}
} catch (IOException e) {
LOG.error("Error while syncing, requesting close of hlog ", e);
requestLogRoll();
} catch (InterruptedException e) {
LOG.debug(getName() + "interrupted while waiting for sync requests",e );
} finally {
lock.unlock();
LOG.info(getName() + " exiting");
}
}
/**
* This method first signals the thread that there's a sync needed
* and then waits for it to happen before returning.
*/
public void addToSyncQueue() {
// Don't bother if somehow our append was already synced
if (unflushedEntries.get() == 0) {
return;
}
lock.lock();
try {
// Wake the thread
queueEmpty.signal();
// Wait for it to syncFs
syncDone.await();
} catch (InterruptedException e) {
LOG.debug(getName() + " was interrupted while waiting for sync", e);
}
finally {
lock.unlock();
}
}
}
public void sync() throws IOException {
sync(false);
}
/**
* This method calls the LogSyncer in order to group commit the sync
* with other threads.
* @param force For catalog regions, force the sync to happen
* @throws IOException
*/
public void sync(boolean force) throws IOException {
// Set force sync if force is true and forceSync is false
forceSync.compareAndSet(!forceSync.get() && force, true);
logSyncerThread.addToSyncQueue();
}
/**
* Multiple threads will call sync() at the same time, only the winner
* will actually flush if there is any race or build up.
*
* @param force sync regardless (for meta updates) if there is data
* @throws IOException
*/
public void sync(boolean force) throws IOException {
protected void hflush() throws IOException {
synchronized (this.updateLock) {
if (this.closed)
return;
if (this.unflushedEntries.get() == 0)
return; // win
if (force || this.unflushedEntries.get() > this.flushlogentries) {
if (this.forceSync.get() ||
this.unflushedEntries.get() > this.flushlogentries) {
try {
lastLogFlushTime = System.currentTimeMillis();
if (this.append && syncfs != null) {
try {
this.syncfs.invoke(this.writer, NO_ARGS);
} catch (Exception e) {
throw new IOException("Reflection", e);
}
} else {
this.writer.sync();
if (this.writer_out != null)
this.writer_out.sync();
this.writer.sync();
if (this.writer_out != null) {
this.writer_out.sync();
}
this.forceSync.compareAndSet(true, false);
this.unflushedEntries.set(0);
} catch (IOException e) {
LOG.fatal("Could not append. Requesting close of hlog", e);
@ -986,7 +1062,6 @@ public class HLog implements HConstants, Syncable {
int concurrentLogReads =
conf.getInt("hbase.regionserver.hlog.splitlog.reader.threads", 3);
// Is append supported?
boolean append = isAppend(conf);
try {
int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) /
concurrentLogReads)).intValue();
@ -1005,7 +1080,6 @@ public class HLog implements HConstants, Syncable {
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
}
recoverLog(fs, logfiles[i].getPath(), append);
SequenceFile.Reader in = null;
int count = 0;
try {
@ -1159,24 +1233,6 @@ public class HLog implements HConstants, Syncable {
return splits;
}
/**
* @param conf
* @return True if append enabled and we have the syncFs in our path.
*/
private static boolean isAppend(final HBaseConfiguration conf) {
boolean append = conf.getBoolean("dfs.support.append", false);
if (append) {
try {
SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
append = true;
} catch (SecurityException e) {
} catch (NoSuchMethodException e) {
append = false;
}
}
return append;
}
/**
* Utility class that lets us keep track of the edit with it's key
* Only used when splitting logs
@ -1224,40 +1280,6 @@ public class HLog implements HConstants, Syncable {
return getHLogDirectoryName(HServerInfo.getServerName(info));
}
/*
* Recover log.
* If append has been set, try and open log in append mode.
* Doing this, we get a hold of the file that crashed writer
* was writing to. Once we have it, close it. This will
* allow subsequent reader to see up to last sync.
* @param fs
* @param p
* @param append
*/
private static void recoverLog(final FileSystem fs, final Path p,
final boolean append) {
if (!append) {
return;
}
// Trying recovery
boolean recovered = false;
while (!recovered) {
try {
FSDataOutputStream out = fs.append(p);
out.close();
recovered = true;
} catch (IOException e) {
LOG.info("Failed open for append, waiting on lease recovery: " + p, e);
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
// ignore it and try again
}
}
}
LOG.info("Past out lease recovery");
}
/**
* Construct the HLog directory name
*