diff --git a/CHANGES.txt b/CHANGES.txt index 80a9ae42d1f..4c4c4a1c7f3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -164,6 +164,8 @@ Release 0.91.0 - Unreleased HBASE-4052 Enabling a table after master switch does not allow table scan, throwing NotServingRegionException (ramkrishna via Ted Yu) HBASE-4112 Creating table may throw NullPointerException (Jinchao via Ted Yu) + HBASE-4095 Hlog may not be rolled in a long time if checkLowReplication's + request of LogRoll is blocked (Jieshan via Ted Yu) IMPROVEMENTS HBASE-3290 Max Compaction Size (Nicolas Spiegelberg via Stack) diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index ce7c46611a2..5addd87c2df 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -39,6 +39,7 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; @@ -195,6 +196,10 @@ public class HLog implements Syncable { // synchronized is insufficient because a cache flush spans two method calls. private final Lock cacheFlushLock = new ReentrantLock(); + // The waiting time for log-roller trying to get the lock of cacheFlushLock. + // If the actual waiting time is longer than it, skip the current log roll. + private final long cacheFlushLockWaitTime; + // We synchronize on updateLock to prevent updates and to prevent a log roll // during an update // locked during appends @@ -340,6 +345,8 @@ public class HLog implements Syncable { this.logrollsize = (long)(this.blocksize * multi); this.optionalFlushInterval = conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000); + this.cacheFlushLockWaitTime = + conf.getLong("hbase.regionserver.cacheFlushLock.waittime", 5000); if (failIfLogDirExists && fs.exists(dir)) { throw new IOException("Target HLog directory already exists: " + dir); } @@ -480,66 +487,86 @@ public class HLog implements Syncable { return null; } byte [][] regionsToFlush = null; - this.cacheFlushLock.lock(); try { - if (closed) { - return regionsToFlush; - } - // Do all the preparation outside of the updateLock to block - // as less as possible the incoming writes - long currentFilenum = this.filenum; - this.filenum = System.currentTimeMillis(); - Path newPath = computeFilename(); - HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf); - int nextInitialReplication = fs.getFileStatus(newPath).getReplication(); - // Can we get at the dfsclient outputstream? If an instance of - // SFLW, it'll have done the necessary reflection to get at the - // protected field name. - FSDataOutputStream nextHdfsOut = null; - if (nextWriter instanceof SequenceFileLogWriter) { - nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream(); - } - // Tell our listeners that a new log was created - if (!this.listeners.isEmpty()) { - for (WALObserver i : this.listeners) { - i.logRolled(newPath); - } - } - - synchronized (updateLock) { - // Clean up current writer. - Path oldFile = cleanupCurrentWriter(currentFilenum); - this.writer = nextWriter; - this.initialReplication = nextInitialReplication; - this.hdfs_out = nextHdfsOut; - - LOG.info((oldFile != null? - "Roll " + FSUtils.getPath(oldFile) + ", entries=" + - this.numEntries.get() + - ", filesize=" + - this.fs.getFileStatus(oldFile).getLen() + ". ": "") + - "New hlog " + FSUtils.getPath(newPath)); - this.numEntries.set(0); - this.logRollRequested = false; - } - // Can we delete any of the old log files? - if (this.outputfiles.size() > 0) { - if (this.lastSeqWritten.isEmpty()) { - LOG.debug("Last sequenceid written is empty. Deleting all old hlogs"); - // If so, then no new writes have come in since all regions were - // flushed (and removed from the lastSeqWritten map). Means can - // remove all but currently open log file. - for (Map.Entry e : this.outputfiles.entrySet()) { - archiveLogFile(e.getValue(), e.getKey()); + if (this.cacheFlushLock.tryLock(this.cacheFlushLockWaitTime, + TimeUnit.MILLISECONDS)) { + try { + if (closed) { + return regionsToFlush; } - this.outputfiles.clear(); - } else { - regionsToFlush = cleanOldLogs(); + this.logRollRequested = true; + // Do all the preparation outside of the updateLock to block + // as less as possible the incoming writes + long currentFilenum = this.filenum; + this.filenum = System.currentTimeMillis(); + Path newPath = computeFilename(); + HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf); + //This method get expect but not the actual replicas of the Hlog file + int nextExpectReplicas = fs.getFileStatus(newPath).getReplication(); + + //Get the current replicas of the Hlog file + int nextActualReplicas = -1; + try + { + nextActualReplicas = getLogReplication(); + } catch (Exception e) { + LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e + + " still proceeding ahead..."); + } + // Can we get at the dfsclient outputstream? If an instance of + // SFLW, it'll have done the necessary reflection to get at the + // protected field name. + FSDataOutputStream nextHdfsOut = null; + if (nextWriter instanceof SequenceFileLogWriter) { + nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream(); + } + // Tell our listeners that a new log was created + if (!this.listeners.isEmpty()) { + for (WALObserver i : this.listeners) { + i.logRolled(newPath); + } + } + + synchronized (updateLock) { + // Clean up current writer. + Path oldFile = cleanupCurrentWriter(currentFilenum); + this.writer = nextWriter; + this.initialReplication = nextActualReplicas == -1 ? + nextExpectReplicas : nextActualReplicas; + this.hdfs_out = nextHdfsOut; + + LOG.info((oldFile != null? + "Roll " + FSUtils.getPath(oldFile) + ", entries=" + + this.numEntries.get() + + ", filesize=" + + this.fs.getFileStatus(oldFile).getLen() + ". ": "") + + "New hlog " + FSUtils.getPath(newPath)); + this.numEntries.set(0); + this.logRollRequested = false; + } + // Can we delete any of the old log files? + if (this.outputfiles.size() > 0) { + if (this.lastSeqWritten.isEmpty()) { + LOG.debug("Last sequenceid written is empty. Deleting all old hlogs"); + // If so, then no new writes have come in since all regions were + // flushed (and removed from the lastSeqWritten map). Means can + // remove all but currently open log file. + for (Map.Entry e : this.outputfiles.entrySet()) { + archiveLogFile(e.getValue(), e.getKey()); + } + this.outputfiles.clear(); + } else { + regionsToFlush = cleanOldLogs(); + } + } + } finally { + this.cacheFlushLock.unlock(); } - } - } finally { - this.cacheFlushLock.unlock(); - } + } + } catch (InterruptedException e) { + LOG.warn("Interrupted rollWriter", e); + Thread.currentThread().interrupt(); + } return regionsToFlush; } @@ -1004,7 +1031,6 @@ public class HLog implements Syncable { this.initialReplication + " replicas. " + " Requesting close of hlog."); requestLogRoll(); - logRollRequested = true; } } catch (Exception e) { LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +