HBASE-543, HBASE-1046, HBase-1051 A region's state is kept in several places in the master opening the possibility for race conditions

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@729186 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jim Kellerman 2008-12-24 01:06:54 +00:00
parent d47bdad4fd
commit 9220e66c92
12 changed files with 684 additions and 583 deletions

View File

@ -115,6 +115,8 @@ Release 0.19.0 - Unreleased
HBASE-1079 Dumb NPE in ServerCallable hides the RetriesExhausted exception HBASE-1079 Dumb NPE in ServerCallable hides the RetriesExhausted exception
HBASE-782 The DELETE key in the hbase shell deletes the wrong character HBASE-782 The DELETE key in the hbase shell deletes the wrong character
(Tim Sell via Stack) (Tim Sell via Stack)
HBASE-543, HBASE-1046, HBase-1051 A region's state is kept in several places
in the master opening the possibility for race conditions
IMPROVEMENTS IMPROVEMENTS
HBASE-901 Add a limit to key length, check key and value length on client side HBASE-901 Add a limit to key length, check key and value length on client side

View File

@ -177,7 +177,15 @@ public class HConnectionManager implements HConstants {
return this.pause * HConstants.RETRY_BACKOFF[ntries]; return this.pause * HConstants.RETRY_BACKOFF[ntries];
} }
public void unsetRootRegionLocation() {
this.rootRegionLocation = null;
}
public void setRootRegionLocation(HRegionLocation rootRegion) { public void setRootRegionLocation(HRegionLocation rootRegion) {
if (rootRegion == null) {
throw new IllegalArgumentException(
"Cannot set root region location to null.");
}
this.rootRegionLocation = rootRegion; this.rootRegionLocation = rootRegion;
} }

View File

@ -33,4 +33,10 @@ public interface ServerConnection extends HConnection {
* @param rootRegion * @param rootRegion
*/ */
public void setRootRegionLocation(HRegionLocation rootRegion); public void setRootRegionLocation(HRegionLocation rootRegion);
/**
* Unset the root region location in the connection. Called by
* ServerManager.processRegionClose.
*/
public void unsetRootRegionLocation();
} }

View File

@ -337,79 +337,85 @@ abstract class BaseScanner extends Chore implements HConstants {
final String serverName, final long startCode) final String serverName, final long startCode)
throws IOException { throws IOException {
// Skip region - if ... synchronized (regionManager) {
if(info.isOffline() // offline // Skip region - if ...
|| regionManager.isClosing(info.getRegionName())) { // queued for offline if(info.isOffline() // offline
|| regionManager.isOfflined(info.getRegionName())) { // queued for offline
regionManager.noLongerUnassigned(info); regionManager.removeRegion(info);
regionManager.noLongerPending(info.getRegionName());
return;
}
HServerInfo storedInfo = null;
boolean deadServer = false;
if (serverName.length() != 0) {
if (regionManager.isMarkedToClose(serverName, info.getRegionName())) {
// Skip if region is on kill list
if(LOG.isDebugEnabled()) {
LOG.debug("not assigning region (on kill list): " +
info.getRegionNameAsString());
}
return; return;
} }
HServerInfo storedInfo = null;
storedInfo = master.serverManager.getServerInfo(serverName); boolean deadServer = false;
deadServer = master.serverManager.isDead(serverName); if (serverName.length() != 0) {
}
/* if (regionManager.isOfflined(info.getRegionName())) {
* If the server is a dead server or its startcode is off -- either null // Skip if region is on kill list
* or doesn't match the start code for the address -- then add it to the if(LOG.isDebugEnabled()) {
* list of unassigned regions IF not already there (or pending open). LOG.debug("not assigning region (on kill list): " +
*/ info.getRegionNameAsString());
if (!deadServer && !regionManager.isUnassigned(info) &&
!regionManager.isPending(info.getRegionName())
&& (storedInfo == null || storedInfo.getStartCode() != startCode)) {
// The current assignment is invalid
if (LOG.isDebugEnabled()) {
LOG.debug("Current assignment of " +
info.getRegionNameAsString() +
" is not valid: serverInfo: " + storedInfo + ", passed startCode: " +
startCode + ", storedInfo.startCode: " +
((storedInfo != null)? storedInfo.getStartCode(): -1) +
", unassignedRegions: " +
regionManager.isUnassigned(info) +
", pendingRegions: " +
regionManager.isPending(info.getRegionName()));
}
// Recover the region server's log if there is one.
// This is only done from here if we are restarting and there is stale
// data in the meta region. Once we are on-line, dead server log
// recovery is handled by lease expiration and ProcessServerShutdown
if (!regionManager.isInitialMetaScanComplete() && serverName.length() != 0) {
StringBuilder dirName = new StringBuilder("log_");
dirName.append(serverName.replace(":", "_"));
Path logDir = new Path(master.rootdir, dirName.toString());
try {
if (master.fs.exists(logDir)) {
regionManager.splitLogLock.lock();
try {
HLog.splitLog(master.rootdir, logDir, master.fs,
master.getConfiguration());
} finally {
regionManager.splitLogLock.unlock();
}
} }
if (LOG.isDebugEnabled()) { return;
LOG.debug("Split " + logDir.toString());
}
} catch (IOException e) {
LOG.warn("unable to split region server log because: ", e);
throw e;
} }
storedInfo = master.serverManager.getServerInfo(serverName);
deadServer = master.serverManager.isDead(serverName);
}
/*
* If the server is a dead server or its startcode is off -- either null
* or doesn't match the start code for the address -- then add it to the
* list of unassigned regions IF not already there (or pending open).
*/
if ((deadServer ||
(storedInfo == null || storedInfo.getStartCode() != startCode)) &&
(!regionManager.isUnassigned(info) &&
!regionManager.isPending(info.getRegionName()) &&
!regionManager.isAssigned(info.getRegionName()))) {
// The current assignment is invalid
if (LOG.isDebugEnabled()) {
LOG.debug("Current assignment of " +
info.getRegionNameAsString() +
" is not valid." +
(storedInfo == null ? " Server '" + serverName + "' unknown." :
" serverInfo: " + storedInfo + ", passed startCode: " +
startCode + ", storedInfo.startCode: " + storedInfo.getStartCode()) +
" Region is not unassigned, assigned or pending");
}
// Recover the region server's log if there is one.
// This is only done from here if we are restarting and there is stale
// data in the meta region. Once we are on-line, dead server log
// recovery is handled by lease expiration and ProcessServerShutdown
if (!regionManager.isInitialMetaScanComplete() &&
serverName.length() != 0) {
StringBuilder dirName = new StringBuilder("log_");
dirName.append(serverName.replace(":", "_"));
Path logDir = new Path(master.rootdir, dirName.toString());
try {
if (master.fs.exists(logDir)) {
regionManager.splitLogLock.lock();
try {
HLog.splitLog(master.rootdir, logDir, master.fs,
master.getConfiguration());
} finally {
regionManager.splitLogLock.unlock();
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Split " + logDir.toString());
}
} catch (IOException e) {
LOG.warn("unable to split region server log because: ", e);
throw e;
}
}
// Now get the region assigned
regionManager.setUnassigned(info, true);
} }
// Now get the region assigned
regionManager.setUnassigned(info);
} }
} }

View File

@ -23,14 +23,12 @@ import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.util.Writables;
/** Instantiated to enable or disable a table */ /** Instantiated to enable or disable a table */
@ -90,15 +88,18 @@ class ChangeTableState extends TableOperation {
LOG.debug("Updated columns in row: " + i.getRegionNameAsString()); LOG.debug("Updated columns in row: " + i.getRegionNameAsString());
} }
if (online) { synchronized (master.regionManager) {
// Bring offline regions on-line if (online) {
master.regionManager.noLongerClosing(i.getRegionName()); // Bring offline regions on-line
if (!master.regionManager.isUnassigned(i)) { if (!master.regionManager.isUnassigned(i) &&
master.regionManager.setUnassigned(i); !master.regionManager.isAssigned(i.getRegionName()) &&
!master.regionManager.isPending(i.getRegionName())) {
master.regionManager.setUnassigned(i, false);
}
} else {
// Prevent region from getting assigned.
master.regionManager.removeRegion(i);
} }
} else {
// Prevent region from getting assigned.
master.regionManager.noLongerUnassigned(i);
} }
} }
@ -106,37 +107,23 @@ class ChangeTableState extends TableOperation {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("processing regions currently being served"); LOG.debug("processing regions currently being served");
} }
for (Map.Entry<String, HashSet<HRegionInfo>> e: servedRegions.entrySet()) { synchronized (master.regionManager) {
String serverName = e.getKey(); for (Map.Entry<String, HashSet<HRegionInfo>> e: servedRegions.entrySet()) {
if (online) { String serverName = e.getKey();
LOG.debug("Already online"); if (online) {
continue; // Already being served LOG.debug("Already online");
} continue; // Already being served
// Cause regions being served to be taken off-line and disabled
Map<byte [], HRegionInfo> localKillList =
new TreeMap<byte [], HRegionInfo>(Bytes.BYTES_COMPARATOR);
for (HRegionInfo i: e.getValue()) {
if (LOG.isDebugEnabled()) {
LOG.debug("adding region " + i.getRegionNameAsString() + " to kill list");
} }
// this marks the regions to be closed
localKillList.put(i.getRegionName(), i); // Cause regions being served to be taken off-line and disabled
// this marks the regions to be offlined once they are closed
master.regionManager.markRegionForOffline(i.getRegionName()); for (HRegionInfo i: e.getValue()) {
} if (LOG.isDebugEnabled()) {
Map<byte [], HRegionInfo> killedRegions = LOG.debug("adding region " + i.getRegionNameAsString() + " to kill list");
master.regionManager.removeMarkedToClose(serverName); }
if (killedRegions != null) { // this marks the regions to be closed
localKillList.putAll(killedRegions); master.regionManager.setClosing(serverName, i, true);
}
if (localKillList.size() > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("inserted local kill list into kill list for server " +
serverName);
} }
master.regionManager.markToCloseBulk(serverName, localKillList);
} }
} }
servedRegions.clear(); servedRegions.clear();

View File

@ -93,7 +93,8 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
static final Log LOG = LogFactory.getLog(HMaster.class.getName()); static final Log LOG = LogFactory.getLog(HMaster.class.getName());
public long getProtocolVersion(String protocol, long clientVersion) { public long getProtocolVersion(@SuppressWarnings("unused") String protocol,
@SuppressWarnings("unused") long clientVersion) {
return HBaseRPCProtocolVersion.versionID; return HBaseRPCProtocolVersion.versionID;
} }
@ -531,8 +532,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
/* /*
* HMasterRegionInterface * HMasterRegionInterface
*/ */
public MapWritable regionServerStartup(HServerInfo serverInfo) public MapWritable regionServerStartup(HServerInfo serverInfo) {
throws IOException {
// Set the address for now even tho it will not be persisted on // Set the address for now even tho it will not be persisted on
// the HRS side. // the HRS side.
String rsAddress = HBaseServer.getRemoteAddress(); String rsAddress = HBaseServer.getRemoteAddress();
@ -833,7 +833,7 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
LOG.info("Marking " + hri.getRegionNameAsString() + LOG.info("Marking " + hri.getRegionNameAsString() +
" as closed on " + servername + "; cleaning SERVER + STARTCODE; " + " as closed on " + servername + "; cleaning SERVER + STARTCODE; " +
"master will tell regionserver to close region on next heartbeat"); "master will tell regionserver to close region on next heartbeat");
this.regionManager.markToClose(servername, hri); this.regionManager.setClosing(servername, hri, false);
MetaRegion meta = this.regionManager.getMetaRegionForRow(regionname); MetaRegion meta = this.regionManager.getMetaRegionForRow(regionname);
HRegionInterface srvr = getMETAServer(meta); HRegionInterface srvr = getMETAServer(meta);
HRegion.cleanRegionInMETA(srvr, meta.getRegionName(), hri); HRegion.cleanRegionInMETA(srvr, meta.getRegionName(), hri);

View File

@ -154,10 +154,12 @@ class MetaScanner extends BaseScanner {
*/ */
synchronized boolean waitForMetaRegionsOrClose() { synchronized boolean waitForMetaRegionsOrClose() {
while (!master.closed.get()) { while (!master.closed.get()) {
if (regionManager.isInitialRootScanComplete() && synchronized (master.regionManager) {
regionManager.numMetaRegions() == if (regionManager.isInitialRootScanComplete() &&
regionManager.numOnlineMetaRegions()) { regionManager.numMetaRegions() ==
break; regionManager.numOnlineMetaRegions()) {
break;
}
} }
try { try {
wait(master.threadWakeFrequency); wait(master.threadWakeFrequency);

View File

@ -71,11 +71,11 @@ class ProcessRegionClose extends ProcessRegionStatusChange {
// back on the toDoQueue // back on the toDoQueue
if (metaRegionAvailable()) { if (metaRegionAvailable()) {
// offline the region in meta and then note that we've offlined // offline the region in meta and then remove it from the
// the region. // set of regions in transition
HRegion.offlineRegionInMETA(server, metaRegionName, HRegion.offlineRegionInMETA(server, metaRegionName,
regionInfo); regionInfo);
master.regionManager.regionOfflined(regionInfo.getRegionName()); master.regionManager.removeRegion(regionInfo);
} }
return true; return true;
} }
@ -84,7 +84,7 @@ class ProcessRegionClose extends ProcessRegionStatusChange {
} else if (reassignRegion) { } else if (reassignRegion) {
// we are reassigning the region eventually, so set it unassigned // we are reassigning the region eventually, so set it unassigned
master.regionManager.setUnassigned(regionInfo); master.regionManager.setUnassigned(regionInfo, false);
} }
return result == null ? true : result; return result == null ? true : result;

View File

@ -94,26 +94,28 @@ class ProcessRegionOpen extends ProcessRegionStatusChange {
this.historian.online(this.master.getConfiguration()); this.historian.online(this.master.getConfiguration());
} }
this.historian.addRegionOpen(regionInfo, serverAddress); this.historian.addRegionOpen(regionInfo, serverAddress);
if (isMetaTable) { synchronized (master.regionManager) {
// It's a meta region. if (isMetaTable) {
MetaRegion m = new MetaRegion(new HServerAddress(serverAddress), // It's a meta region.
regionInfo.getRegionName(), regionInfo.getStartKey()); MetaRegion m = new MetaRegion(new HServerAddress(serverAddress),
if (!master.regionManager.isInitialMetaScanComplete()) { regionInfo.getRegionName(), regionInfo.getStartKey());
// Put it on the queue to be scanned for the first time. if (!master.regionManager.isInitialMetaScanComplete()) {
LOG.debug("Adding " + m.toString() + " to regions to scan"); // Put it on the queue to be scanned for the first time.
master.regionManager.addMetaRegionToScan(m); LOG.debug("Adding " + m.toString() + " to regions to scan");
} else { master.regionManager.addMetaRegionToScan(m);
// Add it to the online meta regions } else {
LOG.debug("Adding to onlineMetaRegions: " + m.toString()); // Add it to the online meta regions
master.regionManager.putMetaRegionOnline(m); LOG.debug("Adding to onlineMetaRegions: " + m.toString());
// Interrupting the Meta Scanner sleep so that it can master.regionManager.putMetaRegionOnline(m);
// process regions right away // Interrupting the Meta Scanner sleep so that it can
master.regionManager.metaScannerThread.interrupt(); // process regions right away
master.regionManager.metaScannerThread.interrupt();
}
} }
// If updated successfully, remove from pending list.
master.regionManager.removeRegion(regionInfo);
return true;
} }
// If updated successfully, remove from pending list.
master.regionManager.noLongerPending(regionInfo.getRegionName());
return true;
} }
}.doWithRetries(); }.doWithRetries();
return result == null ? true : result; return result == null ? true : result;

View File

@ -113,7 +113,7 @@ class ProcessServerShutdown extends RegionServerOperation {
// region had been on shutdown server (could be null because we // region had been on shutdown server (could be null because we
// missed edits in hlog because hdfs does not do write-append). // missed edits in hlog because hdfs does not do write-append).
String serverName = Writables.cellToString(values.get(COL_SERVER)); String serverName = Writables.cellToString(values.get(COL_SERVER));
if (serverName.length() > 0 && if (serverName != null && serverName.length() > 0 &&
deadServerName.compareTo(serverName) != 0) { deadServerName.compareTo(serverName) != 0) {
// This isn't the server you're looking for - move along // This isn't the server you're looking for - move along
continue; continue;
@ -130,31 +130,33 @@ class ProcessServerShutdown extends RegionServerOperation {
continue; continue;
} }
if (info.isMetaTable()) { synchronized (master.regionManager) {
if (LOG.isDebugEnabled()) { if (info.isMetaTable()) {
LOG.debug("removing meta region " + if (LOG.isDebugEnabled()) {
Bytes.toString(info.getRegionName()) + LOG.debug("removing meta region " +
Bytes.toString(info.getRegionName()) +
" from online meta regions"); " from online meta regions");
}
master.regionManager.offlineMetaRegion(info.getStartKey());
} }
master.regionManager.offlineMetaRegion(info.getStartKey());
}
ToDoEntry todo = new ToDoEntry(row, info); ToDoEntry todo = new ToDoEntry(row, info);
toDoList.add(todo); toDoList.add(todo);
if (master.regionManager.isMarkedToClose(deadServerName, info.getRegionName())) { if (master.regionManager.isOfflined(info.getRegionName()) ||
master.regionManager.noLongerMarkedToClose(deadServerName, info.getRegionName()); info.isOffline()) {
master.regionManager.noLongerUnassigned(info); master.regionManager.removeRegion(info);
// Mark region offline // Mark region offline
todo.regionOffline = true; if (!info.isOffline()) {
} else { todo.regionOffline = true;
if (!info.isOffline() && !info.isSplit()) { }
// Get region reassigned } else {
regions.add(info); if (!info.isOffline() && !info.isSplit()) {
// Get region reassigned
regions.add(info);
}
} }
} }
// If it was pending, remove.
master.regionManager.noLongerPending(info.getRegionName());
} }
} finally { } finally {
if(scannerId != -1L) { if(scannerId != -1L) {
@ -184,7 +186,7 @@ class ProcessServerShutdown extends RegionServerOperation {
// Get regions reassigned // Get regions reassigned
for (HRegionInfo info: regions) { for (HRegionInfo info: regions) {
master.regionManager.setUnassigned(info); master.regionManager.setUnassigned(info, true);
} }
} }
@ -252,11 +254,11 @@ class ProcessServerShutdown extends RegionServerOperation {
} }
if (this.rootRegionServer && !this.rootRegionReassigned) { if (this.rootRegionServer && !this.rootRegionReassigned) {
// avoid multiple root region reassignment
this.rootRegionReassigned = true;
// The server that died was serving the root region. Now that the log // The server that died was serving the root region. Now that the log
// has been split, get it reassigned. // has been split, get it reassigned.
master.regionManager.reassignRootRegion(); master.regionManager.reassignRootRegion();
// avoid multiple root region reassignment
this.rootRegionReassigned = true;
// When we call rootAvailable below, it will put us on the delayed // When we call rootAvailable below, it will put us on the delayed
// to do queue to allow some time to pass during which the root // to do queue to allow some time to pass during which the root
// region will hopefully get reassigned. // region will hopefully get reassigned.
@ -307,7 +309,6 @@ class ProcessServerShutdown extends RegionServerOperation {
Bytes.toString(r.getRegionName()) + " on " + r.getServer()); Bytes.toString(r.getRegionName()) + " on " + r.getServer());
} }
} }
master.regionManager.allRegionsClosed(deadServerName);
master.serverManager.removeDeadServer(deadServerName); master.serverManager.removeDeadServer(deadServerName);
return true; return true;
} }

View File

@ -34,8 +34,6 @@ import java.util.Set;
import java.util.SortedMap; import java.util.SortedMap;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.Collections; import java.util.Collections;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -80,45 +78,23 @@ class RegionManager implements HConstants {
private static final byte[] OVERLOADED = Bytes.toBytes("Overloaded"); private static final byte[] OVERLOADED = Bytes.toBytes("Overloaded");
/** /*
* The 'unassignedRegions' table maps from a HRegionInfo to a timestamp that * Map of region name to RegionState for regions that are in transition such as
* indicates the last time we *tried* to assign the region to a RegionServer.
* If the timestamp is out of date, then we can try to reassign it.
* *
* We fill 'unassignedRecords' by scanning ROOT and META tables, learning the * unassigned -> assigned -> pending -> open
* set of all known valid regions. * closing -> closed -> offline
* closing -> closed -> unassigned -> assigned -> pending -> open
*
* At the end of a transition, removeRegion is used to remove the region from
* the map (since it is no longer in transition)
*
* Note: Needs to be SortedMap so we can specify a comparator
* *
* <p>Items are removed from this list when a region server reports in that
* the region has been deployed.
*
* TODO: Need to be a sorted map?
*/ */
private final SortedMap<HRegionInfo, Long> unassignedRegions = private final SortedMap<byte[], RegionState> regionsInTransition =
Collections.synchronizedSortedMap(new TreeMap<HRegionInfo, Long>()); Collections.synchronizedSortedMap(
new TreeMap<byte[], RegionState>(Bytes.BYTES_COMPARATOR));
/**
* Regions that have been assigned, and the server has reported that it has
* started serving it, but that we have not yet recorded in the meta table.
*/
private final Set<byte []> pendingRegions =
Collections.synchronizedSet(new TreeSet<byte []>(Bytes.BYTES_COMPARATOR));
/**
* List of regions that are going to be closed.
*/
private final Map<String, Map<byte [], HRegionInfo>> regionsToClose =
new ConcurrentHashMap<String, Map<byte [], HRegionInfo>>();
/** Regions that are in the process of being closed */
private final Set<byte []> closingRegions =
Collections.synchronizedSet(new TreeSet<byte []>(Bytes.BYTES_COMPARATOR));
/**
* Set of regions that, once closed, should be marked as offline so that they
* are not reassigned.
*/
private final Set<byte []> regionsToOffline =
Collections.synchronizedSet(new TreeSet<byte []>(Bytes.BYTES_COMPARATOR));
// How many regions to assign a server at a time. // How many regions to assign a server at a time.
private final int maxAssignInOneGo; private final int maxAssignInOneGo;
@ -127,12 +103,12 @@ class RegionManager implements HConstants {
private final float slop; private final float slop;
/** Set of regions to split. */ /** Set of regions to split. */
private final Map<byte[],Pair<HRegionInfo,HServerAddress>> regionsToSplit = private final SortedMap<byte[],Pair<HRegionInfo,HServerAddress>> regionsToSplit =
Collections.synchronizedSortedMap( Collections.synchronizedSortedMap(
new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>> new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>>
(Bytes.BYTES_COMPARATOR)); (Bytes.BYTES_COMPARATOR));
/** Set of regions to compact. */ /** Set of regions to compact. */
private final Map<byte[],Pair<HRegionInfo,HServerAddress>> regionsToCompact = private final SortedMap<byte[],Pair<HRegionInfo,HServerAddress>> regionsToCompact =
Collections.synchronizedSortedMap( Collections.synchronizedSortedMap(
new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>> new TreeMap<byte[],Pair<HRegionInfo,HServerAddress>>
(Bytes.BYTES_COMPARATOR)); (Bytes.BYTES_COMPARATOR));
@ -160,21 +136,31 @@ class RegionManager implements HConstants {
Threads.setDaemonThreadRunning(metaScannerThread, Threads.setDaemonThreadRunning(metaScannerThread,
"RegionManager.metaScanner"); "RegionManager.metaScanner");
} }
void unsetRootRegion() { void unsetRootRegion() {
rootRegionLocation.set(null); synchronized (regionsInTransition) {
rootRegionLocation.set(null);
regionsInTransition.remove(HRegionInfo.ROOT_REGIONINFO.getRegionName());
}
} }
void reassignRootRegion() { void reassignRootRegion() {
unsetRootRegion(); unsetRootRegion();
if (!master.shutdownRequested) { if (!master.shutdownRequested) {
unassignedRegions.put(HRegionInfo.ROOT_REGIONINFO, ZERO_L); synchronized (regionsInTransition) {
RegionState s = new RegionState(HRegionInfo.ROOT_REGIONINFO);
s.setUnassigned();
regionsInTransition.put(HRegionInfo.ROOT_REGIONINFO.getRegionName(), s);
}
} }
} }
/* /*
* Assigns regions to region servers attempting to balance the load across * Assigns regions to region servers attempting to balance the load across
* all region servers * all region servers
*
* Note that no synchronization is necessary as the caller
* (ServerManager.processMsgs) already owns the monitor for the RegionManager.
* *
* @param info * @param info
* @param serverName * @param serverName
@ -183,53 +169,51 @@ class RegionManager implements HConstants {
void assignRegions(HServerInfo info, String serverName, void assignRegions(HServerInfo info, String serverName,
HRegionInfo[] mostLoadedRegions, ArrayList<HMsg> returnMsgs) { HRegionInfo[] mostLoadedRegions, ArrayList<HMsg> returnMsgs) {
HServerLoad thisServersLoad = info.getLoad(); HServerLoad thisServersLoad = info.getLoad();
synchronized (unassignedRegions) { // figure out what regions need to be assigned and aren't currently being
// We need to hold a lock on assign attempts while we figure out what to // worked on elsewhere.
// do so that multiple threads do not execute this method in parallel Set<RegionState> regionsToAssign = regionsAwaitingAssignment();
// resulting in assigning the same region to multiple servers. if (regionsToAssign.size() == 0) {
// There are no regions waiting to be assigned.
// figure out what regions need to be assigned and aren't currently being if (!inSafeMode()) {
// worked on elsewhere. // We only do load balancing once all regions are assigned.
Set<HRegionInfo> regionsToAssign = regionsAwaitingAssignment(); // This prevents churn while the cluster is starting up.
if (regionsToAssign.size() == 0) { double avgLoad = master.serverManager.getAverageLoad();
// There are no regions waiting to be assigned. double avgLoadWithSlop = avgLoad +
if (!inSafeMode()) { ((this.slop != 0)? avgLoad * this.slop: avgLoad);
// We only do load balancing once all regions are assigned. if (avgLoad > 2.0 &&
// This prevents churn while the cluster is starting up. thisServersLoad.getNumberOfRegions() > avgLoadWithSlop) {
double avgLoad = master.serverManager.getAverageLoad(); if (LOG.isDebugEnabled()) {
double avgLoadWithSlop = avgLoad + LOG.debug("Server " + serverName +
((this.slop != 0)? avgLoad * this.slop: avgLoad);
if (avgLoad > 2.0 &&
thisServersLoad.getNumberOfRegions() > avgLoadWithSlop) {
if (LOG.isDebugEnabled()) {
LOG.debug("Server " + serverName +
" is overloaded. Server load: " + " is overloaded. Server load: " +
thisServersLoad.getNumberOfRegions() + " avg: " + avgLoad + thisServersLoad.getNumberOfRegions() + " avg: " + avgLoad +
", slop: " + this.slop); ", slop: " + this.slop);
}
unassignSomeRegions(thisServersLoad, avgLoad, mostLoadedRegions,
returnMsgs);
} }
unassignSomeRegions(serverName, thisServersLoad,
avgLoad, mostLoadedRegions, returnMsgs);
} }
}
} else {
// if there's only one server, just give it all the regions
if (master.serverManager.numServers() == 1) {
assignRegionsToOneServer(regionsToAssign, serverName, returnMsgs);
} else { } else {
// if there's only one server, just give it all the regions // otherwise, give this server a few regions taking into account the
if (master.serverManager.numServers() == 1) { // load of all the other servers.
assignRegionsToOneServer(regionsToAssign, serverName, returnMsgs); assignRegionsToMultipleServers(thisServersLoad, regionsToAssign,
} else { serverName, returnMsgs);
// otherwise, give this server a few regions taking into account the
// load of all the other servers.
assignRegionsToMultipleServers(thisServersLoad, regionsToAssign,
serverName, returnMsgs);
}
} }
} }
} }
/** /*
* Make region assignments taking into account multiple servers' loads. * Make region assignments taking into account multiple servers' loads.
*
* Note that no synchronization is needed while we iterate over
* regionsInTransition because this method is only called by assignRegions
* whose caller owns the monitor for RegionManager
*/ */
private void assignRegionsToMultipleServers(final HServerLoad thisServersLoad, private void assignRegionsToMultipleServers(final HServerLoad thisServersLoad,
final Set<HRegionInfo> regionsToAssign, final String serverName, final Set<RegionState> regionsToAssign, final String serverName,
final ArrayList<HMsg> returnMsgs) { final ArrayList<HMsg> returnMsgs) {
int nRegionsToAssign = regionsToAssign.size(); int nRegionsToAssign = regionsToAssign.size();
@ -274,14 +258,12 @@ class RegionManager implements HConstants {
nregions = this.maxAssignInOneGo; nregions = this.maxAssignInOneGo;
} }
long now = System.currentTimeMillis(); for (RegionState s: regionsToAssign) {
for (HRegionInfo regionInfo: regionsToAssign) { LOG.info("assigning region " + Bytes.toString(s.getRegionName())+
LOG.info("assigning region " +
Bytes.toString(regionInfo.getRegionName())+
" to server " + serverName); " to server " + serverName);
unassignedRegions.put(regionInfo, Long.valueOf(now)); s.setAssigned(serverName);
this.historian.addRegionAssignment(regionInfo, serverName); this.historian.addRegionAssignment(s.getRegionInfo(), serverName);
returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_OPEN, regionInfo)); returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_OPEN, s.getRegionInfo()));
if (--nregions <= 0) { if (--nregions <= 0) {
break; break;
} }
@ -326,38 +308,38 @@ class RegionManager implements HConstants {
return nRegions; return nRegions;
} }
/** /*
* Get the set of regions that should be assignable in this pass. * Get the set of regions that should be assignable in this pass.
*
* Note that no synchronization on regionsInTransition is needed because the
* only caller (assignRegions) whose caller owns the monitor for RegionManager
*/ */
private Set<HRegionInfo> regionsAwaitingAssignment() { private Set<RegionState> regionsAwaitingAssignment() {
long now = System.currentTimeMillis();
// set of regions we want to assign to this server // set of regions we want to assign to this server
Set<HRegionInfo> regionsToAssign = new HashSet<HRegionInfo>(); Set<RegionState> regionsToAssign = new HashSet<RegionState>();
// Look over the set of regions that aren't currently assigned to // Look over the set of regions that aren't currently assigned to
// determine which we should assign to this server. // determine which we should assign to this server.
synchronized (unassignedRegions) { //must synchronize when iterating for (RegionState s: regionsInTransition.values()) {
for (Map.Entry<HRegionInfo, Long> e: unassignedRegions.entrySet()) { HRegionInfo i = s.getRegionInfo();
HRegionInfo i = e.getKey(); if (i == null) {
if (numberOfMetaRegions.get() != onlineMetaRegions.size() && continue;
!i.isMetaRegion()) { }
// Can't assign user regions until all meta regions have been assigned if (numberOfMetaRegions.get() != onlineMetaRegions.size() &&
// and are on-line !i.isMetaRegion()) {
continue; // Can't assign user regions until all meta regions have been assigned
} // and are on-line
// If the last attempt to open this region was pretty recent, then we continue;
// don't want to try and assign it. }
long diff = now - e.getValue().longValue(); if (!s.isAssigned() && !s.isClosing() && !s.isPending()) {
if (diff > master.maxRegionOpenTime) { s.setUnassigned();
regionsToAssign.add(e.getKey()); regionsToAssign.add(s);
}
} }
} }
return regionsToAssign; return regionsToAssign;
} }
/** /*
* Figure out the load that is next highest amongst all regionservers. Also, * Figure out the load that is next highest amongst all regionservers. Also,
* return how many servers exist at that load. * return how many servers exist at that load.
*/ */
@ -392,31 +374,37 @@ class RegionManager implements HConstants {
/* /*
* Assign all to the only server. An unlikely case but still possible. * Assign all to the only server. An unlikely case but still possible.
*
* Note that no synchronization is needed on regionsInTransition while
* iterating on it because the only caller is assignRegions whose caller owns
* the monitor for RegionManager
*
* @param regionsToAssign * @param regionsToAssign
* @param serverName * @param serverName
* @param returnMsgs * @param returnMsgs
*/ */
private void assignRegionsToOneServer(final Set<HRegionInfo> regionsToAssign, private void assignRegionsToOneServer(final Set<RegionState> regionsToAssign,
final String serverName, final ArrayList<HMsg> returnMsgs) { final String serverName, final ArrayList<HMsg> returnMsgs) {
long now = System.currentTimeMillis(); for (RegionState s: regionsToAssign) {
for (HRegionInfo regionInfo: regionsToAssign) { LOG.info("assigning region " + Bytes.toString(s.getRegionName()) +
LOG.info("assigning region " +
Bytes.toString(regionInfo.getRegionName()) +
" to the only server " + serverName); " to the only server " + serverName);
unassignedRegions.put(regionInfo, Long.valueOf(now)); s.setAssigned(serverName);
this.historian.addRegionAssignment(regionInfo, serverName); this.historian.addRegionAssignment(s.getRegionInfo(), serverName);
returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_OPEN, regionInfo)); returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_OPEN, s.getRegionInfo()));
} }
} }
/** /*
* The server checking in right now is overloaded. We will tell it to close * The server checking in right now is overloaded. We will tell it to close
* some or all of its most loaded regions, allowing it to reduce its load. * some or all of its most loaded regions, allowing it to reduce its load.
* The closed regions will then get picked up by other underloaded machines. * The closed regions will then get picked up by other underloaded machines.
*
* Note that no synchronization is needed because the only caller
* (assignRegions) whose caller owns the monitor for RegionManager
*/ */
private synchronized void unassignSomeRegions(final HServerLoad load, private void unassignSomeRegions(final String serverName,
final double avgLoad, final HRegionInfo[] mostLoadedRegions, final HServerLoad load, final double avgLoad,
ArrayList<HMsg> returnMsgs) { final HRegionInfo[] mostLoadedRegions, ArrayList<HMsg> returnMsgs) {
int numRegionsToClose = load.getNumberOfRegions() - (int)Math.ceil(avgLoad); int numRegionsToClose = load.getNumberOfRegions() - (int)Math.ceil(avgLoad);
LOG.debug("Choosing to reassign " + numRegionsToClose LOG.debug("Choosing to reassign " + numRegionsToClose
@ -425,7 +413,7 @@ class RegionManager implements HConstants {
int regionIdx = 0; int regionIdx = 0;
int regionsClosed = 0; int regionsClosed = 0;
int skippedClosing = 0; int skipped = 0;
while (regionsClosed < numRegionsToClose && while (regionsClosed < numRegionsToClose &&
regionIdx < mostLoadedRegions.length) { regionIdx < mostLoadedRegions.length) {
HRegionInfo currentRegion = mostLoadedRegions[regionIdx]; HRegionInfo currentRegion = mostLoadedRegions[regionIdx];
@ -435,8 +423,10 @@ class RegionManager implements HConstants {
continue; continue;
} }
if (isClosing(currentRegion.getRegionName())) { byte[] regionName = currentRegion.getRegionName();
skippedClosing++; if (isClosing(regionName) || isUnassigned(currentRegion) ||
isAssigned(regionName) || isPending(regionName)) {
skipped++;
continue; continue;
} }
@ -446,11 +436,11 @@ class RegionManager implements HConstants {
returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_CLOSE, currentRegion, returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_CLOSE, currentRegion,
OVERLOADED)); OVERLOADED));
// mark the region as closing // mark the region as closing
setClosing(currentRegion.getRegionName()); setClosing(serverName, currentRegion, false);
// increment the count of regions we've marked // increment the count of regions we've marked
regionsClosed++; regionsClosed++;
} }
LOG.info("Skipped " + skippedClosing + " region(s) as already closing"); LOG.info("Skipped " + skipped + " region(s) that are in transition states");
} }
/** /**
@ -503,12 +493,10 @@ class RegionManager implements HConstants {
* @return true if we found meta regions, false if we're closing. * @return true if we found meta regions, false if we're closing.
*/ */
public boolean areAllMetaRegionsOnline() { public boolean areAllMetaRegionsOnline() {
boolean result = false; synchronized (onlineMetaRegions) {
if (rootRegionLocation.get() != null && return (rootRegionLocation.get() != null &&
numberOfMetaRegions.get() == onlineMetaRegions.size()) { numberOfMetaRegions.get() == onlineMetaRegions.size());
result = true;
} }
return result;
} }
/** /**
@ -528,7 +516,7 @@ class RegionManager implements HConstants {
return onlineMetaRegions.get(newRegion.getRegionName()); return onlineMetaRegions.get(newRegion.getRegionName());
} }
return onlineMetaRegions.get(onlineMetaRegions.headMap( return onlineMetaRegions.get(onlineMetaRegions.headMap(
newRegion.getTableDesc().getName()).lastKey()); newRegion.getTableDesc().getName()).lastKey());
} }
} }
} }
@ -568,6 +556,15 @@ class RegionManager implements HConstants {
return metaRegions; return metaRegions;
} }
/**
* Remove a region from the region state map.
*
* @param info
*/
public void removeRegion(HRegionInfo info) {
regionsInTransition.remove(info.getRegionName());
}
/** /**
* Get metaregion that would host passed in row. * Get metaregion that would host passed in row.
* @param row Row need to know all the meta regions for * @param row Row need to know all the meta regions for
@ -611,7 +608,7 @@ class RegionManager implements HConstants {
region.getLog().closeAndDelete(); region.getLog().closeAndDelete();
// 5. Get it assigned to a server // 5. Get it assigned to a server
unassignedRegions.put(info, ZERO_L); setUnassigned(info, true);
} }
/** /**
@ -667,7 +664,13 @@ class RegionManager implements HConstants {
* it happens to be between states. * it happens to be between states.
*/ */
public boolean isUnassigned(HRegionInfo info) { public boolean isUnassigned(HRegionInfo info) {
return unassignedRegions.containsKey(info); synchronized (regionsInTransition) {
RegionState s = regionsInTransition.get(info.getRegionName());
if (s != null) {
return s.isUnassigned();
}
}
return false;
} }
/** /**
@ -676,18 +679,57 @@ class RegionManager implements HConstants {
* @return true if pending, false otherwise * @return true if pending, false otherwise
*/ */
public boolean isPending(byte [] regionName) { public boolean isPending(byte [] regionName) {
return pendingRegions.contains(regionName); synchronized (regionsInTransition) {
RegionState s = regionsInTransition.get(regionName);
if (s != null) {
return s.isPending();
}
}
return false;
} }
/**
* @param regionName
* @return true if region has been assigned
*/
public boolean isAssigned(byte[] regionName) {
synchronized (regionsInTransition) {
RegionState s = regionsInTransition.get(regionName);
if (s != null) {
return s.isAssigned() || s.isPending();
}
}
return false;
}
/**
* @param regionName
* @return true if region is marked to be offlined.
*/
public boolean isOfflined(byte[] regionName) {
synchronized (regionsInTransition) {
RegionState s = regionsInTransition.get(regionName);
if (s != null) {
return s.isOfflined();
}
}
return false;
}
/** /**
* Set a region to unassigned * Set a region to unassigned
* @param info Region to set unassigned * @param info Region to set unassigned
* @param force if true mark region unassigned whatever its current state
*/ */
public void setUnassigned(HRegionInfo info) { public void setUnassigned(HRegionInfo info, boolean force) {
synchronized(this.unassignedRegions) { synchronized(this.regionsInTransition) {
if (!this.unassignedRegions.containsKey(info) && RegionState s = regionsInTransition.get(info.getRegionName());
!this.pendingRegions.contains(info.getRegionName())) { if (s == null) {
this.unassignedRegions.put(info, ZERO_L); s = new RegionState(info);
regionsInTransition.put(info.getRegionName(), s);
}
if (force || (!s.isAssigned() && !s.isPending())) {
s.setUnassigned();
} }
} }
} }
@ -697,68 +739,11 @@ class RegionManager implements HConstants {
* @param regionName * @param regionName
*/ */
public void setPending(byte [] regionName) { public void setPending(byte [] regionName) {
pendingRegions.add(regionName); synchronized (regionsInTransition) {
} RegionState s = regionsInTransition.get(regionName);
if (s != null) {
/** s.setPending();
* Unset region's pending status
* @param regionName
*/
public void noLongerPending(byte [] regionName) {
pendingRegions.remove(regionName);
}
/**
* Extend the update assignment deadline for a region.
* @param info Region whose deadline you want to extend
*/
public void updateAssignmentDeadline(HRegionInfo info) {
synchronized (unassignedRegions) {
// Region server is reporting in that its working on region open
// (We can get more than one of these messages if region is replaying
// a bunch of edits and taking a while to open).
// Extend region open time by max region open time.
this.unassignedRegions.put(info,
Long.valueOf(System.currentTimeMillis() + this.master.maxRegionOpenTime));
}
}
/**
* Unset a region's unassigned status
* @param info Region you want to take off the unassigned list
*/
public void noLongerUnassigned(HRegionInfo info) {
unassignedRegions.remove(info);
}
/**
* Mark a region to be closed. Server manager will inform hosting region server
* to close the region at its next opportunity.
* @param serverName address info of server
* @param info region to close
*/
public void markToClose(final String serverName, final HRegionInfo info) {
Map<byte [], HRegionInfo> toclose =
new TreeMap<byte [], HRegionInfo>(Bytes.BYTES_COMPARATOR);
toclose.put(info.getRegionName(), info);
markToCloseBulk(serverName, toclose);
}
/**
* Mark a bunch of regions as to close at once for a server
* @param serverName address info of server
* @param map map of region names to region infos of regions to close
*/
public void markToCloseBulk(final String serverName,
final Map<byte [], HRegionInfo> map) {
synchronized (regionsToClose) {
Map<byte [], HRegionInfo> regions = regionsToClose.get(serverName);
if (regions != null) {
regions.putAll(map);
} else {
regions = map;
} }
regionsToClose.put(serverName, regions);
} }
} }
@ -767,74 +752,71 @@ class RegionManager implements HConstants {
* given server * given server
* *
* @param serverName * @param serverName
* @return map of region names to region infos to close * @return set of infos to close
*/ */
public Map<byte [], HRegionInfo> removeMarkedToClose(String serverName) { public Set<HRegionInfo> getMarkedToClose(String serverName) {
return regionsToClose.remove(serverName); Set<HRegionInfo> result = new HashSet<HRegionInfo>();
} synchronized (regionsInTransition) {
for (RegionState s: regionsInTransition.values()) {
/** if (s.isClosing() && !s.isClosed() &&
* Check if a region is marked as to close s.getServerName().compareTo(serverName) == 0) {
* @param serverName address info of server result.add(s.getRegionInfo());
* @param regionName name of the region we might want to close }
* @return true if the region is marked to close, false otherwise
*/
public boolean isMarkedToClose(String serverName, byte [] regionName) {
synchronized (regionsToClose) {
Map<byte [], HRegionInfo> serverToClose = regionsToClose.get(serverName);
return (serverToClose != null && serverToClose.containsKey(regionName));
}
}
/**
* Mark a region as no longer waiting to be closed. Either it was closed or
* we don't want to close it anymore for some reason.
* @param serverName address info of server
* @param regionName name of the region
*/
public void noLongerMarkedToClose(String serverName, byte [] regionName) {
synchronized (regionsToClose) {
Map<byte [], HRegionInfo> serverToClose = regionsToClose.get(serverName);
if (serverToClose != null) {
serverToClose.remove(regionName);
} }
} }
return result;
} }
/**
* Called when all regions for a particular server have been closed
*
* @param serverName
*/
public void allRegionsClosed(String serverName) {
regionsToClose.remove(serverName);
}
/** /**
* Check if a region is closing * Check if a region is closing
* @param regionName * @param regionName
* @return true if the region is marked as closing, false otherwise * @return true if the region is marked as closing, false otherwise
*/ */
public boolean isClosing(byte [] regionName) { public boolean isClosing(byte [] regionName) {
return closingRegions.contains(regionName); synchronized (regionsInTransition) {
RegionState s = regionsInTransition.get(regionName);
if (s != null) {
return s.isClosing();
}
}
return false;
} }
/**
* Set a region as no longer closing (closed?)
* @param regionName
*/
public void noLongerClosing(byte [] regionName) {
closingRegions.remove(regionName);
}
/** /**
* Mark a region as closing * Mark a region as closing
* @param regionName * @param serverName
* @param regionInfo
* @param setOffline
*/ */
public void setClosing(byte [] regionName) { public void setClosing(String serverName, HRegionInfo regionInfo,
closingRegions.add(regionName); boolean setOffline) {
synchronized (regionsInTransition) {
RegionState s = regionsInTransition.get(regionInfo.getRegionName());
if (s != null) {
if (!s.isClosing()) {
throw new IllegalStateException(
"Cannot transition to closing from any other state. Region: " +
Bytes.toString(regionInfo.getRegionName()));
}
return;
}
s = new RegionState(regionInfo);
regionsInTransition.put(regionInfo.getRegionName(), s);
s.setClosing(serverName, setOffline);
}
} }
/**
* @param regionName
*/
public void setClosed(byte[] regionName) {
synchronized (regionsInTransition) {
RegionState s = regionsInTransition.get(regionName);
if (s != null) {
s.setClosed();
}
}
}
/** /**
* Add a meta region to the scan queue * Add a meta region to the scan queue
* @param m MetaRegion that needs to get scanned * @param m MetaRegion that needs to get scanned
@ -843,31 +825,6 @@ class RegionManager implements HConstants {
metaScannerThread.addMetaRegionToScan(m); metaScannerThread.addMetaRegionToScan(m);
} }
/**
* Note that a region should be offlined as soon as its closed.
* @param regionName
*/
public void markRegionForOffline(byte [] regionName) {
regionsToOffline.add(regionName);
}
/**
* Check if a region is marked for offline
* @param regionName
* @return true if marked for offline, false otherwise
*/
public boolean isMarkedForOffline(byte [] regionName) {
return regionsToOffline.contains(regionName);
}
/**
* Region was offlined as planned, remove it from the list to offline
* @param regionName
*/
public void regionOfflined(byte [] regionName) {
regionsToOffline.remove(regionName);
}
/** /**
* Check if the initial root scan has been completed. * Check if the initial root scan has been completed.
* @return true if scan completed, false otherwise * @return true if scan completed, false otherwise
@ -890,8 +847,7 @@ class RegionManager implements HConstants {
*/ */
public boolean inSafeMode() { public boolean inSafeMode() {
if (safeMode) { if (safeMode) {
if(isInitialMetaScanComplete() && unassignedRegions.size() == 0 && if(isInitialMetaScanComplete() && regionsInTransition.size() == 0) {
pendingRegions.size() == 0) {
safeMode = false; safeMode = false;
LOG.info("exiting safe mode"); LOG.info("exiting safe mode");
} else { } else {
@ -1017,30 +973,174 @@ class RegionManager implements HConstants {
*/ */
public void applyActions(HServerInfo serverInfo, ArrayList<HMsg> returnMsgs) { public void applyActions(HServerInfo serverInfo, ArrayList<HMsg> returnMsgs) {
HServerAddress addr = serverInfo.getServerAddress(); HServerAddress addr = serverInfo.getServerAddress();
Iterator<Pair<HRegionInfo,HServerAddress>> i = Iterator<Pair<HRegionInfo, HServerAddress>> i =
regionsToCompact.values().iterator(); regionsToCompact.values().iterator();
while (i.hasNext()) { synchronized (regionsToCompact) {
Pair<HRegionInfo,HServerAddress> pair = i.next(); while (i.hasNext()) {
if (addr.equals(pair.getSecond())) { Pair<HRegionInfo,HServerAddress> pair = i.next();
if (LOG.isDebugEnabled()) { if (addr.equals(pair.getSecond())) {
LOG.debug("sending MSG_REGION_COMPACT " + pair.getFirst() + " to " + if (LOG.isDebugEnabled()) {
addr); LOG.debug("sending MSG_REGION_COMPACT " + pair.getFirst() + " to " +
addr);
}
returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_COMPACT, pair.getFirst()));
i.remove();
} }
returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_COMPACT, pair.getFirst()));
i.remove();
} }
} }
i = regionsToSplit.values().iterator(); i = regionsToSplit.values().iterator();
while (i.hasNext()) { synchronized (regionsToSplit) {
Pair<HRegionInfo,HServerAddress> pair = i.next(); while (i.hasNext()) {
if (addr.equals(pair.getSecond())) { Pair<HRegionInfo,HServerAddress> pair = i.next();
if (LOG.isDebugEnabled()) { if (addr.equals(pair.getSecond())) {
LOG.debug("sending MSG_REGION_SPLIT " + pair.getFirst() + " to " + if (LOG.isDebugEnabled()) {
addr); LOG.debug("sending MSG_REGION_SPLIT " + pair.getFirst() + " to " +
addr);
}
returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_SPLIT, pair.getFirst()));
i.remove();
} }
returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_SPLIT, pair.getFirst()));
i.remove();
} }
} }
} }
private static class RegionState implements Comparable<RegionState> {
private final byte[] regionName;
private HRegionInfo regionInfo = null;
private boolean unassigned = false;
private boolean assigned = false;
private boolean pending = false;
private boolean closing = false;
private boolean closed = false;
private boolean offlined = false;
private String serverName = null;
RegionState(byte[] regionName) {
this.regionName = regionName;
}
RegionState(HRegionInfo info) {
this.regionName = info.getRegionName();
this.regionInfo = info;
}
byte[] getRegionName() {
return regionName;
}
synchronized HRegionInfo getRegionInfo() {
return this.regionInfo;
}
synchronized String getServerName() {
return this.serverName;
}
synchronized boolean isUnassigned() {
return unassigned;
}
/*
* Note: callers of this method (reassignRootRegion,
* regionsAwaitingAssignment, setUnassigned) ensure that this method is not
* called unless it is safe to do so.
*/
synchronized void setUnassigned() {
this.unassigned = true;
this.assigned = false;
this.pending = false;
this.closing = false;
this.serverName = null;
}
synchronized boolean isAssigned() {
return assigned;
}
synchronized void setAssigned(String serverName) {
if (!this.unassigned) {
throw new IllegalStateException(
"Cannot assign a region that is not currently unassigned. Region: " +
Bytes.toString(regionName));
}
this.unassigned = false;
this.assigned = true;
this.pending = false;
this.closing = false;
this.serverName = serverName;
}
synchronized boolean isPending() {
return pending;
}
synchronized void setPending() {
if (!assigned) {
throw new IllegalStateException(
"Cannot set a region as pending if it has not been assigned. Region: " +
Bytes.toString(regionName));
}
this.unassigned = false;
this.assigned = false;
this.pending = true;
this.closing = false;
}
synchronized boolean isClosing() {
return closing;
}
synchronized void setClosing(String serverName, boolean setOffline) {
this.unassigned = false;
this.assigned = false;
this.pending = false;
this.closing = true;
this.offlined = setOffline;
this.serverName = serverName;
}
synchronized boolean isClosed() {
return this.closed;
}
synchronized void setClosed() {
if (!closing) {
throw new IllegalStateException(
"Cannot set a region to be closed if it was not already marked as" +
" closing. Region: " + Bytes.toString(regionName));
}
this.closed = true;
}
synchronized boolean isOfflined() {
return this.offlined;
}
@Override
public synchronized String toString() {
return "region name: " + Bytes.toString(this.regionName) +
", isUnassigned: " + this.unassigned + ", isAssigned: " +
this.assigned + ", isPending: " + this.pending + ", isClosing: " +
this.closing + ", isClosed: " + this.closed + ", isOfflined: " +
this.offlined;
}
@Override
public boolean equals(Object o) {
return this.compareTo((RegionState) o) == 0;
}
@Override
public int hashCode() {
return Bytes.toString(regionName).hashCode();
}
@Override
public int compareTo(RegionState o) {
if (o == null) {
return 1;
}
return Bytes.compareTo(this.regionName, o.getRegionName());
}
}
} }

View File

@ -260,14 +260,17 @@ class ServerManager implements HConstants {
for (int i = 1; i < msgs.length; i++) { for (int i = 1; i < msgs.length; i++) {
LOG.info("Processing " + msgs[i] + " from " + serverName); LOG.info("Processing " + msgs[i] + " from " + serverName);
HRegionInfo info = msgs[i].getRegionInfo(); HRegionInfo info = msgs[i].getRegionInfo();
if (info.isRootRegion()) { synchronized (master.regionManager) {
master.regionManager.reassignRootRegion(); if (info.isRootRegion()) {
} else if (info.isMetaTable()) { master.regionManager.reassignRootRegion();
master.regionManager.offlineMetaRegion(info.getStartKey()); } else if (info.isMetaTable()) {
} master.regionManager.offlineMetaRegion(info.getStartKey());
if (!master.regionManager.isMarkedToClose( }
serverName, info.getRegionName())) { if (!master.regionManager.isOfflined(info.getRegionName())) {
master.regionManager.setUnassigned(info); master.regionManager.setUnassigned(info, true);
} else {
master.regionManager.removeRegion(info);
}
} }
} }
} }
@ -334,8 +337,6 @@ class ServerManager implements HConstants {
HRegionInfo[] mostLoadedRegions, HMsg incomingMsgs[]) HRegionInfo[] mostLoadedRegions, HMsg incomingMsgs[])
throws IOException { throws IOException {
ArrayList<HMsg> returnMsgs = new ArrayList<HMsg>(); ArrayList<HMsg> returnMsgs = new ArrayList<HMsg>();
Map<byte [], HRegionInfo> regionsToKill =
master.regionManager.removeMarkedToClose(serverName);
if (serverInfo.getServerAddress() == null) { if (serverInfo.getServerAddress() == null) {
throw new NullPointerException("Server address cannot be null; " + throw new NullPointerException("Server address cannot be null; " +
"hbase-958 debugging"); "hbase-958 debugging");
@ -346,7 +347,6 @@ class ServerManager implements HConstants {
LOG.info("Received " + incomingMsgs[i] + " from " + serverName); LOG.info("Received " + incomingMsgs[i] + " from " + serverName);
switch (incomingMsgs[i].getType()) { switch (incomingMsgs[i].getType()) {
case MSG_REPORT_PROCESS_OPEN: case MSG_REPORT_PROCESS_OPEN:
master.regionManager.updateAssignmentDeadline(region);
break; break;
case MSG_REPORT_OPEN: case MSG_REPORT_OPEN:
@ -354,7 +354,7 @@ class ServerManager implements HConstants {
break; break;
case MSG_REPORT_CLOSE: case MSG_REPORT_CLOSE:
processRegionClose(serverInfo, region); processRegionClose(region);
break; break;
case MSG_REPORT_SPLIT: case MSG_REPORT_SPLIT:
@ -369,22 +369,21 @@ class ServerManager implements HConstants {
} }
} }
// Tell the region server to close regions that we have marked for closing. synchronized (master.regionManager) {
if (regionsToKill != null) { // Tell the region server to close regions that we have marked for closing.
for (HRegionInfo i: regionsToKill.values()) { for (HRegionInfo i: master.regionManager.getMarkedToClose(serverName)) {
returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_CLOSE, i)); returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_CLOSE, i));
// Transition the region from toClose to closing state // Transition the region from toClose to closing state
master.regionManager.setClosing(i.getRegionName()); master.regionManager.setClosed(i.getRegionName());
} }
// Figure out what the RegionServer ought to do, and write back.
master.regionManager.assignRegions(serverInfo, serverName,
mostLoadedRegions, returnMsgs);
// Send any pending table actions.
master.regionManager.applyActions(serverInfo, returnMsgs);
} }
// Figure out what the RegionServer ought to do, and write back.
master.regionManager.assignRegions(serverInfo, serverName,
mostLoadedRegions, returnMsgs);
// Send any pending table actions.
master.regionManager.applyActions(serverInfo, returnMsgs);
return returnMsgs.toArray(new HMsg[returnMsgs.size()]); return returnMsgs.toArray(new HMsg[returnMsgs.size()]);
} }
@ -401,21 +400,23 @@ class ServerManager implements HConstants {
private void processSplitRegion(String serverName, HServerInfo serverInfo, private void processSplitRegion(String serverName, HServerInfo serverInfo,
HRegionInfo region, HMsg splitA, HMsg splitB, ArrayList<HMsg> returnMsgs) { HRegionInfo region, HMsg splitA, HMsg splitB, ArrayList<HMsg> returnMsgs) {
// Cancel any actions pending for the affected region. synchronized (master.regionManager) {
// This prevents the master from sending a SPLIT message if the table // Cancel any actions pending for the affected region.
// has already split by the region server. // This prevents the master from sending a SPLIT message if the table
master.regionManager.endActions(region.getRegionName()); // has already split by the region server.
master.regionManager.endActions(region.getRegionName());
HRegionInfo newRegionA = splitA.getRegionInfo(); HRegionInfo newRegionA = splitA.getRegionInfo();
master.regionManager.setUnassigned(newRegionA); master.regionManager.setUnassigned(newRegionA, false);
HRegionInfo newRegionB = splitB.getRegionInfo(); HRegionInfo newRegionB = splitB.getRegionInfo();
master.regionManager.setUnassigned(newRegionB); master.regionManager.setUnassigned(newRegionB, false);
if (region.isMetaTable()) { if (region.isMetaTable()) {
// A meta region has split. // A meta region has split.
master.regionManager.offlineMetaRegion(region.getStartKey()); master.regionManager.offlineMetaRegion(region.getStartKey());
master.regionManager.incrementNumMetaRegions(); master.regionManager.incrementNumMetaRegions();
}
} }
} }
@ -424,119 +425,105 @@ class ServerManager implements HConstants {
HRegionInfo region, ArrayList<HMsg> returnMsgs) HRegionInfo region, ArrayList<HMsg> returnMsgs)
throws IOException { throws IOException {
boolean duplicateAssignment = false; boolean duplicateAssignment = false;
if (!master.regionManager.isUnassigned(region)) { synchronized (master.regionManager) {
if (region.isRootRegion()) { if (!master.regionManager.isUnassigned(region) &&
// Root region !master.regionManager.isAssigned(region.getRegionName())) {
HServerAddress rootServer = master.getRootRegionLocation(); if (region.isRootRegion()) {
if (rootServer != null) { // Root region
if (rootServer.toString().compareTo(serverName) == 0) { HServerAddress rootServer = master.getRootRegionLocation();
// A duplicate open report from the correct server if (rootServer != null) {
if (rootServer.toString().compareTo(serverName) == 0) {
// A duplicate open report from the correct server
return;
}
// We received an open report on the root region, but it is
// assigned to a different server
duplicateAssignment = true;
}
} else {
// Not root region. If it is not a pending region, then we are
// going to treat it as a duplicate assignment, although we can't
// tell for certain that's the case.
if (master.regionManager.isPending(region.getRegionName())) {
// A duplicate report from the correct server
return; return;
} }
// We received an open report on the root region, but it is
// assigned to a different server
duplicateAssignment = true; duplicateAssignment = true;
} }
} else {
// Not root region. If it is not a pending region, then we are
// going to treat it as a duplicate assignment, although we can't
// tell for certain that's the case.
if (master.regionManager.isPending(region.getRegionName())) {
// A duplicate report from the correct server
return;
}
duplicateAssignment = true;
} }
}
if (duplicateAssignment) { if (duplicateAssignment) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("region server " + serverInfo.getServerAddress().toString() LOG.debug("region server " + serverInfo.getServerAddress().toString()
+ " should not have opened region " + region.getRegionName()); + " should not have opened region " + region.getRegionName());
}
// This Region should not have been opened.
// Ask the server to shut it down, but don't report it as closed.
// Otherwise the HMaster will think the Region was closed on purpose,
// and then try to reopen it elsewhere; that's not what we want.
returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_CLOSE_WITHOUT_REPORT,
region, "Duplicate assignment".getBytes()));
} else {
// it was assigned, and it's not a duplicate assignment, so take it out
// of the unassigned list.
master.regionManager.noLongerUnassigned(region);
if (region.isRootRegion()) {
// Store the Root Region location (in memory)
HServerAddress rootServer = serverInfo.getServerAddress();
master.connection.setRootRegionLocation(
new HRegionLocation(region, rootServer));
master.regionManager.setRootRegionLocation(rootServer);
} else {
// Note that the table has been assigned and is waiting for the
// meta table to be updated.
master.regionManager.setPending(region.getRegionName());
// Queue up an update to note the region location.
try {
master.toDoQueue.put(
new ProcessRegionOpen(master, serverInfo, region));
} catch (InterruptedException e) {
throw new RuntimeException(
"Putting into toDoQueue was interrupted.", e);
} }
}
// This Region should not have been opened.
// Ask the server to shut it down, but don't report it as closed.
// Otherwise the HMaster will think the Region was closed on purpose,
// and then try to reopen it elsewhere; that's not what we want.
returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_CLOSE_WITHOUT_REPORT,
region, "Duplicate assignment".getBytes()));
} else {
if (region.isRootRegion()) {
// it was assigned, and it's not a duplicate assignment, so take it out
// of the unassigned list.
master.regionManager.removeRegion(region);
// Store the Root Region location (in memory)
HServerAddress rootServer = serverInfo.getServerAddress();
master.connection.setRootRegionLocation(
new HRegionLocation(region, rootServer));
master.regionManager.setRootRegionLocation(rootServer);
} else {
// Note that the table has been assigned and is waiting for the
// meta table to be updated.
master.regionManager.setPending(region.getRegionName());
// Queue up an update to note the region location.
try {
master.toDoQueue.put(
new ProcessRegionOpen(master, serverInfo, region));
} catch (InterruptedException e) {
throw new RuntimeException(
"Putting into toDoQueue was interrupted.", e);
}
}
}
} }
} }
private void processRegionClose( private void processRegionClose(HRegionInfo region) {
@SuppressWarnings("unused") HServerInfo serverInfo, HRegionInfo region) { synchronized (master.regionManager) {
if (region.isRootRegion()) { if (region.isRootRegion()) {
// Root region // Root region
if (region.isOffline()) { master.connection.unsetRootRegionLocation();
// Can't proceed without root region. Shutdown. master.regionManager.unsetRootRegion();
LOG.fatal("root region is marked offline"); if (region.isOffline()) {
master.shutdown(); // Can't proceed without root region. Shutdown.
} LOG.fatal("root region is marked offline");
master.connection.setRootRegionLocation(null); master.shutdown();
master.regionManager.reassignRootRegion(); return;
}
} else { } else if (region.isMetaTable()) {
boolean reassignRegion = !region.isOffline();
boolean offlineRegion = false;
// either this region is being closed because it was marked to close, or
// the region server is going down peacefully. in either case, we should
// at least try to remove it from the closing list.
master.regionManager.noLongerClosing(region.getRegionName());
// if the region is marked to be offlined, we don't want to reassign it.
if (master.regionManager.isMarkedForOffline(region.getRegionName())) {
reassignRegion = false;
offlineRegion = true;
}
if (region.isMetaTable()) {
// Region is part of the meta table. Remove it from onlineMetaRegions // Region is part of the meta table. Remove it from onlineMetaRegions
master.regionManager.offlineMetaRegion(region.getStartKey()); master.regionManager.offlineMetaRegion(region.getStartKey());
} }
// if the region is already on the unassigned list, let's remove it. this boolean offlineRegion =
// is safe because if it's going to be reassigned, it'll get added again master.regionManager.isOfflined(region.getRegionName());
// shortly. if it's not going to get reassigned, then we need to make boolean reassignRegion = !region.isOffline() && !offlineRegion;
// sure it's not on the unassigned list, because that would contend with
// the ProcessRegionClose going on asynchronously.
master.regionManager.noLongerUnassigned(region);
// NOTE: we cannot put the region into unassignedRegions as that // NOTE: If the region was just being closed and not offlined, we cannot
// changes the ordering of the messages we've received. In // mark the region unassignedRegions as that changes the ordering of
// this case, a close could be processed before an open // the messages we've received. In this case, a close could be
// resulting in the master not agreeing on the region's // processed before an open resulting in the master not agreeing on
// state. // the region's state.
try { try {
master.toDoQueue.put(new ProcessRegionClose(master, region, master.toDoQueue.put(new ProcessRegionClose(master, region, offlineRegion,
offlineRegion, reassignRegion)); reassignRegion));
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException( throw new RuntimeException("Putting into toDoQueue was interrupted.", e);
"Putting into toDoQueue was interrupted.", e);
} }
} }
} }
@ -545,9 +532,9 @@ class ServerManager implements HConstants {
private boolean cancelLease(final String serverName) { private boolean cancelLease(final String serverName) {
boolean leaseCancelled = false; boolean leaseCancelled = false;
HServerInfo info = serversToServerInfo.remove(serverName); HServerInfo info = serversToServerInfo.remove(serverName);
// Only cancel lease and update load information once.
// This method can be called a couple of times during shutdown.
if (info != null) { if (info != null) {
// Only cancel lease and update load information once.
// This method can be called a couple of times during shutdown.
if (master.getRootRegionLocation() != null && if (master.getRootRegionLocation() != null &&
info.getServerAddress().equals(master.getRootRegionLocation())) { info.getServerAddress().equals(master.getRootRegionLocation())) {
master.regionManager.reassignRootRegion(); master.regionManager.reassignRootRegion();