HBASE-3181 Review, document, and fix up Regions-in-Transition timeout logic

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1029938 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Gray 2010-11-02 02:04:09 +00:00
parent 18a78fef4d
commit 8387a5b18f
12 changed files with 350 additions and 229 deletions

View File

@ -1085,6 +1085,9 @@ Release 0.21.0 - Unreleased
HBASE-3184 Xmx setting in pom to use for tests/surefire does not appear
to work
HBASE-3120 [rest] Content transcoding
HBASE-3181 Review, document, and fix up Regions-in-Transition timeout
logic
NEW FEATURES
HBASE-1961 HBase EC2 scripts

View File

@ -498,9 +498,9 @@ public class MetaReader {
Result result;
while((result = metaServer.next(scannerid)) != null) {
if (result != null && result.size() > 0) {
Pair<HRegionInfo, HServerAddress> pair = metaRowToRegionPair(result);
if (pair.getSecond() == null ||
!pair.getSecond().equals(hsi.getServerAddress())) {
Pair<HRegionInfo, HServerInfo> pair =
metaRowToRegionPairWithInfo(result);
if (pair.getSecond() == null || !pair.getSecond().equals(hsi)) {
continue;
}
hris.put(pair.getFirst(), result);

View File

@ -34,6 +34,7 @@ import java.util.NavigableMap;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.Executors;
@ -97,13 +98,19 @@ public class AssignmentManager extends ZooKeeperListener {
private TimeoutMonitor timeoutMonitor;
/** Regions currently in transition. */
/**
* Regions currently in transition. Map of encoded region names to the master
* in-memory state for that region.
*/
final ConcurrentSkipListMap<String, RegionState> regionsInTransition =
new ConcurrentSkipListMap<String, RegionState>();
/** Plans for region movement. Key is the encoded version of a region name*/
// TODO: When do plans get cleaned out? Ever? In server open and in server
// shutdown processing -- St.Ack
// TODO: Better to just synchronize access around regionPlans? I think that
// would be better than a concurrent structure since we do more than
// one operation at a time -- jgray
final ConcurrentNavigableMap<String, RegionPlan> regionPlans =
new ConcurrentSkipListMap<String, RegionPlan>();
@ -152,9 +159,9 @@ public class AssignmentManager extends ZooKeeperListener {
this.executorService = service;
Configuration conf = master.getConfiguration();
this.timeoutMonitor = new TimeoutMonitor(
conf.getInt("hbase.master.assignment.timeoutmonitor.period", 30000),
conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000),
master,
conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 15000));
conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 30000));
Threads.setDaemonThreadRunning(timeoutMonitor,
master.getServerName() + ".timeoutMonitor");
}
@ -272,14 +279,14 @@ public class AssignmentManager extends ZooKeeperListener {
// Region is closed, insert into RIT and handle it
regionsInTransition.put(encodedRegionName, new RegionState(
regionInfo, RegionState.State.CLOSED, data.getStamp()));
new ClosedRegionHandler(master, this, data, regionInfo).process();
new ClosedRegionHandler(master, this, regionInfo).process();
break;
case M_ZK_REGION_OFFLINE:
// Region is offline, insert into RIT and handle it like a closed
regionsInTransition.put(encodedRegionName, new RegionState(
regionInfo, RegionState.State.OFFLINE, data.getStamp()));
new ClosedRegionHandler(master, this, data, regionInfo).process();
new ClosedRegionHandler(master, this, regionInfo).process();
break;
case RS_ZK_REGION_OPENING:
@ -303,7 +310,7 @@ public class AssignmentManager extends ZooKeeperListener {
"; letting RIT timeout so will be assigned elsewhere");
break;
}
new OpenedRegionHandler(master, this, data, regionInfo, hsi).process();
new OpenedRegionHandler(master, this, regionInfo, hsi).process();
break;
}
}
@ -367,7 +374,7 @@ public class AssignmentManager extends ZooKeeperListener {
// what follows will fail because not in expected state.
regionState.update(RegionState.State.CLOSED, data.getStamp());
this.executorService.submit(new ClosedRegionHandler(master,
this, data, regionState.getRegion()));
this, regionState.getRegion()));
break;
case RS_ZK_REGION_OPENING:
@ -400,7 +407,7 @@ public class AssignmentManager extends ZooKeeperListener {
// Handle OPENED by removing from transition and deleted zk node
regionState.update(RegionState.State.OPEN, data.getStamp());
this.executorService.submit(
new OpenedRegionHandler(master, this, data, regionState.getRegion(),
new OpenedRegionHandler(master, this, regionState.getRegion(),
this.serverManager.getServerInfo(data.getServerName())));
break;
}
@ -600,7 +607,8 @@ public class AssignmentManager extends ZooKeeperListener {
public void offlineDisabledRegion(HRegionInfo regionInfo) {
// Disabling so should not be reassigned, just delete the CLOSED node
LOG.debug("Table being disabled so deleting ZK node and removing from " +
"regions in transition, skipping assignment");
"regions in transition, skipping assignment of region " +
regionInfo.getRegionNameAsString());
try {
if (!ZKAssign.deleteClosedNode(watcher, regionInfo.getEncodedName())) {
// Could also be in OFFLINE mode
@ -632,8 +640,15 @@ public class AssignmentManager extends ZooKeeperListener {
* in-memory checks pass, the zk node is forced to OFFLINE before assigning.
*
* @param regionName server to be assigned
* @param setOfflineInZK whether ZK node should be created/transitioned to an
* OFFLINE state before assigning the region
*/
public void assign(HRegionInfo region) {
public void assign(HRegionInfo region, boolean setOfflineInZK) {
assign(region, setOfflineInZK, false);
}
public void assign(HRegionInfo region, boolean setOfflineInZK,
boolean forceNewPlan) {
String tableName = region.getTableDesc().getNameAsString();
if (isTableDisabled(tableName)) {
LOG.info("Table " + tableName + " disabled; skipping assign of " +
@ -648,7 +663,7 @@ public class AssignmentManager extends ZooKeeperListener {
}
RegionState state = addToRegionsInTransition(region);
synchronized (state) {
assign(state);
assign(state, setOfflineInZK, forceNewPlan);
}
}
@ -800,13 +815,14 @@ public class AssignmentManager extends ZooKeeperListener {
* Caller must hold lock on the passed <code>state</code> object.
* @param state
*/
private void assign(final RegionState state) {
if (!setOfflineInZooKeeper(state)) return;
private void assign(final RegionState state, final boolean setOfflineInZK,
final boolean forceNewPlan) {
if (setOfflineInZK && !setOfflineInZooKeeper(state)) return;
if (this.master.isStopped()) {
LOG.debug("Server stopped; skipping assign of " + state);
return;
}
RegionPlan plan = getRegionPlan(state);
RegionPlan plan = getRegionPlan(state, forceNewPlan);
if (plan == null) return; // Should get reassigned later when RIT times out.
try {
LOG.debug("Assigning region " + state.getRegion().getRegionNameAsString() +
@ -823,12 +839,13 @@ public class AssignmentManager extends ZooKeeperListener {
// succeed anyways; we need a new plan!
// Transition back to OFFLINE
state.update(RegionState.State.OFFLINE);
// Remove the plan
this.regionPlans.remove(state.getRegion().getEncodedName());
// Put in place a new plan and reassign. Calling getRegionPlan will add
// a plan if none exists (We removed it in line above).
if (getRegionPlan(state, plan.getDestination()) == null) return;
assign(state);
// Force a new plan and reassign.
if (getRegionPlan(state, plan.getDestination(), true) == null) {
LOG.warn("Unable to find a viable location to assign region " +
state.getRegion().getRegionNameAsString());
return;
}
assign(state, false, false);
}
}
@ -890,43 +907,48 @@ public class AssignmentManager extends ZooKeeperListener {
* @return Plan for passed <code>state</code> (If none currently, it creates one or
* if no servers to assign, it returns null).
*/
RegionPlan getRegionPlan(final RegionState state) {
return getRegionPlan(state, null);
RegionPlan getRegionPlan(final RegionState state,
final boolean forceNewPlan) {
return getRegionPlan(state, null, forceNewPlan);
}
/**
* @param state
* @param serverToExclude Server to exclude (we know its bad). Pass null if
* all servers are thought to be assignable.
* @param forceNewPlan If true, then if an existing plan exists, a new plan
* will be generated.
* @return Plan for passed <code>state</code> (If none currently, it creates one or
* if no servers to assign, it returns null).
*/
RegionPlan getRegionPlan(final RegionState state,
final HServerInfo serverToExclude) {
final HServerInfo serverToExclude, final boolean forceNewPlan) {
// Pickup existing plan or make a new one
String encodedName = state.getRegion().getEncodedName();
List<HServerInfo> servers = this.serverManager.getOnlineServersList();
// The remove below hinges on the fact that the call to
// serverManager.getOnlineServersList() returns a copy
if (serverToExclude != null) servers.remove(serverToExclude);
if (servers.size() < 0) return null;
RegionPlan newPlan = new RegionPlan(state.getRegion(), null,
if (servers.size() <= 0) return null;
RegionPlan randomPlan = new RegionPlan(state.getRegion(), null,
LoadBalancer.randomAssignment(servers));
RegionPlan existingPlan = this.regionPlans.putIfAbsent(encodedName, newPlan);
RegionPlan plan = null;
if (existingPlan == null) {
LOG.debug("No previous transition plan for " +
state.getRegion().getRegionNameAsString() +
" so generated a random one; " + newPlan + "; " +
serverManager.countOfRegionServers() +
" (online=" + serverManager.getOnlineServers().size() +
", exclude=" + serverToExclude + ") available servers");
plan = newPlan;
} else {
LOG.debug("Using preexisting plan=" + existingPlan);
plan = existingPlan;
synchronized (this.regionPlans) {
RegionPlan existingPlan = this.regionPlans.get(encodedName);
if (existingPlan == null || forceNewPlan ||
existingPlan.getDestination().equals(serverToExclude)) {
LOG.debug("No previous transition plan was found (or we are ignoring " +
"an existing plan) for " + state.getRegion().getRegionNameAsString()
+ " so generated a random one; " + randomPlan + "; " +
serverManager.countOfRegionServers() +
" (online=" + serverManager.getOnlineServers().size() +
", exclude=" + serverToExclude + ") available servers");
this.regionPlans.put(encodedName, randomPlan);
return randomPlan;
}
LOG.debug("Using pre-exisitng plan for region " +
state.getRegion().getRegionNameAsString() + "; plan=" + existingPlan);
return existingPlan;
}
return plan;
}
/**
@ -974,10 +996,10 @@ public class AssignmentManager extends ZooKeeperListener {
if (state == null) {
state = new RegionState(region, RegionState.State.PENDING_CLOSE);
regionsInTransition.put(encodedName, state);
} else if (force && (state.isClosing() || state.isPendingClose())) {
} else if (force && state.isPendingClose()) {
LOG.debug("Attempting to unassign region " +
region.getRegionNameAsString() + " which is already closing but " +
"forcing an additional close");
region.getRegionNameAsString() + " which is already pending close "
+ "but forcing an additional close");
state.update(RegionState.State.PENDING_CLOSE);
} else {
LOG.debug("Attempting to unassign region " +
@ -987,20 +1009,26 @@ public class AssignmentManager extends ZooKeeperListener {
}
}
// Send CLOSE RPC
HServerInfo server = null;
synchronized (this.regions) {
server = regions.get(region);
}
try {
// TODO: We should consider making this look more like it does for the
// region open where we catch all throwables and never abort
if(serverManager.sendRegionClose(regions.get(region),
state.getRegion())) {
LOG.debug("Sent CLOSE to " + regions.get(region) + " for region " +
if(serverManager.sendRegionClose(server, state.getRegion())) {
LOG.debug("Sent CLOSE to " + server + " for region " +
region.getRegionNameAsString());
return;
}
LOG.debug("Server " + server + " region CLOSE RPC returned false");
} catch (NotServingRegionException nsre) {
// Failed to close, so pass through and reassign
LOG.debug("Server " + server + " returned NotServingRegionException");
} catch (RemoteException re) {
if (re.unwrapRemoteException() instanceof NotServingRegionException) {
// Failed to close, so pass through and reassign
LOG.debug("Server " + server + " returned NotServingRegionException");
} else {
this.master.abort("Remote unexpected exception",
re.unwrapRemoteException());
@ -1011,13 +1039,13 @@ public class AssignmentManager extends ZooKeeperListener {
this.master.abort("Remote unexpected exception", t);
}
// Did not CLOSE, so set region offline and assign it
LOG.debug("Attempted to send CLOSE to " + regions.get(region) +
LOG.debug("Attempted to send CLOSE to " + server +
" for region " + region.getRegionNameAsString() + " but failed, " +
"setting region as OFFLINE and reassigning");
synchronized (regionsInTransition) {
forceRegionStateToOffline(region);
assign(region);
}
assign(region, true);
}
/**
@ -1049,7 +1077,7 @@ public class AssignmentManager extends ZooKeeperListener {
*/
public void assignRoot() throws KeeperException {
RootLocationEditor.deleteRootLocation(this.master.getZooKeeper());
assign(HRegionInfo.ROOT_REGIONINFO);
assign(HRegionInfo.ROOT_REGIONINFO, true);
}
/**
@ -1062,7 +1090,7 @@ public class AssignmentManager extends ZooKeeperListener {
*/
public void assignMeta() {
// Force assignment to a random server
assign(HRegionInfo.FIRST_META_REGIONINFO);
assign(HRegionInfo.FIRST_META_REGIONINFO, true);
}
/**
@ -1460,67 +1488,73 @@ public class AssignmentManager extends ZooKeeperListener {
LOG.info("Regions in transition timed out: " + regionState);
// Expired! Do a retry.
switch (regionState.getState()) {
case OFFLINE:
case CLOSED:
LOG.info("Region has been OFFLINE or CLOSED for too long, " +
"reassigning " + regionInfo.getRegionNameAsString());
assign(regionState.getRegion());
LOG.info("Region has been CLOSED for too long, " +
"retriggering ClosedRegionHandler");
AssignmentManager.this.executorService.submit(
new ClosedRegionHandler(master, AssignmentManager.this,
regionState.getRegion()));
break;
case OFFLINE:
LOG.info("Region has been OFFLINE for too long, " +
"reassigning " + regionInfo.getRegionNameAsString() +
" to a random server");
assign(regionState.getRegion(), false);
break;
case PENDING_OPEN:
LOG.info("Region has been PENDING_OPEN for too " +
"long, reassigning region=" +
regionInfo.getRegionNameAsString());
// Should have a ZK node in OFFLINE state or no node at all
try {
if (ZKUtil.watchAndCheckExists(watcher,
ZKAssign.getNodeName(watcher,
regionInfo.getEncodedName())) &&
!ZKAssign.verifyRegionState(watcher, regionInfo,
EventType.M_ZK_REGION_OFFLINE)) {
LOG.info("Region exists and not in expected OFFLINE " +
"state so skipping timeout, region=" +
regionInfo.getRegionNameAsString());
break;
}
} catch (KeeperException ke) {
LOG.error("Unexpected ZK exception timing out " +
"PENDING_CLOSE region",
ke);
break;
}
AssignmentManager.this.setOffline(regionState.getRegion());
regionState.update(RegionState.State.OFFLINE);
assign(regionState.getRegion());
break;
assign(regionState.getRegion(), false, true);
break;
case OPENING:
LOG.info("Region has been OPENING for too " +
"long, reassigning region=" +
regionInfo.getRegionNameAsString());
// Should have a ZK node in OPENING state
try {
if (ZKUtil.watchAndCheckExists(watcher,
ZKAssign.getNodeName(watcher,
regionInfo.getEncodedName())) &&
ZKAssign.transitionNode(watcher, regionInfo,
HMaster.MASTER, EventType.RS_ZK_REGION_OPENING,
EventType.M_ZK_REGION_OFFLINE, -1) == -1) {
LOG.info("Region transitioned out of OPENING so " +
"skipping timeout, region=" +
regionInfo.getRegionNameAsString());
String node = ZKAssign.getNodeName(watcher,
regionInfo.getEncodedName());
Stat stat = new Stat();
RegionTransitionData data = ZKAssign.getDataNoWatch(watcher,
node, stat);
if (data.getEventType() == EventType.RS_ZK_REGION_OPENED) {
LOG.debug("Region has transitioned to OPENED, allowing " +
"watched event handlers to process");
break;
} else if (data.getEventType() !=
EventType.RS_ZK_REGION_OPENING) {
LOG.warn("While timing out a region in state OPENING, " +
"found ZK node in unexpected state: " +
data.getEventType());
break;
}
// Attempt to transition node into OFFLINE
try {
data = new RegionTransitionData(
EventType.M_ZK_REGION_OFFLINE,
regionInfo.getRegionName());
if (ZKUtil.setData(watcher, node, data.getBytes(),
stat.getVersion())) {
// Node is now OFFLINE, let's trigger another assignment
ZKUtil.getDataAndWatch(watcher, node); // re-set the watch
LOG.info("Successfully transitioned region=" +
regionInfo.getRegionNameAsString() + " into OFFLINE" +
" and forcing a new assignment");
assign(regionState, false, true);
}
} catch (KeeperException.NoNodeException nne) {
// Node did not exist, can't time this out
}
} catch (KeeperException ke) {
LOG.error("Unexpected ZK exception timing out CLOSING region",
ke);
break;
}
AssignmentManager.this.setOffline(regionState.getRegion());
regionState.update(RegionState.State.OFFLINE);
assign(regionState.getRegion());
break;
case OPEN:
LOG.warn("Long-running region in OPEN state? This should " +
"not happen; region=" + regionInfo.getRegionNameAsString());
LOG.error("Region has been OPEN for too long, " +
"we don't know where region was opened so can't do anything");
break;
case PENDING_CLOSE:
LOG.info("Region has been PENDING_CLOSE for too " +
@ -1544,20 +1578,8 @@ public class AssignmentManager extends ZooKeeperListener {
break;
case CLOSING:
LOG.info("Region has been CLOSING for too " +
"long, running forced unassign again on region=" +
regionInfo.getRegionNameAsString());
try {
if (ZKAssign.deleteClosingNode(watcher,
regionInfo.getEncodedName())) {
unassign(regionInfo, true);
}
} catch (NoNodeException e) {
LOG.debug("Node no longer existed so not forcing another " +
"unassignment");
} catch (KeeperException e) {
LOG.warn("Unexpected ZK exception timing out a region " +
"close", e);
}
"long, this should eventually complete or the server will " +
"expire, doing nothing");
break;
}
}
@ -1569,54 +1591,42 @@ public class AssignmentManager extends ZooKeeperListener {
/**
* Process shutdown server removing any assignments.
* @param hsi Server that went down.
* @return set of regions on this server that are not in transition
*/
public void processServerShutdown(final HServerInfo hsi) {
// Clean out any exisiting assignment plans for this server
for (Iterator <Map.Entry<String, RegionPlan>> i =
this.regionPlans.entrySet().iterator(); i.hasNext();) {
Map.Entry<String, RegionPlan> e = i.next();
if (e.getValue().getDestination().equals(hsi)) {
// Use iterator's remove else we'll get CME
i.remove();
}
}
// Remove assignment info related to the downed server. Remove the downed
// server from list of servers else it looks like a server w/ no load.
synchronized (this.regions) {
Set<HRegionInfo> hris = new HashSet<HRegionInfo>();
for (Map.Entry<HRegionInfo, HServerInfo> e: this.regions.entrySet()) {
// Add to a Set -- don't call setOffline in here else we get a CME.
if (e.getValue().equals(hsi)) hris.add(e.getKey());
}
for (HRegionInfo hri: hris) setOffline(hri);
this.servers.remove(hsi);
}
// If anything in transition related to the server, clean it up.
synchronized (regionsInTransition) {
// Iterate all regions in transition checking if were on this server
final String serverName = hsi.getServerName();
for (Map.Entry<String, RegionState> e: this.regionsInTransition.entrySet()) {
if (!e.getKey().equals(serverName)) continue;
RegionState regionState = e.getValue();
switch(regionState.getState()) {
case PENDING_OPEN:
case OPENING:
case OFFLINE:
case CLOSED:
case PENDING_CLOSE:
case CLOSING:
LOG.info("Region " + regionState.getRegion().getRegionNameAsString() +
" was in state=" + regionState.getState() + " on shutdown server=" +
serverName + ", reassigning");
assign(regionState.getRegion());
break;
case OPEN:
LOG.warn("Long-running region in OPEN state? Should not happen");
break;
public List<HRegionInfo> processServerShutdown(final HServerInfo hsi) {
// Clean out any existing assignment plans for this server
synchronized (this.regionPlans) {
for (Iterator <Map.Entry<String, RegionPlan>> i =
this.regionPlans.entrySet().iterator(); i.hasNext();) {
Map.Entry<String, RegionPlan> e = i.next();
if (e.getValue().getDestination().equals(hsi)) {
// Use iterator's remove else we'll get CME
i.remove();
}
}
}
// TODO: Do we want to sync on RIT here?
// Remove this server from map of servers to regions, and remove all regions
// of this server from online map of regions.
Set<HRegionInfo> deadRegions = null;
synchronized (this.regions) {
deadRegions = new TreeSet<HRegionInfo>(this.servers.remove(hsi));
for (HRegionInfo region : deadRegions) {
this.regions.remove(region);
}
}
// See if any of the regions that were online on this server were in RIT
// If they are, normal timeouts will deal with them appropriately so
// let's skip a manual re-assignment.
List<HRegionInfo> rits = new ArrayList<HRegionInfo>();
synchronized (regionsInTransition) {
for (RegionState region : this.regionsInTransition.values()) {
if (deadRegions.remove(region.getRegion())) {
rits.add(region.getRegion());
}
}
}
return rits;
}
/**

View File

@ -751,7 +751,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
region.getLog().closeAndDelete();
// 4. Trigger immediate assignment of this region
assignmentManager.assign(region.getRegionInfo());
assignmentManager.assign(region.getRegionInfo(), true);
}
// 5. If sync, wait for assignment of regions
@ -958,7 +958,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
}
public void assignRegion(HRegionInfo hri) {
assignmentManager.assign(hri);
assignmentManager.assign(hri, true);
}
/**

View File

@ -580,12 +580,16 @@ public class ServerManager {
*/
public boolean sendRegionClose(HServerInfo server, HRegionInfo region)
throws IOException {
if (server == null) return false;
if (server == null) {
LOG.debug("Unable to send region close because server is null; region=" +
region.getRegionNameAsString());
return false;
}
HRegionInterface hri = getServerConnection(server);
if(hri == null) {
LOG.warn("Attempting to send CLOSE RPC to server " +
server.getServerName() + " failed because no RPC connection found " +
"to this server");
server.getServerName() + " for region " + region.getRegionNameAsString()
+ " failed because no RPC connection found to this server");
return false;
}
return hri.closeRegion(region);

View File

@ -24,7 +24,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.master.AssignmentManager;
/**
@ -39,7 +38,6 @@ public class ClosedRegionHandler extends EventHandler implements TotesHRegionInf
private static final Log LOG = LogFactory.getLog(ClosedRegionHandler.class);
private final AssignmentManager assignmentManager;
private final RegionTransitionData data;
private final HRegionInfo regionInfo;
private final ClosedPriority priority;
@ -58,12 +56,10 @@ public class ClosedRegionHandler extends EventHandler implements TotesHRegionInf
}
};
public ClosedRegionHandler(Server server,
AssignmentManager assignmentManager, RegionTransitionData data,
public ClosedRegionHandler(Server server, AssignmentManager assignmentManager,
HRegionInfo regionInfo) {
super(server, EventType.RS_ZK_REGION_CLOSED);
this.assignmentManager = assignmentManager;
this.data = data;
this.regionInfo = regionInfo;
if(regionInfo.isRootRegion()) {
priority = ClosedPriority.ROOT;
@ -94,6 +90,6 @@ public class ClosedRegionHandler extends EventHandler implements TotesHRegionInf
}
// ZK Node is in CLOSED state, assign it.
assignmentManager.setOffline(regionInfo);
assignmentManager.assign(regionInfo);
assignmentManager.assign(regionInfo, true);
}
}

View File

@ -77,7 +77,7 @@ public class EnableTableHandler extends EventHandler {
assignmentManager.undisableTable(this.tableNameStr);
// Verify all regions of table are disabled
for (HRegionInfo region : regions) {
assignmentManager.assign(region);
assignmentManager.assign(region, true);
}
// Wait on table's regions to clear region in transition.
for (HRegionInfo region: regions) {

View File

@ -25,7 +25,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerInfo;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.zookeeper.KeeperException;
@ -55,8 +54,8 @@ public class OpenedRegionHandler extends EventHandler implements TotesHRegionInf
};
public OpenedRegionHandler(Server server,
AssignmentManager assignmentManager, RegionTransitionData data,
HRegionInfo regionInfo, HServerInfo serverInfo) {
AssignmentManager assignmentManager, HRegionInfo regionInfo,
HServerInfo serverInfo) {
super(server, EventType.RS_ZK_REGION_OPENED);
this.assignmentManager = assignmentManager;
this.regionInfo = regionInfo;

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.master.handler;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@ -97,7 +98,8 @@ public class ServerShutdownHandler extends EventHandler {
// doing after log splitting. Could do some states before -- OPENING?
// OFFLINE? -- and then others after like CLOSING that depend on log
// splitting.
this.services.getAssignmentManager().processServerShutdown(this.hsi);
List<HRegionInfo> regionsInTransition =
this.services.getAssignmentManager().processServerShutdown(this.hsi);
// Assign root and meta if we were carrying them.
if (isCarryingRoot()) { // -ROOT-
@ -113,41 +115,66 @@ public class ServerShutdownHandler extends EventHandler {
if (isCarryingMeta()) this.services.getAssignmentManager().assignMeta();
// Wait on meta to come online; we need it to progress.
try {
this.server.getCatalogTracker().waitForMeta();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted", e);
// TODO: Best way to hold strictly here? We should build this retry logic
// into the MetaReader operations themselves.
NavigableMap<HRegionInfo, Result> hris = null;
while (!this.server.isStopped()) {
try {
this.server.getCatalogTracker().waitForMeta();
hris = MetaReader.getServerUserRegions(this.server.getCatalogTracker(),
this.hsi);
break;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted", e);
} catch (IOException ioe) {
LOG.info("Received exception accessing META during server shutdown of " +
serverName + ", retrying META read");
}
}
NavigableMap<HRegionInfo, Result> hris =
MetaReader.getServerUserRegions(this.server.getCatalogTracker(), this.hsi);
LOG.info("Reassigning the " + hris.size() + " region(s) that " + serverName +
" was carrying");
// Remove regions that were in transition
for (HRegionInfo rit : regionsInTransition) hris.remove(rit);
LOG.info("Reassigning the " + hris.size() + " region(s) that " + serverName
+ " was carrying (skipping " + regionsInTransition.size() +
" regions(s) that are in transition)");
// We should encounter -ROOT- and .META. first in the Set given how its
// a sorted set.
// Iterate regions that were on this server and assign them
for (Map.Entry<HRegionInfo, Result> e: hris.entrySet()) {
processDeadRegion(e.getKey(), e.getValue(),
if (processDeadRegion(e.getKey(), e.getValue(),
this.services.getAssignmentManager(),
this.server.getCatalogTracker());
this.services.getAssignmentManager().assign(e.getKey());
this.server.getCatalogTracker())) {
this.services.getAssignmentManager().assign(e.getKey(), true);
}
}
this.deadServers.remove(serverName);
LOG.info("Finished processing of shutdown of " + serverName);
}
public static void processDeadRegion(HRegionInfo hri, Result result,
/**
* Process a dead region from a dead RS. Checks if the region is disabled
* or if the region has a partially completed split.
* <p>
* Returns true if specified region should be assigned, false if not.
* @param hri
* @param result
* @param assignmentManager
* @param catalogTracker
* @return
* @throws IOException
*/
public static boolean processDeadRegion(HRegionInfo hri, Result result,
AssignmentManager assignmentManager, CatalogTracker catalogTracker)
throws IOException {
// If table is not disabled but the region is offlined,
boolean disabled = assignmentManager.isTableDisabled(
hri.getTableDesc().getNameAsString());
if (disabled) return;
if (disabled) return false;
if (hri.isOffline() && hri.isSplit()) {
fixupDaughters(result, assignmentManager, catalogTracker);
return;
return false;
}
return true;
}
/**
@ -183,7 +210,7 @@ public class ServerShutdownHandler extends EventHandler {
if (pair == null || pair.getFirst() == null) {
LOG.info("Fixup; missing daughter " + hri.getEncodedName());
MetaEditor.addDaughter(catalogTracker, hri, null);
assignmentManager.assign(hri);
assignmentManager.assign(hri, true);
}
}
}

View File

@ -741,6 +741,33 @@ public class ZKAssign {
return RegionTransitionData.fromBytes(data);
}
/**
* Gets the current data in the unassigned node for the specified region name
* or fully-qualified path.
*
* <p>Returns null if the region does not currently have a node.
*
* <p>Does not set a watch.
*
* @param watcher zk reference
* @param pathOrRegionName fully-specified path or region name
* @param stat object to store node info into on getData call
* @return data for the unassigned node
* @throws KeeperException
* @throws KeeperException if unexpected zookeeper exception
*/
public static RegionTransitionData getDataNoWatch(ZooKeeperWatcher zkw,
String pathOrRegionName, Stat stat)
throws KeeperException {
String node = pathOrRegionName.startsWith("/") ?
pathOrRegionName : getNodeName(zkw, pathOrRegionName);
byte [] data = ZKUtil.getDataNoWatch(zkw, node, stat);
if(data == null) {
return null;
}
return RegionTransitionData.fromBytes(data);
}
/**
* Delete the assignment node regardless of its current state.
* <p>

View File

@ -339,17 +339,22 @@ public class TestMasterFailover {
* ZK = CLOSING
*/
// Region of enabled table being closed but not complete
// Region is already assigned, don't say anything to RS but set ZK closing
region = enabledAndAssignedRegions.remove(0);
regionsThatShouldBeOnline.add(region);
ZKAssign.createNodeClosing(zkw, region, serverName);
// Region of disabled table being closed but not complete
// Region is already assigned, don't say anything to RS but set ZK closing
region = disabledAndAssignedRegions.remove(0);
regionsThatShouldBeOffline.add(region);
ZKAssign.createNodeClosing(zkw, region, serverName);
// Disabled test of CLOSING. This case is invalid after HBASE-3181.
// How can an RS stop a CLOSING w/o deleting the node? If it did ever fail
// and left the node in CLOSING, the RS would have aborted and we'd process
// these regions in server shutdown
//
// // Region of enabled table being closed but not complete
// // Region is already assigned, don't say anything to RS but set ZK closing
// region = enabledAndAssignedRegions.remove(0);
// regionsThatShouldBeOnline.add(region);
// ZKAssign.createNodeClosing(zkw, region, serverName);
//
// // Region of disabled table being closed but not complete
// // Region is already assigned, don't say anything to RS but set ZK closing
// region = disabledAndAssignedRegions.remove(0);
// regionsThatShouldBeOffline.add(region);
// ZKAssign.createNodeClosing(zkw, region, serverName);
/*
* ZK = CLOSED
@ -797,26 +802,32 @@ public class TestMasterFailover {
// Let's add some weird states to master in-memory state
// After HBASE-3181, we need to have some ZK state if we're PENDING_OPEN
// b/c it is impossible for us to get into this state w/o a zk node
// this is not true of PENDING_CLOSE
// PENDING_OPEN and enabled
region = enabledRegions.remove(0);
regionsThatShouldBeOnline.add(region);
master.assignmentManager.regionsInTransition.put(region.getEncodedName(),
new RegionState(region, RegionState.State.PENDING_OPEN));
new RegionState(region, RegionState.State.PENDING_OPEN, 0));
ZKAssign.createNodeOffline(zkw, region, master.getServerName());
// PENDING_OPEN and disabled
region = disabledRegions.remove(0);
regionsThatShouldBeOffline.add(region);
master.assignmentManager.regionsInTransition.put(region.getEncodedName(),
new RegionState(region, RegionState.State.PENDING_OPEN));
new RegionState(region, RegionState.State.PENDING_OPEN, 0));
ZKAssign.createNodeOffline(zkw, region, master.getServerName());
// PENDING_CLOSE and enabled
region = enabledRegions.remove(0);
regionsThatShouldBeOnline.add(region);
master.assignmentManager.regionsInTransition.put(region.getEncodedName(),
new RegionState(region, RegionState.State.PENDING_CLOSE));
new RegionState(region, RegionState.State.PENDING_CLOSE, 0));
// PENDING_CLOSE and disabled
region = disabledRegions.remove(0);
regionsThatShouldBeOffline.add(region);
master.assignmentManager.regionsInTransition.put(region.getEncodedName(),
new RegionState(region, RegionState.State.PENDING_CLOSE));
new RegionState(region, RegionState.State.PENDING_CLOSE, 0));
// Failover should be completed, now wait for no RIT
log("Waiting for no more RIT");

View File

@ -28,6 +28,7 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MiniHBaseCluster;
@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.junit.Test;
/**
@ -51,38 +53,50 @@ public class TestRollingRestart {
// Start a cluster with 2 masters and 4 regionservers
final int NUM_MASTERS = 2;
final int NUM_RS = 3;
final int NUM_REGIONS_TO_CREATE = 27;
final int NUM_REGIONS_TO_CREATE = 20;
int expectedNumRS = 3;
// Start the cluster
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
log("Starting cluster");
Configuration conf = HBaseConfiguration.create();
conf.setInt("hbase.master.assignment.timeoutmonitor.period", 2000);
conf.setInt("hbase.master.assignment.timeoutmonitor.timeout", 5000);
HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(conf);
TEST_UTIL.startMiniCluster(NUM_MASTERS, NUM_RS);
MiniHBaseCluster cluster = TEST_UTIL.getHBaseCluster();
log("Waiting for active/ready master");
cluster.waitForActiveAndReadyMaster();
Configuration conf = TEST_UTIL.getConfiguration();
ZooKeeperWatcher zkw = new ZooKeeperWatcher(conf, "testRollingRestart",
null);
HMaster master = cluster.getMaster();
// Create a table with regions
byte [] table = Bytes.toBytes("tableRestart");
byte [] family = Bytes.toBytes("family");
log("Creating table with " + NUM_REGIONS_TO_CREATE + " regions");
HTable ht = TEST_UTIL.createTable(table, family);
int numRegions = TEST_UTIL.createMultiRegions(conf, ht, family,
NUM_REGIONS_TO_CREATE);
numRegions += 2; // catalogs
LOG.debug("\n\nWaiting for no more RIT\n");
ZKAssign.blockUntilNoRIT(zkw);
LOG.debug("\n\nDisabling table\n");
log("Waiting for no more RIT\n");
blockUntilNoRIT(zkw, master);
log("Disabling table\n");
TEST_UTIL.getHBaseAdmin().disableTable(table);
LOG.debug("\n\nWaiting for no more RIT\n");
ZKAssign.blockUntilNoRIT(zkw);
LOG.debug("\n\nEnabling table\n");
TEST_UTIL.getHBaseAdmin().enableTable(table);
LOG.debug("\n\nWaiting for no more RIT\n");
ZKAssign.blockUntilNoRIT(zkw);
LOG.debug("\n\nVerifying there are " + numRegions + " assigned on cluster\n");
log("Waiting for no more RIT\n");
blockUntilNoRIT(zkw, master);
NavigableSet<String> regions = getAllOnlineRegions(cluster);
log("Verifying only catalog regions are assigned\n");
if (regions.size() != 2) {
for (String oregion : regions) log("Region still online: " + oregion);
}
assertEquals(2, regions.size());
log("Enabling table\n");
TEST_UTIL.getHBaseAdmin().enableTable(table);
log("Waiting for no more RIT\n");
blockUntilNoRIT(zkw, master);
log("Verifying there are " + numRegions + " assigned on cluster\n");
regions = getAllOnlineRegions(cluster);
assertRegionsAssigned(cluster, regions);
assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
@ -93,7 +107,7 @@ public class TestRollingRestart {
restarted.waitForServerOnline();
log("Additional RS is online");
log("Waiting for no more RIT");
ZKAssign.blockUntilNoRIT(zkw);
blockUntilNoRIT(zkw, master);
log("Verifying there are " + numRegions + " assigned on cluster");
assertRegionsAssigned(cluster, regions);
assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
@ -112,22 +126,23 @@ public class TestRollingRestart {
}
// Bring down the backup master
LOG.debug("\n\nStopping backup master\n\n");
log("Stopping backup master\n\n");
backupMaster.getMaster().stop("Stop of backup during rolling restart");
cluster.hbaseCluster.waitOnMaster(backupMaster);
// Bring down the primary master
LOG.debug("\n\nStopping primary master\n\n");
log("Stopping primary master\n\n");
activeMaster.getMaster().stop("Stop of active during rolling restart");
cluster.hbaseCluster.waitOnMaster(activeMaster);
// Start primary master
LOG.debug("\n\nRestarting primary master\n\n");
log("Restarting primary master\n\n");
activeMaster = cluster.startMaster();
cluster.waitForActiveAndReadyMaster();
master = activeMaster.getMaster();
// Start backup master
LOG.debug("\n\nRestarting backup master\n\n");
log("Restarting backup master\n\n");
backupMaster = cluster.startMaster();
assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
@ -148,7 +163,7 @@ public class TestRollingRestart {
log("Waiting for RS shutdown to be handled by master");
waitForRSShutdownToStartAndFinish(activeMaster, serverName);
log("RS shutdown done, waiting for no more RIT");
ZKAssign.blockUntilNoRIT(zkw);
blockUntilNoRIT(zkw, master);
log("Verifying there are " + numRegions + " assigned on cluster");
assertRegionsAssigned(cluster, regions);
expectedNumRS--;
@ -159,7 +174,7 @@ public class TestRollingRestart {
expectedNumRS++;
log("Region server " + num + " is back online");
log("Waiting for no more RIT");
ZKAssign.blockUntilNoRIT(zkw);
blockUntilNoRIT(zkw, master);
log("Verifying there are " + numRegions + " assigned on cluster");
assertRegionsAssigned(cluster, regions);
assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
@ -192,7 +207,7 @@ public class TestRollingRestart {
waitForRSShutdownToStartAndFinish(activeMaster,
metaServer.getRegionServer().getServerName());
log("Waiting for no more RIT");
ZKAssign.blockUntilNoRIT(zkw);
blockUntilNoRIT(zkw, master);
log("Verifying there are " + numRegions + " assigned on cluster");
assertRegionsAssigned(cluster, regions);
assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
@ -208,7 +223,7 @@ public class TestRollingRestart {
waitForRSShutdownToStartAndFinish(activeMaster,
metaServer.getRegionServer().getServerName());
log("RS shutdown done, waiting for no more RIT");
ZKAssign.blockUntilNoRIT(zkw);
blockUntilNoRIT(zkw, master);
log("Verifying there are " + numRegions + " assigned on cluster");
assertRegionsAssigned(cluster, regions);
assertEquals(expectedNumRS, cluster.getRegionServerThreads().size());
@ -219,7 +234,7 @@ public class TestRollingRestart {
cluster.startRegionServer().waitForServerOnline();
Thread.sleep(1000);
log("Waiting for no more RIT");
ZKAssign.blockUntilNoRIT(zkw);
blockUntilNoRIT(zkw, master);
log("Verifying there are " + numRegions + " assigned on cluster");
assertRegionsAssigned(cluster, regions);
// Shutdown server hosting META
@ -232,7 +247,7 @@ public class TestRollingRestart {
waitForRSShutdownToStartAndFinish(activeMaster,
metaServer.getRegionServer().getServerName());
log("RS shutdown done, waiting for no more RIT");
ZKAssign.blockUntilNoRIT(zkw);
blockUntilNoRIT(zkw, master);
log("Verifying there are " + numRegions + " assigned on cluster");
assertRegionsAssigned(cluster, regions);
@ -246,7 +261,7 @@ public class TestRollingRestart {
waitForRSShutdownToStartAndFinish(activeMaster,
metaServer.getRegionServer().getServerName());
log("RS shutdown done, waiting for no more RIT");
ZKAssign.blockUntilNoRIT(zkw);
blockUntilNoRIT(zkw, master);
log("Verifying there are " + numRegions + " assigned on cluster");
assertRegionsAssigned(cluster, regions);
@ -260,7 +275,7 @@ public class TestRollingRestart {
waitForRSShutdownToStartAndFinish(activeMaster,
metaServer.getRegionServer().getServerName());
log("RS shutdown done, waiting for no more RIT");
ZKAssign.blockUntilNoRIT(zkw);
blockUntilNoRIT(zkw, master);
log("Verifying there are " + numRegions + " assigned on cluster");
assertRegionsAssigned(cluster, regions);
@ -280,6 +295,12 @@ public class TestRollingRestart {
TEST_UTIL.shutdownMiniCluster();
}
private void blockUntilNoRIT(ZooKeeperWatcher zkw, HMaster master)
throws KeeperException, InterruptedException {
ZKAssign.blockUntilNoRIT(zkw);
master.assignmentManager.waitUntilNoRegionsInTransition(60000);
}
private void waitForRSShutdownToStartAndFinish(MasterThread activeMaster,
String serverName) throws InterruptedException {
ServerManager sm = activeMaster.getMaster().getServerManager();
@ -298,7 +319,7 @@ public class TestRollingRestart {
}
private void log(String msg) {
LOG.debug("\n\n" + msg + "\n");
LOG.debug("\n\nTRR: " + msg + "\n");
}
private RegionServerThread getServerHostingMeta(MiniHBaseCluster cluster) {
@ -325,16 +346,25 @@ public class TestRollingRestart {
for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
numFound += rst.getRegionServer().getNumberOfOnlineRegions();
}
if (expectedRegions.size() != numFound) {
LOG.debug("Expected to find " + expectedRegions.size() + " but only found"
if (expectedRegions.size() > numFound) {
log("Expected to find " + expectedRegions.size() + " but only found"
+ " " + numFound);
NavigableSet<String> foundRegions = getAllOnlineRegions(cluster);
for (String region : expectedRegions) {
if (!foundRegions.contains(region)) {
LOG.debug("Missing region: " + region);
log("Missing region: " + region);
}
}
assertEquals(expectedRegions.size(), numFound);
} else if (expectedRegions.size() < numFound) {
int doubled = numFound - expectedRegions.size();
log("Expected to find " + expectedRegions.size() + " but found"
+ " " + numFound + " (" + doubled + " double assignments?)");
NavigableSet<String> doubleRegions = getDoubleAssignedRegions(cluster);
for (String region : doubleRegions) {
log("Region is double assigned: " + region);
}
assertEquals(expectedRegions.size(), numFound);
} else {
log("Success! Found expected number of " + numFound + " regions");
}
@ -350,4 +380,18 @@ public class TestRollingRestart {
return online;
}
private NavigableSet<String> getDoubleAssignedRegions(
MiniHBaseCluster cluster) {
NavigableSet<String> online = new TreeSet<String>();
NavigableSet<String> doubled = new TreeSet<String>();
for (RegionServerThread rst : cluster.getLiveRegionServerThreads()) {
for (HRegionInfo region : rst.getRegionServer().getOnlineRegions()) {
if(!online.add(region.getRegionNameAsString())) {
doubled.add(region.getRegionNameAsString());
}
}
}
return doubled;
}
}