diff --git a/CHANGES.txt b/CHANGES.txt index 6424ddd4b5b..fc15b0c080b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -45,7 +45,7 @@ Trunk (unreleased changes) HADOOP-1813 OOME makes zombie of region server HADOOP-1814 TestCleanRegionServerExit fails too often on Hudson HADOOP-1820 Regionserver creates hlogs without bound - (reverted 2007/09/25) + (reverted 2007/09/25) (Fixed 2007/09/30) HADOOP-1821 Replace all String.getBytes() with String.getBytes("UTF-8") HADOOP-1832 listTables() returns duplicate tables HADOOP-1834 Scanners ignore timestamp passed on creation diff --git a/src/java/org/apache/hadoop/hbase/HLog.java b/src/java/org/apache/hadoop/hbase/HLog.java index 7fe8761b639..bd2c50694f8 100644 --- a/src/java/org/apache/hadoop/hbase/HLog.java +++ b/src/java/org/apache/hadoop/hbase/HLog.java @@ -19,91 +19,124 @@ */ package org.apache.hadoop.hbase; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.*; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; import org.apache.hadoop.io.SequenceFile.Reader; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.conf.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; /** * HLog stores all the edits to the HStore. - * - * It performs logfile-rolling, so external callers are not aware that the + * + * It performs logfile-rolling, so external callers are not aware that the * underlying file is being rolled. * - *

A single HLog is used by several HRegions simultaneously. - * - *

Each HRegion is identified by a unique long int. HRegions do + *

+ * A single HLog is used by several HRegions simultaneously. + * + *

+ * Each HRegion is identified by a unique long int. HRegions do * not need to declare themselves before using the HLog; they simply include - * their HRegion-id in the append or + * their HRegion-id in the append or * completeCacheFlush calls. * - *

An HLog consists of multiple on-disk files, which have a chronological - * order. As data is flushed to other (better) on-disk structures, the log - * becomes obsolete. We can destroy all the log messages for a given - * HRegion-id up to the most-recent CACHEFLUSH message from that HRegion. + *

+ * An HLog consists of multiple on-disk files, which have a chronological order. + * As data is flushed to other (better) on-disk structures, the log becomes + * obsolete. We can destroy all the log messages for a given HRegion-id up to + * the most-recent CACHEFLUSH message from that HRegion. * - *

It's only practical to delete entire files. Thus, we delete an entire - * on-disk file F when all of the messages in F have a log-sequence-id that's - * older (smaller) than the most-recent CACHEFLUSH message for every HRegion - * that has a message in F. - * - *

TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs - * in HDFS is currently flawed. HBase writes edits to logs and to a memcache. - * The 'atomic' write to the log is meant to serve as insurance against - * abnormal RegionServer exit: on startup, the log is rerun to reconstruct an - * HRegion's last wholesome state. But files in HDFS do not 'exist' until they - * are cleanly closed -- something that will not happen if RegionServer exits - * without running its 'close'. + *

+ * It's only practical to delete entire files. Thus, we delete an entire on-disk + * file F when all of the messages in F have a log-sequence-id that's older + * (smaller) than the most-recent CACHEFLUSH message for every HRegion that has + * a message in F. + * + *

+ * Synchronized methods can never execute in parallel. However, between the + * start of a cache flush and the completion point, appends are allowed but log + * rolling is not. To prevent log rolling taking place during this period, a + * separate reentrant lock is used. + * + *

+ * TODO: Vuk Ercegovac also pointed out that keeping HBase HRegion edit logs in + * HDFS is currently flawed. HBase writes edits to logs and to a memcache. The + * 'atomic' write to the log is meant to serve as insurance against abnormal + * RegionServer exit: on startup, the log is rerun to reconstruct an HRegion's + * last wholesome state. But files in HDFS do not 'exist' until they are cleanly + * closed -- something that will not happen if RegionServer exits without + * running its 'close'. */ public class HLog implements HConstants { private static final Log LOG = LogFactory.getLog(HLog.class); - + static final String HLOG_DATFILE = "hlog.dat."; + static final Text METACOLUMN = new Text("METACOLUMN:"); + static final Text METAROW = new Text("METAROW"); FileSystem fs; + Path dir; + Configuration conf; - SequenceFile.Writer writer; - TreeMap outputfiles = new TreeMap(); - volatile boolean insideCacheFlush = false; + final long threadWakeFrequency; - TreeMap regionToLastFlush = new TreeMap(); + SequenceFile.Writer writer; + + TreeMap outputfiles = new TreeMap(); + + HashMap lastSeqWritten = new HashMap(); volatile boolean closed = false; - volatile long logSeqNum = 0; - long filenum = 0; - AtomicInteger numEntries = new AtomicInteger(0); - Integer rollLock = new Integer(0); + private final Integer sequenceLock = new Integer(0); + volatile long logSeqNum = 0; + + volatile long filenum = 0; + + volatile int numEntries = 0; + + // This lock prevents starting a log roll during a cache flush. + // synchronized is insufficient because a cache flush spans two method calls. + private final Lock cacheFlushLock = new ReentrantLock(); /** - * Split up a bunch of log files, that are no longer being written to, - * into new files, one per region. Delete the old log files when ready. + * Split up a bunch of log files, that are no longer being written to, into + * new files, one per region. Delete the old log files when finished. + * * @param rootDir Root directory of the HBase instance - * @param srcDir Directory of log files to split: - * e.g. ${ROOTDIR}/log_HOST_PORT + * @param srcDir Directory of log files to split: e.g. + * ${ROOTDIR}/log_HOST_PORT * @param fs FileSystem * @param conf HBaseConfiguration * @throws IOException */ static void splitLog(Path rootDir, Path srcDir, FileSystem fs, - Configuration conf) throws IOException { - Path logfiles[] = fs.listPaths(new Path[] {srcDir}); + Configuration conf) throws IOException { + Path logfiles[] = fs.listPaths(new Path[] { srcDir }); LOG.info("splitting " + logfiles.length + " log(s) in " + srcDir.toString()); HashMap logWriters = new HashMap(); try { - for(int i = 0; i < logfiles.length; i++) { + for (int i = 0; i < logfiles.length; i++) { if (LOG.isDebugEnabled()) { LOG.debug("Splitting " + logfiles[i]); } @@ -118,7 +151,7 @@ public class HLog implements HConstants { try { HLogKey key = new HLogKey(); HLogEdit val = new HLogEdit(); - while(in.next(key, val)) { + while (in.next(key, val)) { Text regionName = key.getRegionName(); SequenceFile.Writer w = logWriters.get(regionName); if (w == null) { @@ -141,15 +174,15 @@ public class HLog implements HConstants { } } } finally { - for (SequenceFile.Writer w: logWriters.values()) { + for (SequenceFile.Writer w : logWriters.values()) { w.close(); } } - - if(fs.exists(srcDir)) { - if(! fs.delete(srcDir)) { + + if (fs.exists(srcDir)) { + if (!fs.delete(srcDir)) { LOG.error("Cannot delete: " + srcDir); - if(! FileUtil.fullyDelete(new File(srcDir.toString()))) { + if (!FileUtil.fullyDelete(new File(srcDir.toString()))) { throw new IOException("Cannot delete: " + srcDir); } } @@ -160,10 +193,10 @@ public class HLog implements HConstants { /** * Create an edit log at the given dir location. * - * You should never have to load an existing log. If there is a log - * at startup, it should have already been processed and deleted by - * the time the HLog object is started up. - * + * You should never have to load an existing log. If there is a log at + * startup, it should have already been processed and deleted by the time the + * HLog object is started up. + * * @param fs * @param dir * @param conf @@ -173,6 +206,7 @@ public class HLog implements HConstants { this.fs = fs; this.dir = dir; this.conf = conf; + this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); if (fs.exists(dir)) { throw new IOException("Target HLog directory already exists: " + dir); @@ -180,115 +214,117 @@ public class HLog implements HConstants { fs.mkdirs(dir); rollWriter(); } - - synchronized void setSequenceNumber(long newvalue) { - if (newvalue > logSeqNum) { - if (LOG.isDebugEnabled()) { - LOG.debug("changing sequence number from " + logSeqNum + " to " + - newvalue); + + /** + * Called by HRegionServer when it opens a new region to ensure that log + * sequence numbers are always greater than the latest sequence number of the + * region being brought on-line. + * + * @param newvalue + */ + void setSequenceNumber(long newvalue) { + synchronized (sequenceLock) { + if (newvalue > logSeqNum) { + if (LOG.isDebugEnabled()) { + LOG.debug("changing sequence number from " + logSeqNum + " to " + + newvalue); + } + logSeqNum = newvalue; } - logSeqNum = newvalue; } } /** - * Roll the log writer. That is, start writing log messages to a new file. + * Roll the log writer. That is, start writing log messages to a new file. * - * The 'rollLock' prevents us from entering rollWriter() more than - * once at a time. + * Because a log cannot be rolled during a cache flush, and a cache flush + * spans two method calls, a special lock needs to be obtained so that a cache + * flush cannot start when the log is being rolled and the log cannot be + * rolled during a cache flush. + * + * Note that this method cannot be synchronized because it is possible that + * startCacheFlush runs, obtaining the cacheFlushLock, then this method could + * start which would obtain the lock on this but block on obtaining the + * cacheFlushLock and then completeCacheFlush could be called which would wait + * for the lock on this and consequently never release the cacheFlushLock * - * The 'this' lock limits access to the current writer so - * we don't append multiple items simultaneously. - * * @throws IOException */ - void rollWriter() throws IOException { - synchronized(rollLock) { + synchronized void rollWriter() throws IOException { + boolean locked = false; + while (!locked && !closed) { + if (cacheFlushLock.tryLock()) { + locked = true; + break; + } + try { + this.wait(threadWakeFrequency); + } catch (InterruptedException e) { + } + } + if (closed) { + if (locked) { + cacheFlushLock.unlock(); + } + throw new IOException("Cannot roll log; log is closed"); + } - // Try to roll the writer to a new file. We may have to - // wait for a cache-flush to complete. In the process, - // compute a list of old log files that can be deleted. + // If we get here we have locked out both cache flushes and appends - Vector toDeleteList = new Vector(); - synchronized(this) { - if(closed) { - throw new IOException("Cannot roll log; log is closed"); - } - - // Make sure we do not roll the log while inside a - // cache-flush. Otherwise, the log sequence number for - // the CACHEFLUSH operation will appear in a "newer" log file - // than it should. - while(insideCacheFlush) { - try { - wait(); - } catch (InterruptedException ie) { - // continue; - } - } - - // Close the current writer (if any), and grab a new one. - if(writer != null) { - writer.close(); - Path p = computeFilename(filenum - 1); - if(LOG.isDebugEnabled()) { - LOG.debug("Closing current log writer " + p.toString() + + try { + if (writer != null) { + // Close the current writer, get a new one. + writer.close(); + Path p = computeFilename(filenum - 1); + if (LOG.isDebugEnabled()) { + LOG.debug("Closing current log writer " + p.toString() + " to get a new one"); - } - if (filenum > 0) { + } + if (filenum > 0) { + synchronized (sequenceLock) { outputfiles.put(logSeqNum - 1, p); } } - Path newPath = computeFilename(filenum++); - this.writer = SequenceFile.createWriter(fs, conf, newPath, + } + Path newPath = computeFilename(filenum++); + this.writer = SequenceFile.createWriter(fs, conf, newPath, HLogKey.class, HLogEdit.class); - if(LOG.isDebugEnabled()) { - LOG.debug("new log writer created at " + newPath); - } - - // Can we delete any of the old log files? - // First, compute the oldest relevant log operation - // over all the regions. - long oldestOutstandingSeqNum = Long.MAX_VALUE; - for(Long l: regionToLastFlush.values()) { - long curSeqNum = l.longValue(); - - if(curSeqNum < oldestOutstandingSeqNum) { - oldestOutstandingSeqNum = curSeqNum; - } - } + LOG.info("new log writer created at " + newPath); - // Next, remove all files with a final ID that's older - // than the oldest pending region-operation. - for(Iterator it = outputfiles.keySet().iterator(); it.hasNext();) { - long maxSeqNum = it.next().longValue(); - if(maxSeqNum < oldestOutstandingSeqNum) { - Path p = outputfiles.get(maxSeqNum); - it.remove(); - toDeleteList.add(p); - - } else { - break; - } + // Can we delete any of the old log files? + + TreeSet sequenceNumbers = + new TreeSet(lastSeqWritten.values()); + + if (sequenceNumbers.size() > 0) { + long oldestOutstandingSeqNum = sequenceNumbers.first(); + + // Get the set of all log files whose final ID is older than the oldest + // pending region operation + + sequenceNumbers.clear(); + sequenceNumbers.addAll(outputfiles.headMap( + oldestOutstandingSeqNum).keySet()); + + // Now remove old log files (if any) + + for (Long seq : sequenceNumbers) { + Path p = outputfiles.remove(seq); + LOG.info("removing old log file " + p.toString()); + fs.delete(p); } } + this.numEntries = 0; - // Actually delete them, if any! - for(Iterator it = toDeleteList.iterator(); it.hasNext(); ) { - Path p = it.next(); - if(LOG.isDebugEnabled()) { - LOG.debug("removing old log file " + p.toString()); - } - fs.delete(p); - } - this.numEntries.set(0); + } finally { + cacheFlushLock.unlock(); } } /** - * This is a convenience method that computes a new filename with - * a given file-number. + * This is a convenience method that computes a new filename with a given + * file-number. */ Path computeFilename(final long fn) { return new Path(dir, HLOG_DATFILE + String.format("%1$03d", fn)); @@ -296,19 +332,21 @@ public class HLog implements HConstants { /** * Shut down the log and delete the log directory + * * @throws IOException */ synchronized void closeAndDelete() throws IOException { close(); fs.delete(dir); } - + /** * Shut down the log. + * * @throws IOException */ synchronized void close() throws IOException { - if(LOG.isDebugEnabled()) { + if (LOG.isDebugEnabled()) { LOG.debug("closing log writer in " + this.dir.toString()); } this.writer.close(); @@ -319,16 +357,19 @@ public class HLog implements HConstants { * Append a set of edits to the log. Log edits are keyed by regionName, * rowname, and log-sequence-id. * - * Later, if we sort by these keys, we obtain all the relevant edits for - * a given key-range of the HRegion (TODO). Any edits that do not have a + * Later, if we sort by these keys, we obtain all the relevant edits for a + * given key-range of the HRegion (TODO). Any edits that do not have a * matching {@link HConstants#COMPLETE_CACHEFLUSH} message can be discarded. * - *

Logs cannot be restarted once closed, or once the HLog process dies. - * Each time the HLog starts, it must create a new log. This means that - * other systems should process the log appropriately upon each startup - * (and prior to initializing HLog). + *

+ * Logs cannot be restarted once closed, or once the HLog process dies. Each + * time the HLog starts, it must create a new log. This means that other + * systems should process the log appropriately upon each startup (and prior + * to initializing HLog). + * + * synchronized prevents appends during the completion of a cache flush or for + * the duration of a log roll. * - * We need to seize a lock on the writer so that writes are atomic. * @param regionName * @param tableName * @param row @@ -337,136 +378,121 @@ public class HLog implements HConstants { * @throws IOException */ synchronized void append(Text regionName, Text tableName, Text row, - TreeMap columns, long timestamp) - throws IOException { - if(closed) { + TreeMap columns, long timestamp) throws IOException { + if (closed) { throw new IOException("Cannot append; log is closed"); } - + long seqNum[] = obtainSeqNum(columns.size()); - // The 'regionToLastFlush' map holds the sequence id of the - // most recent flush for every regionName. However, for regions - // that don't have any flush yet, the relevant operation is the - // first one that's been added. - if (regionToLastFlush.get(regionName) == null) { - regionToLastFlush.put(regionName, seqNum[0]); - } + // The 'lastSeqWritten' map holds the sequence number of the most recent + // write for each region. When the cache is flushed, the entry for the + // region being flushed is removed if the sequence number of the flush + // is greater than or equal to the value in lastSeqWritten + + lastSeqWritten.put(regionName, seqNum[seqNum.length - 1]); int counter = 0; - for (Map.Entry es: columns.entrySet()) { + for (Map.Entry es : columns.entrySet()) { HLogKey logKey = new HLogKey(regionName, tableName, row, seqNum[counter++]); HLogEdit logEdit = new HLogEdit(es.getKey(), es.getValue(), timestamp); writer.append(logKey, logEdit); - numEntries.getAndIncrement(); + numEntries++; } } /** @return How many items have been added to the log */ int getNumEntries() { - return numEntries.get(); + return numEntries; } /** - * Obtain a log sequence number. This seizes the whole HLog - * lock, but it shouldn't last too long. + * Obtain a log sequence number. */ - synchronized long obtainSeqNum() { - return logSeqNum++; + private long obtainSeqNum() { + long value; + synchronized (sequenceLock) { + value = logSeqNum++; + } + return value; } - + /** * Obtain a specified number of sequence numbers - * - * @param num - number of sequence numbers to obtain - * @return - array of sequence numbers + * + * @param num number of sequence numbers to obtain + * @return array of sequence numbers */ - synchronized long[] obtainSeqNum(int num) { + private long[] obtainSeqNum(int num) { long[] results = new long[num]; - for (int i = 0; i < num; i++) { - results[i] = logSeqNum++; + synchronized (sequenceLock) { + for (int i = 0; i < num; i++) { + results[i] = logSeqNum++; + } } return results; } /** - * By acquiring a log sequence ID, we can allow log messages - * to continue while we flush the cache. + * By acquiring a log sequence ID, we can allow log messages to continue while + * we flush the cache. + * + * Acquire a lock so that we do not roll the log between the start and + * completion of a cache-flush. Otherwise the log-seq-id for the flush will + * not appear in the correct logfile. * - * Set a flag so that we do not roll the log between the start - * and complete of a cache-flush. Otherwise the log-seq-id for - * the flush will not appear in the correct logfile. * @return sequence ID to pass {@link #completeCacheFlush(Text, Text, long)} * @see #completeCacheFlush(Text, Text, long) * @see #abortCacheFlush() */ - synchronized long startCacheFlush() { - while (this.insideCacheFlush) { - try { - wait(); - } catch (InterruptedException ie) { - // continue - } - } - this.insideCacheFlush = true; - notifyAll(); + long startCacheFlush() { + cacheFlushLock.lock(); return obtainSeqNum(); } - /** Complete the cache flush + /** + * Complete the cache flush + * + * Protected by this and cacheFlushLock + * * @param regionName * @param tableName * @param logSeqId * @throws IOException */ synchronized void completeCacheFlush(final Text regionName, - final Text tableName, final long logSeqId) - throws IOException { - if(this.closed) { - return; - } - - if (!this.insideCacheFlush) { - throw new IOException("Impossible situation: inside " + - "completeCacheFlush(), but 'insideCacheFlush' flag is false"); - } - HLogKey key = new HLogKey(regionName, tableName, HLog.METAROW, logSeqId); - this.writer.append(key, - new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(), - System.currentTimeMillis())); - this.numEntries.getAndIncrement(); + final Text tableName, final long logSeqId) throws IOException { - // Remember the most-recent flush for each region. - // This is used to delete obsolete log files. - this.regionToLastFlush.put(regionName, Long.valueOf(logSeqId)); + try { + if (this.closed) { + return; + } - cleanup(); + writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId), + new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(), + System.currentTimeMillis())); + + numEntries++; + Long seq = lastSeqWritten.get(regionName); + if (seq != null && logSeqId >= seq) { + lastSeqWritten.remove(regionName); + } + + } finally { + cacheFlushLock.unlock(); + notifyAll(); // wake up the log roller if it is waiting + } } - + /** - * Abort a cache flush. - * This method will clear waits on {@link #insideCacheFlush}. Call if the - * flush fails. Note that the only recovery for an aborted flush currently - * is a restart of the regionserver so the snapshot content dropped by the - * failure gets restored to the memcache. + * Abort a cache flush. This method will clear waits on + * {@link #insideCacheFlush}. Call if the flush fails. Note that the only + * recovery for an aborted flush currently is a restart of the regionserver so + * the snapshot content dropped by the failure gets restored to the memcache. */ synchronized void abortCacheFlush() { - cleanup(); - } - - private synchronized void cleanup() { - this.insideCacheFlush = false; - notifyAll(); - } - - /** - * Abort a cache flush. - * This method will clear waits on {@link #insideCacheFlush} but if this - * method is called, we are losing data. TODO: Fix. - */ - synchronized void abort() { - this.insideCacheFlush = false; + this.cacheFlushLock.unlock(); notifyAll(); } @@ -474,10 +500,11 @@ public class HLog implements HConstants { System.err.println("Usage: java org.apache.hbase.HLog" + " {--dump ... | --split ...}"); } - + /** * Pass one or more log file names and it will either dump out a text version * on stdout or split the specified log files. + * * @param args * @throws IOException */ @@ -490,7 +517,7 @@ public class HLog implements HConstants { if (args[0].compareTo("--dump") != 0) { if (args[0].compareTo("--split") == 0) { dump = false; - + } else { usage(); System.exit(-1); @@ -499,7 +526,7 @@ public class HLog implements HConstants { Configuration conf = new HBaseConfiguration(); FileSystem fs = FileSystem.get(conf); Path baseDir = new Path(conf.get(HBASE_DIR, DEFAULT_HBASE_DIR)); - + for (int i = 1; i < args.length; i++) { Path logPath = new Path(args[i]); if (!fs.exists(logPath)) { @@ -513,7 +540,7 @@ public class HLog implements HConstants { try { HLogKey key = new HLogKey(); HLogEdit val = new HLogEdit(); - while(log.next(key, val)) { + while (log.next(key, val)) { System.out.println(key.toString() + " " + val.toString()); } } finally { diff --git a/src/java/org/apache/hadoop/hbase/HRegion.java b/src/java/org/apache/hadoop/hbase/HRegion.java index 0fbb32f1e17..60e5acfdb0b 100644 --- a/src/java/org/apache/hadoop/hbase/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/HRegion.java @@ -210,6 +210,7 @@ public class HRegion implements HConstants { final int memcacheFlushSize; final int blockingMemcacheSize; protected final long threadWakeFrequency; + protected final int optionalFlushCount; private final HLocking lock = new HLocking(); private long desiredMaxFileSize; private final long maxSequenceId; @@ -247,6 +248,8 @@ public class HRegion implements HConstants { this.regionInfo = regionInfo; this.memcache = new HMemcache(); this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); + this.optionalFlushCount = + conf.getInt("hbase.hregion.memcache.optionalflushcount", 10); // Declare the regionName. This is a unique string for the region, used to // build a unique filename. @@ -728,11 +731,13 @@ public class HRegion implements HConstants { void optionallyFlush() throws IOException { if(this.memcache.getSize() > this.memcacheFlushSize) { flushcache(false); - } else if (this.memcache.getSize() > 0 && this.noFlushCount >= 10) { - LOG.info("Optional flush called " + this.noFlushCount + - " times when data present without flushing. Forcing one."); - flushcache(false); - if (this.memcache.getSize() > 0) { + } else if (this.memcache.getSize() > 0) { + if (this.noFlushCount >= this.optionalFlushCount) { + LOG.info("Optional flush called " + this.noFlushCount + + " times when data present without flushing. Forcing one."); + flushcache(false); + + } else { // Only increment if something in the cache. // Gets zero'd when a flushcache is called. this.noFlushCount++; @@ -864,25 +869,31 @@ public class HRegion implements HConstants { retval.memcacheSnapshot.size()); } - // A. Flush memcache to all the HStores. - // Keep running vector of all store files that includes both old and the - // just-made new flush store file. - for (HStore hstore: stores.values()) { - hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId); + try { + // A. Flush memcache to all the HStores. + // Keep running vector of all store files that includes both old and the + // just-made new flush store file. + for (HStore hstore: stores.values()) { + hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId); + } + } catch (IOException e) { + // An exception here means that the snapshot was not persisted. + // The hlog needs to be replayed so its content is restored to memcache. + // Currently, only a server restart will do this. + this.log.abortCacheFlush(); + throw new DroppedSnapshotException(e.getMessage()); } + // If we get to here, the HStores have been written. If we get an + // error in completeCacheFlush it will release the lock it is holding + // B. Write a FLUSHCACHE-COMPLETE message to the log. // This tells future readers that the HStores were emitted correctly, // and that all updates to the log for this regionName that have lower // log-sequence-ids can be safely ignored. this.log.completeCacheFlush(this.regionInfo.regionName, - regionInfo.tableDesc.getName(), logCacheFlushId); - } catch (IOException e) { - // An exception here means that the snapshot was not persisted. - // The hlog needs to be replayed so its content is restored to memcache. - // Currently, only a server restart will do this. - this.log.abortCacheFlush(); - throw new DroppedSnapshotException(e.getMessage()); + regionInfo.tableDesc.getName(), logCacheFlushId); + } finally { // C. Delete the now-irrelevant memcache snapshot; its contents have been // dumped to disk-based HStores or, if error, clear aborted snapshot. diff --git a/src/test/org/apache/hadoop/hbase/TestLogRolling.java b/src/test/org/apache/hadoop/hbase/TestLogRolling.java new file mode 100644 index 00000000000..25a691bc263 --- /dev/null +++ b/src/test/org/apache/hadoop/hbase/TestLogRolling.java @@ -0,0 +1,200 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.dfs.MiniDFSCluster; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; + +/** + * Test log deletion as logs are rolled. + */ +public class TestLogRolling extends HBaseTestCase { + private static final Log LOG = LogFactory.getLog(TestLogRolling.class); + private MiniDFSCluster dfs; + private MiniHBaseCluster cluster; + private Path logdir; + private String tableName; + private byte[] value; + + /** + * constructor + * @throws Exception + */ + public TestLogRolling() throws Exception { + super(); + try { + this.dfs = null; + this.cluster = null; + this.logdir = null; + this.tableName = null; + this.value = null; + + // We roll the log after every 256 writes + conf.setInt("hbase.regionserver.maxlogentries", 256); + + // For less frequently updated regions flush after every 2 flushes + conf.setInt("hbase.hregion.memcache.optionalflushcount", 2); + + // We flush the cache after every 8192 bytes + conf.setInt("hbase.hregion.memcache.flush.size", 8192); + + // Make lease timeout longer, lease checks less frequent + conf.setInt("hbase.master.lease.period", 10 * 1000); + conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000); + + // Increase the amount of time between client retries + conf.setLong("hbase.client.pause", 15 * 1000); + + String className = this.getClass().getName(); + StringBuilder v = new StringBuilder(className); + while (v.length() < 1000) { + v.append(className); + } + value = v.toString().getBytes(HConstants.UTF8_ENCODING); + + } catch (Exception e) { + LOG.fatal("error in constructor", e); + throw e; + } + } + + /** {@inheritDoc} */ + @Override + public void setUp() throws Exception { + try { + super.setUp(); + dfs = new MiniDFSCluster(conf, 2, true, (String[]) null); + } catch (Exception e) { + LOG.fatal("error during setUp: ", e); + throw e; + } + } + + /** {@inheritDoc} */ + @Override + public void tearDown() throws Exception { + try { + super.tearDown(); + + if (cluster != null) { // shutdown mini HBase cluster + cluster.shutdown(); + } + + if (dfs != null) { + FileSystem fs = dfs.getFileSystem(); + try { + dfs.shutdown(); + } finally { + if (fs != null) { + fs.close(); + } + } + } + } catch (Exception e) { + LOG.fatal("error in tearDown", e); + throw e; + } + } + + private void startAndWriteData() throws Exception { + cluster = new MiniHBaseCluster(conf, 1, dfs); + try { + Thread.sleep(10 * 1000); // Wait for region server to start + } catch (InterruptedException e) { + } + + logdir = cluster.regionThreads.get(0).getRegionServer().getLog().dir; + + // When the META table can be opened, the region servers are running + @SuppressWarnings("unused") + HTable meta = new HTable(conf, HConstants.META_TABLE_NAME); + + // Create the test table and open it + HTableDescriptor desc = new HTableDescriptor(tableName); + desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY.toString())); + HBaseAdmin admin = new HBaseAdmin(conf); + admin.createTable(desc); + HTable table = new HTable(conf, new Text(tableName)); + + for (int i = 1; i <= 2048; i++) { // 2048 writes should cause 8 log rolls + long lockid = + table.startUpdate(new Text("row" + String.format("%1$04d", i))); + table.put(lockid, HConstants.COLUMN_FAMILY, value); + table.commit(lockid); + + if (i % 256 == 0) { + // After every 256 writes sleep to let the log roller run + + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + } + } + } + } + + private int countLogFiles(boolean print) throws IOException { + Path[] logfiles = dfs.getFileSystem().listPaths(new Path[] {logdir}); + if (print) { + for (int i = 0; i < logfiles.length; i++) { + if (LOG.isDebugEnabled()) { + LOG.debug("logfile: " + logfiles[i].toString()); + } + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("number of log files: " + logfiles.length); + } + return logfiles.length; + } + + /** + * Tests that logs are deleted + * + * @throws Exception + */ + public void testLogRolling() throws Exception { + tableName = getName(); + // Force a region split after every 768KB + conf.setLong("hbase.hregion.max.filesize", 768L * 1024L); + try { + startAndWriteData(); + LOG.info("Finished writing. Sleeping to let cache flusher and log roller run"); + try { + // Wait for log roller and cache flusher to run a few times... + Thread.sleep(30L * 1000L); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted", e); + } + LOG.info("Wake from sleep"); + assertTrue(countLogFiles(true) <= 2); + } catch (Exception e) { + LOG.fatal("unexpected exception", e); + throw e; + } + } + +}