HADOOP-1937 When the master times out a region server's lease, it is too aggressive in reclaiming the server's log.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@582165 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2007-10-05 09:39:32 +00:00
parent 1acbfbde1f
commit 6e4efdf697
3 changed files with 81 additions and 52 deletions

View File

@ -66,6 +66,8 @@ Trunk (unreleased changes)
HADOOP-1975 HBase tests failing with java.lang.NumberFormatException HADOOP-1975 HBase tests failing with java.lang.NumberFormatException
HADOOP-1990 Regression test instability affects nightly and patch builds HADOOP-1990 Regression test instability affects nightly and patch builds
HADOOP-1996 TestHStoreFile fails on windows if run multiple times HADOOP-1996 TestHStoreFile fails on windows if run multiple times
HADOOP-1937 When the master times out a region server's lease, it is too
aggressive in reclaiming the server's log.
IMPROVEMENTS IMPROVEMENTS
HADOOP-1737 Make HColumnDescriptor data publically members settable HADOOP-1737 Make HColumnDescriptor data publically members settable

View File

@ -35,6 +35,8 @@ import java.util.TimerTask;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
@ -99,8 +101,10 @@ HMasterRegionInterface {
int numRetries; int numRetries;
long maxRegionOpenTime; long maxRegionOpenTime;
DelayQueue<PendingServerShutdown> shutdownQueue;
BlockingQueue<PendingOperation> msgQueue; BlockingQueue<PendingOperation> msgQueue;
int leaseTimeout;
private Leases serverLeases; private Leases serverLeases;
private Server server; private Server server;
private HServerAddress address; private HServerAddress address;
@ -861,10 +865,11 @@ HMasterRegionInterface {
this.maxRegionOpenTime = this.maxRegionOpenTime =
conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000); conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000);
this.shutdownQueue = new DelayQueue<PendingServerShutdown>();
this.msgQueue = new LinkedBlockingQueue<PendingOperation>(); this.msgQueue = new LinkedBlockingQueue<PendingOperation>();
this.serverLeases = new Leases( this.leaseTimeout = conf.getInt("hbase.master.lease.period", 30 * 1000);
conf.getInt("hbase.master.lease.period", 30 * 1000), this.serverLeases = new Leases(this.leaseTimeout,
conf.getInt("hbase.master.lease.thread.wakefrequency", 15 * 1000)); conf.getInt("hbase.master.lease.thread.wakefrequency", 15 * 1000));
this.server = RPC.getServer(this, address.getBindAddress(), this.server = RPC.getServer(this, address.getBindAddress(),
@ -966,10 +971,13 @@ HMasterRegionInterface {
*/ */
try { try {
for (PendingOperation op = null; !closed.get(); ) { for (PendingOperation op = null; !closed.get(); ) {
try { op = shutdownQueue.poll();
op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS); if (op == null ) {
} catch (InterruptedException e) { try {
// continue op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// continue
}
} }
if (op == null || closed.get()) { if (op == null || closed.get()) {
continue; continue;
@ -1117,6 +1125,7 @@ HMasterRegionInterface {
* HMasterRegionInterface * HMasterRegionInterface
*/ */
/** {@inheritDoc} */
@SuppressWarnings("unused") @SuppressWarnings("unused")
public MapWritable regionServerStartup(HServerInfo serverInfo) public MapWritable regionServerStartup(HServerInfo serverInfo)
throws IOException { throws IOException {
@ -1140,11 +1149,7 @@ HMasterRegionInterface {
serversToServerInfo.notifyAll(); serversToServerInfo.notifyAll();
} }
if (storedInfo != null && !closed.get()) { if (storedInfo != null && !closed.get()) {
try { shutdownQueue.put(new PendingServerShutdown(storedInfo));
msgQueue.put(new PendingServerShutdown(storedInfo));
} catch (InterruptedException e) {
throw new RuntimeException("Putting into msgQueue was interrupted.", e);
}
} }
// Either way, record the new server // Either way, record the new server
@ -1683,9 +1688,12 @@ HMasterRegionInterface {
* The region server's log file needs to be split up for each region it was * The region server's log file needs to be split up for each region it was
* serving, and the regions need to get reassigned. * serving, and the regions need to get reassigned.
*/ */
private class PendingServerShutdown extends PendingOperation { private class PendingServerShutdown extends PendingOperation
implements Delayed {
private long delay;
private HServerAddress deadServer; private HServerAddress deadServer;
private String deadServerName; private String deadServerName;
private Path oldLogDir;
private transient boolean logSplit; private transient boolean logSplit;
private transient boolean rootChecked; private transient boolean rootChecked;
private transient boolean rootRescanned; private transient boolean rootRescanned;
@ -1706,11 +1714,30 @@ HMasterRegionInterface {
PendingServerShutdown(HServerInfo serverInfo) { PendingServerShutdown(HServerInfo serverInfo) {
super(); super();
this.delay = leaseTimeout / 2;
this.deadServer = serverInfo.getServerAddress(); this.deadServer = serverInfo.getServerAddress();
this.deadServerName = this.deadServer.toString(); this.deadServerName = this.deadServer.toString();
this.logSplit = false; this.logSplit = false;
this.rootChecked = false; this.rootChecked = false;
this.rootRescanned = false; this.rootRescanned = false;
StringBuilder dirName = new StringBuilder("log_");
dirName.append(deadServer.getBindAddress());
dirName.append("_");
dirName.append(serverInfo.getStartCode());
dirName.append("_");
dirName.append(deadServer.getPort());
this.oldLogDir = new Path(dir, dirName.toString());
}
/** {@inheritDoc} */
public long getDelay(TimeUnit unit) {
return unit.convert(delay, TimeUnit.MILLISECONDS);
}
/** {@inheritDoc} */
public int compareTo(Delayed o) {
return Long.valueOf(getDelay(TimeUnit.MILLISECONDS)
- o.getDelay(TimeUnit.MILLISECONDS)).intValue();
} }
/** {@inheritDoc} */ /** {@inheritDoc} */
@ -1875,17 +1902,12 @@ HMasterRegionInterface {
if (!logSplit) { if (!logSplit) {
// Process the old log file // Process the old log file
StringBuilder dirName = new StringBuilder("log_"); if (fs.exists(oldLogDir)) {
dirName.append(deadServer.getBindAddress());
dirName.append("_");
dirName.append(deadServer.getPort());
Path logdir = new Path(dir, dirName.toString());
if (fs.exists(logdir)) {
if (!splitLogLock.tryLock()) { if (!splitLogLock.tryLock()) {
return false; return false;
} }
try { try {
HLog.splitLog(dir, logdir, fs, conf); HLog.splitLog(dir, oldLogDir, fs, conf);
} finally { } finally {
splitLogLock.unlock(); splitLogLock.unlock();
} }
@ -2901,16 +2923,8 @@ HMasterRegionInterface {
// NOTE: If the server was serving the root region, we cannot reassign it // NOTE: If the server was serving the root region, we cannot reassign it
// here because the new server will start serving the root region before // here because the new server will start serving the root region before
// the PendingServerShutdown operation has a chance to split the log file. // the PendingServerShutdown operation has a chance to split the log file.
try { if (info != null) {
if (info != null) { shutdownQueue.put(new PendingServerShutdown(info));
msgQueue.put(new PendingServerShutdown(info));
}
} catch (InterruptedException e) {
// continue. We used to throw a RuntimeException here but on exit
// this put is often interrupted. For now, just log these iterrupts
// rather than throw an exception
LOG.debug("MsgQueue.put was interrupted (If we are exiting, this " +
"msg can be ignored)");
} }
} }
} }

View File

@ -96,7 +96,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Vector<HMsg> outboundMsgs = new Vector<HMsg>(); private final Vector<HMsg> outboundMsgs = new Vector<HMsg>();
int numRetries; final int numRetries;
protected final int threadWakeFrequency; protected final int threadWakeFrequency;
private final int msgInterval; private final int msgInterval;
private final int serverLeaseTimeout; private final int serverLeaseTimeout;
@ -472,19 +472,27 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
// get it when the master is panicing because for instance // get it when the master is panicing because for instance
// the HDFS has been yanked out from under it. Be wary of // the HDFS has been yanked out from under it. Be wary of
// this message. // this message.
try { if (checkFileSystem()) {
if (checkFileSystem()) { closeAllRegions();
closeAllRegions(); synchronized (logRollerLock) {
restart = true; try {
log.closeAndDelete();
serverInfo.setStartCode(rand.nextLong());
log = setupHLog();
} catch (IOException e) {
this.abortRequested = true;
this.stopRequested.set(true);
e = RemoteExceptionHandler.checkIOException(e);
LOG.fatal("error restarting server", e);
break;
}
} }
} catch (Exception e) { reportForDuty();
restart = true;
} else {
LOG.fatal("file system available check failed. " + LOG.fatal("file system available check failed. " +
"Shutting down server.", e); "Shutting down server.");
this.stopRequested.set(true);
this.fsOk = false;
this.abortRequested = true;
} }
break; break;
case HMsg.MSG_REGIONSERVER_STOP: case HMsg.MSG_REGIONSERVER_STOP:
@ -604,7 +612,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
* Run init. Sets up hlog and starts up all server threads. * Run init. Sets up hlog and starts up all server threads.
* @param c Extra configuration. * @param c Extra configuration.
*/ */
private void init(final MapWritable c) { private void init(final MapWritable c) throws IOException {
try { try {
for (Map.Entry<Writable, Writable> e: c.entrySet()) { for (Map.Entry<Writable, Writable> e: c.entrySet()) {
String key = e.getKey().toString(); String key = e.getKey().toString();
@ -618,18 +626,22 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
startServiceThreads(); startServiceThreads();
} catch (IOException e) { } catch (IOException e) {
this.stopRequested.set(true); this.stopRequested.set(true);
LOG.fatal("Failed init", e = RemoteExceptionHandler.checkIOException(e);
RemoteExceptionHandler.checkIOException(e)); LOG.fatal("Failed init", e);
IOException ex = new IOException("region server startup failed");
ex.initCause(e);
throw ex;
} }
} }
private HLog setupHLog() private HLog setupHLog() throws RegionServerRunningException,
throws RegionServerRunningException, IOException { IOException {
String rootDir = this.conf.get(HConstants.HBASE_DIR); String rootDir = this.conf.get(HConstants.HBASE_DIR);
LOG.info("Root dir: " + rootDir); LOG.info("Root dir: " + rootDir);
Path logdir = new Path(new Path(rootDir), Path logdir = new Path(new Path(rootDir), "log" + "_" + getThisIP() + "_" +
"log" + "_" + getThisIP() + "_" + this.serverInfo.getStartCode() + "_" +
this.serverInfo.getServerAddress().getPort()); this.serverInfo.getServerAddress().getPort());
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Log dir " + logdir); LOG.debug("Log dir " + logdir);
} }
@ -762,6 +774,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
} }
break; break;
} catch(IOException e) { } catch(IOException e) {
LOG.warn("error telling master we are up", e);
this.sleeper.sleep(lastMsg); this.sleeper.sleep(lastMsg);
continue; continue;
} }