From 642dd4294eea44b6f04160f2d7ea85b4298f729a Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Thu, 11 Oct 2007 14:45:36 +0000 Subject: [PATCH] HADOOP-2029 TestLogRolling fails too often in patch and nightlies git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@583839 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + src/java/org/apache/hadoop/hbase/HLog.java | 170 ++++++++++-------- .../apache/hadoop/hbase/TestLogRolling.java | 29 +-- 3 files changed, 108 insertions(+), 92 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 02f0e9dd0d6..e920e785793 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -74,6 +74,7 @@ Trunk (unreleased changes) daemon scripts HADOOP-2017 TestRegionServerAbort failure in patch build #903 and nightly #266 + HADOOP-2029 TestLogRolling fails too often in patch and nightlies IMPROVEMENTS HADOOP-1737 Make HColumnDescriptor data publically members settable diff --git a/src/java/org/apache/hadoop/hbase/HLog.java b/src/java/org/apache/hadoop/hbase/HLog.java index bd2c50694f8..7d8532da1e1 100644 --- a/src/java/org/apache/hadoop/hbase/HLog.java +++ b/src/java/org/apache/hadoop/hbase/HLog.java @@ -83,26 +83,28 @@ import org.apache.hadoop.io.SequenceFile.Reader; */ public class HLog implements HConstants { private static final Log LOG = LogFactory.getLog(HLog.class); - - static final String HLOG_DATFILE = "hlog.dat."; - + private static final String HLOG_DATFILE = "hlog.dat."; static final Text METACOLUMN = new Text("METACOLUMN:"); - static final Text METAROW = new Text("METAROW"); - - FileSystem fs; - - Path dir; - - Configuration conf; - + final FileSystem fs; + final Path dir; + final Configuration conf; final long threadWakeFrequency; + /* + * Current log file. + */ SequenceFile.Writer writer; - TreeMap outputfiles = new TreeMap(); + /* + * Map of all log files but the current one. + */ + final TreeMap outputfiles = new TreeMap(); - HashMap lastSeqWritten = new HashMap(); + /* + * Map of region to last sequence/edit id. + */ + final Map lastSeqWritten = new HashMap(); volatile boolean closed = false; @@ -129,11 +131,12 @@ public class HLog implements HConstants { * @throws IOException */ static void splitLog(Path rootDir, Path srcDir, FileSystem fs, - Configuration conf) throws IOException { + Configuration conf) + throws IOException { Path logfiles[] = fs.listPaths(new Path[] { srcDir }); LOG.info("splitting " + logfiles.length + " log(s) in " + srcDir.toString()); - HashMap logWriters = + Map logWriters = new HashMap(); try { for (int i = 0; i < logfiles.length; i++) { @@ -156,12 +159,12 @@ public class HLog implements HConstants { SequenceFile.Writer w = logWriters.get(regionName); if (w == null) { Path logfile = new Path(HRegion.getRegionDir(rootDir, - regionName), HREGION_OLDLOGFILE_NAME); + regionName), HREGION_OLDLOGFILE_NAME); if (LOG.isDebugEnabled()) { LOG.debug("getting new log file writer for path " + logfile); } w = SequenceFile.createWriter(fs, conf, logfile, HLogKey.class, - HLogEdit.class); + HLogEdit.class); logWriters.put(regionName, w); } if (LOG.isDebugEnabled()) { @@ -202,12 +205,12 @@ public class HLog implements HConstants { * @param conf * @throws IOException */ - HLog(FileSystem fs, Path dir, Configuration conf) throws IOException { + HLog(final FileSystem fs, final Path dir, final Configuration conf) + throws IOException { this.fs = fs; this.dir = dir; this.conf = conf; this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); - if (fs.exists(dir)) { throw new IOException("Target HLog directory already exists: " + dir); } @@ -242,7 +245,7 @@ public class HLog implements HConstants { * flush cannot start when the log is being rolled and the log cannot be * rolled during a cache flush. * - * Note that this method cannot be synchronized because it is possible that + *

Note that this method cannot be synchronized because it is possible that * startCacheFlush runs, obtaining the cacheFlushLock, then this method could * start which would obtain the lock on this but block on obtaining the * cacheFlushLock and then completeCacheFlush could be called which would wait @@ -253,81 +256,94 @@ public class HLog implements HConstants { synchronized void rollWriter() throws IOException { boolean locked = false; while (!locked && !closed) { - if (cacheFlushLock.tryLock()) { + if (this.cacheFlushLock.tryLock()) { locked = true; break; } try { this.wait(threadWakeFrequency); } catch (InterruptedException e) { + // continue } } if (closed) { if (locked) { - cacheFlushLock.unlock(); + this.cacheFlushLock.unlock(); } throw new IOException("Cannot roll log; log is closed"); } // If we get here we have locked out both cache flushes and appends - try { - if (writer != null) { + if (this.writer != null) { // Close the current writer, get a new one. - writer.close(); + this.writer.close(); Path p = computeFilename(filenum - 1); if (LOG.isDebugEnabled()) { LOG.debug("Closing current log writer " + p.toString() + - " to get a new one"); + " to get a new one"); } if (filenum > 0) { - synchronized (sequenceLock) { - outputfiles.put(logSeqNum - 1, p); + synchronized (this.sequenceLock) { + this.outputfiles.put(Long.valueOf(this.logSeqNum - 1), p); } } } Path newPath = computeFilename(filenum++); - this.writer = SequenceFile.createWriter(fs, conf, newPath, + this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath, HLogKey.class, HLogEdit.class); - LOG.info("new log writer created at " + newPath); // Can we delete any of the old log files? - - TreeSet sequenceNumbers = - new TreeSet(lastSeqWritten.values()); - - if (sequenceNumbers.size() > 0) { - long oldestOutstandingSeqNum = sequenceNumbers.first(); - - // Get the set of all log files whose final ID is older than the oldest - // pending region operation - - sequenceNumbers.clear(); - sequenceNumbers.addAll(outputfiles.headMap( - oldestOutstandingSeqNum).keySet()); - - // Now remove old log files (if any) - - for (Long seq : sequenceNumbers) { - Path p = outputfiles.remove(seq); - LOG.info("removing old log file " + p.toString()); - fs.delete(p); + if (this.outputfiles.size() > 0) { + if (this.lastSeqWritten.size() <= 0) { + LOG.debug("Last sequence written is empty. Deleting all old hlogs"); + // If so, then no new writes have come in since all regions were + // flushed (and removed from the lastSeqWritten map). Means can + // remove all but currently open log file. + for (Map.Entry e : this.outputfiles.entrySet()) { + deleteLogFile(e.getValue(), e.getKey()); + } + this.outputfiles.clear(); + } else { + // Get oldest edit/sequence id. If logs are older than this id, + // then safe to remove. + TreeSet sequenceNumbers = + new TreeSet(this.lastSeqWritten.values()); + long oldestOutstandingSeqNum = sequenceNumbers.first().longValue(); + // Get the set of all log files whose final ID is older than the + // oldest pending region operation + sequenceNumbers.clear(); + sequenceNumbers.addAll(this.outputfiles.headMap( + Long.valueOf(oldestOutstandingSeqNum)).keySet()); + // Now remove old log files (if any) + LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " + + "using oldest outstanding seqnum of " + oldestOutstandingSeqNum); + for (Long seq : sequenceNumbers) { + deleteLogFile(this.outputfiles.remove(seq), seq); + } } } this.numEntries = 0; - } finally { - cacheFlushLock.unlock(); + this.cacheFlushLock.unlock(); } } + + private void deleteLogFile(final Path p, final Long seqno) + throws IOException { + LOG.info("removing old log file " + p.toString() + + " whose highest sequence/edit id is " + seqno); + this.fs.delete(p); + } /** * This is a convenience method that computes a new filename with a given * file-number. */ Path computeFilename(final long fn) { - return new Path(dir, HLOG_DATFILE + String.format("%1$03d", fn)); + return new Path(dir, + HLOG_DATFILE + String.format("%1$03d", Long.valueOf(fn))); } /** @@ -378,27 +394,26 @@ public class HLog implements HConstants { * @throws IOException */ synchronized void append(Text regionName, Text tableName, Text row, - TreeMap columns, long timestamp) throws IOException { + TreeMap columns, long timestamp) + throws IOException { if (closed) { throw new IOException("Cannot append; log is closed"); } - long seqNum[] = obtainSeqNum(columns.size()); - - // The 'lastSeqWritten' map holds the sequence number of the most recent + // The 'lastSeqWritten' map holds the sequence number of the oldest // write for each region. When the cache is flushed, the entry for the // region being flushed is removed if the sequence number of the flush - // is greater than or equal to the value in lastSeqWritten - - lastSeqWritten.put(regionName, seqNum[seqNum.length - 1]); - + // is greater than or equal to the value in lastSeqWritten. + if (!this.lastSeqWritten.containsKey(regionName)) { + this.lastSeqWritten.put(regionName, Long.valueOf(seqNum[0])); + } int counter = 0; for (Map.Entry es : columns.entrySet()) { HLogKey logKey = new HLogKey(regionName, tableName, row, seqNum[counter++]); HLogEdit logEdit = new HLogEdit(es.getKey(), es.getValue(), timestamp); - writer.append(logKey, logEdit); - numEntries++; + this.writer.append(logKey, logEdit); + this.numEntries++; } } @@ -426,9 +441,9 @@ public class HLog implements HConstants { */ private long[] obtainSeqNum(int num) { long[] results = new long[num]; - synchronized (sequenceLock) { + synchronized (this.sequenceLock) { for (int i = 0; i < num; i++) { - results[i] = logSeqNum++; + results[i] = this.logSeqNum++; } } return results; @@ -447,7 +462,7 @@ public class HLog implements HConstants { * @see #abortCacheFlush() */ long startCacheFlush() { - cacheFlushLock.lock(); + this.cacheFlushLock.lock(); return obtainSeqNum(); } @@ -462,25 +477,22 @@ public class HLog implements HConstants { * @throws IOException */ synchronized void completeCacheFlush(final Text regionName, - final Text tableName, final long logSeqId) throws IOException { - + final Text tableName, final long logSeqId) + throws IOException { try { if (this.closed) { return; } - - writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId), - new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(), - System.currentTimeMillis())); - - numEntries++; - Long seq = lastSeqWritten.get(regionName); - if (seq != null && logSeqId >= seq) { - lastSeqWritten.remove(regionName); + this.writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId), + new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(), + System.currentTimeMillis())); + this.numEntries++; + Long seq = this.lastSeqWritten.get(regionName); + if (seq != null && logSeqId >= seq.longValue()) { + this.lastSeqWritten.remove(regionName); } - } finally { - cacheFlushLock.unlock(); + this.cacheFlushLock.unlock(); notifyAll(); // wake up the log roller if it is waiting } } diff --git a/src/test/org/apache/hadoop/hbase/TestLogRolling.java b/src/test/org/apache/hadoop/hbase/TestLogRolling.java index d902d0dd22f..7436d5581ea 100644 --- a/src/test/org/apache/hadoop/hbase/TestLogRolling.java +++ b/src/test/org/apache/hadoop/hbase/TestLogRolling.java @@ -19,8 +19,6 @@ */ package org.apache.hadoop.hbase; -import java.io.IOException; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.dfs.MiniDFSCluster; @@ -128,9 +126,10 @@ public class TestLogRolling extends HBaseTestCase { try { Thread.sleep(10 * 1000); // Wait for region server to start } catch (InterruptedException e) { + // continue } - logdir = cluster.regionThreads.get(0).getRegionServer().getLog().dir; + this.logdir = cluster.regionThreads.get(0).getRegionServer().getLog().dir; // When the META table can be opened, the region servers are running @SuppressWarnings("unused") @@ -155,13 +154,14 @@ public class TestLogRolling extends HBaseTestCase { try { Thread.sleep(2000); } catch (InterruptedException e) { + // continue } } } } - private int countLogFiles(boolean print) throws IOException { - Path[] logfiles = dfs.getFileSystem().listPaths(new Path[] {logdir}); + private int countLogFiles(final boolean print) throws Exception { + Path[] logfiles = dfs.getFileSystem().listPaths(new Path[] {this.logdir}); if (print) { for (int i = 0; i < logfiles.length; i++) { if (LOG.isDebugEnabled()) { @@ -186,15 +186,18 @@ public class TestLogRolling extends HBaseTestCase { conf.setLong("hbase.hregion.max.filesize", 768L * 1024L); try { startAndWriteData(); - LOG.info("Finished writing. Sleeping to let cache flusher and log roller run"); - try { - // Wait for log roller and cache flusher to run a few times... - Thread.sleep(30L * 1000L); - } catch (InterruptedException e) { - LOG.info("Sleep interrupted", e); + int count = countLogFiles(true); + LOG.info("Finished writing. There are " + count + " log files. " + + "Sleeping to let cache flusher and log roller run"); + while (count > 2) { + try { + Thread.sleep(1000L); + } catch (InterruptedException e) { + LOG.info("Sleep interrupted", e); + } + count = countLogFiles(true); } - LOG.info("Wake from sleep"); - assertTrue(countLogFiles(true) <= 2); + assertTrue(count <= 2); } catch (Exception e) { LOG.fatal("unexpected exception", e); throw e;