HBASE-1146 Replace the HRS leases with Zookeeper

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@739612 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jean-Daniel Cryans 2009-01-31 21:05:35 +00:00
parent 0ce43735d1
commit a143336404
10 changed files with 162 additions and 133 deletions

View File

@ -4,6 +4,7 @@ Release 0.20.0 - Unreleased
HBASE-1147 Modify the scripts to use Zookeeper
HBASE-1144 Store the ROOT region location in Zookeeper
(Nitay Joffe via Stack)
HBASE-1146 Replace the HRS leases with Zookeeper
BUG FIXES
HBASE-1140 "ant clean test" fails (Nitay Joffe via Stack)

View File

@ -110,6 +110,11 @@ public interface HConstants {
static final String ZOOKEEPER_SAFE_MODE_ZNODE = "zookeeper.znode.safemode";
/** Default ZooKeeper ZNode storing safe mode. */
static final String DEFAULT_ZOOKEEPER_SAFE_MODE_ZNODE = "safe-mode";
/** Parameter name for ZooKeeper ZNode storing safe mode. */
static final String ZOOKEEPER_RS_ZNODE = "zookeeper.znode.rs";
/** Default ZooKeeper ZNode storing safe mode. */
static final String DEFAULT_ZOOKEEPER_RS_ZNODE = "rs";
/** Parameter name for hbase.regionserver address. */
static final String REGIONSERVER_ADDRESS = "hbase.regionserver";

View File

@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.util.InfoServer;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Sleeper;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.io.MapWritable;
@ -114,6 +115,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
final int numRetries;
final long maxRegionOpenTime;
final int leaseTimeout;
private final ZooKeeperWrapper zooKeeperWrapper;
volatile DelayQueue<RegionServerOperation> delayedToDoQueue =
new DelayQueue<RegionServerOperation>();
@ -239,7 +241,8 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
conf.getInt("hbase.master.meta.thread.rescanfrequency", 60 * 1000);
this.sleeper = new Sleeper(this.threadWakeFrequency, this.closed);
zooKeeperWrapper = new ZooKeeperWrapper(conf);
serverManager = new ServerManager(this);
regionManager = new RegionManager(this);
@ -396,7 +399,6 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
}
}
server.stop(); // Stop server
serverManager.stop();
regionManager.stop();
// Join up with all threads
@ -498,7 +500,6 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
this.metrics = new MasterMetrics();
try {
regionManager.start();
serverManager.start();
// Put up info server.
int port = this.conf.getInt("hbase.master.info.port", 60010);
if (port >= 0) {
@ -926,6 +927,14 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
}
}
}
/**
* Get the ZK wrapper object
* @return
*/
public ZooKeeperWrapper getZooKeeperWrapper() {
return zooKeeperWrapper;
}
/*
* Main program

View File

@ -146,7 +146,7 @@ class RegionManager implements HConstants {
// Scans the meta table
metaScannerThread = new MetaScanner(master);
zooKeeperWrapper = new ZooKeeperWrapper(conf);
zooKeeperWrapper = master.getZooKeeperWrapper();
zooKeeperNumRetries = conf.getInt(ZOOKEEPER_RETRIES, DEFAULT_ZOOKEEPER_RETRIES);
zooKeeperPause = conf.getInt(ZOOKEEPER_PAUSE, DEFAULT_ZOOKEEPER_PAUSE);

View File

@ -39,12 +39,13 @@ import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HMsg;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.LeaseException;
import org.apache.hadoop.hbase.Leases;
import org.apache.hadoop.hbase.LeaseListener;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HMsg.Type;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWrapper;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
/**
* The ServerManager class manages info about region servers - HServerInfo,
@ -62,13 +63,14 @@ class ServerManager implements HConstants {
private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg[0];
private final AtomicInteger quiescedServers = new AtomicInteger(0);
private final ZooKeeperWrapper zooKeeperWrapper;
/** The map of known server names to server info */
final Map<String, HServerInfo> serversToServerInfo =
new ConcurrentHashMap<String, HServerInfo>();
/**
* Set of known dead servers. On lease expiration, servers are added here.
* Set of known dead servers. On znode expiration, servers are added here.
* Boolean holds whether its logs have been split or not. Initially set to
* false.
*/
@ -84,7 +86,6 @@ class ServerManager implements HConstants {
new ConcurrentHashMap<String, HServerLoad>();
private HMaster master;
private final Leases serverLeases;
// Last time we logged average load.
private volatile long lastLogOfAverageLaod = 0;
@ -100,9 +101,7 @@ class ServerManager implements HConstants {
*/
public ServerManager(HMaster master) {
this.master = master;
serverLeases = new Leases(master.leaseTimeout,
master.getConfiguration().getInt("hbase.master.lease.thread.wakefrequency",
15 * 1000));
zooKeeperWrapper = master.getZooKeeperWrapper();
this.loggingPeriodForAverageLoad = master.getConfiguration().
getLong("hbase.master.avgload.logging.period", 60000);
this.nobalancingCount = master.getConfiguration().
@ -111,8 +110,7 @@ class ServerManager implements HConstants {
/**
* Look to see if we have ghost references to this regionserver such as
* still-existing leases or if regionserver is on the dead servers list
* getting its logs processed.
* if regionserver is on the dead servers list getting its logs processed.
* @param serverInfo
* @return True if still ghost references and we have not been able to clear
* them or the server is shutting down.
@ -120,7 +118,6 @@ class ServerManager implements HConstants {
private boolean checkForGhostReferences(final HServerInfo serverInfo) {
String s = serverInfo.getServerAddress().toString().trim();
boolean result = false;
boolean lease = false;
for (long sleepTime = -1; !master.closed.get() && !result;) {
if (sleepTime != -1) {
try {
@ -129,28 +126,12 @@ class ServerManager implements HConstants {
// Continue
}
}
if (!lease) {
try {
this.serverLeases.createLease(s, new ServerExpirer(s));
} catch (Leases.LeaseStillHeldException e) {
LOG.debug("Waiting on current lease to expire for " + e.getName());
sleepTime = this.master.leaseTimeout / 4;
continue;
}
lease = true;
}
// May be on list of dead servers. If so, wait till we've cleared it.
String addr = serverInfo.getServerAddress().toString();
if (isDead(addr)) {
LOG.debug("Waiting on " + addr + " removal from dead list before " +
"processing report-for-duty request");
sleepTime = this.master.threadWakeFrequency;
try {
// Keep up lease. May be here > lease expiration.
this.serverLeases.renewLease(s);
} catch (LeaseException e) {
LOG.warn("Failed renewal. Retrying.", e);
}
continue;
}
result = true;
@ -164,6 +145,10 @@ class ServerManager implements HConstants {
*/
public void regionServerStartup(final HServerInfo serverInfo) {
String s = serverInfo.getServerAddress().toString().trim();
Watcher watcher = new ServerExpirer(serverInfo.getServerAddress()
.toString().trim());
zooKeeperWrapper.updateRSLocationGetWatch(serverInfo, watcher);
LOG.info("Received start message from: " + s);
if (!checkForGhostReferences(serverInfo)) {
return;
@ -291,7 +276,7 @@ class ServerManager implements HConstants {
}
synchronized (serversToServerInfo) {
cancelLease(serverName);
removeServerInfo(serverName);
serversToServerInfo.notifyAll();
}
@ -306,14 +291,11 @@ class ServerManager implements HConstants {
private void processRegionServerExit(String serverName, HMsg[] msgs) {
synchronized (serversToServerInfo) {
try {
// HRegionServer is shutting down. Cancel the server's lease.
// Note that canceling the server's lease takes care of updating
// serversToServerInfo, etc.
if (cancelLease(serverName)) {
// Only process the exit message if the server still has a lease.
// HRegionServer is shutting down.
if (removeServerInfo(serverName)) {
// Only process the exit message if the server still has registered info.
// Otherwise we could end up processing the server exit twice.
LOG.info("Region server " + serverName +
": MSG_REPORT_EXITING -- lease cancelled");
LOG.info("Region server " + serverName + ": MSG_REPORT_EXITING");
// Get all the regions the server was serving reassigned
// (if we are not shutting down).
if (!master.closed.get()) {
@ -357,10 +339,6 @@ class ServerManager implements HConstants {
private HMsg[] processRegionServerAllsWell(String serverName,
HServerInfo serverInfo, HRegionInfo[] mostLoadedRegions, HMsg[] msgs)
throws IOException {
// All's well. Renew the server's lease.
// This will always succeed; otherwise, the fetch of serversToServerInfo
// would have failed above.
serverLeases.renewLease(serverName);
// Refresh the info object and the load information
serversToServerInfo.put(serverName, serverInfo);
@ -608,27 +586,19 @@ class ServerManager implements HConstants {
}
}
/** Cancel a server's lease and update its load information */
private boolean cancelLease(final String serverName) {
boolean leaseCancelled = false;
/** Update a server load information because it's shutting down*/
private boolean removeServerInfo(final String serverName) {
boolean infoUpdated = false;
HServerInfo info = serversToServerInfo.remove(serverName);
// Only cancel lease and update load information once.
// Only update load information once.
// This method can be called a couple of times during shutdown.
if (info != null) {
LOG.info("Cancelling lease for " + serverName);
LOG.info("Removing server's info " + serverName);
if (master.getRootRegionLocation() != null &&
info.getServerAddress().equals(master.getRootRegionLocation())) {
master.regionManager.unsetRootRegion();
}
try {
serverLeases.cancelLease(serverName);
} catch (LeaseException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cancelling " + serverName + " got " + e.getMessage() +
"...continuing");
}
}
leaseCancelled = true;
infoUpdated = true;
// update load information
HServerLoad load = serversToLoad.remove(serverName);
@ -642,7 +612,7 @@ class ServerManager implements HConstants {
}
}
}
return leaseCancelled;
return infoUpdated;
}
/**
@ -726,8 +696,7 @@ class ServerManager implements HConstants {
* Wait on regionservers to report in
* with {@link #regionServerReport(HServerInfo, HMsg[])} so they get notice
* the master is going down. Waits until all region servers come back with
* a MSG_REGIONSERVER_STOP which will cancel their lease or until leases held
* by remote region servers have expired.
* a MSG_REGIONSERVER_STOP.
*/
void letRegionServersShutdown() {
if (!master.fsOk) {
@ -737,8 +706,7 @@ class ServerManager implements HConstants {
}
synchronized (serversToServerInfo) {
while (serversToServerInfo.size() > 0) {
LOG.info("Waiting on following regionserver(s) to go down (or " +
"region server lease expiration, whichever happens first): " +
LOG.info("Waiting on following regionserver(s) to go down " +
serversToServerInfo.values());
try {
serversToServerInfo.wait(master.threadWakeFrequency);
@ -749,65 +717,55 @@ class ServerManager implements HConstants {
}
}
/** Instantiated to monitor the health of a region server */
private class ServerExpirer implements LeaseListener {
/** Watcher triggered when a RS znode is deleted */
private class ServerExpirer implements Watcher {
private String server;
ServerExpirer(String server) {
this.server = server;
}
public void leaseExpired() {
LOG.info(server + " lease expired");
// Remove the server from the known servers list and update load info
HServerInfo info = serversToServerInfo.remove(server);
boolean rootServer = false;
if (info != null) {
HServerAddress root = master.getRootRegionLocation();
if (root != null && root.equals(info.getServerAddress())) {
// NOTE: If the server was serving the root region, we cannot reassign
// it here because the new server will start serving the root region
// before ProcessServerShutdown has a chance to split the log file.
master.regionManager.unsetRootRegion();
rootServer = true;
}
String serverName = info.getServerAddress().toString();
HServerLoad load = serversToLoad.remove(serverName);
if (load != null) {
synchronized (loadToServers) {
Set<String> servers = loadToServers.get(load);
if (servers != null) {
servers.remove(serverName);
loadToServers.put(load, servers);
public void process(WatchedEvent event) {
if(event.getType().equals(EventType.NodeDeleted)) {
LOG.info(server + " znode expired");
// Remove the server from the known servers list and update load info
HServerInfo info = serversToServerInfo.remove(server);
boolean rootServer = false;
if (info != null) {
HServerAddress root = master.getRootRegionLocation();
if (root != null && root.equals(info.getServerAddress())) {
// NOTE: If the server was serving the root region, we cannot
// reassign
// it here because the new server will start serving the root region
// before ProcessServerShutdown has a chance to split the log file.
master.regionManager.unsetRootRegion();
rootServer = true;
}
String serverName = info.getServerAddress().toString();
HServerLoad load = serversToLoad.remove(serverName);
if (load != null) {
synchronized (loadToServers) {
Set<String> servers = loadToServers.get(load);
if (servers != null) {
servers.remove(serverName);
loadToServers.put(load, servers);
}
}
}
deadServers.put(server, Boolean.FALSE);
try {
master.toDoQueue.put(new ProcessServerShutdown(master, info,
rootServer));
} catch (InterruptedException e) {
LOG.error("insert into toDoQueue was interrupted", e);
}
}
deadServers.put(server, Boolean.FALSE);
try {
master.toDoQueue.put(
new ProcessServerShutdown(master, info, rootServer));
} catch (InterruptedException e) {
LOG.error("insert into toDoQueue was interrupted", e);
synchronized (serversToServerInfo) {
serversToServerInfo.notifyAll();
}
}
synchronized (serversToServerInfo) {
serversToServerInfo.notifyAll();
}
}
}
/** Start up the server manager */
public void start() {
// Leases are not the same as Chore threads. Set name differently.
this.serverLeases.setName("ServerManager.leaseChecker");
this.serverLeases.start();
}
/** Shut down the server manager */
public void stop() {
// stop monitor lease monitor
serverLeases.close();
}
/**
* @param serverName

View File

@ -284,6 +284,16 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
throw new NullPointerException("Server address cannot be null; " +
"hbase-958 debugging");
}
this.zooKeeperWrapper = new ZooKeeperWrapper(conf);
boolean startCodeOk = false;
while(!startCodeOk) {
serverInfo.setStartCode(System.currentTimeMillis());
startCodeOk = zooKeeperWrapper.writeRSLocation(serverInfo);
if(!startCodeOk) {
LOG.debug("Start code already taken, trying another one");
}
}
this.numRegionsToReport =
conf.getInt("hbase.regionserver.numregionstoreport", 10);
@ -295,8 +305,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
for(int i = 0; i < nbBlocks; i++) {
reservedSpace.add(new byte[DEFAULT_SIZE_RESERVATION_BLOCK]);
}
this.zooKeeperWrapper = new ZooKeeperWrapper(conf);
}
/**

View File

@ -31,6 +31,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@ -64,6 +65,7 @@ public class ZooKeeperWrapper implements HConstants {
private final String parentZNode;
private final String rootRegionZNode;
private final String outOfSafeModeZNode;
private final String rsZNode;
/**
* Create a ZooKeeperWrapper.
@ -103,20 +105,14 @@ public class ZooKeeperWrapper implements HConstants {
String rootServerZNodeName = conf.get(ZOOKEEPER_ROOT_SERVER_ZNODE,
DEFAULT_ZOOKEEPER_ROOT_SERVER_ZNODE);
if (rootServerZNodeName.startsWith(ZNODE_PATH_SEPARATOR)) {
rootRegionZNode = rootServerZNodeName;
} else {
rootRegionZNode = parentZNode + ZNODE_PATH_SEPARATOR + rootServerZNodeName;
}
String outOfSafeModeZNodeName = conf.get(ZOOKEEPER_SAFE_MODE_ZNODE,
DEFAULT_ZOOKEEPER_SAFE_MODE_ZNODE);
if (outOfSafeModeZNodeName.startsWith(ZNODE_PATH_SEPARATOR)) {
outOfSafeModeZNode = outOfSafeModeZNodeName;
} else {
outOfSafeModeZNode = parentZNode + ZNODE_PATH_SEPARATOR +
outOfSafeModeZNodeName;
}
DEFAULT_ZOOKEEPER_SAFE_MODE_ZNODE);
String rsZNodeName = conf.get(ZOOKEEPER_RS_ZNODE,
DEFAULT_ZOOKEEPER_RS_ZNODE);
rootRegionZNode = getZNode(rootServerZNodeName);
outOfSafeModeZNode = getZNode(outOfSafeModeZNodeName);
rsZNode = getZNode(rsZNodeName);
}
/**
@ -218,11 +214,11 @@ public class ZooKeeperWrapper implements HConstants {
return address;
}
private boolean ensureParentZNodeExists() {
private boolean ensureZNodeExists(String path) {
try {
zooKeeper.create(parentZNode, new byte[0],
zooKeeper.create(path, new byte[0],
Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
LOG.debug("Created ZNode " + parentZNode);
LOG.debug("Created ZNode " + path);
return true;
} catch (KeeperException.NodeExistsException e) {
return true; // ok, move on.
@ -240,7 +236,7 @@ public class ZooKeeperWrapper implements HConstants {
* @return true if operation succeeded, false otherwise.
*/
public boolean deleteRootRegionLocation() {
if (!ensureParentZNodeExists()) {
if (!ensureZNodeExists(parentZNode)) {
return false;
}
@ -301,7 +297,7 @@ public class ZooKeeperWrapper implements HConstants {
return deleteRootRegionLocation();
}
if (!ensureParentZNodeExists()) {
if (!ensureZNodeExists(parentZNode)) {
return false;
}
@ -320,7 +316,7 @@ public class ZooKeeperWrapper implements HConstants {
* @return true if we're out of safe mode, false otherwise.
*/
public boolean checkOutOfSafeMode() {
if (!ensureParentZNodeExists()) {
if (!ensureZNodeExists(parentZNode)) {
return false;
}
@ -332,7 +328,7 @@ public class ZooKeeperWrapper implements HConstants {
* @return true if ephemeral ZNode created successfully, false otherwise.
*/
public boolean writeOutOfSafeMode() {
if (!ensureParentZNodeExists()) {
if (!ensureZNodeExists(parentZNode)) {
return false;
}
@ -349,7 +345,55 @@ public class ZooKeeperWrapper implements HConstants {
return false;
}
/**
* Write in ZK this RS startCode and address.
* Ensures that the full path exists.
* @param info The RS info
* @return true if the location was written, false if it failed
*/
public boolean writeRSLocation(HServerInfo info) {
ensureZNodeExists(parentZNode);
ensureZNodeExists(rsZNode);
byte[] data = Bytes.toBytes(info.getServerAddress().getBindAddress());
String znode = rsZNode + ZNODE_PATH_SEPARATOR + info.getStartCode();
try {
zooKeeper.create(znode, data, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
LOG.debug("Created ZNode " + znode
+ " with data " + info.getServerAddress().getBindAddress());
return true;
} catch (KeeperException e) {
LOG.warn("Failed to create " + znode + " znode in ZooKeeper: " + e);
} catch (InterruptedException e) {
LOG.warn("Failed to create " + znode + " znode in ZooKeeper: " + e);
}
return false;
}
/**
* Update the RS address and set a watcher on the znode
* @param info The RS info
* @param watcher The watcher to put on the znode
* @return true if the update is done, false if it failed
*/
public boolean updateRSLocationGetWatch(HServerInfo info, Watcher watcher) {
byte[] data = Bytes.toBytes(info.getServerAddress().getBindAddress());
String znode = rsZNode + "/" + info.getStartCode();
try {
zooKeeper.setData(znode, data, -1);
LOG.debug("Updated ZNode " + znode
+ " with data " + info.getServerAddress().getBindAddress());
zooKeeper.getData(znode, watcher, null);
return true;
} catch (KeeperException e) {
LOG.warn("Failed to update " + znode + " znode in ZooKeeper: " + e);
} catch (InterruptedException e) {
LOG.warn("Failed to update " + znode + " znode in ZooKeeper: " + e);
}
return false;
}
private boolean checkExistenceOf(String path) {
Stat stat = null;
try {
@ -374,4 +418,10 @@ public class ZooKeeperWrapper implements HConstants {
LOG.warn("Failed to close connection with ZooKeeper");
}
}
private String getZNode(String znodeName) {
return znodeName.startsWith(ZNODE_PATH_SEPARATOR) ?
znodeName :
parentZNode + ZNODE_PATH_SEPARATOR + znodeName;
}
}

View File

@ -55,7 +55,6 @@ public class TestHBaseCluster extends HBaseClusterTestCase {
// Make lease timeout longer, lease checks less frequent
conf.setInt("hbase.master.lease.period", 10 * 1000);
conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
// Increase the amount of time between client retries
conf.setLong("hbase.client.pause", 15 * 1000);

View File

@ -81,7 +81,6 @@ public class TestRowFilterAfterWrite extends HBaseClusterTestCase {
// Make lease timeout longer, lease checks less frequent
conf.setInt("hbase.master.lease.period", 10 * 1000);
conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
// For debugging
conf.setInt("hbase.regionserver.lease.period", 20 * 60 * 1000);

View File

@ -88,7 +88,6 @@ public class TestLogRolling extends HBaseClusterTestCase {
// Make lease timeout longer, lease checks less frequent
conf.setInt("hbase.master.lease.period", 10 * 1000);
conf.setInt("hbase.master.lease.thread.wakefrequency", 5 * 1000);
// Increase the amount of time between client retries
conf.setLong("hbase.client.pause", 15 * 1000);