diff --git a/CHANGES.txt b/CHANGES.txt index c68fd702e6f..b13293fd8a6 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -164,8 +164,6 @@ Release 0.91.0 - Unreleased HBASE-4052 Enabling a table after master switch does not allow table scan, throwing NotServingRegionException (ramkrishna via Ted Yu) HBASE-4112 Creating table may throw NullPointerException (Jinchao via Ted Yu) - HBASE-4095 Hlog may not be rolled in a long time if checkLowReplication's - request of LogRoll is blocked (Jieshan via Ted Yu) HBASE-4093 When verifyAndAssignRoot throws exception, the deadServers state cannot be changed (fulin wang via Ted Yu) diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java index 7cd308cb5c7..ce7c46611a2 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/wal/HLog.java @@ -39,7 +39,6 @@ import java.util.TreeMap; import java.util.TreeSet; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; @@ -196,10 +195,6 @@ public class HLog implements Syncable { // synchronized is insufficient because a cache flush spans two method calls. private final Lock cacheFlushLock = new ReentrantLock(); - // The waiting time for log-roller trying to get the lock of cacheFlushLock. - // If the actual waiting time is longer than it, skip the current log roll. - private final long cacheFlushLockWaitTime; - // We synchronize on updateLock to prevent updates and to prevent a log roll // during an update // locked during appends @@ -345,8 +340,6 @@ public class HLog implements Syncable { this.logrollsize = (long)(this.blocksize * multi); this.optionalFlushInterval = conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000); - this.cacheFlushLockWaitTime = - conf.getLong("hbase.regionserver.cacheFlushLock.waittime", 25000); if (failIfLogDirExists && fs.exists(dir)) { throw new IOException("Target HLog directory already exists: " + dir); } @@ -487,82 +480,66 @@ public class HLog implements Syncable { return null; } byte [][] regionsToFlush = null; + this.cacheFlushLock.lock(); try { - if (this.cacheFlushLock.tryLock(this.cacheFlushLockWaitTime, - TimeUnit.MILLISECONDS)) { - try { - if (closed) { - return regionsToFlush; - } - this.logRollRequested = true; - // Do all the preparation outside of the updateLock to block - // as less as possible the incoming writes - long currentFilenum = this.filenum; - this.filenum = System.currentTimeMillis(); - Path newPath = computeFilename(); - HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf); - int nextInitialReplication = fs.getFileStatus(newPath).getReplication(); - - //This method get expect but not the actual replicas of the Hlog file - int nextExpectReplicas = fs.getFileStatus(newPath).getReplication(); - - //Get the current replicas of the Hlog file - int nextActualReplicas = -1; - // Can we get at the dfsclient outputstream? If an instance of - // SFLW, it'll have done the necessary reflection to get at the - // protected field name. - FSDataOutputStream nextHdfsOut = null; - if (nextWriter instanceof SequenceFileLogWriter) { - nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream(); - } - // Tell our listeners that a new log was created - if (!this.listeners.isEmpty()) { - for (WALObserver i : this.listeners) { - i.logRolled(newPath); - } - } - - synchronized (updateLock) { - // Clean up current writer. - Path oldFile = cleanupCurrentWriter(currentFilenum); - this.writer = nextWriter; - this.hdfs_out = nextHdfsOut; - this.initialReplication = nextInitialReplication; - - LOG.info((oldFile != null? - "Roll " + FSUtils.getPath(oldFile) + ", entries=" + - this.numEntries.get() + - ", filesize=" + - this.fs.getFileStatus(oldFile).getLen() + ". ": "") + - "New hlog " + FSUtils.getPath(newPath)); - this.numEntries.set(0); - this.logRollRequested = false; - } - // Can we delete any of the old log files? - if (this.outputfiles.size() > 0) { - if (this.lastSeqWritten.isEmpty()) { - LOG.debug("Last sequenceid written is empty. Deleting all old hlogs"); - // If so, then no new writes have come in since all regions were - // flushed (and removed from the lastSeqWritten map). Means can - // remove all but currently open log file. - for (Map.Entry e : this.outputfiles.entrySet()) { - archiveLogFile(e.getValue(), e.getKey()); - } - this.outputfiles.clear(); - } else { - regionsToFlush = cleanOldLogs(); - } - } - } finally { - this.cacheFlushLock.unlock(); - } - } else { - LOG.debug("Didn't obtain cacheFlushLock in time"); + if (closed) { + return regionsToFlush; } - } catch (InterruptedException e) { - LOG.warn("Interrupted rollWriter", e); - Thread.currentThread().interrupt(); - } + // Do all the preparation outside of the updateLock to block + // as less as possible the incoming writes + long currentFilenum = this.filenum; + this.filenum = System.currentTimeMillis(); + Path newPath = computeFilename(); + HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf); + int nextInitialReplication = fs.getFileStatus(newPath).getReplication(); + // Can we get at the dfsclient outputstream? If an instance of + // SFLW, it'll have done the necessary reflection to get at the + // protected field name. + FSDataOutputStream nextHdfsOut = null; + if (nextWriter instanceof SequenceFileLogWriter) { + nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream(); + } + // Tell our listeners that a new log was created + if (!this.listeners.isEmpty()) { + for (WALObserver i : this.listeners) { + i.logRolled(newPath); + } + } + + synchronized (updateLock) { + // Clean up current writer. + Path oldFile = cleanupCurrentWriter(currentFilenum); + this.writer = nextWriter; + this.initialReplication = nextInitialReplication; + this.hdfs_out = nextHdfsOut; + + LOG.info((oldFile != null? + "Roll " + FSUtils.getPath(oldFile) + ", entries=" + + this.numEntries.get() + + ", filesize=" + + this.fs.getFileStatus(oldFile).getLen() + ". ": "") + + "New hlog " + FSUtils.getPath(newPath)); + this.numEntries.set(0); + this.logRollRequested = false; + } + // Can we delete any of the old log files? + if (this.outputfiles.size() > 0) { + if (this.lastSeqWritten.isEmpty()) { + LOG.debug("Last sequenceid written is empty. Deleting all old hlogs"); + // If so, then no new writes have come in since all regions were + // flushed (and removed from the lastSeqWritten map). Means can + // remove all but currently open log file. + for (Map.Entry e : this.outputfiles.entrySet()) { + archiveLogFile(e.getValue(), e.getKey()); + } + this.outputfiles.clear(); + } else { + regionsToFlush = cleanOldLogs(); + } + } + } finally { + this.cacheFlushLock.unlock(); + } return regionsToFlush; } @@ -1027,6 +1004,7 @@ public class HLog implements Syncable { this.initialReplication + " replicas. " + " Requesting close of hlog."); requestLogRoll(); + logRollRequested = true; } } catch (Exception e) { LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e + diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index 926b9fd7f4f..287f1fb9213 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -314,8 +314,7 @@ public class TestLogRolling { writeData(table, 2); long newFilenum = log.getFilenum(); - assertTrue("Missing datanode should've triggered a log roll: " + newFilenum - + " " + oldFilenum + " " + curTime, + assertTrue("Missing datanode should've triggered a log roll", newFilenum > oldFilenum && newFilenum > curTime); // write some more log data (this should use a new hdfs_out)