HBASE-3053 Add ability to have multiple Masters LocalHBaseCluster for test writing
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1005263 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1e14da3d78
commit
b2bff276ab
|
@ -1028,6 +1028,8 @@ Release 0.21.0 - Unreleased
|
|||
that are in progress
|
||||
(Nicolas Spiegelberg via Stack)
|
||||
HBASE-3073 New APIs for Result, faster implementation for some calls
|
||||
HBASE-3053 Add ability to have multiple Masters LocalHBaseCluster for
|
||||
test writing
|
||||
|
||||
OPTIMIZATIONS
|
||||
HBASE-410 [testing] Speed up the test suite
|
||||
|
|
|
@ -129,6 +129,9 @@ public final class HConstants {
|
|||
/** Default region server interface class name. */
|
||||
public static final String DEFAULT_REGION_SERVER_CLASS = HRegionInterface.class.getName();
|
||||
|
||||
/** Parameter name for what master implementation to use. */
|
||||
public static final String MASTER_IMPL= "hbase.master.impl";
|
||||
|
||||
/** Parameter name for how often threads should wake up */
|
||||
public static final String THREAD_WAKE_FREQUENCY = "hbase.server.thread.wakefrequency";
|
||||
|
||||
|
|
|
@ -54,14 +54,17 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil;
|
|||
*/
|
||||
public class LocalHBaseCluster {
|
||||
static final Log LOG = LogFactory.getLog(LocalHBaseCluster.class);
|
||||
private final HMaster master;
|
||||
private final List<JVMClusterUtil.RegionServerThread> regionThreads;
|
||||
private final List<JVMClusterUtil.MasterThread> masterThreads =
|
||||
new CopyOnWriteArrayList<JVMClusterUtil.MasterThread>();
|
||||
private final List<JVMClusterUtil.RegionServerThread> regionThreads =
|
||||
new CopyOnWriteArrayList<JVMClusterUtil.RegionServerThread>();
|
||||
private final static int DEFAULT_NO = 1;
|
||||
/** local mode */
|
||||
public static final String LOCAL = "local";
|
||||
/** 'local:' */
|
||||
public static final String LOCAL_COLON = LOCAL + ":";
|
||||
private final Configuration conf;
|
||||
private final Class<? extends HMaster> masterClass;
|
||||
private final Class<? extends HRegionServer> regionServerClass;
|
||||
|
||||
/**
|
||||
|
@ -83,7 +86,23 @@ public class LocalHBaseCluster {
|
|||
*/
|
||||
public LocalHBaseCluster(final Configuration conf, final int noRegionServers)
|
||||
throws IOException {
|
||||
this(conf, noRegionServers, HMaster.class, getRegionServerImplementation(conf));
|
||||
this(conf, 1, noRegionServers, getMasterImplementation(conf),
|
||||
getRegionServerImplementation(conf));
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param conf Configuration to use. Post construction has the active master
|
||||
* address.
|
||||
* @param noMasters Count of masters to start.
|
||||
* @param noRegionServers Count of regionservers to start.
|
||||
* @throws IOException
|
||||
*/
|
||||
public LocalHBaseCluster(final Configuration conf, final int noMasters,
|
||||
final int noRegionServers)
|
||||
throws IOException {
|
||||
this(conf, noMasters, noRegionServers, getMasterImplementation(conf),
|
||||
getRegionServerImplementation(conf));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -92,28 +111,40 @@ public class LocalHBaseCluster {
|
|||
HRegionServer.class);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private static Class<? extends HMaster> getMasterImplementation(final Configuration conf) {
|
||||
return (Class<? extends HMaster>)conf.getClass(HConstants.MASTER_IMPL,
|
||||
HMaster.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param conf Configuration to use. Post construction has the master's
|
||||
* address.
|
||||
* @param noMasters Count of masters to start.
|
||||
* @param noRegionServers Count of regionservers to start.
|
||||
* @param masterClass
|
||||
* @param regionServerClass
|
||||
* @throws IOException
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public LocalHBaseCluster(final Configuration conf,
|
||||
public LocalHBaseCluster(final Configuration conf, final int noMasters,
|
||||
final int noRegionServers, final Class<? extends HMaster> masterClass,
|
||||
final Class<? extends HRegionServer> regionServerClass)
|
||||
throws IOException {
|
||||
this.conf = conf;
|
||||
// Create the master
|
||||
this.master = HMaster.constructMaster(masterClass, conf);
|
||||
// Start the HRegionServers. Always have region servers come up on
|
||||
// port '0' so there won't be clashes over default port as unit tests
|
||||
// start/stop ports at different times during the life of the test.
|
||||
// Always have masters and regionservers come up on port '0' so we don't
|
||||
// clash over default ports.
|
||||
conf.set(HConstants.MASTER_PORT, "0");
|
||||
conf.set(HConstants.REGIONSERVER_PORT, "0");
|
||||
this.regionThreads =
|
||||
new CopyOnWriteArrayList<JVMClusterUtil.RegionServerThread>();
|
||||
// Start the HMasters.
|
||||
this.masterClass =
|
||||
(Class<? extends HMaster>)conf.getClass(HConstants.MASTER_IMPL,
|
||||
masterClass);
|
||||
for (int i = 0; i < noMasters; i++) {
|
||||
addMaster(i);
|
||||
}
|
||||
// Start the HRegionServers.
|
||||
this.regionServerClass =
|
||||
(Class<? extends HRegionServer>)conf.getClass(HConstants.REGION_SERVER_IMPL,
|
||||
regionServerClass);
|
||||
|
@ -138,6 +169,22 @@ public class LocalHBaseCluster {
|
|||
return rst;
|
||||
}
|
||||
|
||||
public JVMClusterUtil.MasterThread addMaster() throws IOException {
|
||||
return addMaster(this.masterThreads.size());
|
||||
}
|
||||
|
||||
public JVMClusterUtil.MasterThread addMaster(final int index)
|
||||
throws IOException {
|
||||
// Create each master with its own Configuration instance so each has
|
||||
// its HConnection instance rather than share (see HBASE_INSTANCES down in
|
||||
// the guts of HConnectionManager.
|
||||
JVMClusterUtil.MasterThread mt =
|
||||
JVMClusterUtil.createMasterThread(new Configuration(this.conf),
|
||||
this.masterClass, index);
|
||||
this.masterThreads.add(mt);
|
||||
return mt;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param serverNumber
|
||||
* @return region server
|
||||
|
@ -146,13 +193,6 @@ public class LocalHBaseCluster {
|
|||
return regionThreads.get(serverNumber).getRegionServer();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the HMaster thread
|
||||
*/
|
||||
public HMaster getMaster() {
|
||||
return this.master;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Read-only list of region server threads.
|
||||
*/
|
||||
|
@ -198,6 +238,73 @@ public class LocalHBaseCluster {
|
|||
return regionServerThread.getName();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param serverNumber
|
||||
* @return the HMaster thread
|
||||
*/
|
||||
public HMaster getMaster(int serverNumber) {
|
||||
return masterThreads.get(serverNumber).getMaster();
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the current active master, if available. If no active master, returns
|
||||
* null.
|
||||
* @return the HMaster for the active master
|
||||
*/
|
||||
public HMaster getActiveMaster() {
|
||||
for (JVMClusterUtil.MasterThread mt : masterThreads) {
|
||||
if (mt.getMaster().isActiveMaster()) {
|
||||
return mt.getMaster();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Read-only list of master threads.
|
||||
*/
|
||||
public List<JVMClusterUtil.MasterThread> getMasters() {
|
||||
return Collections.unmodifiableList(this.masterThreads);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return List of running master servers (Some servers may have been killed
|
||||
* or aborted during lifetime of cluster; these servers are not included in
|
||||
* this list).
|
||||
*/
|
||||
public List<JVMClusterUtil.MasterThread> getLiveMasters() {
|
||||
List<JVMClusterUtil.MasterThread> liveServers =
|
||||
new ArrayList<JVMClusterUtil.MasterThread>();
|
||||
List<JVMClusterUtil.MasterThread> list = getMasters();
|
||||
for (JVMClusterUtil.MasterThread mt: list) {
|
||||
if (mt.isAlive()) {
|
||||
liveServers.add(mt);
|
||||
}
|
||||
}
|
||||
return liveServers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the specified master to stop
|
||||
* Removes this thread from list of running threads.
|
||||
* @param serverNumber
|
||||
* @return Name of master that just went down.
|
||||
*/
|
||||
public String waitOnMaster(int serverNumber) {
|
||||
JVMClusterUtil.MasterThread masterThread =
|
||||
this.masterThreads.remove(serverNumber);
|
||||
while (masterThread.isAlive()) {
|
||||
try {
|
||||
LOG.info("Waiting on " +
|
||||
masterThread.getMaster().getServerName().toString());
|
||||
masterThread.join();
|
||||
} catch (InterruptedException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
return masterThread.getName();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for Mini HBase Cluster to shut down.
|
||||
* Presumes you've already called {@link #shutdown()}.
|
||||
|
@ -214,11 +321,15 @@ public class LocalHBaseCluster {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (this.master != null && this.master.isAlive()) {
|
||||
try {
|
||||
this.master.join();
|
||||
} catch(InterruptedException e) {
|
||||
// continue
|
||||
if (this.masterThreads != null) {
|
||||
for (Thread t : this.masterThreads) {
|
||||
if (t.isAlive()) {
|
||||
try {
|
||||
t.join();
|
||||
} catch (InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -227,14 +338,14 @@ public class LocalHBaseCluster {
|
|||
* Start the cluster.
|
||||
*/
|
||||
public void startup() {
|
||||
JVMClusterUtil.startup(this.master, this.regionThreads);
|
||||
JVMClusterUtil.startup(this.masterThreads, this.regionThreads);
|
||||
}
|
||||
|
||||
/**
|
||||
* Shut down the mini HBase cluster
|
||||
*/
|
||||
public void shutdown() {
|
||||
JVMClusterUtil.shutdown(this.master, this.regionThreads);
|
||||
JVMClusterUtil.shutdown(this.masterThreads, this.regionThreads);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.HServerInfo;
|
|||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.TableExistsException;
|
||||
import org.apache.hadoop.hbase.TableNotDisabledException;
|
||||
|
@ -80,7 +79,6 @@ import org.apache.hadoop.hbase.util.Threads;
|
|||
import org.apache.hadoop.hbase.util.VersionInfo;
|
||||
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||
import org.apache.hadoop.io.MapWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
@ -131,17 +129,17 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
// Address of the HMaster
|
||||
private final HServerAddress address;
|
||||
// file system manager for the master FS operations
|
||||
private final MasterFileSystem fileSystemManager;
|
||||
private MasterFileSystem fileSystemManager;
|
||||
|
||||
private final HConnection connection;
|
||||
private HConnection connection;
|
||||
|
||||
// server manager to deal with region server info
|
||||
private final ServerManager serverManager;
|
||||
private ServerManager serverManager;
|
||||
|
||||
// manager of assignment nodes in zookeeper
|
||||
final AssignmentManager assignmentManager;
|
||||
AssignmentManager assignmentManager;
|
||||
// manager of catalog regions
|
||||
private final CatalogTracker catalogTracker;
|
||||
private CatalogTracker catalogTracker;
|
||||
// Cluster status zk tracker and local setter
|
||||
private ClusterStatusTracker clusterStatusTracker;
|
||||
|
||||
|
@ -150,6 +148,10 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
private volatile boolean stopped = false;
|
||||
// Set on abort -- usually failure of our zk session.
|
||||
private volatile boolean abort = false;
|
||||
// flag set after we become the active master (used for testing)
|
||||
protected volatile boolean isActiveMaster = false;
|
||||
// flag set after we complete initialization once active (used for testing)
|
||||
protected volatile boolean isInitialized = false;
|
||||
|
||||
// Instance of the hbase executor service.
|
||||
ExecutorService executorService;
|
||||
|
@ -163,21 +165,22 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
|
||||
/**
|
||||
* Initializes the HMaster. The steps are as follows:
|
||||
*
|
||||
* <p>
|
||||
* <ol>
|
||||
* <li>Initialize HMaster RPC and address
|
||||
* <li>Connect to ZooKeeper. Get count of regionservers still up.
|
||||
* <li>Block until becoming active master
|
||||
* <li>Initialize master components - server manager, region manager,
|
||||
* region server queue, file system manager, etc
|
||||
* <li>Connect to ZooKeeper.
|
||||
* </ol>
|
||||
* <p>
|
||||
* Remaining steps of initialization occur in {@link #run()} so that they
|
||||
* run in their own thread rather than within the context of the constructor.
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public HMaster(final Configuration conf)
|
||||
throws IOException, KeeperException, InterruptedException {
|
||||
this.conf = conf;
|
||||
|
||||
/*
|
||||
* 1. Determine address and initialize RPC server (but do not start).
|
||||
* Determine address and initialize RPC server (but do not start).
|
||||
* The RPC server ports can be ephemeral. Create a ZKW instance.
|
||||
*/
|
||||
HServerAddress a = new HServerAddress(getMyAddress(this.conf));
|
||||
|
@ -201,32 +204,136 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
"_" + System.currentTimeMillis());
|
||||
}
|
||||
|
||||
this.zooKeeper = new ZooKeeperWatcher(conf, MASTER, this);
|
||||
this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" +
|
||||
address.getPort(), this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stall startup if we are designated a backup master; i.e. we want someone
|
||||
* else to become the master before proceeding.
|
||||
* @param c
|
||||
* @param amm
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
private static void stallIfBackupMaster(final Configuration c,
|
||||
final ActiveMasterManager amm)
|
||||
throws InterruptedException {
|
||||
// If we're a backup master, stall until a primary to writes his address
|
||||
if (!c.getBoolean(HConstants.MASTER_TYPE_BACKUP,
|
||||
HConstants.DEFAULT_MASTER_TYPE_BACKUP)) {
|
||||
return;
|
||||
}
|
||||
// This will only be a minute or so while the cluster starts up,
|
||||
// so don't worry about setting watches on the parent znode
|
||||
while (!amm.isActiveMaster()) {
|
||||
LOG.debug("Waiting for master address ZNode to be written " +
|
||||
"(Also watching cluster state node)");
|
||||
Thread.sleep(c.getInt("zookeeper.session.timeout", 60 * 1000));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Main processing loop for the HMaster.
|
||||
* <ol>
|
||||
* <li>Block until becoming active master
|
||||
* <li>Finish initialization via {@link #finishInitialization()}
|
||||
* <li>Enter loop until we are stopped
|
||||
* <li>Stop services and perform cleanup once stopped
|
||||
* </ol>
|
||||
*/
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
/*
|
||||
* Block on becoming the active master.
|
||||
*
|
||||
* We race with other masters to write our address into ZooKeeper. If we
|
||||
* succeed, we are the primary/active master and finish initialization.
|
||||
*
|
||||
* If we do not succeed, there is another active master and we should
|
||||
* now wait until it dies to try and become the next active master. If we
|
||||
* do not succeed on our first attempt, this is no longer a cluster startup.
|
||||
*/
|
||||
this.activeMasterManager = new ActiveMasterManager(zooKeeper, address, this);
|
||||
this.zooKeeper.registerListener(activeMasterManager);
|
||||
stallIfBackupMaster(this.conf, this.activeMasterManager);
|
||||
activeMasterManager.blockUntilBecomingActiveMaster();
|
||||
|
||||
// We are either the active master or we were asked to shutdown
|
||||
|
||||
if (!this.stopped) {
|
||||
// We are active master. Finish init and loop until we are closed.
|
||||
finishInitialization();
|
||||
loop();
|
||||
// Once we break out of here, we are being shutdown
|
||||
|
||||
// Stop balancer and meta catalog janitor
|
||||
if (this.balancerChore != null) {
|
||||
this.balancerChore.interrupt();
|
||||
}
|
||||
if (this.catalogJanitorChore != null) {
|
||||
this.catalogJanitorChore.interrupt();
|
||||
}
|
||||
|
||||
// Wait for all the remaining region servers to report in IFF we were
|
||||
// running a cluster shutdown AND we were NOT aborting.
|
||||
if (!this.abort && this.serverManager.isClusterShutdown()) {
|
||||
this.serverManager.letRegionServersShutdown();
|
||||
}
|
||||
stopServiceThreads();
|
||||
}
|
||||
|
||||
// Handle either a backup or active master being stopped
|
||||
|
||||
// Stop services started for both backup and active masters
|
||||
this.activeMasterManager.stop();
|
||||
HConnectionManager.deleteConnection(this.conf, true);
|
||||
this.zooKeeper.close();
|
||||
LOG.info("HMaster main thread exiting");
|
||||
|
||||
} catch (Throwable t) {
|
||||
abort("Unhandled exception. Starting shutdown.", t);
|
||||
}
|
||||
}
|
||||
|
||||
private void loop() {
|
||||
// Check if we should stop every second.
|
||||
Sleeper sleeper = new Sleeper(1000, this);
|
||||
while (!this.stopped) {
|
||||
sleeper.sleep();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Finish initialization of HMaster after becoming the primary master.
|
||||
*
|
||||
* <ol>
|
||||
* <li>Initialize master components - file system manager, server manager,
|
||||
* assignment manager, region server tracker, catalog tracker, etc</li>
|
||||
* <li>Start necessary service threads - rpc server, info server,
|
||||
* executor services, etc</li>
|
||||
* <li>Set cluster as UP in ZooKeeper</li>
|
||||
* <li>Wait for RegionServers to check-in</li>
|
||||
* <li>Split logs and perform data recovery, if necessary</li>
|
||||
* <li>Ensure assignment of root and meta regions<li>
|
||||
* <li>Handle either fresh cluster start or master failover</li>
|
||||
* </ol>
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
* @throws KeeperException
|
||||
*/
|
||||
private void finishInitialization()
|
||||
throws IOException, InterruptedException, KeeperException {
|
||||
|
||||
isActiveMaster = true;
|
||||
|
||||
/*
|
||||
* 2. Count of regoinservers that are up.
|
||||
*/
|
||||
int count = ZKUtil.getNumberOfChildren(zooKeeper, zooKeeper.rsZNode);
|
||||
|
||||
/*
|
||||
* 3. Block on becoming the active master.
|
||||
* We race with other masters to write our address into ZooKeeper. If we
|
||||
* succeed, we are the primary/active master and finish initialization.
|
||||
*
|
||||
* If we do not succeed, there is another active master and we should
|
||||
* now wait until it dies to try and become the next active master. If we
|
||||
* do not succeed on our first attempt, this is no longer a cluster startup.
|
||||
*/
|
||||
this.activeMasterManager = new ActiveMasterManager(zooKeeper, address, this);
|
||||
this.zooKeeper.registerListener(activeMasterManager);
|
||||
stallIfBackupMaster(this.conf, this.activeMasterManager);
|
||||
activeMasterManager.blockUntilBecomingActiveMaster();
|
||||
|
||||
/*
|
||||
* 4. We are active master now... go initialize components we need to run.
|
||||
* We are active master now... go initialize components we need to run.
|
||||
* Note, there may be dross in zk from previous runs; it'll get addressed
|
||||
* when we enter {@link #run()} below.
|
||||
* below after we determine if cluster startup or failover.
|
||||
*/
|
||||
|
||||
// TODO: Do this using Dependency Injection, using PicoContainer, Guice or Spring.
|
||||
this.fileSystemManager = new MasterFileSystem(this);
|
||||
this.connection = HConnectionManager.getConnection(conf);
|
||||
|
@ -249,107 +356,49 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
// Set the cluster as up. If new RSs, they'll be waiting on this before
|
||||
// going ahead with their startup.
|
||||
this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this);
|
||||
this.clusterStatusTracker.start();
|
||||
boolean wasUp = this.clusterStatusTracker.isClusterUp();
|
||||
if (!wasUp) this.clusterStatusTracker.setClusterUp();
|
||||
this.clusterStatusTracker.start();
|
||||
|
||||
LOG.info("Server active/primary master; " + this.address +
|
||||
", sessionid=0x" +
|
||||
Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()) +
|
||||
", ephemeral nodes still up in zk=" + count +
|
||||
", cluster-up flag was=" + wasUp);
|
||||
}
|
||||
|
||||
/*
|
||||
* Stall startup if we are designated a backup master; i.e. we want someone
|
||||
* else to become the master before proceeding.
|
||||
* @param c
|
||||
* @param amm
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
private static void stallIfBackupMaster(final Configuration c,
|
||||
final ActiveMasterManager amm)
|
||||
throws InterruptedException {
|
||||
// If we're a backup master, stall until a primary to writes his address
|
||||
if (!c.getBoolean(HConstants.MASTER_TYPE_BACKUP,
|
||||
HConstants.DEFAULT_MASTER_TYPE_BACKUP)) return;
|
||||
// This will only be a minute or so while the cluster starts up,
|
||||
// so don't worry about setting watches on the parent znode
|
||||
while (!amm.isActiveMaster()) {
|
||||
LOG.debug("Waiting for master address ZNode to be written " +
|
||||
"(Also watching cluster state node)");
|
||||
Thread.sleep(c.getInt("zookeeper.session.timeout", 60 * 1000));
|
||||
// start up all service threads.
|
||||
startServiceThreads();
|
||||
|
||||
// Wait for region servers to report in. Returns count of regions.
|
||||
int regionCount = this.serverManager.waitForRegionServers();
|
||||
|
||||
// TODO: Should do this in background rather than block master startup
|
||||
this.fileSystemManager.
|
||||
splitLogAfterStartup(this.serverManager.getOnlineServers());
|
||||
|
||||
// Make sure root and meta assigned before proceeding.
|
||||
assignRootAndMeta();
|
||||
|
||||
// Is this fresh start with no regions assigned or are we a master joining
|
||||
// an already-running cluster? If regionsCount == 0, then for sure a
|
||||
// fresh start. TOOD: Be fancier. If regionsCount == 2, perhaps the
|
||||
// 2 are .META. and -ROOT- and we should fall into the fresh startup
|
||||
// branch below. For now, do processFailover.
|
||||
if (regionCount == 0) {
|
||||
LOG.info("Master startup proceeding: cluster startup");
|
||||
this.assignmentManager.cleanoutUnassigned();
|
||||
this.assignmentManager.assignAllUserRegions();
|
||||
} else {
|
||||
LOG.info("Master startup proceeding: master failover");
|
||||
this.assignmentManager.processFailover();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Main processing loop for the HMaster.
|
||||
* <ol>
|
||||
* <li>Handle both fresh cluster start as well as failed over initialization of
|
||||
* the HMaster</li>
|
||||
* <li>Start the necessary services</li>
|
||||
* <li>Reassign the root region</li>
|
||||
* <li>The master is no longer closed - set "closed" to false</li>
|
||||
* </ol>
|
||||
*/
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
// start up all service threads.
|
||||
startServiceThreads();
|
||||
// Start balancer and meta catalog janitor after meta and regions have
|
||||
// been assigned.
|
||||
this.balancerChore = getAndStartBalancerChore(this);
|
||||
this.catalogJanitorChore =
|
||||
Threads.setDaemonThreadRunning(new CatalogJanitor(this, this));
|
||||
|
||||
// Wait for region servers to report in. Returns count of regions.
|
||||
int regionCount = this.serverManager.waitForRegionServers();
|
||||
|
||||
// TODO: Should do this in background rather than block master startup
|
||||
// TODO: Do we want to do this before/while/after RSs check in?
|
||||
// It seems that this method looks at active RSs but happens concurrently
|
||||
// with when we expect them to be checking in
|
||||
this.fileSystemManager.
|
||||
splitLogAfterStartup(this.serverManager.getOnlineServers());
|
||||
|
||||
// Make sure root and meta assigned before proceeding.
|
||||
assignRootAndMeta();
|
||||
|
||||
// Is this fresh start with no regions assigned or are we a master joining
|
||||
// an already-running cluster? If regionsCount == 0, then for sure a
|
||||
// fresh start. TOOD: Be fancier. If regionsCount == 2, perhaps the
|
||||
// 2 are .META. and -ROOT- and we should fall into the fresh startup
|
||||
// branch below. For now, do processFailover.
|
||||
if (regionCount == 0) {
|
||||
this.assignmentManager.cleanoutUnassigned();
|
||||
this.assignmentManager.assignAllUserRegions();
|
||||
} else {
|
||||
this.assignmentManager.processFailover();
|
||||
}
|
||||
|
||||
// Start balancer and meta catalog janitor after meta and regions have
|
||||
// been assigned.
|
||||
this.balancerChore = getAndStartBalancerChore(this);
|
||||
this.catalogJanitorChore =
|
||||
Threads.setDaemonThreadRunning(new CatalogJanitor(this, this));
|
||||
|
||||
// Check if we should stop every second.
|
||||
Sleeper sleeper = new Sleeper(1000, this);
|
||||
while (!this.stopped) sleeper.sleep();
|
||||
} catch (Throwable t) {
|
||||
abort("Unhandled exception. Starting shutdown.", t);
|
||||
}
|
||||
// Stop balancer and meta catalog janitor
|
||||
if (this.balancerChore != null) this.balancerChore.interrupt();
|
||||
if (this.catalogJanitorChore != null) this.catalogJanitorChore.interrupt();
|
||||
|
||||
// Wait for all the remaining region servers to report in IFF we were
|
||||
// running a cluster shutdown AND we were NOT aborting.
|
||||
if (!this.abort && this.serverManager.isClusterShutdown()) {
|
||||
this.serverManager.letRegionServersShutdown();
|
||||
}
|
||||
stopServiceThreads();
|
||||
// Stop services started up in the constructor.
|
||||
this.activeMasterManager.stop();
|
||||
HConnectionManager.deleteConnection(this.conf, true);
|
||||
this.zooKeeper.close();
|
||||
LOG.info("HMaster main thread exiting");
|
||||
isInitialized = true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -374,7 +423,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
assigned++;
|
||||
}
|
||||
LOG.info("-ROOT- assigned=" + assigned + ", rit=" + rit);
|
||||
|
||||
|
||||
// Work on meta region
|
||||
rit = this.assignmentManager.
|
||||
processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO);
|
||||
|
@ -482,11 +531,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
}
|
||||
} catch (IOException e) {
|
||||
if (e instanceof RemoteException) {
|
||||
try {
|
||||
e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
|
||||
} catch (IOException ex) {
|
||||
LOG.warn("thread start", ex);
|
||||
}
|
||||
e = ((RemoteException)e).unwrapRemoteException();
|
||||
}
|
||||
// Something happened during startup. Shut things down.
|
||||
abort("Failed startup", e);
|
||||
|
@ -631,8 +676,10 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
throws UnknownRegionException {
|
||||
Pair<HRegionInfo, HServerInfo> p =
|
||||
this.assignmentManager.getAssignment(encodedRegionName);
|
||||
if (p == null) throw new UnknownRegionException(Bytes.toString(encodedRegionName));
|
||||
HServerInfo dest = this.serverManager.getServerInfo(new String(destServerName));
|
||||
if (p == null)
|
||||
throw new UnknownRegionException(Bytes.toString(encodedRegionName));
|
||||
HServerInfo dest =
|
||||
this.serverManager.getServerInfo(new String(destServerName));
|
||||
RegionPlan rp = new RegionPlan(p.getFirst(), p.getSecond(), dest);
|
||||
this.assignmentManager.balance(rp);
|
||||
}
|
||||
|
@ -865,6 +912,10 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
public void stop(final String why) {
|
||||
LOG.info(why);
|
||||
this.stopped = true;
|
||||
// If we are a backup master, we need to interrupt wait
|
||||
synchronized (this.activeMasterManager.clusterHasActiveMaster) {
|
||||
this.activeMasterManager.clusterHasActiveMaster.notifyAll();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -872,6 +923,31 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
return this.stopped;
|
||||
}
|
||||
|
||||
/**
|
||||
* Report whether this master is currently the active master or not.
|
||||
* If not active master, we are parked on ZK waiting to become active.
|
||||
*
|
||||
* This method is used for testing.
|
||||
*
|
||||
* @return true if active master, false if not.
|
||||
*/
|
||||
public boolean isActiveMaster() {
|
||||
return isActiveMaster;
|
||||
}
|
||||
|
||||
/**
|
||||
* Report whether this master has completed with its initialization and is
|
||||
* ready. If ready, the master is also the active master. A standby master
|
||||
* is never ready.
|
||||
*
|
||||
* This method is used for testing.
|
||||
*
|
||||
* @return true if master is ready to go, false if not.
|
||||
*/
|
||||
public boolean isInitialized() {
|
||||
return isInitialized;
|
||||
}
|
||||
|
||||
public void assignRegion(HRegionInfo hri) {
|
||||
assignmentManager.assign(hri);
|
||||
}
|
||||
|
|
|
@ -135,9 +135,9 @@ public class HMasterCommandLine extends ServerCommandLine {
|
|||
Integer.toString(clientPort));
|
||||
// Need to have the zk cluster shutdown when master is shutdown.
|
||||
// Run a subclass that does the zk cluster shutdown on its way out.
|
||||
LocalHBaseCluster cluster = new LocalHBaseCluster(conf, 1,
|
||||
LocalHBaseCluster cluster = new LocalHBaseCluster(conf, 1, 1,
|
||||
LocalHMaster.class, HRegionServer.class);
|
||||
((LocalHMaster)cluster.getMaster()).setZKCluster(zooKeeperCluster);
|
||||
((LocalHMaster)cluster.getMaster(0)).setZKCluster(zooKeeperCluster);
|
||||
cluster.startup();
|
||||
} else {
|
||||
HMaster master = HMaster.constructMaster(masterClass, conf);
|
||||
|
|
|
@ -255,7 +255,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
*
|
||||
* @param conf
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public HRegionServer(Configuration conf) throws IOException, InterruptedException {
|
||||
this.fsOk = true;
|
||||
|
@ -282,7 +282,36 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
|
||||
HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
|
||||
|
||||
initialize();
|
||||
this.abortRequested = false;
|
||||
this.stopped = false;
|
||||
|
||||
// Server to handle client requests
|
||||
String machineName = DNS.getDefaultHost(conf.get(
|
||||
"hbase.regionserver.dns.interface", "default"), conf.get(
|
||||
"hbase.regionserver.dns.nameserver", "default"));
|
||||
String addressStr = machineName + ":" +
|
||||
conf.get(HConstants.REGIONSERVER_PORT,
|
||||
Integer.toString(HConstants.DEFAULT_REGIONSERVER_PORT));
|
||||
HServerAddress address = new HServerAddress(addressStr);
|
||||
this.server = HBaseRPC.getServer(this,
|
||||
new Class<?>[]{HRegionInterface.class, HBaseRPCErrorHandler.class,
|
||||
OnlineRegions.class},
|
||||
address.getBindAddress(),
|
||||
address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
|
||||
conf.getInt("hbase.regionserver.metahandler.count", 10),
|
||||
false, conf, QOS_THRESHOLD);
|
||||
this.server.setErrorHandler(this);
|
||||
this.server.setQosFunction(new QosFunction());
|
||||
|
||||
// HServerInfo can be amended by master. See below in reportForDuty.
|
||||
this.serverInfo = new HServerInfo(new HServerAddress(new InetSocketAddress(
|
||||
address.getBindAddress(), this.server.getListenerAddress().getPort())),
|
||||
System.currentTimeMillis(), this.conf.getInt(
|
||||
"hbase.regionserver.info.port", 60030), machineName);
|
||||
if (this.serverInfo.getServerAddress() == null) {
|
||||
throw new NullPointerException("Server address cannot be null; "
|
||||
+ "hbase-958 debugging");
|
||||
}
|
||||
}
|
||||
|
||||
private static final int NORMAL_QOS = 0;
|
||||
|
@ -370,39 +399,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
* call it.
|
||||
*
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
private void initialize() throws IOException, InterruptedException {
|
||||
this.abortRequested = false;
|
||||
this.stopped = false;
|
||||
|
||||
// Server to handle client requests
|
||||
String machineName = DNS.getDefaultHost(conf.get(
|
||||
"hbase.regionserver.dns.interface", "default"), conf.get(
|
||||
"hbase.regionserver.dns.nameserver", "default"));
|
||||
String addressStr = machineName + ":" +
|
||||
conf.get(HConstants.REGIONSERVER_PORT,
|
||||
Integer.toString(HConstants.DEFAULT_REGIONSERVER_PORT));
|
||||
HServerAddress address = new HServerAddress(addressStr);
|
||||
this.server = HBaseRPC.getServer(this,
|
||||
new Class<?>[]{HRegionInterface.class, HBaseRPCErrorHandler.class,
|
||||
OnlineRegions.class},
|
||||
address.getBindAddress(),
|
||||
address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
|
||||
conf.getInt("hbase.regionserver.metahandler.count", 10),
|
||||
false, conf, QOS_THRESHOLD);
|
||||
this.server.setErrorHandler(this);
|
||||
this.server.setQosFunction(new QosFunction());
|
||||
|
||||
// HServerInfo can be amended by master. See below in reportForDuty.
|
||||
this.serverInfo = new HServerInfo(new HServerAddress(new InetSocketAddress(
|
||||
address.getBindAddress(), this.server.getListenerAddress().getPort())),
|
||||
System.currentTimeMillis(), this.conf.getInt(
|
||||
"hbase.regionserver.info.port", 60030), machineName);
|
||||
if (this.serverInfo.getServerAddress() == null) {
|
||||
throw new NullPointerException("Server address cannot be null; "
|
||||
+ "hbase-958 debugging");
|
||||
}
|
||||
initializeZooKeeper();
|
||||
initializeThreads();
|
||||
int nbBlocks = conf.getInt("hbase.regionserver.nbreservationblocks", 4);
|
||||
|
@ -421,7 +420,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
*/
|
||||
private void initializeZooKeeper() throws IOException, InterruptedException {
|
||||
// Open connection to zookeeper and set primary watcher
|
||||
zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER +
|
||||
zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER + ":" +
|
||||
serverInfo.getServerAddress().getPort(), this);
|
||||
|
||||
// Create the master address manager, register with zk, and start it. Then
|
||||
|
@ -437,7 +436,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
this.clusterStatusTracker.start();
|
||||
this.clusterStatusTracker.blockUntilAvailable();
|
||||
|
||||
// Create the catalog tracker and start it;
|
||||
// Create the catalog tracker and start it;
|
||||
this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection,
|
||||
this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE));
|
||||
catalogTracker.start();
|
||||
|
@ -477,6 +476,14 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
* load/unload instructions.
|
||||
*/
|
||||
public void run() {
|
||||
|
||||
try {
|
||||
// Initialize threads and wait for a master
|
||||
initialize();
|
||||
} catch (Exception e) {
|
||||
abort("Fatal exception during initialization", e);
|
||||
}
|
||||
|
||||
this.regionServerThread = Thread.currentThread();
|
||||
boolean calledCloseUserRegions = false;
|
||||
try {
|
||||
|
@ -622,9 +629,19 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
this.serverInfo.setLoad(buildServerLoad());
|
||||
this.requestCount.set(0);
|
||||
addOutboundMsgs(outboundMessages);
|
||||
HMsg [] msgs = this.hbaseMaster.regionServerReport(this.serverInfo,
|
||||
outboundMessages.toArray(HMsg.EMPTY_HMSG_ARRAY),
|
||||
getMostLoadedRegions());
|
||||
HMsg [] msgs = null;
|
||||
while (!this.stopped) {
|
||||
try {
|
||||
msgs = this.hbaseMaster.regionServerReport(this.serverInfo,
|
||||
outboundMessages.toArray(HMsg.EMPTY_HMSG_ARRAY),
|
||||
getMostLoadedRegions());
|
||||
break;
|
||||
} catch (IOException ioe) {
|
||||
// Couldn't connect to the master, get location from zk and reconnect
|
||||
// Method blocks until new master is found or we are stopped
|
||||
getMaster();
|
||||
}
|
||||
}
|
||||
updateOutboundMsgs(outboundMessages);
|
||||
outboundMessages.clear();
|
||||
|
||||
|
@ -754,7 +771,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
// hack! Maps DFSClient => RegionServer for logs. HDFS made this
|
||||
// config param for task trackers, but we can piggyback off of it.
|
||||
if (this.conf.get("mapred.task.id") == null) {
|
||||
this.conf.set("mapred.task.id",
|
||||
this.conf.set("mapred.task.id",
|
||||
"hb_rs_" + this.serverInfo.getServerName() + "_" +
|
||||
System.currentTimeMillis());
|
||||
}
|
||||
|
@ -1297,18 +1314,17 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
* Method will block until a master is available. You can break from this
|
||||
* block by requesting the server stop.
|
||||
*
|
||||
* @return
|
||||
* @return master address, or null if server has been stopped
|
||||
*/
|
||||
private boolean getMaster() {
|
||||
private HServerAddress getMaster() {
|
||||
HServerAddress masterAddress = null;
|
||||
while ((masterAddress = masterAddressManager.getMasterAddress()) == null) {
|
||||
if (stopped) {
|
||||
return false;
|
||||
return null;
|
||||
}
|
||||
LOG.debug("No master found, will retry");
|
||||
sleeper.sleep();
|
||||
}
|
||||
LOG.info("Telling master at " + masterAddress + " that we are up");
|
||||
HMasterRegionInterface master = null;
|
||||
while (!stopped && master == null) {
|
||||
try {
|
||||
|
@ -1323,8 +1339,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
sleeper.sleep();
|
||||
}
|
||||
}
|
||||
LOG.info("Connected to master at " + masterAddress);
|
||||
this.hbaseMaster = master;
|
||||
return true;
|
||||
return masterAddress;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1347,7 +1364,8 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
* us by the master.
|
||||
*/
|
||||
private MapWritable reportForDuty() {
|
||||
while (!stopped && !getMaster()) {
|
||||
HServerAddress masterAddress = null;
|
||||
while (!stopped && (masterAddress = getMaster()) == null) {
|
||||
sleeper.sleep();
|
||||
LOG.warn("Unable to get master for initialization");
|
||||
}
|
||||
|
@ -1362,6 +1380,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
ZKUtil.joinZNode(zooKeeper.rsZNode, ZKUtil.getNodeName(serverInfo)),
|
||||
this.serverInfo.getServerAddress());
|
||||
this.serverInfo.setLoad(buildServerLoad());
|
||||
LOG.info("Telling master at " + masterAddress + " that we are up");
|
||||
result = this.hbaseMaster.regionServerStartup(this.serverInfo);
|
||||
break;
|
||||
} catch (IOException e) {
|
||||
|
@ -2373,7 +2392,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
|||
public CompactionRequestor getCompactionRequester() {
|
||||
return this.compactSplitThread;
|
||||
}
|
||||
|
||||
|
||||
//
|
||||
// Main program and support routines
|
||||
//
|
||||
|
|
|
@ -76,12 +76,13 @@ public class JVMClusterUtil {
|
|||
* Call 'start' on the returned thread to make it run.
|
||||
* @param c Configuration to use.
|
||||
* @param hrsc Class to create.
|
||||
* @param index Used distingushing the object returned.
|
||||
* @param index Used distinguishing the object returned.
|
||||
* @throws IOException
|
||||
* @return Region server added.
|
||||
*/
|
||||
public static JVMClusterUtil.RegionServerThread createRegionServerThread(final Configuration c,
|
||||
final Class<? extends HRegionServer> hrsc, final int index)
|
||||
public static JVMClusterUtil.RegionServerThread createRegionServerThread(
|
||||
final Configuration c, final Class<? extends HRegionServer> hrsc,
|
||||
final int index)
|
||||
throws IOException {
|
||||
HRegionServer server;
|
||||
try {
|
||||
|
@ -99,32 +100,121 @@ public class JVMClusterUtil {
|
|||
return new JVMClusterUtil.RegionServerThread(server, index);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Start the cluster.
|
||||
* Datastructure to hold Master Thread and Master instance
|
||||
*/
|
||||
public static class MasterThread extends Thread {
|
||||
private final HMaster master;
|
||||
|
||||
public MasterThread(final HMaster m, final int index) {
|
||||
super(m, "Master:" + index + ";" + m.getServerName());
|
||||
this.master = m;
|
||||
}
|
||||
|
||||
/** @return the master */
|
||||
public HMaster getMaster() {
|
||||
return this.master;
|
||||
}
|
||||
|
||||
/**
|
||||
* Block until the master has come online, indicating it is ready
|
||||
* to be used.
|
||||
*/
|
||||
public void waitForServerOnline() {
|
||||
// The server is marked online after init begins but before race to become
|
||||
// the active master.
|
||||
while (!this.master.isAlive() && !this.master.isStopped()) {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
// continue waiting
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a {@link MasterThread}.
|
||||
* Call 'start' on the returned thread to make it run.
|
||||
* @param c Configuration to use.
|
||||
* @param hmc Class to create.
|
||||
* @param index Used distinguishing the object returned.
|
||||
* @throws IOException
|
||||
* @return Master added.
|
||||
*/
|
||||
public static JVMClusterUtil.MasterThread createMasterThread(
|
||||
final Configuration c, final Class<? extends HMaster> hmc,
|
||||
final int index)
|
||||
throws IOException {
|
||||
HMaster server;
|
||||
try {
|
||||
server = hmc.getConstructor(Configuration.class).newInstance(c);
|
||||
} catch (InvocationTargetException ite) {
|
||||
Throwable target = ite.getTargetException();
|
||||
throw new RuntimeException("Failed construction of RegionServer: " +
|
||||
hmc.toString() + ((target.getCause() != null)?
|
||||
target.getCause().getMessage(): ""), target);
|
||||
} catch (Exception e) {
|
||||
IOException ioe = new IOException();
|
||||
ioe.initCause(e);
|
||||
throw ioe;
|
||||
}
|
||||
return new JVMClusterUtil.MasterThread(server, index);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start the cluster. Waits until there is a primary master and returns its
|
||||
* address.
|
||||
* @param m
|
||||
* @param regionServers
|
||||
* @return Address to use contacting master.
|
||||
* @return Address to use contacting primary master.
|
||||
*/
|
||||
public static String startup(final HMaster m,
|
||||
public static String startup(final List<JVMClusterUtil.MasterThread> masters,
|
||||
final List<JVMClusterUtil.RegionServerThread> regionservers) {
|
||||
if (m != null) m.start();
|
||||
if (masters != null) {
|
||||
for (JVMClusterUtil.MasterThread t : masters) {
|
||||
t.start();
|
||||
}
|
||||
}
|
||||
if (regionservers != null) {
|
||||
for (JVMClusterUtil.RegionServerThread t: regionservers) {
|
||||
t.start();
|
||||
}
|
||||
}
|
||||
return m == null? null: m.getMasterAddress().toString();
|
||||
if (masters == null || masters.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
// Wait for an active master
|
||||
while (true) {
|
||||
for (JVMClusterUtil.MasterThread t : masters) {
|
||||
if (t.master.isActiveMaster()) {
|
||||
return t.master.getMasterAddress().toString();
|
||||
}
|
||||
}
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch(InterruptedException e) {
|
||||
// Keep waiting
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param master
|
||||
* @param regionservers
|
||||
*/
|
||||
public static void shutdown(final HMaster master,
|
||||
public static void shutdown(final List<MasterThread> masters,
|
||||
final List<RegionServerThread> regionservers) {
|
||||
LOG.debug("Shutting down HBase Cluster");
|
||||
if (master != null) {
|
||||
master.shutdown();
|
||||
if (masters != null) {
|
||||
for (JVMClusterUtil.MasterThread t : masters) {
|
||||
if (t.master.isActiveMaster()) {
|
||||
t.master.shutdown();
|
||||
} else {
|
||||
t.master.stopMaster();
|
||||
}
|
||||
}
|
||||
}
|
||||
// regionServerThreads can never be null because they are initialized when
|
||||
// the class is constructed.
|
||||
|
@ -137,20 +227,23 @@ public class JVMClusterUtil {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (master != null) {
|
||||
while (master.isAlive()) {
|
||||
try {
|
||||
// The below has been replaced to debug sometime hangs on end of
|
||||
// tests.
|
||||
// this.master.join():
|
||||
Threads.threadDumpingIsAlive(master);
|
||||
} catch(InterruptedException e) {
|
||||
// continue
|
||||
if (masters != null) {
|
||||
for (JVMClusterUtil.MasterThread t : masters) {
|
||||
while (t.master.isAlive()) {
|
||||
try {
|
||||
// The below has been replaced to debug sometime hangs on end of
|
||||
// tests.
|
||||
// this.master.join():
|
||||
Threads.threadDumpingIsAlive(t.master);
|
||||
} catch(InterruptedException e) {
|
||||
// continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG.info("Shutdown " +
|
||||
((regionservers != null)? master.getName(): "0 masters") +
|
||||
" " + regionservers.size() + " region server(s)");
|
||||
LOG.info("Shutdown of " +
|
||||
((masters != null) ? masters.size() : "0") + " master(s) and " +
|
||||
((regionservers != null) ? regionservers.size() : "0") +
|
||||
" regionserver(s) complete");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -239,9 +239,9 @@ public class ZooKeeperWatcher implements Watcher {
|
|||
switch(event.getState()) {
|
||||
case SyncConnected:
|
||||
// Update our identifier. Otherwise ignore.
|
||||
LOG.info(this.identifier + " connected");
|
||||
this.identifier = this.identifier + "-0x" +
|
||||
Long.toHexString(this.zooKeeper.getSessionId());
|
||||
LOG.info(this.identifier + " connected");
|
||||
break;
|
||||
|
||||
// Abort the server if Disconnected or Expired
|
||||
|
|
|
@ -70,8 +70,6 @@ import org.apache.hadoop.mapred.MiniMRCluster;
|
|||
import org.apache.hadoop.security.UnixUserGroupInformation;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.eclipse.jdt.core.dom.ThisExpression;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
/**
|
||||
|
@ -268,7 +266,7 @@ public class HBaseTestingUtility {
|
|||
* @see {@link #shutdownMiniDFSCluster()}
|
||||
*/
|
||||
public MiniHBaseCluster startMiniCluster() throws Exception {
|
||||
return startMiniCluster(1);
|
||||
return startMiniCluster(1, 1);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -276,17 +274,40 @@ public class HBaseTestingUtility {
|
|||
* Modifies Configuration. Homes the cluster data directory under a random
|
||||
* subdirectory in a directory under System property test.build.data.
|
||||
* Directory is cleaned up on exit.
|
||||
* @param servers Number of servers to start up. We'll start this many
|
||||
* datanodes and regionservers. If servers is > 1, then make sure
|
||||
* @param numSlaves Number of slaves to start up. We'll start this many
|
||||
* datanodes and regionservers. If numSlaves is > 1, then make sure
|
||||
* hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
|
||||
* bind errors.
|
||||
* @throws Exception
|
||||
* @see {@link #shutdownMiniCluster()}
|
||||
* @return Mini hbase cluster instance created.
|
||||
*/
|
||||
public MiniHBaseCluster startMiniCluster(final int servers)
|
||||
public MiniHBaseCluster startMiniCluster(final int numSlaves)
|
||||
throws Exception {
|
||||
LOG.info("Starting up minicluster");
|
||||
return startMiniCluster(1, numSlaves);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start up a minicluster of hbase, optionally dfs, and zookeeper.
|
||||
* Modifies Configuration. Homes the cluster data directory under a random
|
||||
* subdirectory in a directory under System property test.build.data.
|
||||
* Directory is cleaned up on exit.
|
||||
* @param numMasters Number of masters to start up. We'll start this many
|
||||
* hbase masters. If numMasters > 1, you can find the active/primary master
|
||||
* with {@link MiniHBaseCluster#getMaster()}.
|
||||
* @param numSlaves Number of slaves to start up. We'll start this many
|
||||
* datanodes and regionservers. If numSlaves is > 1, then make sure
|
||||
* hbase.regionserver.info.port is -1 (i.e. no ui per regionserver) otherwise
|
||||
* bind errors.
|
||||
* @throws Exception
|
||||
* @see {@link #shutdownMiniCluster()}
|
||||
* @return Mini hbase cluster instance created.
|
||||
*/
|
||||
public MiniHBaseCluster startMiniCluster(final int numMasters,
|
||||
final int numSlaves)
|
||||
throws Exception {
|
||||
LOG.info("Starting up minicluster with " + numMasters + " master(s) and " +
|
||||
numSlaves + " regionserver(s) and datanode(s)");
|
||||
// If we already put up a cluster, fail.
|
||||
String testBuildPath = conf.get(TEST_DIRECTORY_KEY, null);
|
||||
isRunningCluster(testBuildPath);
|
||||
|
@ -300,7 +321,7 @@ public class HBaseTestingUtility {
|
|||
System.setProperty(TEST_DIRECTORY_KEY, this.clusterTestBuildDir.getPath());
|
||||
// Bring up mini dfs cluster. This spews a bunch of warnings about missing
|
||||
// scheme. Complaints are 'Scheme is undefined for build/test/data/dfs/name1'.
|
||||
startMiniDFSCluster(servers, this.clusterTestBuildDir);
|
||||
startMiniDFSCluster(numSlaves, this.clusterTestBuildDir);
|
||||
|
||||
// Mangle conf so fs parameter points to minidfs we just started up
|
||||
FileSystem fs = this.dfsCluster.getFileSystem();
|
||||
|
@ -319,7 +340,7 @@ public class HBaseTestingUtility {
|
|||
this.conf.set(HConstants.HBASE_DIR, hbaseRootdir.toString());
|
||||
fs.mkdirs(hbaseRootdir);
|
||||
FSUtils.setVersion(fs, hbaseRootdir);
|
||||
this.hbaseCluster = new MiniHBaseCluster(this.conf, servers);
|
||||
this.hbaseCluster = new MiniHBaseCluster(this.conf, numMasters, numSlaves);
|
||||
// Don't leave here till we've done a successful scan of the .META.
|
||||
HTable t = new HTable(this.conf, HConstants.META_TABLE_NAME);
|
||||
ResultScanner s = t.getScanner(new Scan());
|
||||
|
@ -853,7 +874,7 @@ public class HBaseTestingUtility {
|
|||
* Returns a HBaseAdmin instance.
|
||||
*
|
||||
* @return The HBaseAdmin instance.
|
||||
* @throws IOException
|
||||
* @throws IOException
|
||||
*/
|
||||
public HBaseAdmin getHBaseAdmin()
|
||||
throws IOException {
|
||||
|
@ -943,7 +964,7 @@ public class HBaseTestingUtility {
|
|||
/**
|
||||
* @param dir Directory to delete
|
||||
* @return True if we deleted it.
|
||||
* @throws IOException
|
||||
* @throws IOException
|
||||
*/
|
||||
public boolean deleteDir(final Path dir) throws IOException {
|
||||
FileSystem fs = getTestFileSystem();
|
||||
|
|
|
@ -66,10 +66,23 @@ public class MiniHBaseCluster {
|
|||
* @throws IOException
|
||||
*/
|
||||
public MiniHBaseCluster(Configuration conf, int numRegionServers)
|
||||
throws IOException {
|
||||
this(conf, 1, numRegionServers);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start a MiniHBaseCluster.
|
||||
* @param conf Configuration to be used for cluster
|
||||
* @param numMasters initial number of masters to start.
|
||||
* @param numRegionServers initial number of region servers to start.
|
||||
* @throws IOException
|
||||
*/
|
||||
public MiniHBaseCluster(Configuration conf, int numMasters,
|
||||
int numRegionServers)
|
||||
throws IOException {
|
||||
this.conf = conf;
|
||||
conf.set(HConstants.MASTER_PORT, "0");
|
||||
init(numRegionServers);
|
||||
init(numMasters, numRegionServers);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -203,6 +216,7 @@ public class MiniHBaseCluster {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void kill() {
|
||||
super.kill();
|
||||
}
|
||||
|
@ -231,10 +245,11 @@ public class MiniHBaseCluster {
|
|||
}
|
||||
}
|
||||
|
||||
private void init(final int nRegionNodes) throws IOException {
|
||||
private void init(final int nMasterNodes, final int nRegionNodes)
|
||||
throws IOException {
|
||||
try {
|
||||
// start up a LocalHBaseCluster
|
||||
hbaseCluster = new LocalHBaseCluster(conf, nRegionNodes,
|
||||
hbaseCluster = new LocalHBaseCluster(conf, nMasterNodes, nRegionNodes,
|
||||
MiniHBaseCluster.MiniHBaseClusterMaster.class,
|
||||
MiniHBaseCluster.MiniHBaseClusterRegionServer.class);
|
||||
hbaseCluster.startup();
|
||||
|
@ -257,21 +272,6 @@ public class MiniHBaseCluster {
|
|||
return t;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Returns the rpc address actually used by the master server, because
|
||||
* the supplied port is not necessarily the actual port used.
|
||||
*/
|
||||
public HServerAddress getHMasterAddress() {
|
||||
return this.hbaseCluster.getMaster().getMasterAddress();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the HMaster
|
||||
*/
|
||||
public HMaster getMaster() {
|
||||
return this.hbaseCluster.getMaster();
|
||||
}
|
||||
|
||||
/**
|
||||
* Cause a region server to exit doing basic clean up only on its way out.
|
||||
* @param serverNumber Used as index into a list.
|
||||
|
@ -322,6 +322,130 @@ public class MiniHBaseCluster {
|
|||
return this.hbaseCluster.waitOnRegionServer(serverNumber);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Starts a master thread running
|
||||
*
|
||||
* @throws IOException
|
||||
* @return New RegionServerThread
|
||||
*/
|
||||
public JVMClusterUtil.MasterThread startMaster() throws IOException {
|
||||
JVMClusterUtil.MasterThread t = this.hbaseCluster.addMaster();
|
||||
t.start();
|
||||
t.waitForServerOnline();
|
||||
return t;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Returns the rpc address actually used by the currently active
|
||||
* master server, because the supplied port is not necessarily the actual port
|
||||
* used.
|
||||
*/
|
||||
public HServerAddress getHMasterAddress() {
|
||||
return this.hbaseCluster.getActiveMaster().getMasterAddress();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current active master, if available.
|
||||
* @return the active HMaster, null if none is active.
|
||||
*/
|
||||
public HMaster getMaster() {
|
||||
return this.hbaseCluster.getActiveMaster();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the master at the specified index, if available.
|
||||
* @return the active HMaster, null if none is active.
|
||||
*/
|
||||
public HMaster getMaster(final int serverNumber) {
|
||||
return this.hbaseCluster.getMaster(serverNumber);
|
||||
}
|
||||
|
||||
/**
|
||||
* Cause a master to exit without shutting down entire cluster.
|
||||
* @param serverNumber Used as index into a list.
|
||||
*/
|
||||
public String abortMaster(int serverNumber) {
|
||||
HMaster server = getMaster(serverNumber);
|
||||
LOG.info("Aborting " + server.toString());
|
||||
server.abort("Aborting for tests", new Exception("Trace info"));
|
||||
return server.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Shut down the specified master cleanly
|
||||
*
|
||||
* @param serverNumber Used as index into a list.
|
||||
* @return the region server that was stopped
|
||||
*/
|
||||
public JVMClusterUtil.MasterThread stopMaster(int serverNumber) {
|
||||
return stopMaster(serverNumber, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Shut down the specified master cleanly
|
||||
*
|
||||
* @param serverNumber Used as index into a list.
|
||||
* @param shutdownFS True is we are to shutdown the filesystem as part of this
|
||||
* master's shutdown. Usually we do but you do not want to do this if
|
||||
* you are running multiple master in a test and you shut down one
|
||||
* before end of the test.
|
||||
* @return the master that was stopped
|
||||
*/
|
||||
public JVMClusterUtil.MasterThread stopMaster(int serverNumber,
|
||||
final boolean shutdownFS) {
|
||||
JVMClusterUtil.MasterThread server =
|
||||
hbaseCluster.getMasters().get(serverNumber);
|
||||
LOG.info("Stopping " + server.toString());
|
||||
server.getMaster().stop("Stopping master " + serverNumber);
|
||||
return server;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the specified master to stop. Removes this thread from list
|
||||
* of running threads.
|
||||
* @param serverNumber
|
||||
* @return Name of master that just went down.
|
||||
*/
|
||||
public String waitOnMaster(final int serverNumber) {
|
||||
return this.hbaseCluster.waitOnMaster(serverNumber);
|
||||
}
|
||||
|
||||
/**
|
||||
* Blocks until there is an active master and that master has completed
|
||||
* initialization.
|
||||
*
|
||||
* @return true if an active master becomes available. false if there are no
|
||||
* masters left.
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public boolean waitForActiveAndReadyMaster() throws InterruptedException {
|
||||
List<JVMClusterUtil.MasterThread> mts;
|
||||
while ((mts = getMasterThreads()).size() > 0) {
|
||||
for (JVMClusterUtil.MasterThread mt : mts) {
|
||||
if (mt.getMaster().isActiveMaster() && mt.getMaster().isInitialized()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
Thread.sleep(200);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return List of master threads.
|
||||
*/
|
||||
public List<JVMClusterUtil.MasterThread> getMasterThreads() {
|
||||
return this.hbaseCluster.getMasters();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return List of live master threads (skips the aborted and the killed)
|
||||
*/
|
||||
public List<JVMClusterUtil.MasterThread> getLiveMasterThreads() {
|
||||
return this.hbaseCluster.getLiveMasters();
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for Mini HBase Cluster to shut down.
|
||||
*/
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.MiniHBaseCluster;
|
||||
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestMasterFailover {
|
||||
private static final Log LOG = LogFactory.getLog(TestMasterFailover.class);
|
||||
|
||||
/**
|
||||
* Simple test of master failover.
|
||||
* <p>
|
||||
* Starts with three masters. Kills a backup master. Then kills the active
|
||||
* master. Ensures the final master becomes active and we can still contact
|
||||
* the cluster.
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test (timeout=180000)
|
||||
public void testSimpleMasterFailover() throws Exception {
|
||||
|
||||
final int NUM_MASTERS = 3;
|
||||
final int NUM_RS = 3;
|
||||
|
||||
// Start the cluster
|
||||
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
|
||||
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
|
||||
|
||||
// get all the master threads
|
||||
List<MasterThread> masterThreads = cluster.getMasterThreads();
|
||||
|
||||
// wait for each to come online
|
||||
for (MasterThread mt : masterThreads) {
|
||||
assertTrue(mt.isAlive());
|
||||
}
|
||||
|
||||
// verify only one is the active master and we have right number
|
||||
int numActive = 0;
|
||||
int activeIndex = -1;
|
||||
String activeName = null;
|
||||
for (int i = 0; i < masterThreads.size(); i++) {
|
||||
if (masterThreads.get(i).getMaster().isActiveMaster()) {
|
||||
numActive++;
|
||||
activeIndex = i;
|
||||
activeName = masterThreads.get(i).getMaster().getServerName();
|
||||
}
|
||||
}
|
||||
assertEquals(1, numActive);
|
||||
assertEquals(NUM_MASTERS, masterThreads.size());
|
||||
|
||||
// attempt to stop one of the inactive masters
|
||||
LOG.debug("\n\nStopping a backup master\n");
|
||||
int backupIndex = (activeIndex == 0 ? 1 : activeIndex - 1);
|
||||
cluster.stopMaster(backupIndex, false);
|
||||
cluster.waitOnMaster(backupIndex);
|
||||
|
||||
// verify still one active master and it's the same
|
||||
for (int i = 0; i < masterThreads.size(); i++) {
|
||||
if (masterThreads.get(i).getMaster().isActiveMaster()) {
|
||||
assertTrue(activeName.equals(
|
||||
masterThreads.get(i).getMaster().getServerName()));
|
||||
activeIndex = i;
|
||||
}
|
||||
}
|
||||
assertEquals(1, numActive);
|
||||
assertEquals(2, masterThreads.size());
|
||||
|
||||
// kill the active master
|
||||
LOG.debug("\n\nStopping the active master\n");
|
||||
cluster.stopMaster(activeIndex, false);
|
||||
cluster.waitOnMaster(activeIndex);
|
||||
|
||||
// wait for an active master to show up and be ready
|
||||
assertTrue(cluster.waitForActiveAndReadyMaster());
|
||||
|
||||
LOG.debug("\n\nVerifying backup master is now active\n");
|
||||
// should only have one master now
|
||||
assertEquals(1, masterThreads.size());
|
||||
// and he should be active
|
||||
assertTrue(masterThreads.get(0).getMaster().isActiveMaster());
|
||||
|
||||
// Stop the cluster
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue