HBASE-3047 If new master crashes, restart is messy
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1002880 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4d25275040
commit
58c4f2df98
|
@ -552,6 +552,7 @@ Release 0.21.0 - Unreleased
|
||||||
HBASE-2995 Incorrect dependency on Log class from Jetty
|
HBASE-2995 Incorrect dependency on Log class from Jetty
|
||||||
HBASE-3038 WALReaderFSDataInputStream.getPos() fails if Filesize > MAX_INT
|
HBASE-3038 WALReaderFSDataInputStream.getPos() fails if Filesize > MAX_INT
|
||||||
(Nicolas Spiegelberg via Stack)
|
(Nicolas Spiegelberg via Stack)
|
||||||
|
HBASE-3047 If new master crashes, restart is messy
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HBASE-1760 Cleanup TODOs in HTable
|
HBASE-1760 Cleanup TODOs in HTable
|
||||||
|
|
|
@ -79,6 +79,8 @@ public class RemoteExceptionHandler {
|
||||||
* @throws IOException indicating a server error ocurred if the decoded
|
* @throws IOException indicating a server error ocurred if the decoded
|
||||||
* exception is not an IOException. The decoded exception is set as
|
* exception is not an IOException. The decoded exception is set as
|
||||||
* the cause.
|
* the cause.
|
||||||
|
* @deprecated Use {@link RemoteException#unwrapRemoteException()} instead.
|
||||||
|
* In fact we should look into deprecating this whole class - St.Ack 2010929
|
||||||
*/
|
*/
|
||||||
public static IOException decodeRemoteException(final RemoteException re)
|
public static IOException decodeRemoteException(final RemoteException re)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -20,7 +20,6 @@
|
||||||
package org.apache.hadoop.hbase.catalog;
|
package org.apache.hadoop.hbase.catalog;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.reflect.UndeclaredThrowableException;
|
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
@ -40,6 +39,7 @@ import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.zookeeper.MetaNodeTracker;
|
import org.apache.hadoop.hbase.zookeeper.MetaNodeTracker;
|
||||||
import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
|
import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -271,7 +271,9 @@ public class CatalogTracker {
|
||||||
/**
|
/**
|
||||||
* Gets the current location for <code>.META.</code> if available and waits
|
* Gets the current location for <code>.META.</code> if available and waits
|
||||||
* for up to the specified timeout if not immediately available. Throws an
|
* for up to the specified timeout if not immediately available. Throws an
|
||||||
* exception if timed out waiting.
|
* exception if timed out waiting. This method differs from {@link #waitForMeta()}
|
||||||
|
* in that it will go ahead and verify the location gotten from ZooKeeper by
|
||||||
|
* trying trying to use returned connection.
|
||||||
* @param timeout maximum time to wait for meta availability, in milliseconds
|
* @param timeout maximum time to wait for meta availability, in milliseconds
|
||||||
* @return location of meta
|
* @return location of meta
|
||||||
* @throws InterruptedException if interrupted while waiting
|
* @throws InterruptedException if interrupted while waiting
|
||||||
|
@ -282,15 +284,15 @@ public class CatalogTracker {
|
||||||
public HServerAddress waitForMeta(long timeout)
|
public HServerAddress waitForMeta(long timeout)
|
||||||
throws InterruptedException, IOException, NotAllMetaRegionsOnlineException {
|
throws InterruptedException, IOException, NotAllMetaRegionsOnlineException {
|
||||||
long stop = System.currentTimeMillis() + timeout;
|
long stop = System.currentTimeMillis() + timeout;
|
||||||
synchronized(metaAvailable) {
|
synchronized (metaAvailable) {
|
||||||
if(getMetaServerConnection(true) != null) {
|
if (getMetaServerConnection(true) != null) {
|
||||||
return metaLocation;
|
return metaLocation;
|
||||||
}
|
}
|
||||||
while(!metaAvailable.get() &&
|
while(!metaAvailable.get() &&
|
||||||
(timeout == 0 || System.currentTimeMillis() < stop)) {
|
(timeout == 0 || System.currentTimeMillis() < stop)) {
|
||||||
metaAvailable.wait(timeout);
|
metaAvailable.wait(timeout);
|
||||||
}
|
}
|
||||||
if(getMetaServerConnection(true) == null) {
|
if (getMetaServerConnection(true) == null) {
|
||||||
throw new NotAllMetaRegionsOnlineException(
|
throw new NotAllMetaRegionsOnlineException(
|
||||||
"Timed out (" + timeout + "ms)");
|
"Timed out (" + timeout + "ms)");
|
||||||
}
|
}
|
||||||
|
@ -336,7 +338,6 @@ public class CatalogTracker {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void setMetaLocation(HServerAddress metaLocation) {
|
private void setMetaLocation(HServerAddress metaLocation) {
|
||||||
LOG.info("Found new META location, " + metaLocation);
|
|
||||||
metaAvailable.set(true);
|
metaAvailable.set(true);
|
||||||
this.metaLocation = metaLocation;
|
this.metaLocation = metaLocation;
|
||||||
// no synchronization because these are private and already under lock
|
// no synchronization because these are private and already under lock
|
||||||
|
@ -359,23 +360,69 @@ public class CatalogTracker {
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean verifyRegionLocation(HRegionInterface metaServer,
|
private boolean verifyRegionLocation(HRegionInterface metaServer,
|
||||||
byte [] regionName) {
|
byte [] regionName)
|
||||||
|
throws IOException {
|
||||||
|
if (metaServer == null) {
|
||||||
|
LOG.info("Passed metaserver is null");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
Throwable t = null;
|
Throwable t = null;
|
||||||
try {
|
try {
|
||||||
|
// Am expecting only two possible exceptions here; unable
|
||||||
|
// to connect to the regionserver or NotServingRegionException wrapped
|
||||||
|
// in the hadoop rpc RemoteException.
|
||||||
return metaServer.getRegionInfo(regionName) != null;
|
return metaServer.getRegionInfo(regionName) != null;
|
||||||
} catch (NotServingRegionException e) {
|
} catch (ConnectException e) {
|
||||||
t = e;
|
t = e;
|
||||||
} catch (UndeclaredThrowableException e) {
|
} catch (RemoteException e) {
|
||||||
// We can get a ConnectException wrapped by a UTE if client fails connect
|
IOException ioe = e.unwrapRemoteException();
|
||||||
// If not a ConnectException, rethrow.
|
if (ioe instanceof NotServingRegionException) {
|
||||||
if (!(e.getCause() instanceof ConnectException)) throw e;
|
t = ioe;
|
||||||
t = e.getCause();
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
LOG.info("Failed verification of " + Bytes.toString(regionName) +
|
LOG.info("Failed verification of " + Bytes.toString(regionName) +
|
||||||
": " + t.getMessage());
|
", assigning anew: " + t);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify <code>-ROOT-</code> is deployed and accessible.
|
||||||
|
* @param timeout How long to wait on zk for root address (passed through to
|
||||||
|
* the internal call to {@link #waitForRootServerConnection(long)}.
|
||||||
|
* @return True if the <code>-ROOT-</code> location is healthy.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
public boolean verifyRootRegionLocation(final long timeout)
|
||||||
|
throws InterruptedException, IOException {
|
||||||
|
HRegionInterface connection = null;
|
||||||
|
try {
|
||||||
|
connection = waitForRootServerConnection(timeout);
|
||||||
|
} catch (NotAllMetaRegionsOnlineException e) {
|
||||||
|
// Pass
|
||||||
|
} catch (IOException e) {
|
||||||
|
// Unexpected exception
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
return (connection == null)? false:
|
||||||
|
verifyRegionLocation(connection, HRegionInfo.ROOT_REGIONINFO.getRegionName());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify <code>.META.</code> is deployed and accessible.
|
||||||
|
* @param timeout How long to wait on zk for <code>.META.</code> address
|
||||||
|
* (passed through to the internal call to {@link #waitForMetaServerConnection(long)}.
|
||||||
|
* @return True if the <code>.META.</code> location is healthy.
|
||||||
|
* @throws IOException Some unexpected IOE.
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
public boolean verifyMetaRegionLocation(final long timeout)
|
||||||
|
throws InterruptedException, IOException {
|
||||||
|
return getMetaServerConnection(true) != null;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if <code>hsi</code> was carrying <code>-ROOT-</code> or
|
* Check if <code>hsi</code> was carrying <code>-ROOT-</code> or
|
||||||
* <code>.META.</code> and if so, clear out old locations.
|
* <code>.META.</code> and if so, clear out old locations.
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.apache.hadoop.hbase.ipc;
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.ConnectException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.NavigableSet;
|
import java.util.NavigableSet;
|
||||||
|
|
||||||
|
@ -38,6 +39,7 @@ import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Clients interact with HRegionServers using a handle to the HRegionInterface.
|
* Clients interact with HRegionServers using a handle to the HRegionInterface.
|
||||||
|
@ -51,10 +53,12 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion, Stoppable, Ab
|
||||||
*
|
*
|
||||||
* @param regionName name of the region
|
* @param regionName name of the region
|
||||||
* @return HRegionInfo object for region
|
* @return HRegionInfo object for region
|
||||||
* @throws NotServingRegionException e
|
* @throws NotServingRegionException
|
||||||
|
* @throws ConnectException
|
||||||
|
* @throws IOException This can manifest as an Hadoop ipc {@link RemoteException}
|
||||||
*/
|
*/
|
||||||
public HRegionInfo getRegionInfo(final byte [] regionName)
|
public HRegionInfo getRegionInfo(final byte [] regionName)
|
||||||
throws NotServingRegionException;
|
throws NotServingRegionException, ConnectException, IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return all the data for the row that matches <i>row</i> exactly,
|
* Return all the data for the row that matches <i>row</i> exactly,
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
/*
|
/*
|
||||||
* Copyright 20010 The Apache Software Foundation
|
* Copyright 2010 The Apache Software Foundation
|
||||||
*
|
*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
|
|
@ -160,7 +160,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Handle failover. Restore state from META and ZK. Handle any regions in
|
* Handle failover. Restore state from META and ZK. Handle any regions in
|
||||||
* transition.
|
* transition. Presumes <code>.META.</code> and <code>-ROOT-</code> deployed.
|
||||||
* @throws KeeperException
|
* @throws KeeperException
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
|
@ -170,6 +170,12 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
// synchronized. The presumption is that in this case it is safe since this
|
// synchronized. The presumption is that in this case it is safe since this
|
||||||
// method is being played by a single thread on startup.
|
// method is being played by a single thread on startup.
|
||||||
|
|
||||||
|
// TODO: Check list of user regions and their assignments against regionservers.
|
||||||
|
// TODO: Regions that have a null location and are not in regionsInTransitions
|
||||||
|
// need to be handled.
|
||||||
|
// TODO: Regions that are on servers that are not in our online list need
|
||||||
|
// reassigning.
|
||||||
|
|
||||||
// Scan META to build list of existing regions, servers, and assignment
|
// Scan META to build list of existing regions, servers, and assignment
|
||||||
rebuildUserRegions();
|
rebuildUserRegions();
|
||||||
// Pickup any disabled tables
|
// Pickup any disabled tables
|
||||||
|
@ -183,46 +189,90 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
}
|
}
|
||||||
LOG.info("Failed-over master needs to process " + nodes.size() +
|
LOG.info("Failed-over master needs to process " + nodes.size() +
|
||||||
" regions in transition");
|
" regions in transition");
|
||||||
for (String regionName: nodes) {
|
for (String encodedRegionName: nodes) {
|
||||||
RegionTransitionData data = ZKAssign.getData(watcher, regionName);
|
processRegionInTransition(encodedRegionName, null);
|
||||||
HRegionInfo regionInfo =
|
}
|
||||||
MetaReader.getRegion(catalogTracker, data.getRegionName()).getFirst();
|
}
|
||||||
String encodedName = regionInfo.getEncodedName();
|
|
||||||
switch(data.getEventType()) {
|
|
||||||
case RS_ZK_REGION_CLOSING:
|
|
||||||
// Just insert region into RIT.
|
|
||||||
// If this never updates the timeout will trigger new assignment
|
|
||||||
regionsInTransition.put(encodedName,
|
|
||||||
new RegionState(regionInfo, RegionState.State.CLOSING,
|
|
||||||
data.getStamp()));
|
|
||||||
break;
|
|
||||||
|
|
||||||
case RS_ZK_REGION_CLOSED:
|
/**
|
||||||
// Region is closed, insert into RIT and handle it
|
* If region is up in zk in transition, then do fixup and block and wait until
|
||||||
regionsInTransition.put(encodedName,
|
* the region is assigned and out of transition. Used on startup for
|
||||||
new RegionState(regionInfo, RegionState.State.CLOSED,
|
* catalog regions.
|
||||||
data.getStamp()));
|
* @param hri Region to look for.
|
||||||
new ClosedRegionHandler(master, this, data, regionInfo).process();
|
* @return True if we processed a region in transition else false if region
|
||||||
break;
|
* was not up in zk in transition.
|
||||||
|
* @throws InterruptedException
|
||||||
case RS_ZK_REGION_OPENING:
|
* @throws KeeperException
|
||||||
// Just insert region into RIT
|
* @throws IOException
|
||||||
// If this never updates the timeout will trigger new assignment
|
*/
|
||||||
regionsInTransition.put(encodedName,
|
boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
|
||||||
new RegionState(regionInfo, RegionState.State.OPENING,
|
throws InterruptedException, KeeperException, IOException {
|
||||||
data.getStamp()));
|
boolean intransistion = processRegionInTransition(hri.getEncodedName(), hri);
|
||||||
break;
|
if (!intransistion) return intransistion;
|
||||||
|
synchronized(this.regionsInTransition) {
|
||||||
case RS_ZK_REGION_OPENED:
|
while (!this.master.isStopped() &&
|
||||||
// Region is opened, insert into RIT and handle it
|
this.regionsInTransition.containsKey(hri.getEncodedName())) {
|
||||||
regionsInTransition.put(encodedName,
|
this.regionsInTransition.wait();
|
||||||
new RegionState(regionInfo, RegionState.State.OPENING,
|
|
||||||
data.getStamp()));
|
|
||||||
new OpenedRegionHandler(master, this, data, regionInfo,
|
|
||||||
serverManager.getServerInfo(data.getServerName())).process();
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return intransistion;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process failover of <code>encodedName</code>. Look in
|
||||||
|
* @param encodedRegionName Region to process failover for.
|
||||||
|
* @param encodedRegionName RegionInfo. If null we'll go get it from meta table.
|
||||||
|
* @return
|
||||||
|
* @throws KeeperException
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
boolean processRegionInTransition(final String encodedRegionName,
|
||||||
|
final HRegionInfo regionInfo)
|
||||||
|
throws KeeperException, IOException {
|
||||||
|
RegionTransitionData data = ZKAssign.getData(watcher, encodedRegionName);
|
||||||
|
if (data == null) return false;
|
||||||
|
HRegionInfo hri = (regionInfo != null)? regionInfo:
|
||||||
|
MetaReader.getRegion(catalogTracker, data.getRegionName()).getFirst();
|
||||||
|
processRegionsInTransition(data, hri);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void processRegionsInTransition(final RegionTransitionData data,
|
||||||
|
final HRegionInfo regionInfo)
|
||||||
|
throws KeeperException {
|
||||||
|
String encodedRegionName = regionInfo.getEncodedName();
|
||||||
|
LOG.info("Processing region " + regionInfo.getRegionNameAsString() +
|
||||||
|
" in state " + data.getEventType());
|
||||||
|
switch (data.getEventType()) {
|
||||||
|
case RS_ZK_REGION_CLOSING:
|
||||||
|
// Just insert region into RIT.
|
||||||
|
// If this never updates the timeout will trigger new assignment
|
||||||
|
regionsInTransition.put(encodedRegionName, new RegionState(
|
||||||
|
regionInfo, RegionState.State.CLOSING, data.getStamp()));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case RS_ZK_REGION_CLOSED:
|
||||||
|
// Region is closed, insert into RIT and handle it
|
||||||
|
regionsInTransition.put(encodedRegionName, new RegionState(
|
||||||
|
regionInfo, RegionState.State.CLOSED, data.getStamp()));
|
||||||
|
new ClosedRegionHandler(master, this, data, regionInfo).process();
|
||||||
|
break;
|
||||||
|
|
||||||
|
case RS_ZK_REGION_OPENING:
|
||||||
|
// Just insert region into RIT
|
||||||
|
// If this never updates the timeout will trigger new assignment
|
||||||
|
regionsInTransition.put(encodedRegionName, new RegionState(
|
||||||
|
regionInfo, RegionState.State.OPENING, data.getStamp()));
|
||||||
|
break;
|
||||||
|
|
||||||
|
case RS_ZK_REGION_OPENED:
|
||||||
|
// Region is opened, insert into RIT and handle it
|
||||||
|
regionsInTransition.put(encodedRegionName, new RegionState(
|
||||||
|
regionInfo, RegionState.State.OPENING, data.getStamp()));
|
||||||
|
new OpenedRegionHandler(master, this, data, regionInfo,
|
||||||
|
serverManager.getServerInfo(data.getServerName())).process();
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -752,11 +802,11 @@ public class AssignmentManager extends ZooKeeperListener {
|
||||||
private void rebuildUserRegions() throws IOException {
|
private void rebuildUserRegions() throws IOException {
|
||||||
Map<HRegionInfo,HServerAddress> allRegions =
|
Map<HRegionInfo,HServerAddress> allRegions =
|
||||||
MetaReader.fullScan(catalogTracker);
|
MetaReader.fullScan(catalogTracker);
|
||||||
for (Map.Entry<HRegionInfo,HServerAddress> region : allRegions.entrySet()) {
|
for (Map.Entry<HRegionInfo,HServerAddress> region: allRegions.entrySet()) {
|
||||||
HServerAddress regionLocation = region.getValue();
|
HServerAddress regionLocation = region.getValue();
|
||||||
HRegionInfo regionInfo = region.getKey();
|
HRegionInfo regionInfo = region.getKey();
|
||||||
if (regionLocation == null) {
|
if (regionLocation == null) {
|
||||||
regions.put(regionInfo, null);
|
this.regions.put(regionInfo, null);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
HServerInfo serverInfo = serverManager.getHServerInfo(regionLocation);
|
HServerInfo serverInfo = serverManager.getHServerInfo(regionLocation);
|
||||||
|
|
|
@ -145,10 +145,6 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||||
// Cluster status zk tracker and local setter
|
// Cluster status zk tracker and local setter
|
||||||
private ClusterStatusTracker clusterStatusTracker;
|
private ClusterStatusTracker clusterStatusTracker;
|
||||||
|
|
||||||
// True if this a cluster startup where there are no already running servers
|
|
||||||
// as opposed to a master joining an already running cluster
|
|
||||||
boolean freshClusterStartup;
|
|
||||||
|
|
||||||
// This flag is for stopping this Master instance. Its set when we are
|
// This flag is for stopping this Master instance. Its set when we are
|
||||||
// stopping or aborting
|
// stopping or aborting
|
||||||
private volatile boolean stopped = false;
|
private volatile boolean stopped = false;
|
||||||
|
@ -170,8 +166,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||||
*
|
*
|
||||||
* <ol>
|
* <ol>
|
||||||
* <li>Initialize HMaster RPC and address
|
* <li>Initialize HMaster RPC and address
|
||||||
* <li>Connect to ZooKeeper and figure out if this is a fresh cluster start or
|
* <li>Connect to ZooKeeper. Get count of regionservers still up.
|
||||||
* a failed over master
|
|
||||||
* <li>Block until becoming active master
|
* <li>Block until becoming active master
|
||||||
* <li>Initialize master components - server manager, region manager,
|
* <li>Initialize master components - server manager, region manager,
|
||||||
* region server queue, file system manager, etc
|
* region server queue, file system manager, etc
|
||||||
|
@ -209,7 +204,12 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||||
this.zooKeeper = new ZooKeeperWatcher(conf, MASTER, this);
|
this.zooKeeper = new ZooKeeperWatcher(conf, MASTER, this);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* 2. Block on becoming the active master.
|
* 2. Count of regoinservers that are up.
|
||||||
|
*/
|
||||||
|
int count = ZKUtil.getNumberOfChildren(zooKeeper, zooKeeper.rsZNode);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* 3. Block on becoming the active master.
|
||||||
* We race with other masters to write our address into ZooKeeper. If we
|
* We race with other masters to write our address into ZooKeeper. If we
|
||||||
* succeed, we are the primary/active master and finish initialization.
|
* succeed, we are the primary/active master and finish initialization.
|
||||||
*
|
*
|
||||||
|
@ -222,16 +222,6 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||||
stallIfBackupMaster(this.conf, this.activeMasterManager);
|
stallIfBackupMaster(this.conf, this.activeMasterManager);
|
||||||
activeMasterManager.blockUntilBecomingActiveMaster();
|
activeMasterManager.blockUntilBecomingActiveMaster();
|
||||||
|
|
||||||
/*
|
|
||||||
* 3. Determine if this is a fresh cluster startup or failed over master.
|
|
||||||
* This is done by checking for the existence of any ephemeral
|
|
||||||
* RegionServer nodes in ZooKeeper. These nodes are created by RSs on
|
|
||||||
* their initialization but initialization will not happen unless clusterup
|
|
||||||
* flag is set -- see ClusterStatusTracker below.
|
|
||||||
*/
|
|
||||||
this.freshClusterStartup =
|
|
||||||
0 == ZKUtil.getNumberOfChildren(zooKeeper, zooKeeper.rsZNode);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* 4. We are active master now... go initialize components we need to run.
|
* 4. We are active master now... go initialize components we need to run.
|
||||||
* Note, there may be dross in zk from previous runs; it'll get addressed
|
* Note, there may be dross in zk from previous runs; it'll get addressed
|
||||||
|
@ -242,7 +232,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||||
this.connection = HConnectionManager.getConnection(conf);
|
this.connection = HConnectionManager.getConnection(conf);
|
||||||
this.executorService = new ExecutorService(getServerName());
|
this.executorService = new ExecutorService(getServerName());
|
||||||
|
|
||||||
this.serverManager = new ServerManager(this, this, this.freshClusterStartup);
|
this.serverManager = new ServerManager(this, this);
|
||||||
|
|
||||||
this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection,
|
this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection,
|
||||||
this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE));
|
this, conf.getInt("hbase.master.catalog.timeout", Integer.MAX_VALUE));
|
||||||
|
@ -259,16 +249,20 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||||
// Set the cluster as up. If new RSs, they'll be waiting on this before
|
// Set the cluster as up. If new RSs, they'll be waiting on this before
|
||||||
// going ahead with their startup.
|
// going ahead with their startup.
|
||||||
this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this);
|
this.clusterStatusTracker = new ClusterStatusTracker(getZooKeeper(), this);
|
||||||
this.clusterStatusTracker.setClusterUp();
|
boolean wasUp = this.clusterStatusTracker.isClusterUp();
|
||||||
|
if (!wasUp) this.clusterStatusTracker.setClusterUp();
|
||||||
this.clusterStatusTracker.start();
|
this.clusterStatusTracker.start();
|
||||||
|
|
||||||
LOG.info("Server active/primary master; " + this.address +
|
LOG.info("Server active/primary master; " + this.address +
|
||||||
"; freshClusterStart=" + this.freshClusterStartup + ", sessionid=0x" +
|
", sessionid=0x" +
|
||||||
Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()));
|
Long.toHexString(this.zooKeeper.getZooKeeper().getSessionId()) +
|
||||||
|
", ephemeral nodes still up in zk=" + count +
|
||||||
|
", cluster-up flag was=" + wasUp);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Stall startup if we are designated a backup master.
|
* Stall startup if we are designated a backup master; i.e. we want someone
|
||||||
|
* else to become the master before proceeding.
|
||||||
* @param c
|
* @param c
|
||||||
* @param amm
|
* @param amm
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
|
@ -290,27 +284,42 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Main processing loop for the HMaster.
|
* Main processing loop for the HMaster.
|
||||||
* 1. Handle both fresh cluster start as well as failed over initialization of
|
* <ol>
|
||||||
* the HMaster.
|
* <li>Handle both fresh cluster start as well as failed over initialization of
|
||||||
* 2. Start the necessary services
|
* the HMaster</li>
|
||||||
* 3. Reassign the root region
|
* <li>Start the necessary services</li>
|
||||||
* 4. The master is no longer closed - set "closed" to false
|
* <li>Reassign the root region</li>
|
||||||
|
* <li>The master is no longer closed - set "closed" to false</li>
|
||||||
|
* </ol>
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
try {
|
try {
|
||||||
// start up all service threads.
|
// start up all service threads.
|
||||||
startServiceThreads();
|
startServiceThreads();
|
||||||
// Wait for minimum number of region servers to report in
|
|
||||||
this.serverManager.waitForRegionServers();
|
|
||||||
|
|
||||||
// Start assignment of user regions, startup or failure
|
// Wait for region servers to report in. Returns count of regions.
|
||||||
if (this.freshClusterStartup) {
|
int regionCount = this.serverManager.waitForRegionServers();
|
||||||
clusterStarterInitializations(this.fileSystemManager,
|
|
||||||
this.serverManager, this.catalogTracker, this.assignmentManager);
|
// TODO: Should do this in background rather than block master startup
|
||||||
|
// TODO: Do we want to do this before/while/after RSs check in?
|
||||||
|
// It seems that this method looks at active RSs but happens concurrently
|
||||||
|
// with when we expect them to be checking in
|
||||||
|
this.fileSystemManager.
|
||||||
|
splitLogAfterStartup(this.serverManager.getOnlineServers());
|
||||||
|
|
||||||
|
// Make sure root and meta assigned before proceeding.
|
||||||
|
assignRootAndMeta();
|
||||||
|
|
||||||
|
// Is this fresh start with no regions assigned or are we a master joining
|
||||||
|
// an already-running cluster? If regionsCount == 0, then for sure a
|
||||||
|
// fresh start. TOOD: Be fancier. If regionsCount == 2, perhaps the
|
||||||
|
// 2 are .META. and -ROOT- and we should fall into the fresh startup
|
||||||
|
// branch below. For now, do processFailover.
|
||||||
|
if (regionCount == 0) {
|
||||||
|
this.assignmentManager.cleanoutUnassigned();
|
||||||
|
this.assignmentManager.assignAllUserRegions();
|
||||||
} else {
|
} else {
|
||||||
// Process existing unassigned nodes in ZK, read all regions from META,
|
|
||||||
// rebuild in-memory state.
|
|
||||||
this.assignmentManager.processFailover();
|
this.assignmentManager.processFailover();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -343,36 +352,42 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
||||||
LOG.info("HMaster main thread exiting");
|
LOG.info("HMaster main thread exiting");
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* Initializations we need to do if we are cluster starter.
|
* Check <code>-ROOT-</code> and <code>.META.</code> are assigned. If not,
|
||||||
* @param mfs
|
* assign them.
|
||||||
* @param sm
|
* @throws InterruptedException
|
||||||
* @param ct
|
|
||||||
* @param am
|
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
* @throws KeeperException
|
||||||
|
* @return Count of regions we assigned.
|
||||||
*/
|
*/
|
||||||
private static void clusterStarterInitializations(final MasterFileSystem mfs,
|
int assignRootAndMeta()
|
||||||
final ServerManager sm, final CatalogTracker ct, final AssignmentManager am)
|
throws InterruptedException, IOException, KeeperException {
|
||||||
throws IOException, InterruptedException, KeeperException {
|
int assigned = 0;
|
||||||
// Check filesystem has required basics
|
long timeout = this.conf.getLong("hbase.catalog.verification.timeout", 1000);
|
||||||
mfs.initialize();
|
|
||||||
// TODO: Should do this in background rather than block master startup
|
// Work on ROOT region. Is it in zk in transition?
|
||||||
// TODO: Do we want to do this before/while/after RSs check in?
|
boolean rit = this.assignmentManager.
|
||||||
// It seems that this method looks at active RSs but happens
|
processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.ROOT_REGIONINFO);
|
||||||
// concurrently with when we expect them to be checking in
|
if (!catalogTracker.verifyRootRegionLocation(timeout)) {
|
||||||
mfs.splitLogAfterStartup(sm.getOnlineServers());
|
this.assignmentManager.assignRoot();
|
||||||
// Clean out current state of unassigned
|
this.catalogTracker.waitForRoot();
|
||||||
am.cleanoutUnassigned();
|
assigned++;
|
||||||
// assign the root region
|
}
|
||||||
am.assignRoot();
|
LOG.info("-ROOT- assigned=" + assigned + ", rit=" + rit);
|
||||||
ct.waitForRoot();
|
|
||||||
// assign the meta region
|
// Work on meta region
|
||||||
am.assignMeta();
|
rit = this.assignmentManager.
|
||||||
ct.waitForMeta();
|
processRegionInTransitionAndBlockUntilAssigned(HRegionInfo.FIRST_META_REGIONINFO);
|
||||||
// above check waits for general meta availability but this does not
|
if (!this.catalogTracker.verifyMetaRegionLocation(timeout)) {
|
||||||
|
this.assignmentManager.assignMeta();
|
||||||
|
this.catalogTracker.waitForMeta();
|
||||||
|
// Above check waits for general meta availability but this does not
|
||||||
// guarantee that the transition has completed
|
// guarantee that the transition has completed
|
||||||
am.waitForAssignment(HRegionInfo.FIRST_META_REGIONINFO);
|
this.assignmentManager.waitForAssignment(HRegionInfo.FIRST_META_REGIONINFO);
|
||||||
am.assignAllUserRegions();
|
assigned++;
|
||||||
|
}
|
||||||
|
LOG.info(".META. assigned=" + assigned + ", rit=" + rit);
|
||||||
|
return assigned;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -81,6 +81,7 @@ public class MasterFileSystem {
|
||||||
this.fs = FileSystem.get(conf);
|
this.fs = FileSystem.get(conf);
|
||||||
// set up the archived logs path
|
// set up the archived logs path
|
||||||
this.oldLogDir = new Path(this.rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
|
this.oldLogDir = new Path(this.rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
|
||||||
|
createInitialFileSystemLayout();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -91,8 +92,9 @@ public class MasterFileSystem {
|
||||||
* </li>
|
* </li>
|
||||||
* <li>Create a log archive directory for RS to put archived logs</li>
|
* <li>Create a log archive directory for RS to put archived logs</li>
|
||||||
* </ol>
|
* </ol>
|
||||||
|
* Idempotent.
|
||||||
*/
|
*/
|
||||||
public void initialize() throws IOException {
|
private void createInitialFileSystemLayout() throws IOException {
|
||||||
// check if the root directory exists
|
// check if the root directory exists
|
||||||
checkRootDir(this.rootdir, conf, this.fs);
|
checkRootDir(this.rootdir, conf, this.fs);
|
||||||
|
|
||||||
|
|
|
@ -84,7 +84,6 @@ public class ServerManager {
|
||||||
|
|
||||||
private final Server master;
|
private final Server master;
|
||||||
private final MasterServices services;
|
private final MasterServices services;
|
||||||
private final boolean freshClusterStartup;
|
|
||||||
|
|
||||||
private final ServerMonitor serverMonitorThread;
|
private final ServerMonitor serverMonitorThread;
|
||||||
|
|
||||||
|
@ -123,11 +122,9 @@ public class ServerManager {
|
||||||
* @param freshClusterStartup True if we are original master on a fresh
|
* @param freshClusterStartup True if we are original master on a fresh
|
||||||
* cluster startup else if false, we are joining an already running cluster.
|
* cluster startup else if false, we are joining an already running cluster.
|
||||||
*/
|
*/
|
||||||
public ServerManager(final Server master, final MasterServices services,
|
public ServerManager(final Server master, final MasterServices services) {
|
||||||
final boolean freshClusterStartup) {
|
|
||||||
this.master = master;
|
this.master = master;
|
||||||
this.services = services;
|
this.services = services;
|
||||||
this.freshClusterStartup = freshClusterStartup;
|
|
||||||
Configuration c = master.getConfiguration();
|
Configuration c = master.getConfiguration();
|
||||||
int monitorInterval = c.getInt("hbase.master.monitor.interval", 60 * 1000);
|
int monitorInterval = c.getInt("hbase.master.monitor.interval", 60 * 1000);
|
||||||
this.metrics = new MasterMetrics(master.getServerName());
|
this.metrics = new MasterMetrics(master.getServerName());
|
||||||
|
@ -170,8 +167,7 @@ public class ServerManager {
|
||||||
throw new PleaseHoldException(message);
|
throw new PleaseHoldException(message);
|
||||||
}
|
}
|
||||||
checkIsDead(info.getServerName(), "STARTUP");
|
checkIsDead(info.getServerName(), "STARTUP");
|
||||||
LOG.info("Received start message from: " + info.getServerName());
|
recordNewServer(info, false, null);
|
||||||
recordNewServer(info);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private HServerInfo haveServerWithSameHostAndPortAlready(final String hostnamePort) {
|
private HServerInfo haveServerWithSameHostAndPortAlready(final String hostnamePort) {
|
||||||
|
@ -200,27 +196,25 @@ public class ServerManager {
|
||||||
throw new YouAreDeadException(message);
|
throw new YouAreDeadException(message);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Adds the HSI to the RS list and creates an empty load
|
|
||||||
* @param info The region server informations
|
|
||||||
*/
|
|
||||||
public void recordNewServer(HServerInfo info) {
|
|
||||||
recordNewServer(info, false, null);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds the HSI to the RS list
|
* Adds the HSI to the RS list
|
||||||
* @param info The region server informations
|
* @param info The region server informations
|
||||||
* @param useInfoLoad True if the load from the info should be used
|
* @param useInfoLoad True if the load from the info should be used; e.g.
|
||||||
* like under a master failover
|
* under a master failover
|
||||||
|
* @param hri Region interface. Can be null.
|
||||||
*/
|
*/
|
||||||
void recordNewServer(HServerInfo info, boolean useInfoLoad,
|
void recordNewServer(HServerInfo info, boolean useInfoLoad,
|
||||||
HRegionInterface hri) {
|
HRegionInterface hri) {
|
||||||
HServerLoad load = useInfoLoad ? info.getLoad() : new HServerLoad();
|
HServerLoad load = useInfoLoad? info.getLoad(): new HServerLoad();
|
||||||
String serverName = info.getServerName();
|
String serverName = info.getServerName();
|
||||||
|
LOG.info("Registering server=" + serverName + ", regionCount=" +
|
||||||
|
load.getLoad() + ", userLoad=" + useInfoLoad);
|
||||||
info.setLoad(load);
|
info.setLoad(load);
|
||||||
// TODO: Why did we update the RS location ourself? Shouldn't RS do this?
|
// TODO: Why did we update the RS location ourself? Shouldn't RS do this?
|
||||||
// masterStatus.getZooKeeper().updateRSLocationGetWatch(info, watcher);
|
// masterStatus.getZooKeeper().updateRSLocationGetWatch(info, watcher);
|
||||||
|
// -- If I understand the question, the RS does not update the location
|
||||||
|
// because could be disagreement over locations because of DNS issues; only
|
||||||
|
// master does DNS now -- St.Ack 20100929.
|
||||||
this.onlineServers.put(serverName, info);
|
this.onlineServers.put(serverName, info);
|
||||||
if (hri == null) {
|
if (hri == null) {
|
||||||
serverConnections.remove(serverName);
|
serverConnections.remove(serverName);
|
||||||
|
@ -254,15 +248,16 @@ public class ServerManager {
|
||||||
// If we don't know this server, tell it shutdown.
|
// If we don't know this server, tell it shutdown.
|
||||||
HServerInfo storedInfo = this.onlineServers.get(info.getServerName());
|
HServerInfo storedInfo = this.onlineServers.get(info.getServerName());
|
||||||
if (storedInfo == null) {
|
if (storedInfo == null) {
|
||||||
if (!this.freshClusterStartup) {
|
if (this.deadservers.contains(storedInfo)) {
|
||||||
// If we are joining an existing cluster, then soon as we come up we'll
|
LOG.warn("Report from deadserver " + storedInfo);
|
||||||
// be getting reports from already running regionservers.
|
return HMsg.STOP_REGIONSERVER_ARRAY;
|
||||||
LOG.info("Registering new server: " + info.getServerName());
|
} else {
|
||||||
|
// Just let the server in. Presume master joining a running cluster.
|
||||||
// recordNewServer is what happens at the end of reportServerStartup.
|
// recordNewServer is what happens at the end of reportServerStartup.
|
||||||
// The only thing we are skipping is passing back to the regionserver
|
// The only thing we are skipping is passing back to the regionserver
|
||||||
// the HServerInfo to use. Here we presume a master has already done
|
// the HServerInfo to use. Here we presume a master has already done
|
||||||
// that so we'll press on with whatever it gave us for HSI.
|
// that so we'll press on with whatever it gave us for HSI.
|
||||||
recordNewServer(info);
|
recordNewServer(info, true, null);
|
||||||
// If msgs, put off their processing but this is not enough because
|
// If msgs, put off their processing but this is not enough because
|
||||||
// its possible that the next time the server reports in, we'll still
|
// its possible that the next time the server reports in, we'll still
|
||||||
// not be up and serving. For example, if a split, we'll need the
|
// not be up and serving. For example, if a split, we'll need the
|
||||||
|
@ -271,11 +266,6 @@ public class ServerManager {
|
||||||
if (msgs.length > 0) throw new PleaseHoldException("FIX! Putting off " +
|
if (msgs.length > 0) throw new PleaseHoldException("FIX! Putting off " +
|
||||||
"message processing because not yet rwady but possible we won't be " +
|
"message processing because not yet rwady but possible we won't be " +
|
||||||
"ready next on next report");
|
"ready next on next report");
|
||||||
} else {
|
|
||||||
LOG.warn("Received report from unknown server, a server calling " +
|
|
||||||
" regionServerReport w/o having first called regionServerStartup; " +
|
|
||||||
"telling it " + HMsg.Type.STOP_REGIONSERVER + ": " + info.getServerName());
|
|
||||||
return HMsg.STOP_REGIONSERVER_ARRAY;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -474,7 +464,10 @@ public class ServerManager {
|
||||||
" but server shutdown is already in progress");
|
" but server shutdown is already in progress");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Remove the server from the known servers lists and update load info
|
// Remove the server from the known servers lists and update load info BUT
|
||||||
|
// add to deadservers first; do this so it'll show in dead servers list if
|
||||||
|
// not in online servers list.
|
||||||
|
this.deadservers.add(serverName);
|
||||||
this.onlineServers.remove(serverName);
|
this.onlineServers.remove(serverName);
|
||||||
this.serverConnections.remove(serverName);
|
this.serverConnections.remove(serverName);
|
||||||
// If cluster is going down, yes, servers are going to be expiring; don't
|
// If cluster is going down, yes, servers are going to be expiring; don't
|
||||||
|
@ -488,7 +481,7 @@ public class ServerManager {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
this.services.getExecutorService().submit(new ServerShutdownHandler(this.master,
|
this.services.getExecutorService().submit(new ServerShutdownHandler(this.master,
|
||||||
this.services, deadservers, info));
|
this.services, this.deadservers, info));
|
||||||
LOG.debug("Added=" + serverName +
|
LOG.debug("Added=" + serverName +
|
||||||
" to dead servers, submitted shutdown handler to be executed");
|
" to dead servers, submitted shutdown handler to be executed");
|
||||||
}
|
}
|
||||||
|
@ -554,9 +547,10 @@ public class ServerManager {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Waits for the regionservers to report in.
|
* Waits for the regionservers to report in.
|
||||||
|
* @return Count of regions out on cluster
|
||||||
* @throws InterruptedException
|
* @throws InterruptedException
|
||||||
*/
|
*/
|
||||||
public void waitForRegionServers()
|
public int waitForRegionServers()
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
long interval = this.master.getConfiguration().
|
long interval = this.master.getConfiguration().
|
||||||
getLong("hbase.master.wait.on.regionservers.interval", 3000);
|
getLong("hbase.master.wait.on.regionservers.interval", 3000);
|
||||||
|
@ -574,8 +568,20 @@ public class ServerManager {
|
||||||
}
|
}
|
||||||
oldcount = count;
|
oldcount = count;
|
||||||
}
|
}
|
||||||
|
// Count how many regions deployed out on cluster. If fresh start, it'll
|
||||||
|
// be none but if not a fresh start, we'll have registered servers when
|
||||||
|
// they came in on the {@link #regionServerReport(HServerInfo)} as opposed to
|
||||||
|
// {@link #regionServerStartup(HServerInfo)} and it'll be carrying an
|
||||||
|
// actual server load.
|
||||||
|
int regionCount = 0;
|
||||||
|
for (Map.Entry<String, HServerInfo> e: this.onlineServers.entrySet()) {
|
||||||
|
HServerLoad load = e.getValue().getLoad();
|
||||||
|
if (load != null) regionCount += load.getLoad();
|
||||||
|
}
|
||||||
LOG.info("Exiting wait on regionserver(s) to checkin; count=" + count +
|
LOG.info("Exiting wait on regionserver(s) to checkin; count=" + count +
|
||||||
", stopped=" + this.master.isStopped());
|
", stopped=" + this.master.isStopped() +
|
||||||
|
", count of regions out on cluster=" + regionCount);
|
||||||
|
return regionCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<HServerInfo> getOnlineServersList() {
|
public List<HServerInfo> getOnlineServersList() {
|
||||||
|
|
|
@ -36,11 +36,16 @@ import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.executor.EventHandler;
|
import org.apache.hadoop.hbase.executor.EventHandler;
|
||||||
import org.apache.hadoop.hbase.master.DeadServer;
|
import org.apache.hadoop.hbase.master.DeadServer;
|
||||||
import org.apache.hadoop.hbase.master.MasterServices;
|
import org.apache.hadoop.hbase.master.MasterServices;
|
||||||
|
import org.apache.hadoop.hbase.master.ServerManager;
|
||||||
import org.apache.hadoop.hbase.util.Pair;
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.util.Writables;
|
import org.apache.hadoop.hbase.util.Writables;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process server shutdown.
|
||||||
|
* Server-to-handle must be already in the deadservers lists. See
|
||||||
|
* {@link ServerManager#expireServer(HServerInfo)}.
|
||||||
|
*/
|
||||||
public class ServerShutdownHandler extends EventHandler {
|
public class ServerShutdownHandler extends EventHandler {
|
||||||
private static final Log LOG = LogFactory.getLog(ServerShutdownHandler.class);
|
private static final Log LOG = LogFactory.getLog(ServerShutdownHandler.class);
|
||||||
private final HServerInfo hsi;
|
private final HServerInfo hsi;
|
||||||
|
@ -55,8 +60,9 @@ public class ServerShutdownHandler extends EventHandler {
|
||||||
this.server = server;
|
this.server = server;
|
||||||
this.services = services;
|
this.services = services;
|
||||||
this.deadServers = deadServers;
|
this.deadServers = deadServers;
|
||||||
// Add to dead servers.
|
if (this.deadServers.contains(hsi.getServerName())) {
|
||||||
this.deadServers.add(hsi.getServerName());
|
LOG.warn(hsi.getServerName() + " is NOT in deadservers; it should be!");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -48,7 +48,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import com.google.common.base.Function;
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -57,7 +56,6 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Chore;
|
import org.apache.hadoop.hbase.Chore;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
|
|
||||||
import org.apache.hadoop.hbase.HMsg;
|
import org.apache.hadoop.hbase.HMsg;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HServerAddress;
|
import org.apache.hadoop.hbase.HServerAddress;
|
||||||
|
@ -72,6 +70,7 @@ import org.apache.hadoop.hbase.Stoppable;
|
||||||
import org.apache.hadoop.hbase.UnknownRowLockException;
|
import org.apache.hadoop.hbase.UnknownRowLockException;
|
||||||
import org.apache.hadoop.hbase.UnknownScannerException;
|
import org.apache.hadoop.hbase.UnknownScannerException;
|
||||||
import org.apache.hadoop.hbase.YouAreDeadException;
|
import org.apache.hadoop.hbase.YouAreDeadException;
|
||||||
|
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
|
||||||
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||||
import org.apache.hadoop.hbase.catalog.MetaEditor;
|
import org.apache.hadoop.hbase.catalog.MetaEditor;
|
||||||
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
|
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
|
||||||
|
@ -120,9 +119,10 @@ import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
import org.apache.hadoop.io.MapWritable;
|
import org.apache.hadoop.io.MapWritable;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.net.DNS;
|
import org.apache.hadoop.net.DNS;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
|
import com.google.common.base.Function;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* HRegionServer makes a set of HRegions available to clients. It checks in with
|
* HRegionServer makes a set of HRegions available to clients. It checks in with
|
||||||
* the HMaster. There are many HRegionServers in a single HBase deployment.
|
* the HMaster. There are many HRegionServers in a single HBase deployment.
|
||||||
|
@ -396,10 +396,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
this.abortRequested = false;
|
this.abortRequested = false;
|
||||||
this.stopped = false;
|
this.stopped = false;
|
||||||
|
|
||||||
|
|
||||||
//HRegionInterface,
|
|
||||||
//HBaseRPCErrorHandler, Runnable, Watcher, Stoppable, OnlineRegions
|
|
||||||
|
|
||||||
// Server to handle client requests
|
// Server to handle client requests
|
||||||
this.server = HBaseRPC.getServer(this,
|
this.server = HBaseRPC.getServer(this,
|
||||||
new Class<?>[]{HRegionInterface.class, HBaseRPCErrorHandler.class,
|
new Class<?>[]{HRegionInterface.class, HBaseRPCErrorHandler.class,
|
||||||
|
@ -429,19 +425,32 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Bring up connection to zk ensemble and then wait until a master for this
|
||||||
|
* cluster and then after that, wait until cluster 'up' flag has been set.
|
||||||
|
* This is the order in which master does things.
|
||||||
|
* Finally put up a catalog tracker.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
private void initializeZooKeeper() throws IOException, InterruptedException {
|
private void initializeZooKeeper() throws IOException, InterruptedException {
|
||||||
// open connection to zookeeper and set primary watcher
|
// Open connection to zookeeper and set primary watcher
|
||||||
zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER +
|
zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER +
|
||||||
serverInfo.getServerAddress().getPort(), this);
|
serverInfo.getServerAddress().getPort(), this);
|
||||||
|
|
||||||
|
// Create the master address manager, register with zk, and start it. Then
|
||||||
|
// block until a master is available. No point in starting up if no master
|
||||||
|
// running.
|
||||||
|
this.masterAddressManager = new MasterAddressTracker(zooKeeper, this);
|
||||||
|
this.masterAddressManager.start();
|
||||||
|
this.masterAddressManager.blockUntilAvailable();
|
||||||
|
|
||||||
|
// Wait on cluster being up. Master will set this flag up in zookeeper
|
||||||
|
// when ready.
|
||||||
this.clusterStatusTracker = new ClusterStatusTracker(this.zooKeeper, this);
|
this.clusterStatusTracker = new ClusterStatusTracker(this.zooKeeper, this);
|
||||||
this.clusterStatusTracker.start();
|
this.clusterStatusTracker.start();
|
||||||
this.clusterStatusTracker.blockUntilAvailable();
|
this.clusterStatusTracker.blockUntilAvailable();
|
||||||
|
|
||||||
// create the master address manager, register with zk, and start it
|
|
||||||
masterAddressManager = new MasterAddressTracker(zooKeeper, this);
|
|
||||||
masterAddressManager.start();
|
|
||||||
|
|
||||||
// Create the catalog tracker and start it;
|
// Create the catalog tracker and start it;
|
||||||
this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection,
|
this.catalogTracker = new CatalogTracker(this.zooKeeper, this.connection,
|
||||||
this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE));
|
this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE));
|
||||||
|
|
|
@ -68,6 +68,7 @@ public class Leases extends Thread {
|
||||||
public Leases(final int leasePeriod, final int leaseCheckFrequency) {
|
public Leases(final int leasePeriod, final int leaseCheckFrequency) {
|
||||||
this.leasePeriod = leasePeriod;
|
this.leasePeriod = leasePeriod;
|
||||||
this.leaseCheckFrequency = leaseCheckFrequency;
|
this.leaseCheckFrequency = leaseCheckFrequency;
|
||||||
|
setDaemon(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -32,11 +32,6 @@ import org.apache.zookeeper.KeeperException;
|
||||||
* RegionServers.
|
* RegionServers.
|
||||||
*/
|
*/
|
||||||
public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
|
public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
|
||||||
/**
|
|
||||||
* Pass this if you do not want a timeout.
|
|
||||||
*/
|
|
||||||
public final static long NO_TIMEOUT = -1;
|
|
||||||
|
|
||||||
/** Path of node being tracked */
|
/** Path of node being tracked */
|
||||||
protected final String node;
|
protected final String node;
|
||||||
|
|
||||||
|
@ -94,7 +89,7 @@ public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
|
||||||
*/
|
*/
|
||||||
public synchronized byte [] blockUntilAvailable()
|
public synchronized byte [] blockUntilAvailable()
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
return blockUntilAvailable(NO_TIMEOUT);
|
return blockUntilAvailable(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -102,18 +97,22 @@ public abstract class ZooKeeperNodeTracker extends ZooKeeperListener {
|
||||||
* specified timeout has elapsed.
|
* specified timeout has elapsed.
|
||||||
*
|
*
|
||||||
* @param timeout maximum time to wait for the node data to be available,
|
* @param timeout maximum time to wait for the node data to be available,
|
||||||
* in milliseconds. Pass {@link #NO_TIMEOUT} for no timeout.
|
* n milliseconds. Pass 0 for no timeout.
|
||||||
* @return data of the node
|
* @return data of the node
|
||||||
* @throws InterruptedException if the waiting thread is interrupted
|
* @throws InterruptedException if the waiting thread is interrupted
|
||||||
*/
|
*/
|
||||||
public synchronized byte [] blockUntilAvailable(long timeout)
|
public synchronized byte [] blockUntilAvailable(long timeout)
|
||||||
throws InterruptedException {
|
throws InterruptedException {
|
||||||
if (timeout != NO_TIMEOUT && timeout < 0) throw new IllegalArgumentException();
|
if (timeout < 0) throw new IllegalArgumentException();
|
||||||
|
boolean notimeout = timeout == 0;
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
long remaining = timeout;
|
long remaining = timeout;
|
||||||
while ((remaining == NO_TIMEOUT || remaining > 0) && this.data == null) {
|
while ((notimeout || remaining > 0) && this.data == null) {
|
||||||
if (remaining == NO_TIMEOUT) wait();
|
if (notimeout) {
|
||||||
else wait(remaining);
|
wait();
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
wait(remaining);
|
||||||
remaining = timeout - (System.currentTimeMillis() - startTime);
|
remaining = timeout - (System.currentTimeMillis() - startTime);
|
||||||
}
|
}
|
||||||
return data;
|
return data;
|
||||||
|
|
|
@ -157,7 +157,7 @@
|
||||||
</property>
|
</property>
|
||||||
<property>
|
<property>
|
||||||
<name>hbase.regionserver.msginterval</name>
|
<name>hbase.regionserver.msginterval</name>
|
||||||
<value>5000</value>
|
<value>3000</value>
|
||||||
<description>Interval between messages from the RegionServer to HMaster
|
<description>Interval between messages from the RegionServer to HMaster
|
||||||
in milliseconds.
|
in milliseconds.
|
||||||
</description>
|
</description>
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
package org.apache.hadoop.hbase.catalog;
|
package org.apache.hadoop.hbase.catalog;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.ConnectException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -36,13 +37,13 @@ import org.apache.hadoop.hbase.HServerAddress;
|
||||||
import org.apache.hadoop.hbase.HServerInfo;
|
import org.apache.hadoop.hbase.HServerInfo;
|
||||||
import org.apache.hadoop.hbase.KeyValue;
|
import org.apache.hadoop.hbase.KeyValue;
|
||||||
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
|
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
|
||||||
|
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.HConnection;
|
import org.apache.hadoop.hbase.client.HConnection;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.util.Writables;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
import org.apache.hadoop.util.Progressable;
|
import org.apache.hadoop.util.Progressable;
|
||||||
|
@ -52,6 +53,7 @@ import org.junit.AfterClass;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Matchers;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -100,6 +102,61 @@ public class TestCatalogTracker {
|
||||||
return ct;
|
return ct;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test public void testGetMetaServerConnectionFails()
|
||||||
|
throws IOException, InterruptedException, KeeperException {
|
||||||
|
HConnection connection = Mockito.mock(HConnection.class);
|
||||||
|
ConnectException connectException =
|
||||||
|
new ConnectException("Connection refused");
|
||||||
|
final HRegionInterface implementation =
|
||||||
|
Mockito.mock(HRegionInterface.class);
|
||||||
|
Mockito.when(implementation.get((byte [])Mockito.any(), (Get)Mockito.any())).
|
||||||
|
thenThrow(connectException);
|
||||||
|
Mockito.when(connection.getHRegionConnection((HServerAddress)Matchers.anyObject(), Matchers.anyBoolean())).
|
||||||
|
thenReturn(implementation);
|
||||||
|
Assert.assertNotNull(connection.getHRegionConnection(new HServerAddress(), false));
|
||||||
|
final CatalogTracker ct = constructAndStartCatalogTracker(connection);
|
||||||
|
try {
|
||||||
|
RootLocationEditor.setRootLocation(this.watcher,
|
||||||
|
new HServerAddress("example.com:1234"));
|
||||||
|
Assert.assertFalse(ct.verifyMetaRegionLocation(100));
|
||||||
|
} finally {
|
||||||
|
// Clean out root location or later tests will be confused... they presume
|
||||||
|
// start fresh in zk.
|
||||||
|
RootLocationEditor.deleteRootLocation(this.watcher);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test get of root region fails properly if nothing to connect to.
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
* @throws KeeperException
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testVerifyRootRegionLocationFails()
|
||||||
|
throws IOException, InterruptedException, KeeperException {
|
||||||
|
HConnection connection = Mockito.mock(HConnection.class);
|
||||||
|
ConnectException connectException =
|
||||||
|
new ConnectException("Connection refused");
|
||||||
|
final HRegionInterface implementation =
|
||||||
|
Mockito.mock(HRegionInterface.class);
|
||||||
|
Mockito.when(implementation.getRegionInfo((byte [])Mockito.any())).
|
||||||
|
thenThrow(connectException);
|
||||||
|
Mockito.when(connection.getHRegionConnection((HServerAddress)Matchers.anyObject(), Matchers.anyBoolean())).
|
||||||
|
thenReturn(implementation);
|
||||||
|
Assert.assertNotNull(connection.getHRegionConnection(new HServerAddress(), false));
|
||||||
|
final CatalogTracker ct = constructAndStartCatalogTracker(connection);
|
||||||
|
try {
|
||||||
|
RootLocationEditor.setRootLocation(this.watcher,
|
||||||
|
new HServerAddress("example.com:1234"));
|
||||||
|
Assert.assertFalse(ct.verifyRootRegionLocation(100));
|
||||||
|
} finally {
|
||||||
|
// Clean out root location or later tests will be confused... they presume
|
||||||
|
// start fresh in zk.
|
||||||
|
RootLocationEditor.deleteRootLocation(this.watcher);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test (expected = NotAllMetaRegionsOnlineException.class)
|
@Test (expected = NotAllMetaRegionsOnlineException.class)
|
||||||
public void testTimeoutWaitForRoot()
|
public void testTimeoutWaitForRoot()
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
|
|
Loading…
Reference in New Issue