From 58c4f2df98f806edc48d92d8892d1b682676c579 Mon Sep 17 00:00:00 2001 From: Michael Stack Date: Wed, 29 Sep 2010 22:37:50 +0000 Subject: [PATCH] HBASE-3047 If new master crashes, restart is messy git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1002880 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../hadoop/hbase/RemoteExceptionHandler.java | 2 + .../hadoop/hbase/catalog/CatalogTracker.java | 75 ++++++++-- .../hadoop/hbase/ipc/HRegionInterface.java | 8 +- .../hadoop/hbase/mapreduce/package-info.java | 2 +- .../hbase/master/AssignmentManager.java | 130 +++++++++++----- .../apache/hadoop/hbase/master/HMaster.java | 139 ++++++++++-------- .../hadoop/hbase/master/MasterFileSystem.java | 4 +- .../hadoop/hbase/master/ServerManager.java | 68 +++++---- .../master/handler/ServerShutdownHandler.java | 12 +- .../hbase/regionserver/HRegionServer.java | 33 +++-- .../hadoop/hbase/regionserver/Leases.java | 1 + .../hbase/zookeeper/ZooKeeperNodeTracker.java | 21 ++- src/main/resources/hbase-default.xml | 2 +- .../hbase/catalog/TestCatalogTracker.java | 59 +++++++- 15 files changed, 378 insertions(+), 179 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index cbb4869bf67..abac52e6ebc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -552,6 +552,7 @@ Release 0.21.0 - Unreleased HBASE-2995 Incorrect dependency on Log class from Jetty HBASE-3038 WALReaderFSDataInputStream.getPos() fails if Filesize > MAX_INT (Nicolas Spiegelberg via Stack) + HBASE-3047 If new master crashes, restart is messy IMPROVEMENTS HBASE-1760 Cleanup TODOs in HTable diff --git a/src/main/java/org/apache/hadoop/hbase/RemoteExceptionHandler.java b/src/main/java/org/apache/hadoop/hbase/RemoteExceptionHandler.java index c73ff539302..485c254cd40 100644 --- a/src/main/java/org/apache/hadoop/hbase/RemoteExceptionHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/RemoteExceptionHandler.java @@ -79,6 +79,8 @@ public class RemoteExceptionHandler { * @throws IOException indicating a server error ocurred if the decoded * exception is not an IOException. The decoded exception is set as * the cause. + * @deprecated Use {@link RemoteException#unwrapRemoteException()} instead. + * In fact we should look into deprecating this whole class - St.Ack 2010929 */ public static IOException decodeRemoteException(final RemoteException re) throws IOException { diff --git a/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java b/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java index 3ffce467966..e13e9b839a8 100644 --- a/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java +++ b/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.catalog; import java.io.IOException; -import java.lang.reflect.UndeclaredThrowableException; import java.net.ConnectException; import java.util.concurrent.atomic.AtomicBoolean; @@ -40,6 +39,7 @@ import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.MetaNodeTracker; import org.apache.hadoop.hbase.zookeeper.RootRegionTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.hadoop.ipc.RemoteException; import org.apache.zookeeper.KeeperException; /** @@ -271,7 +271,9 @@ public class CatalogTracker { /** * Gets the current location for .META. if available and waits * for up to the specified timeout if not immediately available. Throws an - * exception if timed out waiting. + * exception if timed out waiting. This method differs from {@link #waitForMeta()} + * in that it will go ahead and verify the location gotten from ZooKeeper by + * trying trying to use returned connection. * @param timeout maximum time to wait for meta availability, in milliseconds * @return location of meta * @throws InterruptedException if interrupted while waiting @@ -282,15 +284,15 @@ public class CatalogTracker { public HServerAddress waitForMeta(long timeout) throws InterruptedException, IOException, NotAllMetaRegionsOnlineException { long stop = System.currentTimeMillis() + timeout; - synchronized(metaAvailable) { - if(getMetaServerConnection(true) != null) { + synchronized (metaAvailable) { + if (getMetaServerConnection(true) != null) { return metaLocation; } while(!metaAvailable.get() && (timeout == 0 || System.currentTimeMillis() < stop)) { metaAvailable.wait(timeout); } - if(getMetaServerConnection(true) == null) { + if (getMetaServerConnection(true) == null) { throw new NotAllMetaRegionsOnlineException( "Timed out (" + timeout + "ms)"); } @@ -336,7 +338,6 @@ public class CatalogTracker { } private void setMetaLocation(HServerAddress metaLocation) { - LOG.info("Found new META location, " + metaLocation); metaAvailable.set(true); this.metaLocation = metaLocation; // no synchronization because these are private and already under lock @@ -359,23 +360,69 @@ public class CatalogTracker { } private boolean verifyRegionLocation(HRegionInterface metaServer, - byte [] regionName) { + byte [] regionName) + throws IOException { + if (metaServer == null) { + LOG.info("Passed metaserver is null"); + return false; + } Throwable t = null; try { + // Am expecting only two possible exceptions here; unable + // to connect to the regionserver or NotServingRegionException wrapped + // in the hadoop rpc RemoteException. return metaServer.getRegionInfo(regionName) != null; - } catch (NotServingRegionException e) { + } catch (ConnectException e) { t = e; - } catch (UndeclaredThrowableException e) { - // We can get a ConnectException wrapped by a UTE if client fails connect - // If not a ConnectException, rethrow. - if (!(e.getCause() instanceof ConnectException)) throw e; - t = e.getCause(); + } catch (RemoteException e) { + IOException ioe = e.unwrapRemoteException(); + if (ioe instanceof NotServingRegionException) { + t = ioe; + } else { + throw e; + } } LOG.info("Failed verification of " + Bytes.toString(regionName) + - ": " + t.getMessage()); + ", assigning anew: " + t); return false; } + /** + * Verify -ROOT- is deployed and accessible. + * @param timeout How long to wait on zk for root address (passed through to + * the internal call to {@link #waitForRootServerConnection(long)}. + * @return True if the -ROOT- location is healthy. + * @throws IOException + * @throws InterruptedException + */ + public boolean verifyRootRegionLocation(final long timeout) + throws InterruptedException, IOException { + HRegionInterface connection = null; + try { + connection = waitForRootServerConnection(timeout); + } catch (NotAllMetaRegionsOnlineException e) { + // Pass + } catch (IOException e) { + // Unexpected exception + throw e; + } + return (connection == null)? false: + verifyRegionLocation(connection, HRegionInfo.ROOT_REGIONINFO.getRegionName()); + } + + /** + * Verify .META. is deployed and accessible. + * @param timeout How long to wait on zk for .META. address + * (passed through to the internal call to {@link #waitForMetaServerConnection(long)}. + * @return True if the .META. location is healthy. + * @throws IOException Some unexpected IOE. + * @throws InterruptedException + */ + public boolean verifyMetaRegionLocation(final long timeout) + throws InterruptedException, IOException { + return getMetaServerConnection(true) != null; + } + /** * Check if hsi was carrying -ROOT- or * .META. and if so, clear out old locations. diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java b/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java index a4810a6e878..27f9cc0ea19 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; +import java.net.ConnectException; import java.util.List; import java.util.NavigableSet; @@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.wal.HLog; +import org.apache.hadoop.ipc.RemoteException; /** * Clients interact with HRegionServers using a handle to the HRegionInterface. @@ -51,10 +53,12 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion, Stoppable, Ab * * @param regionName name of the region * @return HRegionInfo object for region - * @throws NotServingRegionException e + * @throws NotServingRegionException + * @throws ConnectException + * @throws IOException This can manifest as an Hadoop ipc {@link RemoteException} */ public HRegionInfo getRegionInfo(final byte [] regionName) - throws NotServingRegionException; + throws NotServingRegionException, ConnectException, IOException; /** * Return all the data for the row that matches row exactly, diff --git a/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java b/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java index fec0dacfba9..8fb14ebe7ca 100644 --- a/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java +++ b/src/main/java/org/apache/hadoop/hbase/mapreduce/package-info.java @@ -1,5 +1,5 @@ /* - * Copyright 20010 The Apache Software Foundation + * Copyright 2010 The Apache Software Foundation * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file diff --git a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index efdd339faac..8f44ad37dc7 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -160,7 +160,7 @@ public class AssignmentManager extends ZooKeeperListener { /** * Handle failover. Restore state from META and ZK. Handle any regions in - * transition. + * transition. Presumes .META. and -ROOT- deployed. * @throws KeeperException * @throws IOException */ @@ -170,6 +170,12 @@ public class AssignmentManager extends ZooKeeperListener { // synchronized. The presumption is that in this case it is safe since this // method is being played by a single thread on startup. + // TODO: Check list of user regions and their assignments against regionservers. + // TODO: Regions that have a null location and are not in regionsInTransitions + // need to be handled. + // TODO: Regions that are on servers that are not in our online list need + // reassigning. + // Scan META to build list of existing regions, servers, and assignment rebuildUserRegions(); // Pickup any disabled tables @@ -183,46 +189,90 @@ public class AssignmentManager extends ZooKeeperListener { } LOG.info("Failed-over master needs to process " + nodes.size() + " regions in transition"); - for (String regionName: nodes) { - RegionTransitionData data = ZKAssign.getData(watcher, regionName); - HRegionInfo regionInfo = - MetaReader.getRegion(catalogTracker, data.getRegionName()).getFirst(); - String encodedName = regionInfo.getEncodedName(); - switch(data.getEventType()) { - case RS_ZK_REGION_CLOSING: - // Just insert region into RIT. - // If this never updates the timeout will trigger new assignment - regionsInTransition.put(encodedName, - new RegionState(regionInfo, RegionState.State.CLOSING, - data.getStamp())); - break; + for (String encodedRegionName: nodes) { + processRegionInTransition(encodedRegionName, null); + } + } - case RS_ZK_REGION_CLOSED: - // Region is closed, insert into RIT and handle it - regionsInTransition.put(encodedName, - new RegionState(regionInfo, RegionState.State.CLOSED, - data.getStamp())); - new ClosedRegionHandler(master, this, data, regionInfo).process(); - break; - - case RS_ZK_REGION_OPENING: - // Just insert region into RIT - // If this never updates the timeout will trigger new assignment - regionsInTransition.put(encodedName, - new RegionState(regionInfo, RegionState.State.OPENING, - data.getStamp())); - break; - - case RS_ZK_REGION_OPENED: - // Region is opened, insert into RIT and handle it - regionsInTransition.put(encodedName, - new RegionState(regionInfo, RegionState.State.OPENING, - data.getStamp())); - new OpenedRegionHandler(master, this, data, regionInfo, - serverManager.getServerInfo(data.getServerName())).process(); - break; + /** + * If region is up in zk in transition, then do fixup and block and wait until + * the region is assigned and out of transition. Used on startup for + * catalog regions. + * @param hri Region to look for. + * @return True if we processed a region in transition else false if region + * was not up in zk in transition. + * @throws InterruptedException + * @throws KeeperException + * @throws IOException + */ + boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri) + throws InterruptedException, KeeperException, IOException { + boolean intransistion = processRegionInTransition(hri.getEncodedName(), hri); + if (!intransistion) return intransistion; + synchronized(this.regionsInTransition) { + while (!this.master.isStopped() && + this.regionsInTransition.containsKey(hri.getEncodedName())) { + this.regionsInTransition.wait(); } } + return intransistion; + } + + /** + * Process failover of encodedName. Look in + * @param encodedRegionName Region to process failover for. + * @param encodedRegionName RegionInfo. If null we'll go get it from meta table. + * @return + * @throws KeeperException + * @throws IOException + */ + boolean processRegionInTransition(final String encodedRegionName, + final HRegionInfo regionInfo) + throws KeeperException, IOException { + RegionTransitionData data = ZKAssign.getData(watcher, encodedRegionName); + if (data == null) return false; + HRegionInfo hri = (regionInfo != null)? regionInfo: + MetaReader.getRegion(catalogTracker, data.getRegionName()).getFirst(); + processRegionsInTransition(data, hri); + return true; + } + + void processRegionsInTransition(final RegionTransitionData data, + final HRegionInfo regionInfo) + throws KeeperException { + String encodedRegionName = regionInfo.getEncodedName(); + LOG.info("Processing region " + regionInfo.getRegionNameAsString() + + " in state " + data.getEventType()); + switch (data.getEventType()) { + case RS_ZK_REGION_CLOSING: + // Just insert region into RIT. + // If this never updates the timeout will trigger new assignment + regionsInTransition.put(encodedRegionName, new RegionState( + regionInfo, RegionState.State.CLOSING, data.getStamp())); + break; + + case RS_ZK_REGION_CLOSED: + // Region is closed, insert into RIT and handle it + regionsInTransition.put(encodedRegionName, new RegionState( + regionInfo, RegionState.State.CLOSED, data.getStamp())); + new ClosedRegionHandler(master, this, data, regionInfo).process(); + break; + + case RS_ZK_REGION_OPENING: + // Just insert region into RIT + // If this never updates the timeout will trigger new assignment + regionsInTransition.put(encodedRegionName, new RegionState( + regionInfo, RegionState.State.OPENING, data.getStamp())); + break; + + case RS_ZK_REGION_OPENED: + // Region is opened, insert into RIT and handle it + regionsInTransition.put(encodedRegionName, new RegionState( + regionInfo, RegionState.State.OPENING, data.getStamp())); + new OpenedRegionHandler(master, this, data, regionInfo, + serverManager.getServerInfo(data.getServerName())).process(); + break; + } } /** @@ -752,11 +802,11 @@ public class AssignmentManager extends ZooKeeperListener { private void rebuildUserRegions() throws IOException { Map allRegions = MetaReader.fullScan(catalogTracker); - for (Map.Entry region : allRegions.entrySet()) { + for (Map.Entry region: allRegions.entrySet()) { HServerAddress regionLocation = region.getValue(); HRegionInfo regionInfo = region.getKey(); if (regionLocation == null) { - regions.put(regionInfo, null); + this.regions.put(regionInfo, null); continue; } HServerInfo serverInfo = serverManager.getHServerInfo(regionLocation); diff --git a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 6374b5014e9..fb1e83417dc 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -145,10 +145,6 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { // Cluster status zk tracker and local setter private ClusterStatusTracker clusterStatusTracker; - // True if this a cluster startup where there are no already running servers - // as opposed to a master joining an already running cluster - boolean freshClusterStartup; - // This flag is for stopping this Master instance. Its set when we are // stopping or aborting private volatile boolean stopped = false; @@ -170,8 +166,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { * *
    *
  1. Initialize HMaster RPC and address - *
  2. Connect to ZooKeeper and figure out if this is a fresh cluster start or - * a failed over master + *
  3. Connect to ZooKeeper. Get count of regionservers still up. *
  4. Block until becoming active master *
  5. Initialize master components - server manager, region manager, * region server queue, file system manager, etc @@ -209,7 +204,12 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { this.zooKeeper = new ZooKeeperWatcher(conf, MASTER, this); /* - * 2. Block on becoming the active master. + * 2. Count of regoinservers that are up. + */ + int count = ZKUtil.getNumberOfChildren(zooKeeper, zooKeeper.rsZNode); + + /* + * 3. Block on becoming the active master. * We race with other masters to write our address into ZooKeeper. If we * succeed, we are the primary/active master and finish initialization. * @@ -222,16 +222,6 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { stallIfBackupMaster(this.conf, this.activeMasterManager); activeMasterManager.blockUntilBecomingActiveMaster(); - /* - * 3. Determine if this is a fresh cluster startup or failed over master. - * This is done by checking for the existence of any ephemeral - * RegionServer nodes in ZooKeeper. These nodes are created by RSs on - * their initialization but initialization will not happen unless clusterup - * flag is set -- see ClusterStatusTracker below. - */ - this.freshClusterStartup = - 0 == ZKUtil.getNumberOfChildren(zooKeeper, zooKeeper.rsZNode); - /* * 4. We are active master now... go initialize components we need to run. * Note, there may be dross in zk from previous runs; it'll get addressed @@ -242,7 +232,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { this.connection = HConnectionManager.getConnection(conf); this.executorService = new ExecutorService(getServerName()); - this.serverManager = new ServerManager(this, this, this.freshClusterStartup); + this.serverManager = new ServerManager(this, this); this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection, this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE)); @@ -259,16 +249,20 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { // Set the cluster as up. If new RSs, they'll be waiting on this before // going ahead with their startup. this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this); - this.clusterStatusTracker.setClusterUp(); + boolean wasUp = this.clusterStatusTracker.isClusterUp(); + if (!wasUp) this.clusterStatusTracker.setClusterUp(); this.clusterStatusTracker.start(); LOG.info("Server active/primary master; " + this.address + - "; freshClusterStart=" + this.freshClusterStartup + ", sessionid=0x" + - Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId())); + ", sessionid=0x" + + Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()) + + ", ephemeral nodes still up in zk=" + count + + ", cluster-up flag was=" + wasUp); } /* - * Stall startup if we are designated a backup master. + * Stall startup if we are designated a backup master; i.e. we want someone + * else to become the master before proceeding. * @param c * @param amm * @throws InterruptedException @@ -290,27 +284,42 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { /** * Main processing loop for the HMaster. - * 1. Handle both fresh cluster start as well as failed over initialization of - * the HMaster. - * 2. Start the necessary services - * 3. Reassign the root region - * 4. The master is no longer closed - set "closed" to false + *
      + *
    1. Handle both fresh cluster start as well as failed over initialization of + * the HMaster
    2. + *
    3. Start the necessary services
    4. + *
    5. Reassign the root region
    6. + *
    7. The master is no longer closed - set "closed" to false
    8. + *
    */ @Override public void run() { try { // start up all service threads. startServiceThreads(); - // Wait for minimum number of region servers to report in - this.serverManager.waitForRegionServers(); - // Start assignment of user regions, startup or failure - if (this.freshClusterStartup) { - clusterStarterInitializations(this.fileSystemManager, - this.serverManager, this.catalogTracker, this.assignmentManager); + // Wait for region servers to report in. Returns count of regions. + int regionCount = this.serverManager.waitForRegionServers(); + + // TODO: Should do this in background rather than block master startup + // TODO: Do we want to do this before/while/after RSs check in? + // It seems that this method looks at active RSs but happens concurrently + // with when we expect them to be checking in + this.fileSystemManager. + splitLogAfterStartup(this.serverManager.getOnlineServers()); + + // Make sure root and meta assigned before proceeding. + assignRootAndMeta(); + + // Is this fresh start with no regions assigned or are we a master joining + // an already-running cluster? If regionsCount == 0, then for sure a + // fresh start. TOOD: Be fancier. If regionsCount == 2, perhaps the + // 2 are .META. and -ROOT- and we should fall into the fresh startup + // branch below. For now, do processFailover. + if (regionCount == 0) { + this.assignmentManager.cleanoutUnassigned(); + this.assignmentManager.assignAllUserRegions(); } else { - // Process existing unassigned nodes in ZK, read all regions from META, - // rebuild in-memory state. this.assignmentManager.processFailover(); } @@ -343,36 +352,42 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server { LOG.info("HMaster main thread exiting"); } - /* - * Initializations we need to do if we are cluster starter. - * @param mfs - * @param sm - * @param ct - * @param am + /** + * Check -ROOT- and .META. are assigned. If not, + * assign them. + * @throws InterruptedException * @throws IOException + * @throws KeeperException + * @return Count of regions we assigned. */ - private static void clusterStarterInitializations(final MasterFileSystem mfs, - final ServerManager sm, final CatalogTracker ct, final AssignmentManager am) - throws IOException, InterruptedException, KeeperException { - // Check filesystem has required basics - mfs.initialize(); - // TODO: Should do this in background rather than block master startup - // TODO: Do we want to do this before/while/after RSs check in? - // It seems that this method looks at active RSs but happens - // concurrently with when we expect them to be checking in - mfs.splitLogAfterStartup(sm.getOnlineServers()); - // Clean out current state of unassigned - am.cleanoutUnassigned(); - // assign the root region - am.assignRoot(); - ct.waitForRoot(); - // assign the meta region - am.assignMeta(); - ct.waitForMeta(); - // above check waits for general meta availability but this does not + int assignRootAndMeta() + throws InterruptedException, IOException, KeeperException { + int assigned = 0; + long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000); + + // Work on ROOT region. Is it in zk in transition? + boolean rit = this.assignmentManager. + processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.ROOT_REGIONINFO); + if (!catalogTracker.verifyRootRegionLocation(timeout)) { + this.assignmentManager.assignRoot(); + this.catalogTracker.waitForRoot(); + assigned++; + } + LOG.info("-ROOT- assigned=" + assigned + ", rit=" + rit); + + // Work on meta region + rit = this.assignmentManager. + processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO); + if (!this.catalogTracker.verifyMetaRegionLocation(timeout)) { + this.assignmentManager.assignMeta(); + this.catalogTracker.waitForMeta(); + // Above check waits for general meta availability but this does not // guarantee that the transition has completed - am.waitForAssignment(HRegionInfo.FIRST_META_REGIONINFO); - am.assignAllUserRegions(); + this.assignmentManager.waitForAssignment(HRegionInfo.FIRST_META_REGIONINFO); + assigned++; + } + LOG.info(".META. assigned=" + assigned + ", rit=" + rit); + return assigned; } /* diff --git a/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index 0ad871434ae..df7412c7c70 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ b/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -81,6 +81,7 @@ public class MasterFileSystem { this.fs = FileSystem.get(conf); // set up the archived logs path this.oldLogDir = new Path(this.rootdir, HConstants.HREGION_OLDLOGDIR_NAME); + createInitialFileSystemLayout(); } /** @@ -91,8 +92,9 @@ public class MasterFileSystem { *
  6. *
  7. Create a log archive directory for RS to put archived logs
  8. *
+ * Idempotent. */ - public void initialize() throws IOException { + private void createInitialFileSystemLayout() throws IOException { // check if the root directory exists checkRootDir(this.rootdir, conf, this.fs); diff --git a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 7445ebd2bbc..77f371c265c 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -84,7 +84,6 @@ public class ServerManager { private final Server master; private final MasterServices services; - private final boolean freshClusterStartup; private final ServerMonitor serverMonitorThread; @@ -123,11 +122,9 @@ public class ServerManager { * @param freshClusterStartup True if we are original master on a fresh * cluster startup else if false, we are joining an already running cluster. */ - public ServerManager(final Server master, final MasterServices services, - final boolean freshClusterStartup) { + public ServerManager(final Server master, final MasterServices services) { this.master = master; this.services = services; - this.freshClusterStartup = freshClusterStartup; Configuration c = master.getConfiguration(); int monitorInterval = c.getInt("hbase.master.monitor.interval", 60 * 1000); this.metrics = new MasterMetrics(master.getServerName()); @@ -170,8 +167,7 @@ public class ServerManager { throw new PleaseHoldException(message); } checkIsDead(info.getServerName(), "STARTUP"); - LOG.info("Received start message from: " + info.getServerName()); - recordNewServer(info); + recordNewServer(info, false, null); } private HServerInfo haveServerWithSameHostAndPortAlready(final String hostnamePort) { @@ -200,27 +196,25 @@ public class ServerManager { throw new YouAreDeadException(message); } - /** - * Adds the HSI to the RS list and creates an empty load - * @param info The region server informations - */ - public void recordNewServer(HServerInfo info) { - recordNewServer(info, false, null); - } - /** * Adds the HSI to the RS list * @param info The region server informations - * @param useInfoLoad True if the load from the info should be used - * like under a master failover + * @param useInfoLoad True if the load from the info should be used; e.g. + * under a master failover + * @param hri Region interface. Can be null. */ void recordNewServer(HServerInfo info, boolean useInfoLoad, HRegionInterface hri) { - HServerLoad load = useInfoLoad ? info.getLoad() : new HServerLoad(); + HServerLoad load = useInfoLoad? info.getLoad(): new HServerLoad(); String serverName = info.getServerName(); + LOG.info("Registering server=" + serverName + ", regionCount=" + + load.getLoad() + ", userLoad=" + useInfoLoad); info.setLoad(load); // TODO: Why did we update the RS location ourself? Shouldn't RS do this? // masterStatus.getZooKeeper().updateRSLocationGetWatch(info, watcher); + // -- If I understand the question, the RS does not update the location + // because could be disagreement over locations because of DNS issues; only + // master does DNS now -- St.Ack 20100929. this.onlineServers.put(serverName, info); if (hri == null) { serverConnections.remove(serverName); @@ -254,15 +248,16 @@ public class ServerManager { // If we don't know this server, tell it shutdown. HServerInfo storedInfo = this.onlineServers.get(info.getServerName()); if (storedInfo == null) { - if (!this.freshClusterStartup) { - // If we are joining an existing cluster, then soon as we come up we'll - // be getting reports from already running regionservers. - LOG.info("Registering new server: " + info.getServerName()); + if (this.deadservers.contains(storedInfo)) { + LOG.warn("Report from deadserver " + storedInfo); + return HMsg.STOP_REGIONSERVER_ARRAY; + } else { + // Just let the server in. Presume master joining a running cluster. // recordNewServer is what happens at the end of reportServerStartup. // The only thing we are skipping is passing back to the regionserver // the HServerInfo to use. Here we presume a master has already done // that so we'll press on with whatever it gave us for HSI. - recordNewServer(info); + recordNewServer(info, true, null); // If msgs, put off their processing but this is not enough because // its possible that the next time the server reports in, we'll still // not be up and serving. For example, if a split, we'll need the @@ -271,11 +266,6 @@ public class ServerManager { if (msgs.length > 0) throw new PleaseHoldException("FIX! Putting off " + "message processing because not yet rwady but possible we won't be " + "ready next on next report"); - } else { - LOG.warn("Received report from unknown server, a server calling " + - " regionServerReport w/o having first called regionServerStartup; " + - "telling it " + HMsg.Type.STOP_REGIONSERVER + ": " + info.getServerName()); - return HMsg.STOP_REGIONSERVER_ARRAY; } } @@ -474,7 +464,10 @@ public class ServerManager { " but server shutdown is already in progress"); return; } - // Remove the server from the known servers lists and update load info + // Remove the server from the known servers lists and update load info BUT + // add to deadservers first; do this so it'll show in dead servers list if + // not in online servers list. + this.deadservers.add(serverName); this.onlineServers.remove(serverName); this.serverConnections.remove(serverName); // If cluster is going down, yes, servers are going to be expiring; don't @@ -488,7 +481,7 @@ public class ServerManager { return; } this.services.getExecutorService().submit(new ServerShutdownHandler(this.master, - this.services, deadservers, info)); + this.services, this.deadservers, info)); LOG.debug("Added=" + serverName + " to dead servers, submitted shutdown handler to be executed"); } @@ -554,9 +547,10 @@ public class ServerManager { /** * Waits for the regionservers to report in. + * @return Count of regions out on cluster * @throws InterruptedException */ - public void waitForRegionServers() + public int waitForRegionServers() throws InterruptedException { long interval = this.master.getConfiguration(). getLong("hbase.master.wait.on.regionservers.interval", 3000); @@ -574,8 +568,20 @@ public class ServerManager { } oldcount = count; } + // Count how many regions deployed out on cluster. If fresh start, it'll + // be none but if not a fresh start, we'll have registered servers when + // they came in on the {@link #regionServerReport(HServerInfo)} as opposed to + // {@link #regionServerStartup(HServerInfo)} and it'll be carrying an + // actual server load. + int regionCount = 0; + for (Map.Entry e: this.onlineServers.entrySet()) { + HServerLoad load = e.getValue().getLoad(); + if (load != null) regionCount += load.getLoad(); + } LOG.info("Exiting wait on regionserver(s) to checkin; count=" + count + - ", stopped=" + this.master.isStopped()); + ", stopped=" + this.master.isStopped() + + ", count of regions out on cluster=" + regionCount); + return regionCount; } public List getOnlineServersList() { diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java index 4747c25ee57..090308aa8c9 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java @@ -36,11 +36,16 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.master.DeadServer; import org.apache.hadoop.hbase.master.MasterServices; +import org.apache.hadoop.hbase.master.ServerManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Writables; import org.apache.zookeeper.KeeperException; - +/** + * Process server shutdown. + * Server-to-handle must be already in the deadservers lists. See + * {@link ServerManager#expireServer(HServerInfo)}. + */ public class ServerShutdownHandler extends EventHandler { private static final Log LOG = LogFactory.getLog(ServerShutdownHandler.class); private final HServerInfo hsi; @@ -55,8 +60,9 @@ public class ServerShutdownHandler extends EventHandler { this.server = server; this.services = services; this.deadServers = deadServers; - // Add to dead servers. - this.deadServers.add(hsi.getServerName()); + if (this.deadServers.contains(hsi.getServerName())) { + LOG.warn(hsi.getServerName() + " is NOT in deadservers; it should be!"); + } } @Override diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index e498ed1f467..595cf2eab38 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -48,7 +48,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantReadWriteLock; -import com.google.common.base.Function; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -57,7 +56,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Chore; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HMsg; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HServerAddress; @@ -72,6 +70,7 @@ import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.UnknownRowLockException; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.YouAreDeadException; +import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.catalog.CatalogTracker; import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.catalog.RootLocationEditor; @@ -120,9 +119,10 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.net.DNS; -import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.KeeperException; +import com.google.common.base.Function; + /** * HRegionServer makes a set of HRegions available to clients. It checks in with * the HMaster. There are many HRegionServers in a single HBase deployment. @@ -396,10 +396,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, this.abortRequested = false; this.stopped = false; - - //HRegionInterface, - //HBaseRPCErrorHandler, Runnable, Watcher, Stoppable, OnlineRegions - // Server to handle client requests this.server = HBaseRPC.getServer(this, new Class[]{HRegionInterface.class, HBaseRPCErrorHandler.class, @@ -429,19 +425,32 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, } } + /** + * Bring up connection to zk ensemble and then wait until a master for this + * cluster and then after that, wait until cluster 'up' flag has been set. + * This is the order in which master does things. + * Finally put up a catalog tracker. + * @throws IOException + * @throws InterruptedException + */ private void initializeZooKeeper() throws IOException, InterruptedException { - // open connection to zookeeper and set primary watcher + // Open connection to zookeeper and set primary watcher zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER + serverInfo.getServerAddress().getPort(), this); + // Create the master address manager, register with zk, and start it. Then + // block until a master is available. No point in starting up if no master + // running. + this.masterAddressManager = new MasterAddressTracker(zooKeeper, this); + this.masterAddressManager.start(); + this.masterAddressManager.blockUntilAvailable(); + + // Wait on cluster being up. Master will set this flag up in zookeeper + // when ready. this.clusterStatusTracker = new ClusterStatusTracker(this.zooKeeper, this); this.clusterStatusTracker.start(); this.clusterStatusTracker.blockUntilAvailable(); - // create the master address manager, register with zk, and start it - masterAddressManager = new MasterAddressTracker(zooKeeper, this); - masterAddressManager.start(); - // Create the catalog tracker and start it; this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection, this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE)); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java index 15f7453d6b0..5a4b275f40d 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Leases.java @@ -68,6 +68,7 @@ public class Leases extends Thread { public Leases(final int leasePeriod, final int leaseCheckFrequency) { this.leasePeriod = leasePeriod; this.leaseCheckFrequency = leaseCheckFrequency; + setDaemon(true); } /** diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java index 6e74241cf03..d40e2f7306f 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperNodeTracker.java @@ -32,11 +32,6 @@ import org.apache.zookeeper.KeeperException; * RegionServers. */ public abstract class ZooKeeperNodeTracker extends ZooKeeperListener { - /** - * Pass this if you do not want a timeout. - */ - public final static long NO_TIMEOUT = -1; - /** Path of node being tracked */ protected final String node; @@ -94,7 +89,7 @@ public abstract class ZooKeeperNodeTracker extends ZooKeeperListener { */ public synchronized byte [] blockUntilAvailable() throws InterruptedException { - return blockUntilAvailable(NO_TIMEOUT); + return blockUntilAvailable(0); } /** @@ -102,18 +97,22 @@ public abstract class ZooKeeperNodeTracker extends ZooKeeperListener { * specified timeout has elapsed. * * @param timeout maximum time to wait for the node data to be available, - * in milliseconds. Pass {@link #NO_TIMEOUT} for no timeout. + * n milliseconds. Pass 0 for no timeout. * @return data of the node * @throws InterruptedException if the waiting thread is interrupted */ public synchronized byte [] blockUntilAvailable(long timeout) throws InterruptedException { - if (timeout != NO_TIMEOUT && timeout < 0) throw new IllegalArgumentException(); + if (timeout < 0) throw new IllegalArgumentException(); + boolean notimeout = timeout == 0; long startTime = System.currentTimeMillis(); long remaining = timeout; - while ((remaining == NO_TIMEOUT || remaining > 0) && this.data == null) { - if (remaining == NO_TIMEOUT) wait(); - else wait(remaining); + while ((notimeout || remaining > 0) && this.data == null) { + if (notimeout) { + wait(); + continue; + } + wait(remaining); remaining = timeout - (System.currentTimeMillis() - startTime); } return data; diff --git a/src/main/resources/hbase-default.xml b/src/main/resources/hbase-default.xml index 1ed59eb4375..5fafe657a28 100644 --- a/src/main/resources/hbase-default.xml +++ b/src/main/resources/hbase-default.xml @@ -157,7 +157,7 @@ hbase.regionserver.msginterval - 5000 + 3000 Interval between messages from the RegionServer to HMaster in milliseconds. diff --git a/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java b/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java index ce3c50b50d3..4091cc766ef 100644 --- a/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java +++ b/src/test/java/org/apache/hadoop/hbase/catalog/TestCatalogTracker.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.catalog; import java.io.IOException; +import java.net.ConnectException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -36,13 +37,13 @@ import org.apache.hadoop.hbase.HServerAddress; import org.apache.hadoop.hbase.HServerInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException; +import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.util.Progressable; @@ -52,6 +53,7 @@ import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Matchers; import org.mockito.Mockito; /** @@ -100,6 +102,61 @@ public class TestCatalogTracker { return ct; } + @Test public void testGetMetaServerConnectionFails() + throws IOException, InterruptedException, KeeperException { + HConnection connection = Mockito.mock(HConnection.class); + ConnectException connectException = + new ConnectException("Connection refused"); + final HRegionInterface implementation = + Mockito.mock(HRegionInterface.class); + Mockito.when(implementation.get((byte [])Mockito.any(), (Get)Mockito.any())). + thenThrow(connectException); + Mockito.when(connection.getHRegionConnection((HServerAddress)Matchers.anyObject(), Matchers.anyBoolean())). + thenReturn(implementation); + Assert.assertNotNull(connection.getHRegionConnection(new HServerAddress(), false)); + final CatalogTracker ct = constructAndStartCatalogTracker(connection); + try { + RootLocationEditor.setRootLocation(this.watcher, + new HServerAddress("example.com:1234")); + Assert.assertFalse(ct.verifyMetaRegionLocation(100)); + } finally { + // Clean out root location or later tests will be confused... they presume + // start fresh in zk. + RootLocationEditor.deleteRootLocation(this.watcher); + } + } + + /** + * Test get of root region fails properly if nothing to connect to. + * @throws IOException + * @throws InterruptedException + * @throws KeeperException + */ + @Test + public void testVerifyRootRegionLocationFails() + throws IOException, InterruptedException, KeeperException { + HConnection connection = Mockito.mock(HConnection.class); + ConnectException connectException = + new ConnectException("Connection refused"); + final HRegionInterface implementation = + Mockito.mock(HRegionInterface.class); + Mockito.when(implementation.getRegionInfo((byte [])Mockito.any())). + thenThrow(connectException); + Mockito.when(connection.getHRegionConnection((HServerAddress)Matchers.anyObject(), Matchers.anyBoolean())). + thenReturn(implementation); + Assert.assertNotNull(connection.getHRegionConnection(new HServerAddress(), false)); + final CatalogTracker ct = constructAndStartCatalogTracker(connection); + try { + RootLocationEditor.setRootLocation(this.watcher, + new HServerAddress("example.com:1234")); + Assert.assertFalse(ct.verifyRootRegionLocation(100)); + } finally { + // Clean out root location or later tests will be confused... they presume + // start fresh in zk. + RootLocationEditor.deleteRootLocation(this.watcher); + } + } + @Test (expected = NotAllMetaRegionsOnlineException.class) public void testTimeoutWaitForRoot() throws IOException, InterruptedException {