HBASE-4095 revert, wait for further investigation

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1148715 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-07-20 12:15:03 +00:00
parent 77b2ebb530
commit 399462d5d7
3 changed files with 60 additions and 85 deletions

View File

@ -164,8 +164,6 @@ Release 0.91.0 - Unreleased
HBASE-4052 Enabling a table after master switch does not allow table scan, HBASE-4052 Enabling a table after master switch does not allow table scan,
throwing NotServingRegionException (ramkrishna via Ted Yu) throwing NotServingRegionException (ramkrishna via Ted Yu)
HBASE-4112 Creating table may throw NullPointerException (Jinchao via Ted Yu) HBASE-4112 Creating table may throw NullPointerException (Jinchao via Ted Yu)
HBASE-4095 Hlog may not be rolled in a long time if checkLowReplication's
request of LogRoll is blocked (Jieshan via Ted Yu)
HBASE-4093 When verifyAndAssignRoot throws exception, the deadServers state HBASE-4093 When verifyAndAssignRoot throws exception, the deadServers state
cannot be changed (fulin wang via Ted Yu) cannot be changed (fulin wang via Ted Yu)

View File

@ -39,7 +39,6 @@ import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
@ -196,10 +195,6 @@ public class HLog implements Syncable {
// synchronized is insufficient because a cache flush spans two method calls. // synchronized is insufficient because a cache flush spans two method calls.
private final Lock cacheFlushLock = new ReentrantLock(); private final Lock cacheFlushLock = new ReentrantLock();
// The waiting time for log-roller trying to get the lock of cacheFlushLock.
// If the actual waiting time is longer than it, skip the current log roll.
private final long cacheFlushLockWaitTime;
// We synchronize on updateLock to prevent updates and to prevent a log roll // We synchronize on updateLock to prevent updates and to prevent a log roll
// during an update // during an update
// locked during appends // locked during appends
@ -345,8 +340,6 @@ public class HLog implements Syncable {
this.logrollsize = (long)(this.blocksize * multi); this.logrollsize = (long)(this.blocksize * multi);
this.optionalFlushInterval = this.optionalFlushInterval =
conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000); conf.getLong("hbase.regionserver.optionallogflushinterval", 1 * 1000);
this.cacheFlushLockWaitTime =
conf.getLong("hbase.regionserver.cacheFlushLock.waittime", 25000);
if (failIfLogDirExists && fs.exists(dir)) { if (failIfLogDirExists && fs.exists(dir)) {
throw new IOException("Target HLog directory already exists: " + dir); throw new IOException("Target HLog directory already exists: " + dir);
} }
@ -487,81 +480,65 @@ public class HLog implements Syncable {
return null; return null;
} }
byte [][] regionsToFlush = null; byte [][] regionsToFlush = null;
this.cacheFlushLock.lock();
try { try {
if (this.cacheFlushLock.tryLock(this.cacheFlushLockWaitTime, if (closed) {
TimeUnit.MILLISECONDS)) { return regionsToFlush;
try {
if (closed) {
return regionsToFlush;
}
this.logRollRequested = true;
// Do all the preparation outside of the updateLock to block
// as less as possible the incoming writes
long currentFilenum = this.filenum;
this.filenum = System.currentTimeMillis();
Path newPath = computeFilename();
HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
int nextInitialReplication = fs.getFileStatus(newPath).getReplication();
//This method get expect but not the actual replicas of the Hlog file
int nextExpectReplicas = fs.getFileStatus(newPath).getReplication();
//Get the current replicas of the Hlog file
int nextActualReplicas = -1;
// Can we get at the dfsclient outputstream? If an instance of
// SFLW, it'll have done the necessary reflection to get at the
// protected field name.
FSDataOutputStream nextHdfsOut = null;
if (nextWriter instanceof SequenceFileLogWriter) {
nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
}
// Tell our listeners that a new log was created
if (!this.listeners.isEmpty()) {
for (WALObserver i : this.listeners) {
i.logRolled(newPath);
}
}
synchronized (updateLock) {
// Clean up current writer.
Path oldFile = cleanupCurrentWriter(currentFilenum);
this.writer = nextWriter;
this.hdfs_out = nextHdfsOut;
this.initialReplication = nextInitialReplication;
LOG.info((oldFile != null?
"Roll " + FSUtils.getPath(oldFile) + ", entries=" +
this.numEntries.get() +
", filesize=" +
this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
"New hlog " + FSUtils.getPath(newPath));
this.numEntries.set(0);
this.logRollRequested = false;
}
// Can we delete any of the old log files?
if (this.outputfiles.size() > 0) {
if (this.lastSeqWritten.isEmpty()) {
LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
// If so, then no new writes have come in since all regions were
// flushed (and removed from the lastSeqWritten map). Means can
// remove all but currently open log file.
for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
archiveLogFile(e.getValue(), e.getKey());
}
this.outputfiles.clear();
} else {
regionsToFlush = cleanOldLogs();
}
}
} finally {
this.cacheFlushLock.unlock();
}
} else {
LOG.debug("Didn't obtain cacheFlushLock in time");
} }
} catch (InterruptedException e) { // Do all the preparation outside of the updateLock to block
LOG.warn("Interrupted rollWriter", e); // as less as possible the incoming writes
Thread.currentThread().interrupt(); long currentFilenum = this.filenum;
this.filenum = System.currentTimeMillis();
Path newPath = computeFilename();
HLog.Writer nextWriter = this.createWriterInstance(fs, newPath, conf);
int nextInitialReplication = fs.getFileStatus(newPath).getReplication();
// Can we get at the dfsclient outputstream? If an instance of
// SFLW, it'll have done the necessary reflection to get at the
// protected field name.
FSDataOutputStream nextHdfsOut = null;
if (nextWriter instanceof SequenceFileLogWriter) {
nextHdfsOut = ((SequenceFileLogWriter)nextWriter).getWriterFSDataOutputStream();
}
// Tell our listeners that a new log was created
if (!this.listeners.isEmpty()) {
for (WALObserver i : this.listeners) {
i.logRolled(newPath);
}
}
synchronized (updateLock) {
// Clean up current writer.
Path oldFile = cleanupCurrentWriter(currentFilenum);
this.writer = nextWriter;
this.initialReplication = nextInitialReplication;
this.hdfs_out = nextHdfsOut;
LOG.info((oldFile != null?
"Roll " + FSUtils.getPath(oldFile) + ", entries=" +
this.numEntries.get() +
", filesize=" +
this.fs.getFileStatus(oldFile).getLen() + ". ": "") +
"New hlog " + FSUtils.getPath(newPath));
this.numEntries.set(0);
this.logRollRequested = false;
}
// Can we delete any of the old log files?
if (this.outputfiles.size() > 0) {
if (this.lastSeqWritten.isEmpty()) {
LOG.debug("Last sequenceid written is empty. Deleting all old hlogs");
// If so, then no new writes have come in since all regions were
// flushed (and removed from the lastSeqWritten map). Means can
// remove all but currently open log file.
for (Map.Entry<Long, Path> e : this.outputfiles.entrySet()) {
archiveLogFile(e.getValue(), e.getKey());
}
this.outputfiles.clear();
} else {
regionsToFlush = cleanOldLogs();
}
}
} finally {
this.cacheFlushLock.unlock();
} }
return regionsToFlush; return regionsToFlush;
} }
@ -1027,6 +1004,7 @@ public class HLog implements Syncable {
this.initialReplication + " replicas. " + this.initialReplication + " replicas. " +
" Requesting close of hlog."); " Requesting close of hlog.");
requestLogRoll(); requestLogRoll();
logRollRequested = true;
} }
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e + LOG.warn("Unable to invoke DFSOutputStream.getNumCurrentReplicas" + e +

View File

@ -314,8 +314,7 @@ public class TestLogRolling {
writeData(table, 2); writeData(table, 2);
long newFilenum = log.getFilenum(); long newFilenum = log.getFilenum();
assertTrue("Missing datanode should've triggered a log roll: " + newFilenum assertTrue("Missing datanode should've triggered a log roll",
+ " " + oldFilenum + " " + curTime,
newFilenum > oldFilenum && newFilenum > curTime); newFilenum > oldFilenum && newFilenum > curTime);
// write some more log data (this should use a new hdfs_out) // write some more log data (this should use a new hdfs_out)