HBASE-2700 Test of: Handle master failover for regions in transition

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1023953 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Gray 2010-10-18 19:08:50 +00:00
parent 2a6edb9848
commit 5f7c8ca1b2
13 changed files with 1171 additions and 106 deletions

View File

@ -1008,6 +1008,7 @@ Release 0.21.0 - Unreleased
HBASE-3088 TestAvroServer and TestThriftServer broken because use same
table in all tests and tests enable/disable/delete
HBASE-3097 Merge in hbase-1200 doc on bloomfilters into hbase book
HBASE-2700 Test of: Handle master failover for regions in transition
NEW FEATURES
HBASE-1961 HBase EC2 scripts

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
@ -39,6 +40,7 @@ import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.ipc.RemoteException;
/**
* Reads region and assignment information from <code>.META.</code>.
@ -124,7 +126,8 @@ public class MetaReader {
* @return map of regions to their currently assigned server
* @throws IOException
*/
public static Map<HRegionInfo,HServerAddress> fullScan(CatalogTracker catalogTracker)
public static Map<HRegionInfo,HServerAddress> fullScan(
CatalogTracker catalogTracker)
throws IOException {
final Map<HRegionInfo,HServerAddress> regions =
new TreeMap<HRegionInfo,HServerAddress>();
@ -141,6 +144,34 @@ public class MetaReader {
return regions;
}
/**
* Performs a full scan of <code>.META.</code>.
* <p>
* Returns a map of every region to it's currently assigned server, according
* to META. If the region does not have an assignment it will have a null
* value in the map.
* <p>
* Returns HServerInfo which includes server startcode.
*
* @return map of regions to their currently assigned server
* @throws IOException
*/
public static List<Result> fullScanOfResults(
CatalogTracker catalogTracker)
throws IOException {
final List<Result> regions = new ArrayList<Result>();
Visitor v = new Visitor() {
@Override
public boolean visit(Result r) throws IOException {
if (r == null || r.isEmpty()) return true;
regions.add(r);
return true;
}
};
fullScan(catalogTracker, v);
return regions;
}
/**
* Performs a full scan of <code>.META.</code>.
* <p>
@ -214,6 +245,13 @@ public class MetaReader {
} else {
throw e;
}
} catch (RemoteException re) {
if (re.unwrapRemoteException() instanceof NotServingRegionException) {
// Treat this NSRE as unavailable table. Catch and fall through to
// return null below
} else {
throw re;
}
} catch (IOException e) {
if (e.getCause() != null && e.getCause() instanceof IOException &&
e.getCause().getMessage() != null &&
@ -272,6 +310,31 @@ public class MetaReader {
}
}
/**
* @param data A .META. table row.
* @return A pair of the regioninfo and the server info from <code>data</code>
* (or null for server address if no address set in .META.).
* @throws IOException
*/
public static Pair<HRegionInfo, HServerInfo> metaRowToRegionPairWithInfo(
Result data) throws IOException {
HRegionInfo info = Writables.getHRegionInfo(
data.getValue(HConstants.CATALOG_FAMILY,
HConstants.REGIONINFO_QUALIFIER));
final byte[] value = data.getValue(HConstants.CATALOG_FAMILY,
HConstants.SERVER_QUALIFIER);
if (value != null && value.length > 0) {
final long startCode = Bytes.toLong(data.getValue(HConstants.CATALOG_FAMILY,
HConstants.STARTCODE_QUALIFIER));
HServerAddress server = new HServerAddress(Bytes.toString(value));
HServerInfo hsi = new HServerInfo(server, startCode, 0,
server.getHostname());
return new Pair<HRegionInfo,HServerInfo>(info, hsi);
} else {
return new Pair<HRegionInfo, HServerInfo>(info, null);
}
}
/**
* Checks if the specified table exists. Looks at the META table hosted on
* the specified server.

View File

@ -43,7 +43,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NotServingRegionException;
@ -53,11 +52,14 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
import org.apache.hadoop.hbase.client.MetaScanner;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan;
import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
@ -69,6 +71,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -91,13 +94,13 @@ public class AssignmentManager extends ZooKeeperListener {
private TimeoutMonitor timeoutMonitor;
/** Regions currently in transition. */
private final Map<String, RegionState> regionsInTransition =
new TreeMap<String, RegionState>();
private final ConcurrentSkipListMap<String, RegionState> regionsInTransition =
new ConcurrentSkipListMap<String, RegionState>();
/** Plans for region movement. Key is the encoded version of a region name*/
// TODO: When do plans get cleaned out? Ever? In server open and in server
// shutdown processing -- St.Ack
private final ConcurrentNavigableMap<String, RegionPlan> regionPlans =
protected final ConcurrentNavigableMap<String, RegionPlan> regionPlans =
new ConcurrentSkipListMap<String, RegionPlan>();
/** Set of tables that have been disabled. */
@ -184,9 +187,13 @@ public class AssignmentManager extends ZooKeeperListener {
// reassigning.
// Scan META to build list of existing regions, servers, and assignment
rebuildUserRegions();
// Pickup any disabled tables
// Returns servers who have not checked in (assumed dead) and their regions
Map<HServerInfo,List<Pair<HRegionInfo,Result>>> deadServers =
rebuildUserRegions();
// Pickup any disabled tables from ZK
rebuildDisabledTables();
// Process list of dead servers
processDeadServers(deadServers);
// Check existing regions in transition
List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(watcher,
watcher.assignmentZNode);
@ -250,35 +257,44 @@ public class AssignmentManager extends ZooKeeperListener {
String encodedRegionName = regionInfo.getEncodedName();
LOG.info("Processing region " + regionInfo.getRegionNameAsString() +
" in state " + data.getEventType());
switch (data.getEventType()) {
case RS_ZK_REGION_CLOSING:
// Just insert region into RIT.
// If this never updates the timeout will trigger new assignment
regionsInTransition.put(encodedRegionName, new RegionState(
regionInfo, RegionState.State.CLOSING, data.getStamp()));
break;
synchronized (regionsInTransition) {
switch (data.getEventType()) {
case RS_ZK_REGION_CLOSING:
// Just insert region into RIT.
// If this never updates the timeout will trigger new assignment
regionsInTransition.put(encodedRegionName, new RegionState(
regionInfo, RegionState.State.CLOSING, data.getStamp()));
break;
case RS_ZK_REGION_CLOSED:
// Region is closed, insert into RIT and handle it
regionsInTransition.put(encodedRegionName, new RegionState(
regionInfo, RegionState.State.CLOSED, data.getStamp()));
new ClosedRegionHandler(master, this, data, regionInfo).process();
break;
case RS_ZK_REGION_CLOSED:
// Region is closed, insert into RIT and handle it
regionsInTransition.put(encodedRegionName, new RegionState(
regionInfo, RegionState.State.CLOSED, data.getStamp()));
new ClosedRegionHandler(master, this, data, regionInfo).process();
break;
case RS_ZK_REGION_OPENING:
// Just insert region into RIT
// If this never updates the timeout will trigger new assignment
regionsInTransition.put(encodedRegionName, new RegionState(
regionInfo, RegionState.State.OPENING, data.getStamp()));
break;
case M_ZK_REGION_OFFLINE:
// Region is offline, insert into RIT and handle it like a closed
regionsInTransition.put(encodedRegionName, new RegionState(
regionInfo, RegionState.State.OFFLINE, data.getStamp()));
new ClosedRegionHandler(master, this, data, regionInfo).process();
break;
case RS_ZK_REGION_OPENED:
// Region is opened, insert into RIT and handle it
regionsInTransition.put(encodedRegionName, new RegionState(
regionInfo, RegionState.State.OPENING, data.getStamp()));
new OpenedRegionHandler(master, this, data, regionInfo,
serverManager.getServerInfo(data.getServerName())).process();
break;
case RS_ZK_REGION_OPENING:
// Just insert region into RIT
// If this never updates the timeout will trigger new assignment
regionsInTransition.put(encodedRegionName, new RegionState(
regionInfo, RegionState.State.OPENING, data.getStamp()));
break;
case RS_ZK_REGION_OPENED:
// Region is opened, insert into RIT and handle it
regionsInTransition.put(encodedRegionName, new RegionState(
regionInfo, RegionState.State.OPENING, data.getStamp()));
new OpenedRegionHandler(master, this, data, regionInfo,
serverManager.getServerInfo(data.getServerName())).process();
break;
}
}
}
@ -569,6 +585,25 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
public void offlineDisabledRegion(HRegionInfo regionInfo) {
// Disabling so should not be reassigned, just delete the CLOSED node
LOG.debug("Table being disabled so deleting ZK node and removing from " +
"regions in transition, skipping assignment");
try {
if (!ZKAssign.deleteClosedNode(watcher, regionInfo.getEncodedName())) {
// Could also be in OFFLINE mode
ZKAssign.deleteOfflineNode(watcher, regionInfo.getEncodedName());
}
} catch (KeeperException.NoNodeException nne) {
LOG.warn("Tried to delete closed node for " + regionInfo + " but it " +
"does not exist");
return;
} catch (KeeperException e) {
this.master.abort("Error deleting CLOSED node in ZK", e);
}
regionOffline(regionInfo);
}
// Assignment methods
/**
@ -592,6 +627,7 @@ public class AssignmentManager extends ZooKeeperListener {
if (isTableDisabled(tableName)) {
LOG.info("Table " + tableName + " disabled; skipping assign of " +
region.getRegionNameAsString());
offlineDisabledRegion(region);
return;
}
if (this.serverManager.isClusterShutdown()) {
@ -788,6 +824,22 @@ public class AssignmentManager extends ZooKeeperListener {
* @param regionName server to be unassigned
*/
public void unassign(HRegionInfo region) {
unassign(region, false);
}
/**
* Unassigns the specified region.
* <p>
* Updates the RegionState and sends the CLOSE RPC.
* <p>
* If a RegionPlan is already set, it will remain. If this is being used
* to disable a table, be sure to use {@link #disableTable(String)} to ensure
* regions are not onlined after being closed.
*
* @param regionName server to be unassigned
* @param force if region should be closed even if already closing
*/
public void unassign(HRegionInfo region, boolean force) {
LOG.debug("Starting unassignment of region " +
region.getRegionNameAsString() + " (offlining)");
// Check if this region is currently assigned
@ -805,6 +857,11 @@ public class AssignmentManager extends ZooKeeperListener {
if (state == null) {
state = new RegionState(region, RegionState.State.PENDING_CLOSE);
regionsInTransition.put(encodedName, state);
} else if (force && (state.isClosing() || state.isPendingClose())) {
LOG.debug("Attempting to unassign region " +
region.getRegionNameAsString() + " which is already closing but " +
"forcing an additional close");
state.update(RegionState.State.PENDING_CLOSE);
} else {
LOG.debug("Attempting to unassign region " +
region.getRegionNameAsString() + " but it is " +
@ -964,19 +1021,84 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
private void rebuildUserRegions() throws IOException {
Map<HRegionInfo,HServerAddress> allRegions =
MetaReader.fullScan(catalogTracker);
for (Map.Entry<HRegionInfo,HServerAddress> region: allRegions.entrySet()) {
HServerAddress regionLocation = region.getValue();
HRegionInfo regionInfo = region.getKey();
/**
* Rebuild the list of user regions and assignment information.
* <p>
* Returns a map of servers that are not found to be online and the regions
* they were hosting.
* @return map of servers not online to their assigned regions, as stored
* in META
* @throws IOException
*/
private Map<HServerInfo,List<Pair<HRegionInfo,Result>>> rebuildUserRegions()
throws IOException {
// Region assignment from META
List<Result> results = MetaReader.fullScanOfResults(catalogTracker);
// Map of offline servers and their regions to be returned
Map<HServerInfo,List<Pair<HRegionInfo,Result>>> offlineServers =
new TreeMap<HServerInfo,List<Pair<HRegionInfo,Result>>>();
// Iterate regions in META
for (Result result : results) {
Pair<HRegionInfo,HServerInfo> region =
MetaReader.metaRowToRegionPairWithInfo(result);
HServerInfo regionLocation = region.getSecond();
HRegionInfo regionInfo = region.getFirst();
if (regionLocation == null) {
// Region not being served, add to region map with no assignment
// If this needs to be assigned out, it will also be in ZK as RIT
this.regions.put(regionInfo, null);
continue;
} else if (!serverManager.isServerOnline(
regionLocation.getServerName())) {
// Region is located on a server that isn't online
List<Pair<HRegionInfo,Result>> offlineRegions =
offlineServers.get(regionLocation);
if (offlineRegions == null) {
offlineRegions = new ArrayList<Pair<HRegionInfo,Result>>(1);
offlineServers.put(regionLocation, offlineRegions);
}
offlineRegions.add(new Pair<HRegionInfo,Result>(regionInfo, result));
} else {
// Region is being served and on an active server
regions.put(regionInfo, regionLocation);
addToServers(regionLocation, regionInfo);
}
}
return offlineServers;
}
/**
* Processes list of dead servers from result of META scan.
* <p>
* This is used as part of failover to handle RegionServers which failed
* while there was no active master.
* <p>
* Method stubs in-memory data to be as expected by the normal server shutdown
* handler.
*
* @param deadServers
* @throws IOException
* @throws KeeperException
*/
private void processDeadServers(
Map<HServerInfo, List<Pair<HRegionInfo, Result>>> deadServers)
throws IOException, KeeperException {
for (Map.Entry<HServerInfo, List<Pair<HRegionInfo,Result>>> deadServer :
deadServers.entrySet()) {
List<Pair<HRegionInfo,Result>> regions = deadServer.getValue();
for (Pair<HRegionInfo,Result> region : regions) {
HRegionInfo regionInfo = region.getFirst();
Result result = region.getSecond();
// If region was in transition (was in zk) force it offline for reassign
try {
ZKAssign.createOrForceNodeOffline(watcher, regionInfo,
master.getServerName());
} catch (KeeperException.NoNodeException nne) {
// This is fine
}
// Process with existing RS shutdown code
ServerShutdownHandler.processDeadRegion(regionInfo, result, this,
this.catalogTracker);
}
HServerInfo serverInfo = serverManager.getHServerInfo(regionLocation);
regions.put(regionInfo, serverInfo);
addToServers(serverInfo, regionInfo);
}
}
@ -1139,7 +1261,7 @@ public class AssignmentManager extends ZooKeeperListener {
if(!disabledTables.isEmpty()) {
LOG.info("Rebuilt list of " + disabledTables.size() + " disabled " +
"tables from zookeeper");
disabledTables.addAll(disabledTables);
this.disabledTables.addAll(disabledTables);
}
}
}
@ -1200,6 +1322,7 @@ public class AssignmentManager extends ZooKeeperListener {
// If bulkAssign in progress, suspend checks
if (this.bulkAssign) return;
synchronized (regionsInTransition) {
LOG.debug("Checking for timed out RIT");
// Iterate all regions in transition checking for time outs
long now = System.currentTimeMillis();
for (RegionState regionState : regionsInTransition.values()) {
@ -1219,11 +1342,23 @@ public class AssignmentManager extends ZooKeeperListener {
LOG.info("Region has been PENDING_OPEN or OPENING for too " +
"long, reassigning region=" +
regionInfo.getRegionNameAsString());
// TODO: Possible RACE in here if RS is right now sending us an
// OPENED to handle. Otherwise, after our call to assign, which
// forces zk state to OFFLINE, any actions by RS should cause
// it abort its open w/ accompanying LOG.warns coming out of the
// handleRegion method below.
// There could be two cases. No ZK node or ZK in CLOSING.
try {
if (ZKUtil.checkExists(watcher, watcher.assignmentZNode)
!= -1 &&
ZKAssign.transitionNode(watcher, regionInfo,
HMaster.MASTER, EventType.RS_ZK_REGION_OPENING,
EventType.M_ZK_REGION_OFFLINE, -1) == -1) {
LOG.info("Region transitioned out of OPENING so " +
"skipping timeout, region=" +
regionInfo.getRegionNameAsString());
break;
}
} catch (KeeperException ke) {
LOG.error("Unexpected ZK exception timing out CLOSING region",
ke);
break;
}
AssignmentManager.this.setOffline(regionState.getRegion());
regionState.update(RegionState.State.OFFLINE);
assign(regionState.getRegion());
@ -1235,9 +1370,20 @@ public class AssignmentManager extends ZooKeeperListener {
case PENDING_CLOSE:
case CLOSING:
LOG.info("Region has been PENDING_CLOSE or CLOSING for too " +
"long, running unassign again on region=" +
"long, running forced unassign again on region=" +
regionInfo.getRegionNameAsString());
unassign(regionInfo);
try {
if (ZKAssign.deleteClosingNode(watcher,
regionInfo.getEncodedName())) {
unassign(regionInfo, true);
}
} catch (NoNodeException e) {
LOG.debug("Node no longer existed so not forcing another " +
"unassignment");
} catch (KeeperException e) {
LOG.warn("Unexpected ZK exception timing out a region " +
"close", e);
}
break;
}
}
@ -1465,4 +1611,8 @@ public class AssignmentManager extends ZooKeeperListener {
out.writeLong(stamp);
}
}
public void stop() {
this.timeoutMonitor.interrupt();
}
}

View File

@ -149,9 +149,9 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
// Set on abort -- usually failure of our zk session.
private volatile boolean abort = false;
// flag set after we become the active master (used for testing)
protected volatile boolean isActiveMaster = false;
private volatile boolean isActiveMaster = false;
// flag set after we complete initialization once active (used for testing)
protected volatile boolean isInitialized = false;
private volatile boolean isInitialized = false;
// Instance of the hbase executor service.
ExecutorService executorService;
@ -267,13 +267,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
loop();
// Once we break out of here, we are being shutdown
// Stop balancer and meta catalog janitor
if (this.balancerChore != null) {
this.balancerChore.interrupt();
}
if (this.catalogJanitorChore != null) {
this.catalogJanitorChore.interrupt();
}
// Stop chores
stopChores();
// Wait for all the remaining region servers to report in IFF we were
// running a cluster shutdown AND we were NOT aborting.
@ -288,6 +283,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
// Stop services started for both backup and active masters
this.activeMasterManager.stop();
this.catalogTracker.stop();
this.serverManager.stop();
this.assignmentManager.stop();
HConnectionManager.deleteConnection(this.conf, true);
this.zooKeeper.close();
LOG.info("HMaster main thread exiting");
@ -399,6 +396,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
this.catalogJanitorChore =
Threads.setDaemonThreadRunning(new CatalogJanitor(this, this));
LOG.info("Master has completed initialization");
isInitialized = true;
}
@ -569,6 +567,15 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
return Threads.setDaemonThreadRunning(chore);
}
private void stopChores() {
if (this.balancerChore != null) {
this.balancerChore.interrupt();
}
if (this.catalogJanitorChore != null) {
this.catalogJanitorChore.interrupt();
}
}
public MapWritable regionServerStartup(final HServerInfo serverInfo)
throws IOException {
// Set the ip into the passed in serverInfo. Its ip is more than likely

View File

@ -634,4 +634,14 @@ public class ServerManager {
public boolean isClusterShutdown() {
return this.clusterShutdown;
}
/**
* Stop the ServerManager.
* <p>
* Currently just interrupts the ServerMonitor and LogCleaner chores.
*/
public void stop() {
this.serverMonitorThread.interrupt();
this.logCleaner.interrupt();
}
}

View File

@ -91,20 +91,7 @@ public class ClosedRegionHandler extends EventHandler implements TotesHRegionInf
LOG.debug("Handling CLOSED event");
// Check if this table is being disabled or not
if (assignmentManager.isTableOfRegionDisabled(regionInfo.getRegionName())) {
// Disabling so should not be reassigned, just delete the CLOSED node
LOG.debug("Table being disabled so deleting ZK node and removing from " +
"regions in transition, skipping assignment");
try {
ZKAssign.deleteClosedNode(server.getZooKeeper(),
regionInfo.getEncodedName());
} catch (KeeperException.NoNodeException nne) {
LOG.warn("Tried to delete closed node for " + data + " but it does " +
"not exist");
return;
} catch (KeeperException e) {
server.abort("Error deleting CLOSED node in ZK", e);
}
assignmentManager.regionOffline(regionInfo);
assignmentManager.offlineDisabledRegion(regionInfo);
return;
}
// ZK Node is in CLOSED state, assign it.

View File

@ -86,15 +86,22 @@ public class OpenedRegionHandler extends EventHandler implements TotesHRegionInf
public void process() {
LOG.debug("Handling OPENED event for " + this.regionInfo.getEncodedName() +
"; deleting unassigned node");
// TODO: should we check if this table was disabled and get it closed?
// Remove region from in-memory transition and unassigned node from ZK
try {
ZKAssign.deleteOpenedNode(server.getZooKeeper(),
regionInfo.getEncodedName());
} catch (KeeperException e) {
server.abort("Error deleting OPENED node in ZK", e);
server.abort("Error deleting OPENED node in ZK for transition ZK node (" +
regionInfo.getEncodedName() + ")", e);
}
this.assignmentManager.regionOnline(regionInfo, serverInfo);
LOG.debug("Opened region " + regionInfo.getRegionNameAsString());
if (assignmentManager.isTableDisabled(
regionInfo.getTableDesc().getNameAsString())) {
LOG.debug("Opened region " + regionInfo.getRegionNameAsString() + " but "
+ "this table is disabled, triggering close of region");
assignmentManager.unassign(regionInfo);
} else {
LOG.debug("Opened region " + regionInfo.getRegionNameAsString());
}
}
}

View File

@ -30,10 +30,12 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.DeadServer;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.ServerManager;
@ -118,52 +120,62 @@ public class ServerShutdownHandler extends EventHandler {
// We should encounter -ROOT- and .META. first in the Set given how its
// a sorted set.
for (Map.Entry<HRegionInfo, Result> e: hris.entrySet()) {
// If table is not disabled but the region is offlined,
HRegionInfo hri = e.getKey();
boolean disabled = this.services.getAssignmentManager().
isTableDisabled(hri.getTableDesc().getNameAsString());
if (disabled) continue;
if (hri.isOffline() && hri.isSplit()) {
fixupDaughters(hris, e.getValue());
continue;
}
this.services.getAssignmentManager().assign(hri);
processDeadRegion(e.getKey(), e.getValue(),
this.services.getAssignmentManager(),
this.server.getCatalogTracker());
this.services.getAssignmentManager().assign(e.getKey());
}
this.deadServers.remove(serverName);
LOG.info("Finished processing of shutdown of " + serverName);
}
public static void processDeadRegion(HRegionInfo hri, Result result,
AssignmentManager assignmentManager, CatalogTracker catalogTracker)
throws IOException {
// If table is not disabled but the region is offlined,
boolean disabled = assignmentManager.isTableDisabled(
hri.getTableDesc().getNameAsString());
if (disabled) return;
if (hri.isOffline() && hri.isSplit()) {
fixupDaughters(result, assignmentManager, catalogTracker);
return;
}
}
/**
* Check that daughter regions are up in .META. and if not, add them.
* @param hris All regions for this server in meta.
* @param result The contents of the parent row in .META.
* @throws IOException
*/
void fixupDaughters(final NavigableMap<HRegionInfo, Result> hris,
final Result result) throws IOException {
fixupDaughter(hris, result, HConstants.SPLITA_QUALIFIER);
fixupDaughter(hris, result, HConstants.SPLITB_QUALIFIER);
static void fixupDaughters(final Result result,
final AssignmentManager assignmentManager,
final CatalogTracker catalogTracker) throws IOException {
fixupDaughter(result, HConstants.SPLITA_QUALIFIER, assignmentManager,
catalogTracker);
fixupDaughter(result, HConstants.SPLITB_QUALIFIER, assignmentManager,
catalogTracker);
}
/**
* Check individual daughter is up in .META.; fixup if its not.
* @param hris All regions for this server in meta.
* @param result The contents of the parent row in .META.
* @param qualifier Which daughter to check for.
* @throws IOException
*/
void fixupDaughter(final NavigableMap<HRegionInfo, Result> hris,
final Result result, final byte [] qualifier)
static void fixupDaughter(final Result result, final byte [] qualifier,
final AssignmentManager assignmentManager,
final CatalogTracker catalogTracker)
throws IOException {
byte [] bytes = result.getValue(HConstants.CATALOG_FAMILY, qualifier);
if (bytes == null || bytes.length <= 0) return;
HRegionInfo hri = Writables.getHRegionInfo(bytes);
Pair<HRegionInfo, HServerAddress> pair =
MetaReader.getRegion(this.server.getCatalogTracker(), hri.getRegionName());
MetaReader.getRegion(catalogTracker, hri.getRegionName());
if (pair == null || pair.getFirst() == null) {
LOG.info("Fixup; missing daughter " + hri.getEncodedName());
MetaEditor.addDaughter(this.server.getCatalogTracker(), hri, null);
this.services.getAssignmentManager().assign(hri);
MetaEditor.addDaughter(catalogTracker, hri, null);
assignmentManager.assign(hri);
}
}
}

View File

@ -124,7 +124,7 @@ public class JVMClusterUtil {
public void waitForServerOnline() {
// The server is marked online after init begins but before race to become
// the active master.
while (!this.master.isAlive() && !this.master.isStopped()) {
while (!this.master.isMasterRunning() && !this.master.isStopped()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {

View File

@ -19,6 +19,8 @@
*/
package org.apache.hadoop.hbase.zookeeper;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo;
@ -243,6 +245,31 @@ public class ZKAssign {
return deleteNode(zkw, regionName, EventType.RS_ZK_REGION_OPENED);
}
/**
* Deletes an existing unassigned node that is in the OFFLINE state for the
* specified region.
*
* <p>If a node does not already exist for this region, a
* {@link NoNodeException} will be thrown.
*
* <p>No watcher is set whether this succeeds or not.
*
* <p>Returns false if the node was not in the proper state but did exist.
*
* <p>This method is used during master failover when the regions on an RS
* that has died are all set to OFFLINE before being processed.
*
* @param zkw zk reference
* @param region closed region to be deleted from zk
* @throws KeeperException if unexpected zookeeper exception
* @throws KeeperException.NoNodeException if node does not exist
*/
public static boolean deleteOfflineNode(ZooKeeperWatcher zkw,
String regionName)
throws KeeperException, KeeperException.NoNodeException {
return deleteNode(zkw, regionName, EventType.M_ZK_REGION_OFFLINE);
}
/**
* Deletes an existing unassigned node that is in the CLOSED state for the
* specified region.
@ -569,12 +596,13 @@ public class ZKAssign {
* @param zkw zk reference
* @param region region to be transitioned to opened
* @param serverName server event originates from
* @param beginState state the node must currently be in to do transition
* @param endState state to transition node to if all checks pass
* @param beginState state the node must currently be in to do transition
* @param expectedVersion expected version of data before modification, or -1
* @return version of node after transition, -1 if unsuccessful transition
* @throws KeeperException if unexpected zookeeper exception
*/
private static int transitionNode(ZooKeeperWatcher zkw, HRegionInfo region,
public static int transitionNode(ZooKeeperWatcher zkw, HRegionInfo region,
String serverName, EventType beginState, EventType endState,
int expectedVersion)
throws KeeperException {
@ -665,4 +693,39 @@ public class ZKAssign {
}
return RegionTransitionData.fromBytes(data);
}
/**
* Delete the assignment node regardless of its current state.
* <p>
* Fail silent even if the node does not exist at all.
* @param watcher
* @param regionInfo
* @throws KeeperException
*/
public static void deleteNodeFailSilent(ZooKeeperWatcher watcher,
HRegionInfo regionInfo)
throws KeeperException {
String node = getNodeName(watcher, regionInfo.getEncodedName());
ZKUtil.deleteNodeFailSilent(watcher, node);
}
/**
* Blocks until there are no node in regions in transition.
* @param zkw zk reference
* @throws KeeperException
* @throws InterruptedException
*/
public static void blockUntilNoRIT(ZooKeeperWatcher zkw)
throws KeeperException, InterruptedException {
while (ZKUtil.nodeHasChildren(zkw, zkw.assignmentZNode)) {
List<String> znodes =
ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.assignmentZNode);
if (znodes != null && !znodes.isEmpty()) {
for (String znode : znodes) {
LOG.debug("ZK RIT -> " + znode);
}
}
Thread.sleep(200);
}
}
}

View File

@ -763,6 +763,38 @@ public class HBaseTestingUtility {
return count;
}
/**
* Create rows in META for regions of the specified table with the specified
* start keys. The first startKey should be a 0 length byte array if you
* want to form a proper range of regions.
* @param conf
* @param htd
* @param startKeys
* @return list of region info for regions added to meta
* @throws IOException
*/
public List<HRegionInfo> createMultiRegionsInMeta(final Configuration conf,
final HTableDescriptor htd, byte [][] startKeys)
throws IOException {
HTable meta = new HTable(conf, HConstants.META_TABLE_NAME);
Arrays.sort(startKeys, Bytes.BYTES_COMPARATOR);
List<HRegionInfo> newRegions = new ArrayList<HRegionInfo>(startKeys.length);
// add custom ones
int count = 0;
for (int i = 0; i < startKeys.length; i++) {
int j = (i + 1) % startKeys.length;
HRegionInfo hri = new HRegionInfo(htd, startKeys[i], startKeys[j]);
Put put = new Put(hri.getRegionName());
put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
Writables.getBytes(hri));
meta.put(put);
LOG.info("createMultiRegionsInMeta: inserted " + hri.toString());
newRegions.add(hri);
count++;
}
return newRegions;
}
/**
* Returns all rows from the .META. table.
*

View File

@ -613,4 +613,18 @@ public class MiniHBaseCluster {
throws IOException {
((MiniHBaseClusterMaster)getMaster()).addMessage(hrs.getHServerInfo(), msg);
}
/**
* Counts the total numbers of regions being served by the currently online
* region servers by asking each how many regions they have. Does not look
* at META at all. Count includes catalog tables.
* @return number of regions being served by all region servers
*/
public long countServedRegions() {
long count = 0;
for (JVMClusterUtil.RegionServerThread rst : getLiveRegionServerThreads()) {
count += rst.getRegionServer().getNumberOfOnlineRegions();
}
return count;
}
}

View File

@ -20,15 +20,35 @@
package org.apache.hadoop.hbase.master;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKTableDisable;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.Test;
public class TestMasterFailover {
@ -109,4 +129,703 @@ public class TestMasterFailover {
// Stop the cluster
TEST_UTIL.shutdownMiniCluster();
}
/**
* Complex test of master failover that tests as many permutations of the
* different possible states that regions in transition could be in within ZK.
* <p>
* This tests the proper handling of these states by the failed-over master
* and includes a thorough testing of the timeout code as well.
* <p>
* Starts with a single master and three regionservers.
* <p>
* Creates two tables, enabledTable and disabledTable, each containing 5
* regions. The disabledTable is then disabled.
* <p>
* After reaching steady-state, the master is killed. We then mock several
* states in ZK.
* <p>
* After mocking them, we will startup a new master which should become the
* active master and also detect that it is a failover. The primary test
* passing condition will be that all regions of the enabled table are
* assigned and all the regions of the disabled table are not assigned.
* <p>
* The different scenarios to be tested are below:
* <p>
* <b>ZK State: OFFLINE</b>
* <p>A node can get into OFFLINE state if</p>
* <ul>
* <li>An RS fails to open a region, so it reverts the state back to OFFLINE
* <li>The Master is assigning the region to a RS before it sends RPC
* </ul>
* <p>We will mock the scenarios</p>
* <ul>
* <li>Master has assigned an enabled region but RS failed so a region is
* not assigned anywhere and is sitting in ZK as OFFLINE</li>
* <li>This seems to cover both cases?</li>
* </ul>
* <p>
* <b>ZK State: CLOSING</b>
* <p>A node can get into CLOSING state if</p>
* <ul>
* <li>An RS has begun to close a region
* </ul>
* <p>We will mock the scenarios</p>
* <ul>
* <li>Region of enabled table was being closed but did not complete
* <li>Region of disabled table was being closed but did not complete
* </ul>
* <p>
* <b>ZK State: CLOSED</b>
* <p>A node can get into CLOSED state if</p>
* <ul>
* <li>An RS has completed closing a region but not acknowledged by master yet
* </ul>
* <p>We will mock the scenarios</p>
* <ul>
* <li>Region of a table that should be enabled was closed on an RS
* <li>Region of a table that should be disabled was closed on an RS
* </ul>
* <p>
* <b>ZK State: OPENING</b>
* <p>A node can get into OPENING state if</p>
* <ul>
* <li>An RS has begun to open a region
* </ul>
* <p>We will mock the scenarios</p>
* <ul>
* <li>RS was opening a region of enabled table but never finishes
* </ul>
* <p>
* <b>ZK State: OPENED</b>
* <p>A node can get into OPENED state if</p>
* <ul>
* <li>An RS has finished opening a region but not acknowledged by master yet
* </ul>
* <p>We will mock the scenarios</p>
* <ul>
* <li>Region of a table that should be enabled was opened on an RS
* <li>Region of a table that should be disabled was opened on an RS
* </ul>
* @throws Exception
*/
@Test (timeout=180000)
public void testMasterFailoverWithMockedRIT() throws Exception {
final int NUM_MASTERS = 1;
final int NUM_RS = 3;
// Create config to use for this cluster
Configuration conf = HBaseConfiguration.create();
// Need to drop the timeout much lower
conf.setInt("hbase.master.assignment.timeoutmonitor.period", 2000);
conf.setInt("hbase.master.assignment.timeoutmonitor.timeout", 4000);
// Start the cluster
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
log("Cluster started");
// Create a ZKW to use in the test
ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
"unittest", new Abortable() {
@Override
public void abort(String why, Throwable e) {
LOG.error("Fatal ZK Error: " + why, e);
org.junit.Assert.assertFalse("Fatal ZK error", true);
}
});
// get all the master threads
List<MasterThread> masterThreads = cluster.getMasterThreads();
assertEquals(1, masterThreads.size());
// only one master thread, let's wait for it to be initialized
assertTrue(cluster.waitForActiveAndReadyMaster());
HMaster master = masterThreads.get(0).getMaster();
assertTrue(master.isActiveMaster());
assertTrue(master.isInitialized());
// disable load balancing on this master
master.balanceSwitch(false);
// create two tables in META, each with 10 regions
byte [] FAMILY = Bytes.toBytes("family");
byte [][] SPLIT_KEYS = new byte [][] {
new byte[0], Bytes.toBytes("aaa"), Bytes.toBytes("bbb"),
Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
Bytes.toBytes("iii"), Bytes.toBytes("jjj")
};
byte [] enabledTable = Bytes.toBytes("enabledTable");
HTableDescriptor htdEnabled = new HTableDescriptor(enabledTable);
htdEnabled.addFamily(new HColumnDescriptor(FAMILY));
List<HRegionInfo> enabledRegions = TEST_UTIL.createMultiRegionsInMeta(
TEST_UTIL.getConfiguration(), htdEnabled, SPLIT_KEYS);
byte [] disabledTable = Bytes.toBytes("disabledTable");
HTableDescriptor htdDisabled = new HTableDescriptor(disabledTable);
htdDisabled.addFamily(new HColumnDescriptor(FAMILY));
List<HRegionInfo> disabledRegions = TEST_UTIL.createMultiRegionsInMeta(
TEST_UTIL.getConfiguration(), htdDisabled, SPLIT_KEYS);
log("Regions in META have been created");
// at this point we only expect 2 regions to be assigned out (catalogs)
assertEquals(2, cluster.countServedRegions());
// Let's just assign everything to first RS
HRegionServer hrs = cluster.getRegionServer(0);
String serverName = hrs.getServerName();
HServerInfo hsiAlive = hrs.getServerInfo();
// we'll need some regions to already be assigned out properly on live RS
List<HRegionInfo> enabledAndAssignedRegions = new ArrayList<HRegionInfo>();
enabledAndAssignedRegions.add(enabledRegions.remove(0));
enabledAndAssignedRegions.add(enabledRegions.remove(0));
List<HRegionInfo> disabledAndAssignedRegions = new ArrayList<HRegionInfo>();
disabledAndAssignedRegions.add(disabledRegions.remove(0));
disabledAndAssignedRegions.add(disabledRegions.remove(0));
// now actually assign them
for (HRegionInfo hri : enabledAndAssignedRegions) {
master.assignmentManager.regionPlans.put(hri.getEncodedName(),
new RegionPlan(hri, null, hsiAlive));
master.assignRegion(hri);
}
for (HRegionInfo hri : disabledAndAssignedRegions) {
master.assignmentManager.regionPlans.put(hri.getEncodedName(),
new RegionPlan(hri, null, hsiAlive));
master.assignRegion(hri);
}
// wait for no more RIT
log("Waiting for assignment to finish");
ZKAssign.blockUntilNoRIT(zkw);
log("Assignment completed");
// Stop the master
log("Aborting master");
cluster.abortMaster(0);
cluster.waitOnMaster(0);
log("Master has aborted");
/*
* Now, let's start mocking up some weird states as described in the method
* javadoc.
*/
List<HRegionInfo> regionsThatShouldBeOnline = new ArrayList<HRegionInfo>();
List<HRegionInfo> regionsThatShouldBeOffline = new ArrayList<HRegionInfo>();
log("Beginning to mock scenarios");
// Disable the disabledTable in ZK
ZKTableDisable.disableTable(zkw, Bytes.toString(disabledTable));
/*
* ZK = OFFLINE
*/
// Region that should be assigned but is not and is in ZK as OFFLINE
HRegionInfo region = enabledRegions.remove(0);
regionsThatShouldBeOnline.add(region);
ZKAssign.createNodeOffline(zkw, region, serverName);
/*
* ZK = CLOSING
*/
// Region of enabled table being closed but not complete
// Region is already assigned, don't say anything to RS but set ZK closing
region = enabledAndAssignedRegions.remove(0);
regionsThatShouldBeOnline.add(region);
ZKAssign.createNodeClosing(zkw, region, serverName);
// Region of disabled table being closed but not complete
// Region is already assigned, don't say anything to RS but set ZK closing
region = disabledAndAssignedRegions.remove(0);
regionsThatShouldBeOffline.add(region);
ZKAssign.createNodeClosing(zkw, region, serverName);
/*
* ZK = CLOSED
*/
// Region of enabled table closed but not ack
region = enabledRegions.remove(0);
regionsThatShouldBeOnline.add(region);
int version = ZKAssign.createNodeClosing(zkw, region, serverName);
ZKAssign.transitionNodeClosed(zkw, region, serverName, version);
// Region of disabled table closed but not ack
region = disabledRegions.remove(0);
regionsThatShouldBeOffline.add(region);
version = ZKAssign.createNodeClosing(zkw, region, serverName);
ZKAssign.transitionNodeClosed(zkw, region, serverName, version);
/*
* ZK = OPENING
*/
// RS was opening a region of enabled table but never finishes
region = enabledRegions.remove(0);
regionsThatShouldBeOnline.add(region);
ZKAssign.createNodeOffline(zkw, region, serverName);
ZKAssign.transitionNodeOpening(zkw, region, serverName);
/*
* ZK = OPENED
*/
// Region of enabled table was opened on RS
region = enabledRegions.remove(0);
regionsThatShouldBeOnline.add(region);
ZKAssign.createNodeOffline(zkw, region, serverName);
hrs.openRegion(region);
while (true) {
RegionTransitionData rtd = ZKAssign.getData(zkw, region.getEncodedName());
if (rtd != null && rtd.getEventType() == EventType.RS_ZK_REGION_OPENED) {
break;
}
Thread.sleep(100);
}
// Region of disable table was opened on RS
region = disabledRegions.remove(0);
regionsThatShouldBeOffline.add(region);
ZKAssign.createNodeOffline(zkw, region, serverName);
hrs.openRegion(region);
while (true) {
RegionTransitionData rtd = ZKAssign.getData(zkw, region.getEncodedName());
if (rtd != null && rtd.getEventType() == EventType.RS_ZK_REGION_OPENED) {
break;
}
Thread.sleep(100);
}
/*
* ZK = NONE
*/
/*
* DONE MOCKING
*/
log("Done mocking data up in ZK");
// Start up a new master
log("Starting up a new master");
master = cluster.startMaster().getMaster();
log("Waiting for master to be ready");
cluster.waitForActiveAndReadyMaster();
log("Master is ready");
// Failover should be completed, now wait for no RIT
log("Waiting for no more RIT");
ZKAssign.blockUntilNoRIT(zkw);
log("No more RIT in ZK, now doing final test verification");
// Grab all the regions that are online across RSs
Set<HRegionInfo> onlineRegions = new TreeSet<HRegionInfo>();
for (JVMClusterUtil.RegionServerThread rst :
cluster.getRegionServerThreads()) {
onlineRegions.addAll(rst.getRegionServer().getOnlineRegions());
}
// Now, everything that should be online should be online
for (HRegionInfo hri : regionsThatShouldBeOnline) {
assertTrue(onlineRegions.contains(hri));
}
// Everything that should be offline should not be online
for (HRegionInfo hri : regionsThatShouldBeOffline) {
assertFalse(onlineRegions.contains(hri));
}
log("Done with verification, all passed, shutting down cluster");
// Done, shutdown the cluster
TEST_UTIL.shutdownMiniCluster();
}
/**
* Complex test of master failover that tests as many permutations of the
* different possible states that regions in transition could be in within ZK
* pointing to an RS that has died while no master is around to process it.
* <p>
* This tests the proper handling of these states by the failed-over master
* and includes a thorough testing of the timeout code as well.
* <p>
* Starts with a single master and two regionservers.
* <p>
* Creates two tables, enabledTable and disabledTable, each containing 5
* regions. The disabledTable is then disabled.
* <p>
* After reaching steady-state, the master is killed. We then mock several
* states in ZK. And one of the RS will be killed.
* <p>
* After mocking them and killing an RS, we will startup a new master which
* should become the active master and also detect that it is a failover. The
* primary test passing condition will be that all regions of the enabled
* table are assigned and all the regions of the disabled table are not
* assigned.
* <p>
* The different scenarios to be tested are below:
* <p>
* <b>ZK State: CLOSING</b>
* <p>A node can get into CLOSING state if</p>
* <ul>
* <li>An RS has begun to close a region
* </ul>
* <p>We will mock the scenarios</p>
* <ul>
* <li>Region was being closed but the RS died before finishing the close
* </ul>
* <b>ZK State: OPENED</b>
* <p>A node can get into OPENED state if</p>
* <ul>
* <li>An RS has finished opening a region but not acknowledged by master yet
* </ul>
* <p>We will mock the scenarios</p>
* <ul>
* <li>Region of a table that should be enabled was opened by a now-dead RS
* <li>Region of a table that should be disabled was opened by a now-dead RS
* </ul>
* <p>
* <b>ZK State: NONE</b>
* <p>A region could not have a transition node if</p>
* <ul>
* <li>The server hosting the region died and no master processed it
* </ul>
* <p>We will mock the scenarios</p>
* <ul>
* <li>Region of enabled table was on a dead RS that was not yet processed
* <li>Region of disabled table was on a dead RS that was not yet processed
* </ul>
* @throws Exception
*/
@Test (timeout=180000)
public void testMasterFailoverWithMockedRITOnDeadRS() throws Exception {
final int NUM_MASTERS = 1;
final int NUM_RS = 2;
// Create config to use for this cluster
Configuration conf = HBaseConfiguration.create();
// Need to drop the timeout much lower
conf.setInt("hbase.master.assignment.timeoutmonitor.period", 2000);
conf.setInt("hbase.master.assignment.timeoutmonitor.timeout", 4000);
// Create and start the cluster
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
log("Cluster started");
// Create a ZKW to use in the test
ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
"unittest", new Abortable() {
@Override
public void abort(String why, Throwable e) {
LOG.error("Fatal ZK Error: " + why, e);
org.junit.Assert.assertFalse("Fatal ZK error", true);
}
});
// get all the master threads
List<MasterThread> masterThreads = cluster.getMasterThreads();
assertEquals(1, masterThreads.size());
// only one master thread, let's wait for it to be initialized
assertTrue(cluster.waitForActiveAndReadyMaster());
HMaster master = masterThreads.get(0).getMaster();
assertTrue(master.isActiveMaster());
assertTrue(master.isInitialized());
// disable load balancing on this master
master.balanceSwitch(false);
// create two tables in META, each with 10 regions
byte [] FAMILY = Bytes.toBytes("family");
byte [][] SPLIT_KEYS = new byte [][] {
new byte[0], Bytes.toBytes("aaa"), Bytes.toBytes("bbb"),
Bytes.toBytes("ccc"), Bytes.toBytes("ddd"), Bytes.toBytes("eee"),
Bytes.toBytes("fff"), Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
Bytes.toBytes("iii"), Bytes.toBytes("jjj")
};
byte [] enabledTable = Bytes.toBytes("enabledTable");
HTableDescriptor htdEnabled = new HTableDescriptor(enabledTable);
htdEnabled.addFamily(new HColumnDescriptor(FAMILY));
List<HRegionInfo> enabledRegions = TEST_UTIL.createMultiRegionsInMeta(
TEST_UTIL.getConfiguration(), htdEnabled, SPLIT_KEYS);
byte [] disabledTable = Bytes.toBytes("disabledTable");
HTableDescriptor htdDisabled = new HTableDescriptor(disabledTable);
htdDisabled.addFamily(new HColumnDescriptor(FAMILY));
List<HRegionInfo> disabledRegions = TEST_UTIL.createMultiRegionsInMeta(
TEST_UTIL.getConfiguration(), htdDisabled, SPLIT_KEYS);
log("Regions in META have been created");
// at this point we only expect 2 regions to be assigned out (catalogs)
assertEquals(2, cluster.countServedRegions());
// The first RS will stay online
HRegionServer hrs = cluster.getRegionServer(0);
HServerInfo hsiAlive = hrs.getServerInfo();
// The second RS is going to be hard-killed
HRegionServer hrsDead = cluster.getRegionServer(1);
String deadServerName = hrsDead.getServerName();
HServerInfo hsiDead = hrsDead.getServerInfo();
// we'll need some regions to already be assigned out properly on live RS
List<HRegionInfo> enabledAndAssignedRegions = new ArrayList<HRegionInfo>();
enabledAndAssignedRegions.add(enabledRegions.remove(0));
enabledAndAssignedRegions.add(enabledRegions.remove(0));
List<HRegionInfo> disabledAndAssignedRegions = new ArrayList<HRegionInfo>();
disabledAndAssignedRegions.add(disabledRegions.remove(0));
disabledAndAssignedRegions.add(disabledRegions.remove(0));
// now actually assign them
for (HRegionInfo hri : enabledAndAssignedRegions) {
master.assignmentManager.regionPlans.put(hri.getEncodedName(),
new RegionPlan(hri, null, hsiAlive));
master.assignRegion(hri);
}
for (HRegionInfo hri : disabledAndAssignedRegions) {
master.assignmentManager.regionPlans.put(hri.getEncodedName(),
new RegionPlan(hri, null, hsiAlive));
master.assignRegion(hri);
}
// we also need regions assigned out on the dead server
List<HRegionInfo> enabledAndOnDeadRegions = new ArrayList<HRegionInfo>();
enabledAndOnDeadRegions.add(enabledRegions.remove(0));
enabledAndOnDeadRegions.add(enabledRegions.remove(0));
List<HRegionInfo> disabledAndOnDeadRegions = new ArrayList<HRegionInfo>();
disabledAndOnDeadRegions.add(disabledRegions.remove(0));
disabledAndOnDeadRegions.add(disabledRegions.remove(0));
// set region plan to server to be killed and trigger assign
for (HRegionInfo hri : enabledAndOnDeadRegions) {
master.assignmentManager.regionPlans.put(hri.getEncodedName(),
new RegionPlan(hri, null, hsiDead));
master.assignRegion(hri);
}
for (HRegionInfo hri : disabledAndOnDeadRegions) {
master.assignmentManager.regionPlans.put(hri.getEncodedName(),
new RegionPlan(hri, null, hsiDead));
master.assignRegion(hri);
}
// wait for no more RIT
log("Waiting for assignment to finish");
ZKAssign.blockUntilNoRIT(zkw);
log("Assignment completed");
// Stop the master
log("Aborting master");
cluster.abortMaster(0);
cluster.waitOnMaster(0);
log("Master has aborted");
/*
* Now, let's start mocking up some weird states as described in the method
* javadoc.
*/
List<HRegionInfo> regionsThatShouldBeOnline = new ArrayList<HRegionInfo>();
List<HRegionInfo> regionsThatShouldBeOffline = new ArrayList<HRegionInfo>();
log("Beginning to mock scenarios");
// Disable the disabledTable in ZK
ZKTableDisable.disableTable(zkw, Bytes.toString(disabledTable));
/*
* ZK = CLOSING
*/
// Region of enabled table being closed on dead RS but not finished
HRegionInfo region = enabledAndOnDeadRegions.remove(0);
regionsThatShouldBeOnline.add(region);
ZKAssign.createNodeClosing(zkw, region, deadServerName);
LOG.debug("\n\nRegion of enabled table was CLOSING on dead RS\n" +
region + "\n\n");
// Region of disabled table being closed on dead RS but not finished
region = disabledAndOnDeadRegions.remove(0);
regionsThatShouldBeOffline.add(region);
ZKAssign.createNodeClosing(zkw, region, deadServerName);
LOG.debug("\n\nRegion of disabled table was CLOSING on dead RS\n" +
region + "\n\n");
/*
* ZK = CLOSED
*/
// Region of enabled on dead server gets closed but not ack'd by master
region = enabledAndOnDeadRegions.remove(0);
regionsThatShouldBeOnline.add(region);
int version = ZKAssign.createNodeClosing(zkw, region, deadServerName);
ZKAssign.transitionNodeClosed(zkw, region, deadServerName, version);
LOG.debug("\n\nRegion of enabled table was CLOSED on dead RS\n" +
region + "\n\n");
// Region of disabled on dead server gets closed but not ack'd by master
region = disabledAndOnDeadRegions.remove(0);
regionsThatShouldBeOffline.add(region);
version = ZKAssign.createNodeClosing(zkw, region, deadServerName);
ZKAssign.transitionNodeClosed(zkw, region, deadServerName, version);
LOG.debug("\n\nRegion of disabled table was CLOSED on dead RS\n" +
region + "\n\n");
/*
* ZK = OPENING
*/
// RS was opening a region of enabled table then died
region = enabledRegions.remove(0);
regionsThatShouldBeOnline.add(region);
ZKAssign.createNodeOffline(zkw, region, deadServerName);
ZKAssign.transitionNodeOpening(zkw, region, deadServerName);
LOG.debug("\n\nRegion of enabled table was OPENING on dead RS\n" +
region + "\n\n");
// RS was opening a region of disabled table then died
region = disabledRegions.remove(0);
regionsThatShouldBeOffline.add(region);
ZKAssign.createNodeOffline(zkw, region, deadServerName);
ZKAssign.transitionNodeOpening(zkw, region, deadServerName);
LOG.debug("\n\nRegion of disabled table was OPENING on dead RS\n" +
region + "\n\n");
/*
* ZK = OPENED
*/
// Region of enabled table was opened on dead RS
region = enabledRegions.remove(0);
regionsThatShouldBeOnline.add(region);
ZKAssign.createNodeOffline(zkw, region, deadServerName);
hrsDead.openRegion(region);
while (true) {
RegionTransitionData rtd = ZKAssign.getData(zkw, region.getEncodedName());
if (rtd != null && rtd.getEventType() == EventType.RS_ZK_REGION_OPENED) {
break;
}
Thread.sleep(100);
}
LOG.debug("\n\nRegion of enabled table was OPENED on dead RS\n" +
region + "\n\n");
// Region of disabled table was opened on dead RS
region = disabledRegions.remove(0);
regionsThatShouldBeOffline.add(region);
ZKAssign.createNodeOffline(zkw, region, deadServerName);
hrsDead.openRegion(region);
while (true) {
RegionTransitionData rtd = ZKAssign.getData(zkw, region.getEncodedName());
if (rtd != null && rtd.getEventType() == EventType.RS_ZK_REGION_OPENED) {
break;
}
Thread.sleep(100);
}
LOG.debug("\n\nRegion of disabled table was OPENED on dead RS\n" +
region + "\n\n");
/*
* ZK = NONE
*/
// Region of enabled table was open at steady-state on dead RS
region = enabledRegions.remove(0);
regionsThatShouldBeOnline.add(region);
ZKAssign.createNodeOffline(zkw, region, deadServerName);
hrsDead.openRegion(region);
while (true) {
RegionTransitionData rtd = ZKAssign.getData(zkw, region.getEncodedName());
if (rtd != null && rtd.getEventType() == EventType.RS_ZK_REGION_OPENED) {
ZKAssign.deleteOpenedNode(zkw, region.getEncodedName());
break;
}
Thread.sleep(100);
}
LOG.debug("\n\nRegion of enabled table was open at steady-state on dead RS"
+ "\n" + region + "\n\n");
// Region of disabled table was open at steady-state on dead RS
region = disabledRegions.remove(0);
regionsThatShouldBeOffline.add(region);
ZKAssign.createNodeOffline(zkw, region, deadServerName);
hrsDead.openRegion(region);
while (true) {
RegionTransitionData rtd = ZKAssign.getData(zkw, region.getEncodedName());
if (rtd != null && rtd.getEventType() == EventType.RS_ZK_REGION_OPENED) {
ZKAssign.deleteOpenedNode(zkw, region.getEncodedName());
break;
}
Thread.sleep(100);
}
LOG.debug("\n\nRegion of disabled table was open at steady-state on dead RS"
+ "\n" + region + "\n\n");
/*
* DONE MOCKING
*/
log("Done mocking data up in ZK");
// Kill the RS that had a hard death
log("Killing RS " + deadServerName);
hrsDead.abort("Killing for unit test");
log("RS " + deadServerName + " killed");
// Start up a new master
log("Starting up a new master");
master = cluster.startMaster().getMaster();
log("Waiting for master to be ready");
cluster.waitForActiveAndReadyMaster();
log("Master is ready");
// Failover should be completed, now wait for no RIT
log("Waiting for no more RIT");
ZKAssign.blockUntilNoRIT(zkw);
log("No more RIT in ZK, now doing final test verification");
// Grab all the regions that are online across RSs
Set<HRegionInfo> onlineRegions = new TreeSet<HRegionInfo>();
for (JVMClusterUtil.RegionServerThread rst :
cluster.getRegionServerThreads()) {
onlineRegions.addAll(rst.getRegionServer().getOnlineRegions());
}
// Now, everything that should be online should be online
for (HRegionInfo hri : regionsThatShouldBeOnline) {
assertTrue(onlineRegions.contains(hri));
}
// Everything that should be offline should not be online
for (HRegionInfo hri : regionsThatShouldBeOffline) {
assertFalse(onlineRegions.contains(hri));
}
log("Done with verification, all passed, shutting down cluster");
// Done, shutdown the cluster
TEST_UTIL.shutdownMiniCluster();
}
// TODO: Next test to add is with testing permutations of the RIT or the RS
// killed are hosting ROOT and META regions.
private void log(String string) {
LOG.info("\n\n" + string + " \n\n");
}
}