HBASE-4095 Hlog may not be rolled in a long time if checkLowReplication's

request of LogRoll is blocked (Jieshan via Ted Yu)


git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1148169 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-07-19 05:11:34 +00:00
parent d757784cbf
commit 70d8db3673
2 changed files with 86 additions and 58 deletions

View File

@ -164,6 +164,8 @@ Release 0.91.0 - Unreleased
HBASE-4052 Enabling a table after master switch does not allow table scan, HBASE-4052 Enabling a table after master switch does not allow table scan,
throwing NotServingRegionException (ramkrishna via Ted Yu) throwing NotServingRegionException (ramkrishna via Ted Yu)
HBASE-4112 Creating table may throw NullPointerException (Jinchao via Ted Yu) HBASE-4112 Creating table may throw NullPointerException (Jinchao via Ted Yu)
HBASE-4095 Hlog may not be rolled in a long time if checkLowReplication's
request of LogRoll is blocked (Jieshan via Ted Yu)
IMPROVEMENTS IMPROVEMENTS
HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack) HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack)

View File

@ -39,6 +39,7 @@ import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
@ -195,6 +196,10 @@ public class HLog implements Syncable {
// synchronized is insufficient because a cache flush spans two method calls. // synchronized is insufficient because a cache flush spans two method calls.
private final Lock cacheFlushLock = new ReentrantLock(); private final Lock cacheFlushLock = new ReentrantLock();
// The waiting time for log-roller trying to get the lock of cacheFlushLock.
// If the actual waiting time is longer than it, skip the current log roll.
private final long cacheFlushLockWaitTime;
// We synchronize on updateLock to prevent updates and to prevent a log roll // We synchronize on updateLock to prevent updates and to prevent a log roll
// during an update // during an update
// locked during appends // locked during appends
@ -340,6 +345,8 @@ public class HLog implements Syncable {
this.logrollsize = (long)(this.blocksize * multi); this.logrollsize = (long)(this.blocksize * multi);
this.optionalFlushInterval = this.optionalFlushInterval =
conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000); conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
this.cacheFlushLockWaitTime =
conf.getLong("hbase.regionserver.cacheFlushLock.waittime", 5000);
if (failIfLogDirExists && fs.exists(dir)) { if (failIfLogDirExists && fs.exists(dir)) {
throw new IOException("Target HLog directory already exists: " + dir); throw new IOException("Target HLog directory already exists: " + dir);
} }
@ -480,65 +487,85 @@ public class HLog implements Syncable {
return null; return null;
} }
byte [][] regionsToFlush = null; byte [][] regionsToFlush = null;
this.cacheFlushLock.lock();
try { try {
if (closed) { if (this.cacheFlushLock.tryLock(this.cacheFlushLockWaitTime,
return regionsToFlush; TimeUnit.MILLISECONDS)) {
} try {
// Do all the preparation outside of the updateLock to block if (closed) {
// as less as possible the incoming writes return regionsToFlush;
long currentFilenum = this.filenum;
this.filenum = System.currentTimeMillis();
Path newPath = computeFilename();
HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
int nextInitialReplication = fs.getFileStatus(newPath).getReplication();
// Can we get at the dfsclient outputstream? If an instance of
// SFLW, it'll have done the necessary reflection to get at the
// protected field name.
FSDataOutputStream nextHdfsOut = null;
if (nextWriter instanceof SequenceFileLogWriter) {
nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
}
// Tell our listeners that a new log was created
if (!this.listeners.isEmpty()) {
for (WALObserver i : this.listeners) {
i.logRolled(newPath);
}
}
synchronized (updateLock) {
// Clean up current writer.
Path oldFile = cleanupCurrentWriter(currentFilenum);
this.writer = nextWriter;
this.initialReplication = nextInitialReplication;
this.hdfs_out = nextHdfsOut;
LOG.info((oldFile != null?
"Roll " + FSUtils.getPath(oldFile) + ", entries=" +
this.numEntries.get() +
", filesize=" +
this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
"New hlog " + FSUtils.getPath(newPath));
this.numEntries.set(0);
this.logRollRequested = false;
}
// Can we delete any of the old log files?
if (this.outputfiles.size() > 0) {
if (this.lastSeqWritten.isEmpty()) {
LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
// If so, then no new writes have come in since all regions were
// flushed (and removed from the lastSeqWritten map). Means can
// remove all but currently open log file.
for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
archiveLogFile(e.getValue(), e.getKey());
} }
this.outputfiles.clear(); this.logRollRequested = true;
} else { // Do all the preparation outside of the updateLock to block
regionsToFlush = cleanOldLogs(); // as less as possible the incoming writes
long currentFilenum = this.filenum;
this.filenum = System.currentTimeMillis();
Path newPath = computeFilename();
HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
//This method get expect but not the actual replicas of the Hlog file
int nextExpectReplicas = fs.getFileStatus(newPath).getReplication();
//Get the current replicas of the Hlog file
int nextActualReplicas = -1;
try
{
nextActualReplicas = getLogReplication();
} catch (Exception e) {
LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +
" still proceeding ahead...");
}
// Can we get at the dfsclient outputstream? If an instance of
// SFLW, it'll have done the necessary reflection to get at the
// protected field name.
FSDataOutputStream nextHdfsOut = null;
if (nextWriter instanceof SequenceFileLogWriter) {
nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
}
// Tell our listeners that a new log was created
if (!this.listeners.isEmpty()) {
for (WALObserver i : this.listeners) {
i.logRolled(newPath);
}
}
synchronized (updateLock) {
// Clean up current writer.
Path oldFile = cleanupCurrentWriter(currentFilenum);
this.writer = nextWriter;
this.initialReplication = nextActualReplicas == -1 ?
nextExpectReplicas : nextActualReplicas;
this.hdfs_out = nextHdfsOut;
LOG.info((oldFile != null?
"Roll " + FSUtils.getPath(oldFile) + ", entries=" +
this.numEntries.get() +
", filesize=" +
this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
"New hlog " + FSUtils.getPath(newPath));
this.numEntries.set(0);
this.logRollRequested = false;
}
// Can we delete any of the old log files?
if (this.outputfiles.size() > 0) {
if (this.lastSeqWritten.isEmpty()) {
LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
// If so, then no new writes have come in since all regions were
// flushed (and removed from the lastSeqWritten map). Means can
// remove all but currently open log file.
for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
archiveLogFile(e.getValue(), e.getKey());
}
this.outputfiles.clear();
} else {
regionsToFlush = cleanOldLogs();
}
}
} finally {
this.cacheFlushLock.unlock();
} }
} }
} finally { } catch (InterruptedException e) {
this.cacheFlushLock.unlock(); LOG.warn("Interrupted rollWriter", e);
Thread.currentThread().interrupt();
} }
return regionsToFlush; return regionsToFlush;
} }
@ -1004,7 +1031,6 @@ public class HLog implements Syncable {
this.initialReplication + " replicas. " + this.initialReplication + " replicas. " +
" Requesting close of hlog."); " Requesting close of hlog.");
requestLogRoll(); requestLogRoll();
logRollRequested = true;
} }
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e + LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +