HADOOP-2029 TestLogRolling fails too often in patch and nightlies

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@583839 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2007-10-11 14:45:36 +00:00
parent c9f105acd5
commit 642dd4294e
3 changed files with 108 additions and 92 deletions

View File

@ -74,6 +74,7 @@ Trunk (unreleased changes)
daemon scripts daemon scripts
HADOOP-2017 TestRegionServerAbort failure in patch build #903 and HADOOP-2017 TestRegionServerAbort failure in patch build #903 and
nightly #266 nightly #266
HADOOP-2029 TestLogRolling fails too often in patch and nightlies
IMPROVEMENTS IMPROVEMENTS
HADOOP-1737 Make HColumnDescriptor data publically members settable HADOOP-1737 Make HColumnDescriptor data publically members settable

View File

@ -83,26 +83,28 @@ import org.apache.hadoop.io.SequenceFile.Reader;
*/ */
public class HLog implements HConstants { public class HLog implements HConstants {
private static final Log LOG = LogFactory.getLog(HLog.class); private static final Log LOG = LogFactory.getLog(HLog.class);
private static final String HLOG_DATFILE = "hlog.dat.";
static final String HLOG_DATFILE = "hlog.dat.";
static final Text METACOLUMN = new Text("METACOLUMN:"); static final Text METACOLUMN = new Text("METACOLUMN:");
static final Text METAROW = new Text("METAROW"); static final Text METAROW = new Text("METAROW");
final FileSystem fs;
FileSystem fs; final Path dir;
final Configuration conf;
Path dir;
Configuration conf;
final long threadWakeFrequency; final long threadWakeFrequency;
/*
* Current log file.
*/
SequenceFile.Writer writer; SequenceFile.Writer writer;
TreeMap<Long, Path> outputfiles = new TreeMap<Long, Path>(); /*
* Map of all log files but the current one.
*/
final TreeMap<Long, Path> outputfiles = new TreeMap<Long, Path>();
HashMap<Text, Long> lastSeqWritten = new HashMap<Text, Long>(); /*
* Map of region to last sequence/edit id.
*/
final Map<Text, Long> lastSeqWritten = new HashMap<Text, Long>();
volatile boolean closed = false; volatile boolean closed = false;
@ -129,11 +131,12 @@ public class HLog implements HConstants {
* @throws IOException * @throws IOException
*/ */
static void splitLog(Path rootDir, Path srcDir, FileSystem fs, static void splitLog(Path rootDir, Path srcDir, FileSystem fs,
Configuration conf) throws IOException { Configuration conf)
throws IOException {
Path logfiles[] = fs.listPaths(new Path[] { srcDir }); Path logfiles[] = fs.listPaths(new Path[] { srcDir });
LOG.info("splitting " + logfiles.length + " log(s) in " + LOG.info("splitting " + logfiles.length + " log(s) in " +
srcDir.toString()); srcDir.toString());
HashMap<Text, SequenceFile.Writer> logWriters = Map<Text, SequenceFile.Writer> logWriters =
new HashMap<Text, SequenceFile.Writer>(); new HashMap<Text, SequenceFile.Writer>();
try { try {
for (int i = 0; i < logfiles.length; i++) { for (int i = 0; i < logfiles.length; i++) {
@ -202,12 +205,12 @@ public class HLog implements HConstants {
* @param conf * @param conf
* @throws IOException * @throws IOException
*/ */
HLog(FileSystem fs, Path dir, Configuration conf) throws IOException { HLog(final FileSystem fs, final Path dir, final Configuration conf)
throws IOException {
this.fs = fs; this.fs = fs;
this.dir = dir; this.dir = dir;
this.conf = conf; this.conf = conf;
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
if (fs.exists(dir)) { if (fs.exists(dir)) {
throw new IOException("Target HLog directory already exists: " + dir); throw new IOException("Target HLog directory already exists: " + dir);
} }
@ -242,7 +245,7 @@ public class HLog implements HConstants {
* flush cannot start when the log is being rolled and the log cannot be * flush cannot start when the log is being rolled and the log cannot be
* rolled during a cache flush. * rolled during a cache flush.
* *
* Note that this method cannot be synchronized because it is possible that * <p>Note that this method cannot be synchronized because it is possible that
* startCacheFlush runs, obtaining the cacheFlushLock, then this method could * startCacheFlush runs, obtaining the cacheFlushLock, then this method could
* start which would obtain the lock on this but block on obtaining the * start which would obtain the lock on this but block on obtaining the
* cacheFlushLock and then completeCacheFlush could be called which would wait * cacheFlushLock and then completeCacheFlush could be called which would wait
@ -253,81 +256,94 @@ public class HLog implements HConstants {
synchronized void rollWriter() throws IOException { synchronized void rollWriter() throws IOException {
boolean locked = false; boolean locked = false;
while (!locked && !closed) { while (!locked && !closed) {
if (cacheFlushLock.tryLock()) { if (this.cacheFlushLock.tryLock()) {
locked = true; locked = true;
break; break;
} }
try { try {
this.wait(threadWakeFrequency); this.wait(threadWakeFrequency);
} catch (InterruptedException e) { } catch (InterruptedException e) {
// continue
} }
} }
if (closed) { if (closed) {
if (locked) { if (locked) {
cacheFlushLock.unlock(); this.cacheFlushLock.unlock();
} }
throw new IOException("Cannot roll log; log is closed"); throw new IOException("Cannot roll log; log is closed");
} }
// If we get here we have locked out both cache flushes and appends // If we get here we have locked out both cache flushes and appends
try { try {
if (writer != null) { if (this.writer != null) {
// Close the current writer, get a new one. // Close the current writer, get a new one.
writer.close(); this.writer.close();
Path p = computeFilename(filenum - 1); Path p = computeFilename(filenum - 1);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Closing current log writer " + p.toString() + LOG.debug("Closing current log writer " + p.toString() +
" to get a new one"); " to get a new one");
} }
if (filenum > 0) { if (filenum > 0) {
synchronized (sequenceLock) { synchronized (this.sequenceLock) {
outputfiles.put(logSeqNum - 1, p); this.outputfiles.put(Long.valueOf(this.logSeqNum - 1), p);
} }
} }
} }
Path newPath = computeFilename(filenum++); Path newPath = computeFilename(filenum++);
this.writer = SequenceFile.createWriter(fs, conf, newPath, this.writer = SequenceFile.createWriter(this.fs, this.conf, newPath,
HLogKey.class, HLogEdit.class); HLogKey.class, HLogEdit.class);
LOG.info("new log writer created at " + newPath); LOG.info("new log writer created at " + newPath);
// Can we delete any of the old log files? // Can we delete any of the old log files?
if (this.outputfiles.size() > 0) {
if (this.lastSeqWritten.size() <= 0) {
LOG.debug("Last sequence written is empty. Deleting all old hlogs");
// If so, then no new writes have come in since all regions were
// flushed (and removed from the lastSeqWritten map). Means can
// remove all but currently open log file.
for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
deleteLogFile(e.getValue(), e.getKey());
}
this.outputfiles.clear();
} else {
// Get oldest edit/sequence id. If logs are older than this id,
// then safe to remove.
TreeSet<Long> sequenceNumbers = TreeSet<Long> sequenceNumbers =
new TreeSet<Long>(lastSeqWritten.values()); new TreeSet<Long>(this.lastSeqWritten.values());
long oldestOutstandingSeqNum = sequenceNumbers.first().longValue();
if (sequenceNumbers.size() > 0) { // Get the set of all log files whose final ID is older than the
long oldestOutstandingSeqNum = sequenceNumbers.first(); // oldest pending region operation
// Get the set of all log files whose final ID is older than the oldest
// pending region operation
sequenceNumbers.clear(); sequenceNumbers.clear();
sequenceNumbers.addAll(outputfiles.headMap( sequenceNumbers.addAll(this.outputfiles.headMap(
oldestOutstandingSeqNum).keySet()); Long.valueOf(oldestOutstandingSeqNum)).keySet());
// Now remove old log files (if any) // Now remove old log files (if any)
LOG.debug("Found " + sequenceNumbers.size() + " logs to remove " +
"using oldest outstanding seqnum of " + oldestOutstandingSeqNum);
for (Long seq : sequenceNumbers) { for (Long seq : sequenceNumbers) {
Path p = outputfiles.remove(seq); deleteLogFile(this.outputfiles.remove(seq), seq);
LOG.info("removing old log file " + p.toString()); }
fs.delete(p);
} }
} }
this.numEntries = 0; this.numEntries = 0;
} finally { } finally {
cacheFlushLock.unlock(); this.cacheFlushLock.unlock();
} }
} }
private void deleteLogFile(final Path p, final Long seqno)
throws IOException {
LOG.info("removing old log file " + p.toString() +
" whose highest sequence/edit id is " + seqno);
this.fs.delete(p);
}
/** /**
* This is a convenience method that computes a new filename with a given * This is a convenience method that computes a new filename with a given
* file-number. * file-number.
*/ */
Path computeFilename(final long fn) { Path computeFilename(final long fn) {
return new Path(dir, HLOG_DATFILE + String.format("%1$03d", fn)); return new Path(dir,
HLOG_DATFILE + String.format("%1$03d", Long.valueOf(fn)));
} }
/** /**
@ -378,27 +394,26 @@ public class HLog implements HConstants {
* @throws IOException * @throws IOException
*/ */
synchronized void append(Text regionName, Text tableName, Text row, synchronized void append(Text regionName, Text tableName, Text row,
TreeMap<Text, byte[]> columns, long timestamp) throws IOException { TreeMap<Text, byte[]> columns, long timestamp)
throws IOException {
if (closed) { if (closed) {
throw new IOException("Cannot append; log is closed"); throw new IOException("Cannot append; log is closed");
} }
long seqNum[] = obtainSeqNum(columns.size()); long seqNum[] = obtainSeqNum(columns.size());
// The 'lastSeqWritten' map holds the sequence number of the oldest
// The 'lastSeqWritten' map holds the sequence number of the most recent
// write for each region. When the cache is flushed, the entry for the // write for each region. When the cache is flushed, the entry for the
// region being flushed is removed if the sequence number of the flush // region being flushed is removed if the sequence number of the flush
// is greater than or equal to the value in lastSeqWritten // is greater than or equal to the value in lastSeqWritten.
if (!this.lastSeqWritten.containsKey(regionName)) {
lastSeqWritten.put(regionName, seqNum[seqNum.length - 1]); this.lastSeqWritten.put(regionName, Long.valueOf(seqNum[0]));
}
int counter = 0; int counter = 0;
for (Map.Entry<Text, byte[]> es : columns.entrySet()) { for (Map.Entry<Text, byte[]> es : columns.entrySet()) {
HLogKey logKey = HLogKey logKey =
new HLogKey(regionName, tableName, row, seqNum[counter++]); new HLogKey(regionName, tableName, row, seqNum[counter++]);
HLogEdit logEdit = new HLogEdit(es.getKey(), es.getValue(), timestamp); HLogEdit logEdit = new HLogEdit(es.getKey(), es.getValue(), timestamp);
writer.append(logKey, logEdit); this.writer.append(logKey, logEdit);
numEntries++; this.numEntries++;
} }
} }
@ -426,9 +441,9 @@ public class HLog implements HConstants {
*/ */
private long[] obtainSeqNum(int num) { private long[] obtainSeqNum(int num) {
long[] results = new long[num]; long[] results = new long[num];
synchronized (sequenceLock) { synchronized (this.sequenceLock) {
for (int i = 0; i < num; i++) { for (int i = 0; i < num; i++) {
results[i] = logSeqNum++; results[i] = this.logSeqNum++;
} }
} }
return results; return results;
@ -447,7 +462,7 @@ public class HLog implements HConstants {
* @see #abortCacheFlush() * @see #abortCacheFlush()
*/ */
long startCacheFlush() { long startCacheFlush() {
cacheFlushLock.lock(); this.cacheFlushLock.lock();
return obtainSeqNum(); return obtainSeqNum();
} }
@ -462,25 +477,22 @@ public class HLog implements HConstants {
* @throws IOException * @throws IOException
*/ */
synchronized void completeCacheFlush(final Text regionName, synchronized void completeCacheFlush(final Text regionName,
final Text tableName, final long logSeqId) throws IOException { final Text tableName, final long logSeqId)
throws IOException {
try { try {
if (this.closed) { if (this.closed) {
return; return;
} }
this.writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
writer.append(new HLogKey(regionName, tableName, HLog.METAROW, logSeqId),
new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(), new HLogEdit(HLog.METACOLUMN, HGlobals.completeCacheFlush.get(),
System.currentTimeMillis())); System.currentTimeMillis()));
this.numEntries++;
numEntries++; Long seq = this.lastSeqWritten.get(regionName);
Long seq = lastSeqWritten.get(regionName); if (seq != null && logSeqId >= seq.longValue()) {
if (seq != null && logSeqId >= seq) { this.lastSeqWritten.remove(regionName);
lastSeqWritten.remove(regionName);
} }
} finally { } finally {
cacheFlushLock.unlock(); this.cacheFlushLock.unlock();
notifyAll(); // wake up the log roller if it is waiting notifyAll(); // wake up the log roller if it is waiting
} }
} }

View File

@ -19,8 +19,6 @@
*/ */
package org.apache.hadoop.hbase; package org.apache.hadoop.hbase;
import java.io.IOException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.dfs.MiniDFSCluster; import org.apache.hadoop.dfs.MiniDFSCluster;
@ -128,9 +126,10 @@ public class TestLogRolling extends HBaseTestCase {
try { try {
Thread.sleep(10 * 1000); // Wait for region server to start Thread.sleep(10 * 1000); // Wait for region server to start
} catch (InterruptedException e) { } catch (InterruptedException e) {
// continue
} }
logdir = cluster.regionThreads.get(0).getRegionServer().getLog().dir; this.logdir = cluster.regionThreads.get(0).getRegionServer().getLog().dir;
// When the META table can be opened, the region servers are running // When the META table can be opened, the region servers are running
@SuppressWarnings("unused") @SuppressWarnings("unused")
@ -155,13 +154,14 @@ public class TestLogRolling extends HBaseTestCase {
try { try {
Thread.sleep(2000); Thread.sleep(2000);
} catch (InterruptedException e) { } catch (InterruptedException e) {
// continue
} }
} }
} }
} }
private int countLogFiles(boolean print) throws IOException { private int countLogFiles(final boolean print) throws Exception {
Path[] logfiles = dfs.getFileSystem().listPaths(new Path[] {logdir}); Path[] logfiles = dfs.getFileSystem().listPaths(new Path[] {this.logdir});
if (print) { if (print) {
for (int i = 0; i < logfiles.length; i++) { for (int i = 0; i < logfiles.length; i++) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -186,15 +186,18 @@ public class TestLogRolling extends HBaseTestCase {
conf.setLong("hbase.hregion.max.filesize", 768L * 1024L); conf.setLong("hbase.hregion.max.filesize", 768L * 1024L);
try { try {
startAndWriteData(); startAndWriteData();
LOG.info("Finished writing. Sleeping to let cache flusher and log roller run"); int count = countLogFiles(true);
LOG.info("Finished writing. There are " + count + " log files. " +
"Sleeping to let cache flusher and log roller run");
while (count > 2) {
try { try {
// Wait for log roller and cache flusher to run a few times... Thread.sleep(1000L);
Thread.sleep(30L * 1000L);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.info("Sleep interrupted", e); LOG.info("Sleep interrupted", e);
} }
LOG.info("Wake from sleep"); count = countLogFiles(true);
assertTrue(countLogFiles(true) <= 2); }
assertTrue(count <= 2);
} catch (Exception e) { } catch (Exception e) {
LOG.fatal("unexpected exception", e); LOG.fatal("unexpected exception", e);
throw e; throw e;