HBASE-3010 Can't start/stop/start... cluster using new master

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@998380 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-09-18 00:51:05 +00:00
parent e532293310
commit d153ec95da
13 changed files with 206 additions and 256 deletions

View File

@ -526,6 +526,7 @@ Release 0.21.0 - Unreleased
HBASE-3006 Reading compressed HFile blocks causes way too many DFS RPC HBASE-3006 Reading compressed HFile blocks causes way too many DFS RPC
calls severly impacting performance calls severly impacting performance
(Kannan Muthukkaruppan via Stack) (Kannan Muthukkaruppan via Stack)
HBASE-3010 Can't start/stop/start... cluster using new master
IMPROVEMENTS IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable HBASE-1760 Cleanup TODOs in HTable

View File

@ -55,6 +55,8 @@ while kill -0 `cat $pid` > /dev/null 2>&1; do
echo -n "." echo -n "."
sleep 1; sleep 1;
done done
# Add a CR after we're done w/ dots.
echo
# distributed == false means that the HMaster will kill ZK when it exits # distributed == false means that the HMaster will kill ZK when it exits
distMode=`$bin/hbase org.apache.hadoop.hbase.util.HBaseConfTool hbase.cluster.distributed` distMode=`$bin/hbase org.apache.hadoop.hbase.util.HBaseConfTool hbase.cluster.distributed`

View File

@ -114,9 +114,6 @@ public class CatalogTracker {
public void start() throws IOException, InterruptedException { public void start() throws IOException, InterruptedException {
this.rootRegionTracker.start(); this.rootRegionTracker.start();
this.metaNodeTracker.start(); this.metaNodeTracker.start();
// Determine meta assignment; may not work because root and meta not yet
// deployed. Calling the below will set {@link #metaLocation}.
getMetaServerConnection(true);
} }
/** /**
@ -205,7 +202,7 @@ public class CatalogTracker {
*/ */
private HRegionInterface getRootServerConnection() private HRegionInterface getRootServerConnection()
throws IOException, InterruptedException { throws IOException, InterruptedException {
HServerAddress address = rootRegionTracker.getRootRegionLocation(); HServerAddress address = this.rootRegionTracker.getRootRegionLocation();
if (address == null) { if (address == null) {
return null; return null;
} }

View File

@ -20,18 +20,6 @@
package org.apache.hadoop.hbase.ipc; package org.apache.hadoop.hbase.ipc;
import com.google.common.base.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
@ -64,6 +52,18 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.ObjectWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Function;
/** An abstract IPC service. IPC calls take a single {@link Writable} as a /** An abstract IPC service. IPC calls take a single {@link Writable} as a
* parameter, and return a {@link Writable} as their value. A service runs on * parameter, and return a {@link Writable} as their value. A service runs on
* a port and is defined by a parameter class and a value class. * a port and is defined by a parameter class and a value class.
@ -151,7 +151,6 @@ public abstract class HBaseServer {
protected Configuration conf; protected Configuration conf;
@SuppressWarnings({"FieldCanBeLocal"})
private int maxQueueSize; private int maxQueueSize;
protected int socketSendBufferSize; protected int socketSendBufferSize;
protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm protected final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
@ -285,7 +284,6 @@ public abstract class HBaseServer {
this.readSelector = readSelector; this.readSelector = readSelector;
} }
public void run() { public void run() {
LOG.info("Starting SocketReader");
synchronized(this) { synchronized(this) {
while (running) { while (running) {
SelectionKey key = null; SelectionKey key = null;

View File

@ -95,7 +95,7 @@ class ActiveMasterManager extends ZooKeeperListener {
clusterHasActiveMaster.set(true); clusterHasActiveMaster.set(true);
} else { } else {
// Node is no longer there, cluster does not have an active master // Node is no longer there, cluster does not have an active master
LOG.debug("No master available. notifying waiting threads"); LOG.debug("No master available. Notifying waiting threads");
clusterHasActiveMaster.set(false); clusterHasActiveMaster.set(false);
// Notify any thread waiting to become the active master // Notify any thread waiting to become the active master
clusterHasActiveMaster.notifyAll(); clusterHasActiveMaster.notifyAll();
@ -114,46 +114,56 @@ class ActiveMasterManager extends ZooKeeperListener {
* *
* This also makes sure that we are watching the master znode so will be * This also makes sure that we are watching the master znode so will be
* notified if another master dies. * notified if another master dies.
* @return False if we did not start up this cluster, another * @return True if no issue becoming active master else false if another
* master did, or if a problem (zookeeper, stop flag has been set on this * master was running or if some other problem (zookeeper, stop flag has been
* Master) * set on this Master)
*/ */
boolean blockUntilBecomingActiveMaster() { boolean blockUntilBecomingActiveMaster() {
boolean thisMasterStartedCluster = true; boolean cleanSetOfActiveMaster = true;
// Try to become the active master, watch if there is another master // Try to become the active master, watch if there is another master
try { try {
if(ZKUtil.setAddressAndWatch(watcher, watcher.masterAddressZNode, if (ZKUtil.setAddressAndWatch(this.watcher,
address)) { this.watcher.masterAddressZNode, this.address)) {
// We are the master, return // We are the master, return
clusterHasActiveMaster.set(true); this.clusterHasActiveMaster.set(true);
return thisMasterStartedCluster; return cleanSetOfActiveMaster;
}
// There is another active master running elsewhere or this is a restart
// and the master ephemeral node has not expired yet.
this.clusterHasActiveMaster.set(true);
cleanSetOfActiveMaster = false;
HServerAddress currentMaster =
ZKUtil.getDataAsAddress(this.watcher, this.watcher.masterAddressZNode);
if (currentMaster != null && currentMaster.equals(this.address)) {
LOG.info("Current master has this master's address, " + currentMaster +
"; master was restarted? Waiting on znode to expire...");
// Hurry along the expiration of the znode.
ZKUtil.deleteNode(this.watcher, this.watcher.masterAddressZNode);
} else {
LOG.info("Another master is the active master, " + currentMaster +
"; waiting to become the next active master");
} }
} catch (KeeperException ke) { } catch (KeeperException ke) {
master.abort("Received an unexpected KeeperException, aborting", ke); master.abort("Received an unexpected KeeperException, aborting", ke);
return false; return false;
} }
// There is another active master, this is not a cluster startup synchronized (this.clusterHasActiveMaster) {
// and we must wait until the active master dies while (this.clusterHasActiveMaster.get() && !this.master.isStopped()) {
LOG.info("Another master is already the active master, waiting to become " +
"the next active master");
clusterHasActiveMaster.set(true);
thisMasterStartedCluster = false;
synchronized(clusterHasActiveMaster) {
while(clusterHasActiveMaster.get() && !master.isStopped()) {
try { try {
clusterHasActiveMaster.wait(); this.clusterHasActiveMaster.wait();
} catch (InterruptedException e) { } catch (InterruptedException e) {
// We expect to be interrupted when a master dies, will fall out if so // We expect to be interrupted when a master dies, will fall out if so
LOG.debug("Interrupted waiting for master to die", e); LOG.debug("Interrupted waiting for master to die", e);
} }
} }
if(master.isStopped()) { if (this.master.isStopped()) {
return thisMasterStartedCluster; return cleanSetOfActiveMaster;
} }
// Try to become active master again now that there is no active master // Try to become active master again now that there is no active master
blockUntilBecomingActiveMaster(); blockUntilBecomingActiveMaster();
} }
return thisMasterStartedCluster; return cleanSetOfActiveMaster;
} }
/** /**

View File

@ -157,8 +157,8 @@ public class AssignmentManager extends ZooKeeperListener {
void cleanoutUnassigned() throws IOException, KeeperException { void cleanoutUnassigned() throws IOException, KeeperException {
// Cleanup any existing ZK nodes and start watching // Cleanup any existing ZK nodes and start watching
ZKAssign.deleteAllNodes(watcher); ZKAssign.deleteAllNodes(watcher);
ZKUtil.listChildrenAndWatchForNewChildren(watcher, ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
watcher.assignmentZNode); this.watcher.assignmentZNode);
} }
/** /**
@ -545,7 +545,7 @@ public class AssignmentManager extends ZooKeeperListener {
if (plan == null) { if (plan == null) {
LOG.debug("No previous transition plan for " + LOG.debug("No previous transition plan for " +
state.getRegion().getRegionNameAsString() + state.getRegion().getRegionNameAsString() +
" so generating a random one from " + serverManager.numServers() + " so generating a random one from " + serverManager.countOfRegionServers() +
" ( " + serverManager.getOnlineServers().size() + ") available servers"); " ( " + serverManager.getOnlineServers().size() + ") available servers");
plan = new RegionPlan(state.getRegion(), null, plan = new RegionPlan(state.getRegion(), null,
LoadBalancer.randomAssignment(serverManager.getOnlineServersList())); LoadBalancer.randomAssignment(serverManager.getOnlineServersList()));

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.UnknownRegionException;
import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.MetaScanner; import org.apache.hadoop.hbase.client.MetaScanner;
@ -145,12 +146,14 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
// Cluster status zk tracker and local setter // Cluster status zk tracker and local setter
private ClusterStatusTracker clusterStatusTracker; private ClusterStatusTracker clusterStatusTracker;
// True if this is the master that started the cluster. // True if this a cluster startup where there are no already running servers
boolean clusterStarter; // as opposed to a master joining an already running cluster
boolean freshClusterStartup;
// This flag is for stopping this Master instance. // This flag is for stopping this Master instance. Its set when we are
private boolean stopped = false; // stopping or aborting
// Set on abort -- usually failure of our zk session private volatile boolean stopped = false;
// Set on abort -- usually failure of our zk session.
private volatile boolean abort = false; private volatile boolean abort = false;
// Instance of the hbase executor service. // Instance of the hbase executor service.
@ -178,17 +181,17 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
this.conf = conf; this.conf = conf;
/* /*
* 1. Determine address and initialize RPC server (but do not start). * 1. Determine address and initialize RPC server (but do not start).
* The RPC server ports can be ephemeral. * The RPC server ports can be ephemeral. Create a ZKW instance.
*/ */
HServerAddress a = new HServerAddress(getMyAddress(this.conf)); HServerAddress a = new HServerAddress(getMyAddress(this.conf));
int numHandlers = conf.getInt("hbase.regionserver.handler.count", 10); int numHandlers = conf.getInt("hbase.regionserver.handler.count", 10);
this.rpcServer = HBaseRPC.getServer(this, this.rpcServer = HBaseRPC.getServer(this,
new Class<?>[]{HMasterInterface.class, HMasterRegionInterface.class}, new Class<?>[]{HMasterInterface.class, HMasterRegionInterface.class},
a.getBindAddress(), a.getPort(), a.getBindAddress(), a.getPort(),
numHandlers, numHandlers,
0, // we dont use high priority handlers in master 0, // we dont use high priority handlers in master
false, conf, false, conf,
0); // this is a DNC w/o high priority handlers 0); // this is a DNC w/o high priority handlers
this.address = new HServerAddress(rpcServer.getListenerAddress()); this.address = new HServerAddress(rpcServer.getListenerAddress());
// set the thread name now we have an address // set the thread name now we have an address
@ -201,24 +204,11 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
"_" + System.currentTimeMillis()); "_" + System.currentTimeMillis());
} }
/*
* 2. Determine if this is a fresh cluster startup or failed over master.
* This is done by checking for the existence of any ephemeral
* RegionServer nodes in ZooKeeper. These nodes are created by RSs on
* their initialization but only after they find the primary master. As
* long as this check is done before we write our address into ZK, this
* will work. Note that multiple masters could find this to be true on
* startup (none have become active master yet), which is why there is an
* additional check if this master does not become primary on its first attempt.
*/
this.zooKeeper = this.zooKeeper =
new ZooKeeperWatcher(conf, MASTER + "-" + getMasterAddress(), this); new ZooKeeperWatcher(conf, MASTER + "-" + getMasterAddress(), this);
this.clusterStarter = 0 ==
ZKUtil.getNumberOfChildren(zooKeeper, zooKeeper.rsZNode);
/* /*
* 3. Block on becoming the active master. * 2. Block on becoming the active master.
* We race with other masters to write our address into ZooKeeper. If we * We race with other masters to write our address into ZooKeeper. If we
* succeed, we are the primary/active master and finish initialization. * succeed, we are the primary/active master and finish initialization.
* *
@ -228,32 +218,25 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
*/ */
this.activeMasterManager = new ActiveMasterManager(zooKeeper, address, this); this.activeMasterManager = new ActiveMasterManager(zooKeeper, address, this);
this.zooKeeper.registerListener(activeMasterManager); this.zooKeeper.registerListener(activeMasterManager);
stallIfBackupMaster(this.conf, this.activeMasterManager);
activeMasterManager.blockUntilBecomingActiveMaster();
/*
// If we're a backup master, stall until a primary to writes his address * 3. Determine if this is a fresh cluster startup or failed over master.
if (conf.getBoolean(HConstants.MASTER_TYPE_BACKUP, * This is done by checking for the existence of any ephemeral
HConstants.DEFAULT_MASTER_TYPE_BACKUP)) { * RegionServer nodes in ZooKeeper. These nodes are created by RSs on
// This will only be a minute or so while the cluster starts up, * their initialization but initialization will not happen unless clusterup
// so don't worry about setting watches on the parent znode * flag is set -- see ClusterStatusTracker below.
while (!this.activeMasterManager.isActiveMaster()) {
try {
LOG.debug("Waiting for master address ZNode to be written " +
"(Also watching cluster state node)");
Thread.sleep(conf.getInt("zookeeper.session.timeout", 60 * 1000));
} catch (InterruptedException e) {
// interrupted = user wants to kill us. Don't continue
throw new IOException("Interrupted waiting for master address");
}
}
}
// Wait here until we are the active master
clusterStarter = activeMasterManager.blockUntilBecomingActiveMaster();
/**
* 4. We are active master now... go initialize components we need to run.
*/ */
// TODO: Do this using Dependency Injection, using PicoContainer or Spring. this.freshClusterStartup =
0 == ZKUtil.getNumberOfChildren(zooKeeper, zooKeeper.rsZNode);
/*
* 4. We are active master now... go initialize components we need to run.
* Note, there may be dross in zk from previous runs; it'll get addressed
* when we enter {@link #run()} below.
*/
// TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
this.fileSystemManager = new MasterFileSystem(this); this.fileSystemManager = new MasterFileSystem(this);
this.connection = HConnectionManager.getConnection(conf); this.connection = HConnectionManager.getConnection(conf);
this.executorService = new ExecutorService(getServerName()); this.executorService = new ExecutorService(getServerName());
@ -270,18 +253,40 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
this.regionServerTracker = new RegionServerTracker(zooKeeper, this, this.regionServerTracker = new RegionServerTracker(zooKeeper, this,
this.serverManager); this.serverManager);
regionServerTracker.start(); this.regionServerTracker.start();
// Set the cluster as up. // Set the cluster as up. If new RSs, they'll be waiting on this before
// going ahead with their startup.
this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this); this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this);
this.clusterStatusTracker.setClusterUp(); this.clusterStatusTracker.setClusterUp();
this.clusterStatusTracker.start(); this.clusterStatusTracker.start();
LOG.info("Server active/primary master; " + this.address + LOG.info("Server active/primary master; " + this.address +
"; clusterStarter=" + this.clusterStarter + ", sessionid=0x" + "; freshClusterStart=" + this.freshClusterStartup + ", sessionid=0x" +
Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId())); Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
} }
/*
* Stall startup if we are designated a backup master.
* @param c
* @param amm
* @throws InterruptedException
*/
private static void stallIfBackupMaster(final Configuration c,
final ActiveMasterManager amm)
throws InterruptedException {
// If we're a backup master, stall until a primary to writes his address
if (!c.getBoolean(HConstants.MASTER_TYPE_BACKUP,
HConstants.DEFAULT_MASTER_TYPE_BACKUP)) return;
// This will only be a minute or so while the cluster starts up,
// so don't worry about setting watches on the parent znode
while (!amm.isActiveMaster()) {
LOG.debug("Waiting for master address ZNode to be written " +
"(Also watching cluster state node)");
Thread.sleep(c.getInt("zookeeper.session.timeout", 60 * 1000));
}
}
/** /**
* Main processing loop for the HMaster. * Main processing loop for the HMaster.
* 1. Handle both fresh cluster start as well as failed over initialization of * 1. Handle both fresh cluster start as well as failed over initialization of
@ -295,22 +300,24 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
try { try {
// start up all service threads. // start up all service threads.
startServiceThreads(); startServiceThreads();
// wait for minimum number of region servers to be up // Wait for minimum number of region servers to report in
this.serverManager.waitForMinServers(); this.serverManager.waitForRegionServers();
// start assignment of user regions, startup or failure
if (this.clusterStarter) { // Start assignment of user regions, startup or failure
clusterStarterInitializations(this.fileSystemManager, if (!this.stopped) {
if (this.freshClusterStartup) {
clusterStarterInitializations(this.fileSystemManager,
this.serverManager, this.catalogTracker, this.assignmentManager); this.serverManager, this.catalogTracker, this.assignmentManager);
} else { } else {
// Process existing unassigned nodes in ZK, read all regions from META, // Process existing unassigned nodes in ZK, read all regions from META,
// rebuild in-memory state. // rebuild in-memory state.
this.assignmentManager.processFailover(); this.assignmentManager.processFailover();
}
} }
// Check if we should stop every second. // Check if we should stop every second.
Sleeper sleeper = new Sleeper(1000, this); Sleeper sleeper = new Sleeper(1000, this);
while (!this.stopped && !this.abort) { while (!this.stopped) sleeper.sleep();
sleeper.sleep();
}
} catch (Throwable t) { } catch (Throwable t) {
abort("Unhandled exception. Starting shutdown.", t); abort("Unhandled exception. Starting shutdown.", t);
} }
@ -341,22 +348,17 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
/* /*
* Initializations we need to do if we are cluster starter. * Initializations we need to do if we are cluster starter.
* @param starter
* @param mfs * @param mfs
* @param sm
* @param ct
* @param am
* @throws IOException * @throws IOException
*/ */
private static void clusterStarterInitializations(final MasterFileSystem mfs, private static void clusterStarterInitializations(final MasterFileSystem mfs,
final ServerManager sm, final CatalogTracker ct, final AssignmentManager am) final ServerManager sm, final CatalogTracker ct, final AssignmentManager am)
throws IOException, InterruptedException, KeeperException { throws IOException, InterruptedException, KeeperException {
// This master is starting the cluster (its not a preexisting cluster // Check filesystem has required basics
// that this master is joining).
// Initialize the filesystem, which does the following:
// - Creates the root hbase directory in the FS if DNE
// - If fresh start, create first ROOT and META regions (bootstrap)
// - Checks the FS to make sure the root directory is readable
// - Creates the archive directory for logs
mfs.initialize(); mfs.initialize();
// Do any log splitting necessary
// TODO: Should do this in background rather than block master startup // TODO: Should do this in background rather than block master startup
// TODO: Do we want to do this before/while/after RSs check in? // TODO: Do we want to do this before/while/after RSs check in?
// It seems that this method looks at active RSs but happens // It seems that this method looks at active RSs but happens
@ -795,6 +797,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
if (t != null) LOG.fatal(msg, t); if (t != null) LOG.fatal(msg, t);
else LOG.fatal(msg); else LOG.fatal(msg);
this.abort = true; this.abort = true;
stop("Aborting");
} }
@Override @Override

View File

@ -19,8 +19,8 @@
*/ */
package org.apache.hadoop.hbase.master; package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
@ -30,11 +30,10 @@ import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.LocalHBaseCluster; import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.ServerCommandLine; import org.apache.hadoop.hbase.util.ServerCommandLine;

View File

@ -83,8 +83,11 @@ public class MasterFileSystem {
} }
/** /**
* Create initial layout in filesystem.
* <ol> * <ol>
* <li>Check if the root region exists and is readable, if not create it</li> * <li>Check if the root region exists and is readable, if not create it.
* Create hbase.version and the -ROOT- directory if not one.
* </li>
* <li>Create a log archive directory for RS to put archived logs</li> * <li>Create a log archive directory for RS to put archived logs</li>
* </ol> * </ol>
*/ */

View File

@ -86,8 +86,6 @@ public class ServerManager {
private final ServerMonitor serverMonitorThread; private final ServerMonitor serverMonitorThread;
private int minimumServerCount;
private final LogCleaner logCleaner; private final LogCleaner logCleaner;
// Reporting to track master metrics. // Reporting to track master metrics.
@ -106,7 +104,7 @@ public class ServerManager {
@Override @Override
protected void chore() { protected void chore() {
int numServers = numServers(); int numServers = countOfRegionServers();
int numDeadServers = deadservers.size(); int numDeadServers = deadservers.size();
double averageLoad = getAverageLoad(); double averageLoad = getAverageLoad();
String deadServersList = deadservers.toString(); String deadServersList = deadservers.toString();
@ -127,7 +125,6 @@ public class ServerManager {
this.services = services; this.services = services;
Configuration c = master.getConfiguration(); Configuration c = master.getConfiguration();
int monitorInterval = c.getInt("hbase.master.monitor.interval", 60 * 1000); int monitorInterval = c.getInt("hbase.master.monitor.interval", 60 * 1000);
this.minimumServerCount = c.getInt("hbase.regions.server.count.min", 1);
this.metrics = new MasterMetrics(master.getServerName()); this.metrics = new MasterMetrics(master.getServerName());
this.serverMonitorThread = new ServerMonitor(monitorInterval, master); this.serverMonitorThread = new ServerMonitor(monitorInterval, master);
String n = Thread.currentThread().getName(); String n = Thread.currentThread().getName();
@ -220,8 +217,8 @@ public class ServerManager {
info.setLoad(load); info.setLoad(load);
// TODO: Why did we update the RS location ourself? Shouldn't RS do this? // TODO: Why did we update the RS location ourself? Shouldn't RS do this?
// masterStatus.getZooKeeper().updateRSLocationGetWatch(info, watcher); // masterStatus.getZooKeeper().updateRSLocationGetWatch(info, watcher);
onlineServers.put(serverName, info); this.onlineServers.put(serverName, info);
if(hri == null) { if (hri == null) {
serverConnections.remove(serverName); serverConnections.remove(serverName);
} else { } else {
serverConnections.put(serverName, hri); serverConnections.put(serverName, hri);
@ -277,7 +274,7 @@ public class ServerManager {
} }
HMsg [] reply = null; HMsg [] reply = null;
int numservers = numServers(); int numservers = countOfRegionServers();
if (this.clusterShutdown) { if (this.clusterShutdown) {
if (numservers <= 2) { if (numservers <= 2) {
// Shutdown needs to be staggered; the meta regions need to close last // Shutdown needs to be staggered; the meta regions need to close last
@ -362,14 +359,10 @@ public class ServerManager {
return averageLoad; return averageLoad;
} }
/** @return the number of active servers */ /** @return the count of active regionservers */
public int numServers() { int countOfRegionServers() {
int num = -1; // Presumes onlineServers is a concurrent map
// This synchronized seems gratuitous. return this.onlineServers.size();
synchronized (this.onlineServers) {
num = this.onlineServers.size();
}
return num;
} }
/** /**
@ -476,17 +469,6 @@ public class ServerManager {
" to dead servers, submitted shutdown handler to be executed"); " to dead servers, submitted shutdown handler to be executed");
} }
public boolean canAssignUserRegions() {
if (minimumServerCount == 0) {
return true;
}
return (numServers() >= minimumServerCount);
}
public void setMinimumServerCount(int minimumServerCount) {
this.minimumServerCount = minimumServerCount;
}
// RPC methods to region servers // RPC methods to region servers
/** /**
@ -546,18 +528,25 @@ public class ServerManager {
} }
/** /**
* Waits for the minimum number of servers to be running. * Waits for the regionservers to report in.
* @throws InterruptedException
*/ */
public void waitForMinServers() { public void waitForRegionServers()
while(numServers() < minimumServerCount) { throws InterruptedException {
// !masterStatus.getShutdownRequested().get()) { long interval = this.master.getConfiguration().
LOG.info("Waiting for enough servers to check in. Currently have " + getLong("hbase.master.wait.on.regionservers.interval", 3000);
numServers() + " but need at least " + minimumServerCount); // So, number of regionservers > 0 and its been n since last check in, break,
try { // else just stall here
Thread.sleep(1000); for (int oldcount = countOfRegionServers(); !this.master.isStopped();) {
} catch (InterruptedException e) { Thread.sleep(interval);
LOG.warn("Got interrupted waiting for servers to check in, looping"); int count = countOfRegionServers();
if (count == oldcount && count > 0) break;
if (count == 0) {
LOG.info("Waiting on regionserver(s) to checkin");
} else {
LOG.info("Waiting on regionserver(s) count to settle; currently=" + count);
} }
oldcount = count;
} }
} }
@ -571,8 +560,8 @@ public class ServerManager {
} }
public void shutdownCluster() { public void shutdownCluster() {
LOG.info("Cluster shutdown requested");
this.clusterShutdown = true; this.clusterShutdown = true;
this.master.stop("Cluster shutdown requested");
} }
public boolean isClusterShutdown() { public boolean isClusterShutdown() {

View File

@ -265,7 +265,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
* @throws InterruptedException * @throws InterruptedException
*/ */
public HRegionServer(Configuration conf) throws IOException, InterruptedException { public HRegionServer(Configuration conf) throws IOException, InterruptedException {
machineName = DNS.getDefaultHost(conf.get( this.machineName = DNS.getDefaultHost(conf.get(
"hbase.regionserver.dns.interface", "default"), conf.get( "hbase.regionserver.dns.interface", "default"), conf.get(
"hbase.regionserver.dns.nameserver", "default")); "hbase.regionserver.dns.nameserver", "default"));
String addressStr = machineName String addressStr = machineName
@ -434,18 +434,18 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER + "-" zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER + "-"
+ serverInfo.getServerName(), this); + serverInfo.getServerName(), this);
this.clusterStatusTracker = new ClusterStatusTracker(this.zooKeeper, this);
this.clusterStatusTracker.start();
this.clusterStatusTracker.blockUntilAvailable();
// create the master address manager, register with zk, and start it // create the master address manager, register with zk, and start it
masterAddressManager = new MasterAddressTracker(zooKeeper, this); masterAddressManager = new MasterAddressTracker(zooKeeper, this);
masterAddressManager.start(); masterAddressManager.start();
// create the catalog tracker and start it // Create the catalog tracker and start it;
this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection, this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection,
this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE)); this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE));
catalogTracker.start(); catalogTracker.start();
this.clusterStatusTracker = new ClusterStatusTracker(this.zooKeeper, this);
this.clusterStatusTracker.start();
this.clusterStatusTracker.blockUntilAvailable();
} }
/** /**

View File

@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -57,6 +58,39 @@ public class TestActiveMasterManager {
TEST_UTIL.shutdownMiniZKCluster(); TEST_UTIL.shutdownMiniZKCluster();
} }
@Test public void testRestartMaster() throws IOException, KeeperException {
ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
"testActiveMasterManagerFromZK", null);
ZKUtil.createAndFailSilent(zk, zk.baseZNode);
try {
ZKUtil.deleteNode(zk, zk.masterAddressZNode);
} catch(KeeperException.NoNodeException nne) {}
// Create the master node with a dummy address
HServerAddress master = new HServerAddress("localhost", 1);
// Should not have a master yet
DummyMaster dummyMaster = new DummyMaster();
ActiveMasterManager activeMasterManager = new ActiveMasterManager(zk,
master, dummyMaster);
zk.registerListener(activeMasterManager);
assertFalse(activeMasterManager.clusterHasActiveMaster.get());
// First test becoming the active master uninterrupted
activeMasterManager.blockUntilBecomingActiveMaster();
assertTrue(activeMasterManager.clusterHasActiveMaster.get());
assertMaster(zk, master);
// Now pretend master restart
DummyMaster secondDummyMaster = new DummyMaster();
ActiveMasterManager secondActiveMasterManager = new ActiveMasterManager(zk,
master, secondDummyMaster);
zk.registerListener(secondActiveMasterManager);
assertFalse(secondActiveMasterManager.clusterHasActiveMaster.get());
activeMasterManager.blockUntilBecomingActiveMaster();
assertTrue(activeMasterManager.clusterHasActiveMaster.get());
assertMaster(zk, master);
}
/** /**
* Unit tests that uses ZooKeeper but does not use the master-side methods * Unit tests that uses ZooKeeper but does not use the master-side methods
* but rather acts directly on ZK. * but rather acts directly on ZK.
@ -64,22 +98,21 @@ public class TestActiveMasterManager {
*/ */
@Test @Test
public void testActiveMasterManagerFromZK() throws Exception { public void testActiveMasterManagerFromZK() throws Exception {
ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(), ZooKeeperWatcher zk = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
"testActiveMasterManagerFromZK", null); "testActiveMasterManagerFromZK", null);
ZKUtil.createAndFailSilent(zk, zk.baseZNode); ZKUtil.createAndFailSilent(zk, zk.baseZNode);
try { try {
ZKUtil.deleteNode(zk, zk.masterAddressZNode); ZKUtil.deleteNode(zk, zk.masterAddressZNode);
} catch(KeeperException.NoNodeException nne) {} } catch(KeeperException.NoNodeException nne) {}
// Create the master node with a dummy address // Create the master node with a dummy address
HServerAddress firstMasterAddress = new HServerAddress("firstMaster", 1234); HServerAddress firstMasterAddress = new HServerAddress("localhost", 1);
HServerAddress secondMasterAddress = new HServerAddress("secondMaster", 1234); HServerAddress secondMasterAddress = new HServerAddress("localhost", 2);
// Should not have a master yet // Should not have a master yet
DummyMaster ms1 = new DummyMaster(); DummyMaster ms1 = new DummyMaster();
ActiveMasterManager activeMasterManager = new ActiveMasterManager(zk, ActiveMasterManager activeMasterManager = new ActiveMasterManager(zk,
firstMasterAddress, ms1); firstMasterAddress, ms1);
zk.registerListener(activeMasterManager); zk.registerListener(activeMasterManager);
assertFalse(activeMasterManager.clusterHasActiveMaster.get()); assertFalse(activeMasterManager.clusterHasActiveMaster.get());
@ -132,6 +165,9 @@ public class TestActiveMasterManager {
assertTrue(t.manager.clusterHasActiveMaster.get()); assertTrue(t.manager.clusterHasActiveMaster.get());
assertTrue(t.isActiveMaster); assertTrue(t.isActiveMaster);
LOG.info("Deleting master node");
ZKUtil.deleteNode(zk, zk.masterAddressZNode);
} }
/** /**

View File

@ -1,88 +0,0 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseClusterTestCase;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
public class TestMinimumServerCount extends HBaseClusterTestCase {
static final String TABLE_NAME = "TestTable";
public TestMinimumServerCount() {
// start cluster with one region server only
super(1, true);
}
boolean isTableAvailable(String tableName) throws IOException {
boolean available = true;
HTable meta = new HTable(conf, ".META.");
ResultScanner scanner = meta.getScanner(HConstants.CATALOG_FAMILY);
Result result;
while ((result = scanner.next()) != null) {
// set available to false if a region of the table is found with no
// assigned server
byte[] value = result.getValue(HConstants.CATALOG_FAMILY,
HConstants.SERVER_QUALIFIER);
if (value == null) {
available = false;
break;
}
}
return available;
}
public void testMinimumServerCount() throws Exception {
HBaseAdmin admin = new HBaseAdmin(conf);
// create and disable table
admin.createTable(createTableDescriptor(TABLE_NAME));
admin.disableTable(TABLE_NAME);
assertFalse(admin.isTableEnabled(TABLE_NAME));
// reach in and set minimum server count
cluster.hbaseCluster.getMaster().getServerManager()
.setMinimumServerCount(2);
// now try to enable the table
try {
admin.enableTable(TABLE_NAME);
} catch (IOException ex) {
// ignore
}
Thread.sleep(10 * 1000);
assertFalse(admin.isTableAvailable(TABLE_NAME));
// now start another region server
cluster.startRegionServer();
// sleep a bit for assignment
Thread.sleep(10 * 1000);
assertTrue(admin.isTableAvailable(TABLE_NAME));
}
}