HADOOP-1820 regionserver creates hlogs without bound

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@580811 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2007-09-30 23:46:04 +00:00
parent 8a3bc9c23e
commit 188f9e6f23
4 changed files with 489 additions and 251 deletions

View File

@ -45,7 +45,7 @@ Trunk (unreleased changes)
HADOOP-1813 OOME makes zombie of region server HADOOP-1813 OOME makes zombie of region server
HADOOP-1814 TestCleanRegionServerExit fails too often on Hudson HADOOP-1814 TestCleanRegionServerExit fails too often on Hudson
HADOOP-1820 Regionserver creates hlogs without bound HADOOP-1820 Regionserver creates hlogs without bound
(reverted 2007/09/25) (reverted 2007/09/25) (Fixed 2007/09/30)
HADOOP-1821 Replace all String.getBytes() with String.getBytes("UTF-8") HADOOP-1821 Replace all String.getBytes() with String.getBytes("UTF-8")
HADOOP-1832 listTables() returns duplicate tables HADOOP-1832 listTables() returns duplicate tables
HADOOP-1834 Scanners ignore timestamp passed on creation HADOOP-1834 Scanners ignore timestamp passed on creation

View File

@ -19,91 +19,124 @@
*/ */
package org.apache.hadoop.hbase; package org.apache.hadoop.hbase;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.*; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.SequenceFile.Reader; import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.conf.*;
import java.io.*;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
/** /**
* HLog stores all the edits to the HStore. * HLog stores all the edits to the HStore.
* *
* It performs logfile-rolling, so external callers are not aware that the * It performs logfile-rolling, so external callers are not aware that the
* underlying file is being rolled. * underlying file is being rolled.
* *
* <p>A single HLog is used by several HRegions simultaneously. * <p>
* * A single HLog is used by several HRegions simultaneously.
* <p>Each HRegion is identified by a unique long <code>int</code>. HRegions do *
* <p>
* Each HRegion is identified by a unique long <code>int</code>. HRegions do
* not need to declare themselves before using the HLog; they simply include * not need to declare themselves before using the HLog; they simply include
* their HRegion-id in the <code>append</code> or * their HRegion-id in the <code>append</code> or
* <code>completeCacheFlush</code> calls. * <code>completeCacheFlush</code> calls.
* *
* <p>An HLog consists of multiple on-disk files, which have a chronological * <p>
* order. As data is flushed to other (better) on-disk structures, the log * An HLog consists of multiple on-disk files, which have a chronological order.
* becomes obsolete. We can destroy all the log messages for a given * As data is flushed to other (better) on-disk structures, the log becomes
* HRegion-id up to the most-recent CACHEFLUSH message from that HRegion. * obsolete. We can destroy all the log messages for a given HRegion-id up to
* the most-recent CACHEFLUSH message from that HRegion.
* *
* <p>It's only practical to delete entire files. Thus, we delete an entire * <p>
* on-disk file F when all of the messages in F have a log-sequence-id that's * It's only practical to delete entire files. Thus, we delete an entire on-disk
* older (smaller) than the most-recent CACHEFLUSH message for every HRegion * file F when all of the messages in F have a log-sequence-id that's older
* that has a message in F. * (smaller) than the most-recent CACHEFLUSH message for every HRegion that has
* * a message in F.
* <p>TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs *
* in HDFS is currently flawed. HBase writes edits to logs and to a memcache. * <p>
* The 'atomic' write to the log is meant to serve as insurance against * Synchronized methods can never execute in parallel. However, between the
* abnormal RegionServer exit: on startup, the log is rerun to reconstruct an * start of a cache flush and the completion point, appends are allowed but log
* HRegion's last wholesome state. But files in HDFS do not 'exist' until they * rolling is not. To prevent log rolling taking place during this period, a
* are cleanly closed -- something that will not happen if RegionServer exits * separate reentrant lock is used.
* without running its 'close'. *
* <p>
* TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs in
* HDFS is currently flawed. HBase writes edits to logs and to a memcache. The
* 'atomic' write to the log is meant to serve as insurance against abnormal
* RegionServer exit: on startup, the log is rerun to reconstruct an HRegion's
* last wholesome state. But files in HDFS do not 'exist' until they are cleanly
* closed -- something that will not happen if RegionServer exits without
* running its 'close'.
*/ */
public class HLog implements HConstants { public class HLog implements HConstants {
private static final Log LOG = LogFactory.getLog(HLog.class); private static final Log LOG = LogFactory.getLog(HLog.class);
static final String HLOG_DATFILE = "hlog.dat."; static final String HLOG_DATFILE = "hlog.dat.";
static final Text METACOLUMN = new Text("METACOLUMN:"); static final Text METACOLUMN = new Text("METACOLUMN:");
static final Text METAROW = new Text("METAROW"); static final Text METAROW = new Text("METAROW");
FileSystem fs; FileSystem fs;
Path dir; Path dir;
Configuration conf; Configuration conf;
SequenceFile.Writer writer; final long threadWakeFrequency;
TreeMap<Long, Path> outputfiles = new TreeMap<Long, Path>();
volatile boolean insideCacheFlush = false;
TreeMap<Text, Long> regionToLastFlush = new TreeMap<Text, Long>(); SequenceFile.Writer writer;
TreeMap<Long, Path> outputfiles = new TreeMap<Long, Path>();
HashMap<Text, Long> lastSeqWritten = new HashMap<Text, Long>();
volatile boolean closed = false; volatile boolean closed = false;
volatile long logSeqNum = 0;
long filenum = 0;
AtomicInteger numEntries = new AtomicInteger(0);
Integer rollLock = new Integer(0); private final Integer sequenceLock = new Integer(0);
volatile long logSeqNum = 0;
volatile long filenum = 0;
volatile int numEntries = 0;
// This lock prevents starting a log roll during a cache flush.
// synchronized is insufficient because a cache flush spans two method calls.
private final Lock cacheFlushLock = new ReentrantLock();
/** /**
* Split up a bunch of log files, that are no longer being written to, * Split up a bunch of log files, that are no longer being written to, into
* into new files, one per region. Delete the old log files when ready. * new files, one per region. Delete the old log files when finished.
*
* @param rootDir Root directory of the HBase instance * @param rootDir Root directory of the HBase instance
* @param srcDir Directory of log files to split: * @param srcDir Directory of log files to split: e.g.
* e.g. <code>${ROOTDIR}/log_HOST_PORT</code> * <code>${ROOTDIR}/log_HOST_PORT</code>
* @param fs FileSystem * @param fs FileSystem
* @param conf HBaseConfiguration * @param conf HBaseConfiguration
* @throws IOException * @throws IOException
*/ */
static void splitLog(Path rootDir, Path srcDir, FileSystem fs, static void splitLog(Path rootDir, Path srcDir, FileSystem fs,
Configuration conf) throws IOException { Configuration conf) throws IOException {
Path logfiles[] = fs.listPaths(new Path[] {srcDir}); Path logfiles[] = fs.listPaths(new Path[] { srcDir });
LOG.info("splitting " + logfiles.length + " log(s) in " + LOG.info("splitting " + logfiles.length + " log(s) in " +
srcDir.toString()); srcDir.toString());
HashMap<Text, SequenceFile.Writer> logWriters = HashMap<Text, SequenceFile.Writer> logWriters =
new HashMap<Text, SequenceFile.Writer>(); new HashMap<Text, SequenceFile.Writer>();
try { try {
for(int i = 0; i < logfiles.length; i++) { for (int i = 0; i < logfiles.length; i++) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Splitting " + logfiles[i]); LOG.debug("Splitting " + logfiles[i]);
} }
@ -118,7 +151,7 @@ public class HLog implements HConstants {
try { try {
HLogKey key = new HLogKey(); HLogKey key = new HLogKey();
HLogEdit val = new HLogEdit(); HLogEdit val = new HLogEdit();
while(in.next(key, val)) { while (in.next(key, val)) {
Text regionName = key.getRegionName(); Text regionName = key.getRegionName();
SequenceFile.Writer w = logWriters.get(regionName); SequenceFile.Writer w = logWriters.get(regionName);
if (w == null) { if (w == null) {
@ -141,15 +174,15 @@ public class HLog implements HConstants {
} }
} }
} finally { } finally {
for (SequenceFile.Writer w: logWriters.values()) { for (SequenceFile.Writer w : logWriters.values()) {
w.close(); w.close();
} }
} }
if(fs.exists(srcDir)) { if (fs.exists(srcDir)) {
if(! fs.delete(srcDir)) { if (!fs.delete(srcDir)) {
LOG.error("Cannot delete: " + srcDir); LOG.error("Cannot delete: " + srcDir);
if(! FileUtil.fullyDelete(new File(srcDir.toString()))) { if (!FileUtil.fullyDelete(new File(srcDir.toString()))) {
throw new IOException("Cannot delete: " + srcDir); throw new IOException("Cannot delete: " + srcDir);
} }
} }
@ -160,10 +193,10 @@ public class HLog implements HConstants {
/** /**
* Create an edit log at the given <code>dir</code> location. * Create an edit log at the given <code>dir</code> location.
* *
* You should never have to load an existing log. If there is a log * You should never have to load an existing log. If there is a log at
* at startup, it should have already been processed and deleted by * startup, it should have already been processed and deleted by the time the
* the time the HLog object is started up. * HLog object is started up.
* *
* @param fs * @param fs
* @param dir * @param dir
* @param conf * @param conf
@ -173,6 +206,7 @@ public class HLog implements HConstants {
this.fs = fs; this.fs = fs;
this.dir = dir; this.dir = dir;
this.conf = conf; this.conf = conf;
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
if (fs.exists(dir)) { if (fs.exists(dir)) {
throw new IOException("Target HLog directory already exists: " + dir); throw new IOException("Target HLog directory already exists: " + dir);
@ -180,115 +214,117 @@ public class HLog implements HConstants {
fs.mkdirs(dir); fs.mkdirs(dir);
rollWriter(); rollWriter();
} }
synchronized void setSequenceNumber(long newvalue) { /**
if (newvalue > logSeqNum) { * Called by HRegionServer when it opens a new region to ensure that log
if (LOG.isDebugEnabled()) { * sequence numbers are always greater than the latest sequence number of the
LOG.debug("changing sequence number from " + logSeqNum + " to " + * region being brought on-line.
newvalue); *
* @param newvalue
*/
void setSequenceNumber(long newvalue) {
synchronized (sequenceLock) {
if (newvalue > logSeqNum) {
if (LOG.isDebugEnabled()) {
LOG.debug("changing sequence number from " + logSeqNum + " to " +
newvalue);
}
logSeqNum = newvalue;
} }
logSeqNum = newvalue;
} }
} }
/** /**
* Roll the log writer. That is, start writing log messages to a new file. * Roll the log writer. That is, start writing log messages to a new file.
* *
* The 'rollLock' prevents us from entering rollWriter() more than * Because a log cannot be rolled during a cache flush, and a cache flush
* once at a time. * spans two method calls, a special lock needs to be obtained so that a cache
* flush cannot start when the log is being rolled and the log cannot be
* rolled during a cache flush.
*
* Note that this method cannot be synchronized because it is possible that
* startCacheFlush runs, obtaining the cacheFlushLock, then this method could
* start which would obtain the lock on this but block on obtaining the
* cacheFlushLock and then completeCacheFlush could be called which would wait
* for the lock on this and consequently never release the cacheFlushLock
* *
* The 'this' lock limits access to the current writer so
* we don't append multiple items simultaneously.
*
* @throws IOException * @throws IOException
*/ */
void rollWriter() throws IOException { synchronized void rollWriter() throws IOException {
synchronized(rollLock) { boolean locked = false;
while (!locked && !closed) {
if (cacheFlushLock.tryLock()) {
locked = true;
break;
}
try {
this.wait(threadWakeFrequency);
} catch (InterruptedException e) {
}
}
if (closed) {
if (locked) {
cacheFlushLock.unlock();
}
throw new IOException("Cannot roll log; log is closed");
}
// Try to roll the writer to a new file. We may have to // If we get here we have locked out both cache flushes and appends
// wait for a cache-flush to complete. In the process,
// compute a list of old log files that can be deleted.
Vector<Path> toDeleteList = new Vector<Path>(); try {
synchronized(this) { if (writer != null) {
if(closed) { // Close the current writer, get a new one.
throw new IOException("Cannot roll log; log is closed"); writer.close();
} Path p = computeFilename(filenum - 1);
if (LOG.isDebugEnabled()) {
// Make sure we do not roll the log while inside a LOG.debug("Closing current log writer " + p.toString() +
// cache-flush. Otherwise, the log sequence number for
// the CACHEFLUSH operation will appear in a "newer" log file
// than it should.
while(insideCacheFlush) {
try {
wait();
} catch (InterruptedException ie) {
// continue;
}
}
// Close the current writer (if any), and grab a new one.
if(writer != null) {
writer.close();
Path p = computeFilename(filenum - 1);
if(LOG.isDebugEnabled()) {
LOG.debug("Closing current log writer " + p.toString() +
" to get a new one"); " to get a new one");
} }
if (filenum > 0) { if (filenum > 0) {
synchronized (sequenceLock) {
outputfiles.put(logSeqNum - 1, p); outputfiles.put(logSeqNum - 1, p);
} }
} }
Path newPath = computeFilename(filenum++); }
this.writer = SequenceFile.createWriter(fs, conf, newPath, Path newPath = computeFilename(filenum++);
this.writer = SequenceFile.createWriter(fs, conf, newPath,
HLogKey.class, HLogEdit.class); HLogKey.class, HLogEdit.class);
if(LOG.isDebugEnabled()) {
LOG.debug("new log writer created at " + newPath);
}
// Can we delete any of the old log files?
// First, compute the oldest relevant log operation
// over all the regions.
long oldestOutstandingSeqNum = Long.MAX_VALUE; LOG.info("new log writer created at " + newPath);
for(Long l: regionToLastFlush.values()) {
long curSeqNum = l.longValue();
if(curSeqNum < oldestOutstandingSeqNum) {
oldestOutstandingSeqNum = curSeqNum;
}
}
// Next, remove all files with a final ID that's older // Can we delete any of the old log files?
// than the oldest pending region-operation.
for(Iterator<Long> it = outputfiles.keySet().iterator(); it.hasNext();) { TreeSet<Long> sequenceNumbers =
long maxSeqNum = it.next().longValue(); new TreeSet<Long>(lastSeqWritten.values());
if(maxSeqNum < oldestOutstandingSeqNum) {
Path p = outputfiles.get(maxSeqNum); if (sequenceNumbers.size() > 0) {
it.remove(); long oldestOutstandingSeqNum = sequenceNumbers.first();
toDeleteList.add(p);
// Get the set of all log files whose final ID is older than the oldest
} else { // pending region operation
break;
} sequenceNumbers.clear();
sequenceNumbers.addAll(outputfiles.headMap(
oldestOutstandingSeqNum).keySet());
// Now remove old log files (if any)
for (Long seq : sequenceNumbers) {
Path p = outputfiles.remove(seq);
LOG.info("removing old log file " + p.toString());
fs.delete(p);
} }
} }
this.numEntries = 0;
// Actually delete them, if any! } finally {
for(Iterator<Path> it = toDeleteList.iterator(); it.hasNext(); ) { cacheFlushLock.unlock();
Path p = it.next();
if(LOG.isDebugEnabled()) {
LOG.debug("removing old log file " + p.toString());
}
fs.delete(p);
}
this.numEntries.set(0);
} }
} }
/** /**
* This is a convenience method that computes a new filename with * This is a convenience method that computes a new filename with a given
* a given file-number. * file-number.
*/ */
Path computeFilename(final long fn) { Path computeFilename(final long fn) {
return new Path(dir, HLOG_DATFILE + String.format("%1$03d", fn)); return new Path(dir, HLOG_DATFILE + String.format("%1$03d", fn));
@ -296,19 +332,21 @@ public class HLog implements HConstants {
/** /**
* Shut down the log and delete the log directory * Shut down the log and delete the log directory
*
* @throws IOException * @throws IOException
*/ */
synchronized void closeAndDelete() throws IOException { synchronized void closeAndDelete() throws IOException {
close(); close();
fs.delete(dir); fs.delete(dir);
} }
/** /**
* Shut down the log. * Shut down the log.
*
* @throws IOException * @throws IOException
*/ */
synchronized void close() throws IOException { synchronized void close() throws IOException {
if(LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("closing log writer in " + this.dir.toString()); LOG.debug("closing log writer in " + this.dir.toString());
} }
this.writer.close(); this.writer.close();
@ -319,16 +357,19 @@ public class HLog implements HConstants {
* Append a set of edits to the log. Log edits are keyed by regionName, * Append a set of edits to the log. Log edits are keyed by regionName,
* rowname, and log-sequence-id. * rowname, and log-sequence-id.
* *
* Later, if we sort by these keys, we obtain all the relevant edits for * Later, if we sort by these keys, we obtain all the relevant edits for a
* a given key-range of the HRegion (TODO). Any edits that do not have a * given key-range of the HRegion (TODO). Any edits that do not have a
* matching {@link HConstants#COMPLETE_CACHEFLUSH} message can be discarded. * matching {@link HConstants#COMPLETE_CACHEFLUSH} message can be discarded.
* *
* <p>Logs cannot be restarted once closed, or once the HLog process dies. * <p>
* Each time the HLog starts, it must create a new log. This means that * Logs cannot be restarted once closed, or once the HLog process dies. Each
* other systems should process the log appropriately upon each startup * time the HLog starts, it must create a new log. This means that other
* (and prior to initializing HLog). * systems should process the log appropriately upon each startup (and prior
* to initializing HLog).
*
* synchronized prevents appends during the completion of a cache flush or for
* the duration of a log roll.
* *
* We need to seize a lock on the writer so that writes are atomic.
* @param regionName * @param regionName
* @param tableName * @param tableName
* @param row * @param row
@ -337,136 +378,121 @@ public class HLog implements HConstants {
* @throws IOException * @throws IOException
*/ */
synchronized void append(Text regionName, Text tableName, Text row, synchronized void append(Text regionName, Text tableName, Text row,
TreeMap<Text, byte []> columns, long timestamp) TreeMap<Text, byte[]> columns, long timestamp) throws IOException {
throws IOException { if (closed) {
if(closed) {
throw new IOException("Cannot append; log is closed"); throw new IOException("Cannot append; log is closed");
} }
long seqNum[] = obtainSeqNum(columns.size()); long seqNum[] = obtainSeqNum(columns.size());
// The 'regionToLastFlush' map holds the sequence id of the // The 'lastSeqWritten' map holds the sequence number of the most recent
// most recent flush for every regionName. However, for regions // write for each region. When the cache is flushed, the entry for the
// that don't have any flush yet, the relevant operation is the // region being flushed is removed if the sequence number of the flush
// first one that's been added. // is greater than or equal to the value in lastSeqWritten
if (regionToLastFlush.get(regionName) == null) {
regionToLastFlush.put(regionName, seqNum[0]); lastSeqWritten.put(regionName, seqNum[seqNum.length - 1]);
}
int counter = 0; int counter = 0;
for (Map.Entry<Text, byte []> es: columns.entrySet()) { for (Map.Entry<Text, byte[]> es : columns.entrySet()) {
HLogKey logKey = HLogKey logKey =
new HLogKey(regionName, tableName, row, seqNum[counter++]); new HLogKey(regionName, tableName, row, seqNum[counter++]);
HLogEdit logEdit = new HLogEdit(es.getKey(), es.getValue(), timestamp); HLogEdit logEdit = new HLogEdit(es.getKey(), es.getValue(), timestamp);
writer.append(logKey, logEdit); writer.append(logKey, logEdit);
numEntries.getAndIncrement(); numEntries++;
} }
} }
/** @return How many items have been added to the log */ /** @return How many items have been added to the log */
int getNumEntries() { int getNumEntries() {
return numEntries.get(); return numEntries;
} }
/** /**
* Obtain a log sequence number. This seizes the whole HLog * Obtain a log sequence number.
* lock, but it shouldn't last too long.
*/ */
synchronized long obtainSeqNum() { private long obtainSeqNum() {
return logSeqNum++; long value;
synchronized (sequenceLock) {
value = logSeqNum++;
}
return value;
} }
/** /**
* Obtain a specified number of sequence numbers * Obtain a specified number of sequence numbers
* *
* @param num - number of sequence numbers to obtain * @param num number of sequence numbers to obtain
* @return - array of sequence numbers * @return array of sequence numbers
*/ */
synchronized long[] obtainSeqNum(int num) { private long[] obtainSeqNum(int num) {
long[] results = new long[num]; long[] results = new long[num];
for (int i = 0; i < num; i++) { synchronized (sequenceLock) {
results[i] = logSeqNum++; for (int i = 0; i < num; i++) {
results[i] = logSeqNum++;
}
} }
return results; return results;
} }
/** /**
* By acquiring a log sequence ID, we can allow log messages * By acquiring a log sequence ID, we can allow log messages to continue while
* to continue while we flush the cache. * we flush the cache.
*
* Acquire a lock so that we do not roll the log between the start and
* completion of a cache-flush. Otherwise the log-seq-id for the flush will
* not appear in the correct logfile.
* *
* Set a flag so that we do not roll the log between the start
* and complete of a cache-flush. Otherwise the log-seq-id for
* the flush will not appear in the correct logfile.
* @return sequence ID to pass {@link #completeCacheFlush(Text, Text, long)} * @return sequence ID to pass {@link #completeCacheFlush(Text, Text, long)}
* @see #completeCacheFlush(Text, Text, long) * @see #completeCacheFlush(Text, Text, long)
* @see #abortCacheFlush() * @see #abortCacheFlush()
*/ */
synchronized long startCacheFlush() { long startCacheFlush() {
while (this.insideCacheFlush) { cacheFlushLock.lock();
try {
wait();
} catch (InterruptedException ie) {
// continue
}
}
this.insideCacheFlush = true;
notifyAll();
return obtainSeqNum(); return obtainSeqNum();
} }
/** Complete the cache flush /**
* Complete the cache flush
*
* Protected by this and cacheFlushLock
*
* @param regionName * @param regionName
* @param tableName * @param tableName
* @param logSeqId * @param logSeqId
* @throws IOException * @throws IOException
*/ */
synchronized void completeCacheFlush(final Text regionName, synchronized void completeCacheFlush(final Text regionName,
final Text tableName, final long logSeqId) final Text tableName, final long logSeqId) throws IOException {
throws IOException {
if(this.closed) {
return;
}
if (!this.insideCacheFlush) {
throw new IOException("Impossible situation: inside " +
"completeCacheFlush(), but 'insideCacheFlush' flag is false");
}
HLogKey key = new HLogKey(regionName, tableName, HLog.METAROW, logSeqId);
this.writer.append(key,
new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
System.currentTimeMillis()));
this.numEntries.getAndIncrement();
// Remember the most-recent flush for each region. try {
// This is used to delete obsolete log files. if (this.closed) {
this.regionToLastFlush.put(regionName, Long.valueOf(logSeqId)); return;
}
cleanup(); writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
System.currentTimeMillis()));
numEntries++;
Long seq = lastSeqWritten.get(regionName);
if (seq != null && logSeqId >= seq) {
lastSeqWritten.remove(regionName);
}
} finally {
cacheFlushLock.unlock();
notifyAll(); // wake up the log roller if it is waiting
}
} }
/** /**
* Abort a cache flush. * Abort a cache flush. This method will clear waits on
* This method will clear waits on {@link #insideCacheFlush}. Call if the * {@link #insideCacheFlush}. Call if the flush fails. Note that the only
* flush fails. Note that the only recovery for an aborted flush currently * recovery for an aborted flush currently is a restart of the regionserver so
* is a restart of the regionserver so the snapshot content dropped by the * the snapshot content dropped by the failure gets restored to the memcache.
* failure gets restored to the memcache.
*/ */
synchronized void abortCacheFlush() { synchronized void abortCacheFlush() {
cleanup(); this.cacheFlushLock.unlock();
}
private synchronized void cleanup() {
this.insideCacheFlush = false;
notifyAll();
}
/**
* Abort a cache flush.
* This method will clear waits on {@link #insideCacheFlush} but if this
* method is called, we are losing data. TODO: Fix.
*/
synchronized void abort() {
this.insideCacheFlush = false;
notifyAll(); notifyAll();
} }
@ -474,10 +500,11 @@ public class HLog implements HConstants {
System.err.println("Usage: java org.apache.hbase.HLog" + System.err.println("Usage: java org.apache.hbase.HLog" +
" {--dump <logfile>... | --split <logdir>...}"); " {--dump <logfile>... | --split <logdir>...}");
} }
/** /**
* Pass one or more log file names and it will either dump out a text version * Pass one or more log file names and it will either dump out a text version
* on <code>stdout</code> or split the specified log files. * on <code>stdout</code> or split the specified log files.
*
* @param args * @param args
* @throws IOException * @throws IOException
*/ */
@ -490,7 +517,7 @@ public class HLog implements HConstants {
if (args[0].compareTo("--dump") != 0) { if (args[0].compareTo("--dump") != 0) {
if (args[0].compareTo("--split") == 0) { if (args[0].compareTo("--split") == 0) {
dump = false; dump = false;
} else { } else {
usage(); usage();
System.exit(-1); System.exit(-1);
@ -499,7 +526,7 @@ public class HLog implements HConstants {
Configuration conf = new HBaseConfiguration(); Configuration conf = new HBaseConfiguration();
FileSystem fs = FileSystem.get(conf); FileSystem fs = FileSystem.get(conf);
Path baseDir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)); Path baseDir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR));
for (int i = 1; i < args.length; i++) { for (int i = 1; i < args.length; i++) {
Path logPath = new Path(args[i]); Path logPath = new Path(args[i]);
if (!fs.exists(logPath)) { if (!fs.exists(logPath)) {
@ -513,7 +540,7 @@ public class HLog implements HConstants {
try { try {
HLogKey key = new HLogKey(); HLogKey key = new HLogKey();
HLogEdit val = new HLogEdit(); HLogEdit val = new HLogEdit();
while(log.next(key, val)) { while (log.next(key, val)) {
System.out.println(key.toString() + " " + val.toString()); System.out.println(key.toString() + " " + val.toString());
} }
} finally { } finally {

View File

@ -210,6 +210,7 @@ public class HRegion implements HConstants {
final int memcacheFlushSize; final int memcacheFlushSize;
final int blockingMemcacheSize; final int blockingMemcacheSize;
protected final long threadWakeFrequency; protected final long threadWakeFrequency;
protected final int optionalFlushCount;
private final HLocking lock = new HLocking(); private final HLocking lock = new HLocking();
private long desiredMaxFileSize; private long desiredMaxFileSize;
private final long maxSequenceId; private final long maxSequenceId;
@ -247,6 +248,8 @@ public class HRegion implements HConstants {
this.regionInfo = regionInfo; this.regionInfo = regionInfo;
this.memcache = new HMemcache(); this.memcache = new HMemcache();
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
this.optionalFlushCount =
conf.getInt("hbase.hregion.memcache.optionalflushcount", 10);
// Declare the regionName. This is a unique string for the region, used to // Declare the regionName. This is a unique string for the region, used to
// build a unique filename. // build a unique filename.
@ -728,11 +731,13 @@ public class HRegion implements HConstants {
void optionallyFlush() throws IOException { void optionallyFlush() throws IOException {
if(this.memcache.getSize() > this.memcacheFlushSize) { if(this.memcache.getSize() > this.memcacheFlushSize) {
flushcache(false); flushcache(false);
} else if (this.memcache.getSize() > 0 && this.noFlushCount >= 10) { } else if (this.memcache.getSize() > 0) {
LOG.info("Optional flush called " + this.noFlushCount + if (this.noFlushCount >= this.optionalFlushCount) {
" times when data present without flushing. Forcing one."); LOG.info("Optional flush called " + this.noFlushCount +
flushcache(false); " times when data present without flushing. Forcing one.");
if (this.memcache.getSize() > 0) { flushcache(false);
} else {
// Only increment if something in the cache. // Only increment if something in the cache.
// Gets zero'd when a flushcache is called. // Gets zero'd when a flushcache is called.
this.noFlushCount++; this.noFlushCount++;
@ -864,25 +869,31 @@ public class HRegion implements HConstants {
retval.memcacheSnapshot.size()); retval.memcacheSnapshot.size());
} }
// A. Flush memcache to all the HStores. try {
// Keep running vector of all store files that includes both old and the // A. Flush memcache to all the HStores.
// just-made new flush store file. // Keep running vector of all store files that includes both old and the
for (HStore hstore: stores.values()) { // just-made new flush store file.
hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId); for (HStore hstore: stores.values()) {
hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
}
} catch (IOException e) {
// An exception here means that the snapshot was not persisted.
// The hlog needs to be replayed so its content is restored to memcache.
// Currently, only a server restart will do this.
this.log.abortCacheFlush();
throw new DroppedSnapshotException(e.getMessage());
} }
// If we get to here, the HStores have been written. If we get an
// error in completeCacheFlush it will release the lock it is holding
// B. Write a FLUSHCACHE-COMPLETE message to the log. // B. Write a FLUSHCACHE-COMPLETE message to the log.
// This tells future readers that the HStores were emitted correctly, // This tells future readers that the HStores were emitted correctly,
// and that all updates to the log for this regionName that have lower // and that all updates to the log for this regionName that have lower
// log-sequence-ids can be safely ignored. // log-sequence-ids can be safely ignored.
this.log.completeCacheFlush(this.regionInfo.regionName, this.log.completeCacheFlush(this.regionInfo.regionName,
regionInfo.tableDesc.getName(), logCacheFlushId); regionInfo.tableDesc.getName(), logCacheFlushId);
} catch (IOException e) {
// An exception here means that the snapshot was not persisted.
// The hlog needs to be replayed so its content is restored to memcache.
// Currently, only a server restart will do this.
this.log.abortCacheFlush();
throw new DroppedSnapshotException(e.getMessage());
} finally { } finally {
// C. Delete the now-irrelevant memcache snapshot; its contents have been // C. Delete the now-irrelevant memcache snapshot; its contents have been
// dumped to disk-based HStores or, if error, clear aborted snapshot. // dumped to disk-based HStores or, if error, clear aborted snapshot.

View File

@ -0,0 +1,200 @@
/**
* Copyright 2007 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
/**
* Test log deletion as logs are rolled.
*/
public class TestLogRolling extends HBaseTestCase {
private static final Log LOG = LogFactory.getLog(TestLogRolling.class);
private MiniDFSCluster dfs;
private MiniHBaseCluster cluster;
private Path logdir;
private String tableName;
private byte[] value;
/**
* constructor
* @throws Exception
*/
public TestLogRolling() throws Exception {
super();
try {
this.dfs = null;
this.cluster = null;
this.logdir = null;
this.tableName = null;
this.value = null;
// We roll the log after every 256 writes
conf.setInt("hbase.regionserver.maxlogentries", 256);
// For less frequently updated regions flush after every 2 flushes
conf.setInt("hbase.hregion.memcache.optionalflushcount", 2);
// We flush the cache after every 8192 bytes
conf.setInt("hbase.hregion.memcache.flush.size", 8192);
// Make lease timeout longer, lease checks less frequent
conf.setInt("hbase.master.lease.period", 10 * 1000);
conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
// Increase the amount of time between client retries
conf.setLong("hbase.client.pause", 15 * 1000);
String className = this.getClass().getName();
StringBuilder v = new StringBuilder(className);
while (v.length() < 1000) {
v.append(className);
}
value = v.toString().getBytes(HConstants.UTF8_ENCODING);
} catch (Exception e) {
LOG.fatal("error in constructor", e);
throw e;
}
}
/** {@inheritDoc} */
@Override
public void setUp() throws Exception {
try {
super.setUp();
dfs = new MiniDFSCluster(conf, 2, true, (String[]) null);
} catch (Exception e) {
LOG.fatal("error during setUp: ", e);
throw e;
}
}
/** {@inheritDoc} */
@Override
public void tearDown() throws Exception {
try {
super.tearDown();
if (cluster != null) { // shutdown mini HBase cluster
cluster.shutdown();
}
if (dfs != null) {
FileSystem fs = dfs.getFileSystem();
try {
dfs.shutdown();
} finally {
if (fs != null) {
fs.close();
}
}
}
} catch (Exception e) {
LOG.fatal("error in tearDown", e);
throw e;
}
}
private void startAndWriteData() throws Exception {
cluster = new MiniHBaseCluster(conf, 1, dfs);
try {
Thread.sleep(10 * 1000); // Wait for region server to start
} catch (InterruptedException e) {
}
logdir = cluster.regionThreads.get(0).getRegionServer().getLog().dir;
// When the META table can be opened, the region servers are running
@SuppressWarnings("unused")
HTable meta = new HTable(conf, HConstants.META_TABLE_NAME);
// Create the test table and open it
HTableDescriptor desc = new HTableDescriptor(tableName);
desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY.toString()));
HBaseAdmin admin = new HBaseAdmin(conf);
admin.createTable(desc);
HTable table = new HTable(conf, new Text(tableName));
for (int i = 1; i <= 2048; i++) { // 2048 writes should cause 8 log rolls
long lockid =
table.startUpdate(new Text("row" + String.format("%1$04d", i)));
table.put(lockid, HConstants.COLUMN_FAMILY, value);
table.commit(lockid);
if (i % 256 == 0) {
// After every 256 writes sleep to let the log roller run
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
}
}
}
private int countLogFiles(boolean print) throws IOException {
Path[] logfiles = dfs.getFileSystem().listPaths(new Path[] {logdir});
if (print) {
for (int i = 0; i < logfiles.length; i++) {
if (LOG.isDebugEnabled()) {
LOG.debug("logfile: " + logfiles[i].toString());
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("number of log files: " + logfiles.length);
}
return logfiles.length;
}
/**
* Tests that logs are deleted
*
* @throws Exception
*/
public void testLogRolling() throws Exception {
tableName = getName();
// Force a region split after every 768KB
conf.setLong("hbase.hregion.max.filesize", 768L * 1024L);
try {
startAndWriteData();
LOG.info("Finished writing. Sleeping to let cache flusher and log roller run");
try {
// Wait for log roller and cache flusher to run a few times...
Thread.sleep(30L * 1000L);
} catch (InterruptedException e) {
LOG.info("Sleep interrupted", e);
}
LOG.info("Wake from sleep");
assertTrue(countLogFiles(true) <= 2);
} catch (Exception e) {
LOG.fatal("unexpected exception", e);
throw e;
}
}
}