HADOOP-1747 On a cluster, on restart, regions multiply assigned

M src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    Removed some empty lines so I can squeeze more code into a screenful.
    (assignedRegions): Factored out some code into own methods so
    this method is made a bit shorter.  Added early returns near
    top -- if nothing to assign, etc. -- so less nesting.
    Added fix: Instead of iterating over unassignedRegions after
    all the loadings have been calculated, instead iterate over
    the locally calculated  map, regionsToAssign (Otherwise, we
    were running over the same territory each time through the
    loop and were thus giving out same region multiple times).
    (regionsPerServer, assignRegionsToOneServer,
      getRegionsToAssign): Added.


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@568404 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2007-08-22 04:05:08 +00:00
parent e41859593b
commit ec2d29c902
2 changed files with 165 additions and 167 deletions

View File

@ -14,6 +14,7 @@ Trunk (unreleased changes)
shutdown message shutdown message
HADOOP-1729 Recent renaming or META tables breaks hbase shell HADOOP-1729 Recent renaming or META tables breaks hbase shell
HADOOP-1730 unexpected null value causes META scanner to exit (silently) HADOOP-1730 unexpected null value causes META scanner to exit (silently)
HADOOP-1747 On a cluster, on restart, regions multiply assigned
IMPROVEMENTS IMPROVEMENTS
HADOOP-1737 Make HColumnDescriptor data publically members settable HADOOP-1737 Make HColumnDescriptor data publically members settable

View File

@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.io.BatchUpdate;
public class HMaster implements HConstants, HMasterInterface, public class HMaster implements HConstants, HMasterInterface,
HMasterRegionInterface, Runnable { HMasterRegionInterface, Runnable {
/** {@inheritDoc} */
public long getProtocolVersion(String protocol, public long getProtocolVersion(String protocol,
@SuppressWarnings("unused") long clientVersion) throws IOException { @SuppressWarnings("unused") long clientVersion) throws IOException {
@ -189,7 +188,6 @@ HMasterRegionInterface, Runnable {
// Array to hold list of split parents found. Scan adds to list. After // Array to hold list of split parents found. Scan adds to list. After
// scan we go check if parents can be removed. // scan we go check if parents can be removed.
Map<HRegionInfo, SortedMap<Text, byte[]>> splitParents = Map<HRegionInfo, SortedMap<Text, byte[]>> splitParents =
new HashMap<HRegionInfo, SortedMap<Text, byte[]>>(); new HashMap<HRegionInfo, SortedMap<Text, byte[]>>();
try { try {
@ -211,17 +209,14 @@ HMasterRegionInterface, Runnable {
results.put(key.getColumn(), results.put(key.getColumn(),
((ImmutableBytesWritable) e.getValue()).get()); ((ImmutableBytesWritable) e.getValue()).get());
} }
HRegionInfo info = (HRegionInfo) Writables.getWritable( HRegionInfo info = (HRegionInfo) Writables.getWritable(
results.get(COL_REGIONINFO), new HRegionInfo()); results.get(COL_REGIONINFO), new HRegionInfo());
String serverName = Writables.bytesToString(results.get(COL_SERVER)); String serverName = Writables.bytesToString(results.get(COL_SERVER));
long startCode = Writables.bytesToLong(results.get(COL_STARTCODE)); long startCode = Writables.bytesToLong(results.get(COL_STARTCODE));
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug(Thread.currentThread().getName() + " scanner: " + LOG.debug(Thread.currentThread().getName() + " scanner: " +
Long.valueOf(scannerId) + " regioninfo: {" + info.toString() + Long.valueOf(scannerId) + " regioninfo: {" + info.toString() +
"}, server: " + serverName + ", startCode: " + startCode); "}, server: " + serverName + ", startCode: " + startCode);
} }
// Note Region has been assigned. // Note Region has been assigned.
@ -467,7 +462,6 @@ HMasterRegionInterface, Runnable {
try { try {
// Don't interrupt us while we're working // Don't interrupt us while we're working
synchronized(rootScannerLock) { synchronized(rootScannerLock) {
scanRegion(new MetaRegion(rootRegionLocation.get(), scanRegion(new MetaRegion(rootRegionLocation.get(),
HGlobals.rootRegionInfo.regionName, null)); HGlobals.rootRegionInfo.regionName, null));
@ -478,7 +472,6 @@ HMasterRegionInterface, Runnable {
try { try {
e = RemoteExceptionHandler.decodeRemoteException( e = RemoteExceptionHandler.decodeRemoteException(
(RemoteException) e); (RemoteException) e);
} catch (IOException ex) { } catch (IOException ex) {
e = ex; e = ex;
} }
@ -717,6 +710,9 @@ HMasterRegionInterface, Runnable {
* *
* We fill 'unassignedRecords' by scanning ROOT and META tables, learning the * We fill 'unassignedRecords' by scanning ROOT and META tables, learning the
* set of all known valid regions. * set of all known valid regions.
*
* <p>Items are removed from this list when a region server reports in that
* the region has been deployed.
*/ */
SortedMap<Text, HRegionInfo> unassignedRegions; SortedMap<Text, HRegionInfo> unassignedRegions;
@ -790,7 +786,6 @@ HMasterRegionInterface, Runnable {
this.rand = new Random(); this.rand = new Random();
// Make sure the root directory exists! // Make sure the root directory exists!
if(! fs.exists(dir)) { if(! fs.exists(dir)) {
fs.mkdirs(dir); fs.mkdirs(dir);
} }
@ -1157,8 +1152,8 @@ HMasterRegionInterface, Runnable {
onlineMetaRegions.remove(info.getStartKey()); onlineMetaRegions.remove(info.getStartKey());
} }
unassignedRegions.put(info.regionName, info); this.unassignedRegions.put(info.regionName, info);
assignAttempts.put(info.regionName, Long.valueOf(0L)); this.assignAttempts.put(info.regionName, Long.valueOf(0L));
} }
} }
} }
@ -1323,18 +1318,17 @@ HMasterRegionInterface, Runnable {
region.regionName); region.regionName);
// Remove from unassigned list so we don't assign it to someone else // Remove from unassigned list so we don't assign it to someone else
this.unassignedRegions.remove(region.regionName);
unassignedRegions.remove(region.regionName); this.assignAttempts.remove(region.regionName);
assignAttempts.remove(region.regionName);
if (region.regionName.compareTo( if (region.regionName.compareTo(
HGlobals.rootRegionInfo.regionName) == 0) { HGlobals.rootRegionInfo.regionName) == 0) {
// Store the Root Region location (in memory) // Store the Root Region location (in memory)
synchronized (rootRegionLocation) { synchronized (rootRegionLocation) {
rootRegionLocation.set(new HServerAddress(info.getServerAddress())); this.rootRegionLocation.
rootRegionLocation.notifyAll(); set(new HServerAddress(info.getServerAddress()));
this.rootRegionLocation.notifyAll();
} }
break; break;
} }
@ -1436,12 +1430,11 @@ HMasterRegionInterface, Runnable {
} }
// Figure out what the RegionServer ought to do, and write back. // Figure out what the RegionServer ought to do, and write back.
assignRegions(info, serverName, returnMsgs); assignRegions(info, serverName, returnMsgs);
return returnMsgs.toArray(new HMsg[returnMsgs.size()]); return returnMsgs.toArray(new HMsg[returnMsgs.size()]);
} }
/** /*
* Assigns regions to region servers attempting to balance the load across * Assigns regions to region servers attempting to balance the load across
* all region servers * all region servers
* *
@ -1451,148 +1444,154 @@ HMasterRegionInterface, Runnable {
*/ */
private void assignRegions(HServerInfo info, String serverName, private void assignRegions(HServerInfo info, String serverName,
ArrayList<HMsg> returnMsgs) { ArrayList<HMsg> returnMsgs) {
TreeSet<Text> regionsToAssign = getRegionsToAssign();
int nRegionsToAssign = regionsToAssign.size();
if (nRegionsToAssign <= 0) {
// No regions to assign. Return.
return;
}
if (this.serversToServerInfo.size() == 1) {
assignRegionsToOneServer(regionsToAssign, serverName, returnMsgs);
// Finished. Return.
return;
}
// Multiple servers in play.
// We need to allocate regions only to most lightly loaded servers.
HServerLoad thisServersLoad = info.getLoad();
synchronized (this.serversToServerInfo) {
int nregions = regionsPerServer(nRegionsToAssign, thisServersLoad);
nRegionsToAssign -= nregions;
if (nRegionsToAssign > 0) {
// We still have more regions to assign. See how many we can assign
// before this server becomes more heavily loaded than the next
// most heavily loaded server.
SortedMap<HServerLoad, Set<String>> heavyServers =
this.loadToServers.tailMap(thisServersLoad);
int nservers = 0;
HServerLoad heavierLoad = null;
for (Map.Entry<HServerLoad, Set<String>> e : heavyServers.entrySet()) {
Set<String> servers = e.getValue();
nservers += servers.size();
if (e.getKey().compareTo(thisServersLoad) == 0) {
// This is the load factor of the server we are considering
nservers -= 1;
continue;
}
// If we get here, we are at the first load entry that is a
// heavier load than the server we are considering
heavierLoad = e.getKey();
break;
}
nregions = 0;
if (heavierLoad != null) {
// There is a more heavily loaded server
for (HServerLoad load =
new HServerLoad(thisServersLoad.getNumberOfRequests(),
thisServersLoad.getNumberOfRegions());
load.compareTo(heavierLoad) <= 0 &&
nregions < nRegionsToAssign;
load.setNumberOfRegions(load.getNumberOfRegions() + 1),
nregions++) {
// continue;
}
}
if (nregions < nRegionsToAssign) {
// There are some more heavily loaded servers
// but we can't assign all the regions to this server.
if (nservers > 0) {
// There are other servers that can share the load.
// Split regions that need assignment across the servers.
nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
/ (1.0 * nservers));
} else {
// No other servers with same load.
// Split regions over all available servers
nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
/ (1.0 * serversToServerInfo.size()));
}
} else {
// Assign all regions to this server
nregions = nRegionsToAssign;
}
long now = System.currentTimeMillis();
for (Text regionName: regionsToAssign) {
HRegionInfo regionInfo = this.unassignedRegions.get(regionName);
LOG.info("assigning region " + regionName + " to server " +
serverName);
this.assignAttempts.put(regionName, Long.valueOf(now));
returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
if (--nregions <= 0) {
break;
}
}
}
}
}
/*
* @param nRegionsToAssign
* @param thisServersLoad
* @return How many regions we can assign to more lightly loaded servers
*/
private int regionsPerServer(final int nRegionsToAssign,
final HServerLoad thisServersLoad) {
SortedMap<HServerLoad, Set<String>> lightServers =
this.loadToServers.headMap(thisServersLoad);
int nRegions = 0;
for (Map.Entry<HServerLoad, Set<String>> e : lightServers.entrySet()) {
HServerLoad lightLoad = new HServerLoad(e.getKey()
.getNumberOfRequests(), e.getKey().getNumberOfRegions());
do {
lightLoad.setNumberOfRegions(lightLoad.getNumberOfRegions() + 1);
nRegions += 1;
} while (lightLoad.compareTo(thisServersLoad) <= 0
&& nRegions < nRegionsToAssign);
nRegions *= e.getValue().size();
if (nRegions >= nRegionsToAssign) {
break;
}
}
return nRegions;
}
/*
* Assign all to the only server. An unlikely case but still possible. @param
* regionsToAssign @param serverName @param returnMsgs
*/
private void assignRegionsToOneServer(final TreeSet<Text> regionsToAssign,
final String serverName, final ArrayList<HMsg> returnMsgs) {
long now = System.currentTimeMillis();
for (Text regionName: regionsToAssign) {
HRegionInfo regionInfo = this.unassignedRegions.get(regionName);
LOG.info("assigning region " + regionName + " to the only server " +
serverName);
this.assignAttempts.put(regionName, Long.valueOf(now));
returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
}
}
/*
* @return List of regions to assign.
*/
private TreeSet<Text> getRegionsToAssign() {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
TreeSet<Text> regionsToAssign = new TreeSet<Text>(); TreeSet<Text> regionsToAssign = new TreeSet<Text>();
for (Map.Entry<Text, Long> e: assignAttempts.entrySet()) { for (Map.Entry<Text, Long> e: this.assignAttempts.entrySet()) {
if (now - e.getValue() > maxRegionOpenTime) { long diff = now - e.getValue().longValue();
if (diff > this.maxRegionOpenTime) {
regionsToAssign.add(e.getKey()); regionsToAssign.add(e.getKey());
} }
} }
int nRegionsToAssign = regionsToAssign.size(); return regionsToAssign;
if (nRegionsToAssign > 0) {
if (serversToServerInfo.size() == 1) {
// Only one server. An unlikely case but still possible.
// Assign all unassigned regions to it.
for (Text regionName: regionsToAssign) {
HRegionInfo regionInfo = unassignedRegions.get(regionName);
LOG.info("assigning region " + regionName + " to server " +
serverName);
assignAttempts.put(regionName, Long.valueOf(now));
returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
}
} else {
// Multiple servers in play.
// We need to allocate regions only to most lightly loaded servers.
HServerLoad thisServersLoad = info.getLoad();
synchronized (serversToServerInfo) {
SortedMap<HServerLoad, Set<String>> lightServers =
loadToServers.headMap(thisServersLoad);
// How many regions we can assign to more lightly loaded servers?
int nregions = 0;
for (Map.Entry<HServerLoad, Set<String>> e: lightServers.entrySet()) {
HServerLoad lightLoad =
new HServerLoad(e.getKey().getNumberOfRequests(),
e.getKey().getNumberOfRegions());
do {
lightLoad.setNumberOfRegions(lightLoad.getNumberOfRegions() + 1);
nregions += 1;
} while (lightLoad.compareTo(thisServersLoad) <= 0 &&
nregions < nRegionsToAssign);
nregions *= e.getValue().size();
if (nregions >= nRegionsToAssign) {
break;
}
}
nRegionsToAssign -= nregions;
if (nRegionsToAssign > 0) {
// We still have more regions to assign. See how many we can assign
// before this server becomes more heavily loaded than the next
// most heavily loaded server.
SortedMap<HServerLoad, Set<String>> heavyServers =
loadToServers.tailMap(thisServersLoad);
int nservers = 0;
HServerLoad heavierLoad = null;
for (Map.Entry<HServerLoad, Set<String>> e:
heavyServers.entrySet()) {
Set<String> servers = e.getValue();
nservers += servers.size();
if (e.getKey().compareTo(thisServersLoad) == 0) {
// This is the load factor of the server we are considering
nservers -= 1;
continue;
}
// If we get here, we are at the first load entry that is a
// heavier load than the server we are considering
heavierLoad = e.getKey();
break;
}
nregions = 0;
if (heavierLoad != null) {
// There is a more heavily loaded server
for (HServerLoad load =
new HServerLoad(thisServersLoad.getNumberOfRequests(),
thisServersLoad.getNumberOfRegions());
load.compareTo(heavierLoad) <= 0 && nregions < nRegionsToAssign;
load.setNumberOfRegions(load.getNumberOfRegions() + 1),
nregions++) {
}
}
if (nregions < nRegionsToAssign) {
// There are some more heavily loaded servers
// but we can't assign all the regions to this server.
if (nservers > 0) {
// There are other servers that can share the load.
// Split regions that need assignment across the servers.
nregions =
(int) Math.ceil((1.0 * nRegionsToAssign) / (1.0 * nservers));
} else {
// No other servers with same load.
// Split regions over all available servers
nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
/ (1.0 * serversToServerInfo.size()));
}
} else {
// Assign all regions to this server
nregions = nRegionsToAssign;
}
for (Map.Entry<Text, HRegionInfo> e: unassignedRegions.entrySet()) {
Text regionName = e.getKey();
HRegionInfo regionInfo = e.getValue();
LOG.info("assigning region " + regionName + " to server " +
serverName);
assignAttempts.put(regionName, Long.valueOf(now));
returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
if (--nregions <= 0) {
break;
}
}
}
}
}
}
} }
/* /*
@ -2114,7 +2113,8 @@ HMasterRegionInterface, Runnable {
private HServerAddress serverAddress; private HServerAddress serverAddress;
private byte [] startCode; private byte [] startCode;
PendingOpenReport(HServerInfo info, HRegionInfo region) throws IOException { PendingOpenReport(HServerInfo info, HRegionInfo region)
throws IOException {
if (region.tableDesc.getName().equals(META_TABLE_NAME)) { if (region.tableDesc.getName().equals(META_TABLE_NAME)) {
// The region which just came on-line is a META region. // The region which just came on-line is a META region.
// We need to look in the ROOT region for its information. // We need to look in the ROOT region for its information.
@ -2147,7 +2147,6 @@ HMasterRegionInterface, Runnable {
this.serverAddress.toString()); this.serverAddress.toString());
// Register the newly-available Region's location. // Register the newly-available Region's location.
Text metaRegionName; Text metaRegionName;
HRegionInterface server; HRegionInterface server;
if (rootRegion) { if (rootRegion) {
@ -2163,7 +2162,6 @@ HMasterRegionInterface, Runnable {
} }
metaRegionName = HGlobals.rootRegionInfo.regionName; metaRegionName = HGlobals.rootRegionInfo.regionName;
server = connection.getHRegionConnection(rootRegionLocation.get()); server = connection.getHRegionConnection(rootRegionLocation.get());
} else { } else {
if (!rootScanned || if (!rootScanned ||
numberOfMetaRegions.get() != onlineMetaRegions.size()) { numberOfMetaRegions.get() != onlineMetaRegions.size()) {
@ -2185,7 +2183,6 @@ HMasterRegionInterface, Runnable {
MetaRegion r = null; MetaRegion r = null;
if (onlineMetaRegions.containsKey(region.getRegionName())) { if (onlineMetaRegions.containsKey(region.getRegionName())) {
r = onlineMetaRegions.get(region.getRegionName()); r = onlineMetaRegions.get(region.getRegionName());
} else { } else {
r = onlineMetaRegions.get(onlineMetaRegions.headMap( r = onlineMetaRegions.get(onlineMetaRegions.headMap(
region.getRegionName()).lastKey()); region.getRegionName()).lastKey());
@ -2194,15 +2191,15 @@ HMasterRegionInterface, Runnable {
server = connection.getHRegionConnection(r.server); server = connection.getHRegionConnection(r.server);
} }
LOG.info("updating row " + region.getRegionName() + " in table " + LOG.info("updating row " + region.getRegionName() + " in table " +
metaRegionName); metaRegionName + " with startcode " +
Writables.bytesToLong(this.startCode) + " and server "+
serverAddress.toString());
try { try {
BatchUpdate b = new BatchUpdate(); BatchUpdate b = new BatchUpdate();
long lockid = b.startUpdate(region.getRegionName()); long lockid = b.startUpdate(region.getRegionName());
b.put(lockid, COL_SERVER, b.put(lockid, COL_SERVER,
Writables.stringToBytes(serverAddress.toString())); Writables.stringToBytes(serverAddress.toString()));
b.put(lockid, COL_STARTCODE, startCode); b.put(lockid, COL_STARTCODE, startCode);
server.batchUpdate(metaRegionName, System.currentTimeMillis(), b); server.batchUpdate(metaRegionName, System.currentTimeMillis(), b);
@ -2376,8 +2373,8 @@ HMasterRegionInterface, Runnable {
// 5. Get it assigned to a server // 5. Get it assigned to a server
unassignedRegions.put(regionName, info); this.unassignedRegions.put(regionName, info);
assignAttempts.put(regionName, Long.valueOf(0L)); this.assignAttempts.put(regionName, Long.valueOf(0L));
} finally { } finally {
synchronized (tableInCreation) { synchronized (tableInCreation) {