HADOOP-2276 Address regression caused by HADOOP-2274, fix HADOOP-2173 (When the master times out a region servers lease, the region server may not restart)
git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk/src/contrib/hbase@598535 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
0eb95f5c54
commit
3786f399a6
@ -34,6 +34,9 @@ Trunk (unreleased changes)
|
|||||||
HADOOP-2274 Excess synchronization introduced by HADOOP-2139 negatively
|
HADOOP-2274 Excess synchronization introduced by HADOOP-2139 negatively
|
||||||
impacts performance
|
impacts performance
|
||||||
HADOOP-2196 Fix how hbase sits in hadoop 'package' product
|
HADOOP-2196 Fix how hbase sits in hadoop 'package' product
|
||||||
|
HADOOP-2276 Address regression caused by HADOOP-2274, fix HADOOP-2173 (When
|
||||||
|
the master times out a region servers lease, the region server
|
||||||
|
may not restart)
|
||||||
|
|
||||||
IMPROVEMENTS
|
IMPROVEMENTS
|
||||||
HADOOP-2401 Add convenience put method that takes writable
|
HADOOP-2401 Add convenience put method that takes writable
|
||||||
|
@ -1606,100 +1606,105 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
|||||||
private void assignRegions(HServerInfo info, String serverName,
|
private void assignRegions(HServerInfo info, String serverName,
|
||||||
ArrayList<HMsg> returnMsgs) {
|
ArrayList<HMsg> returnMsgs) {
|
||||||
|
|
||||||
long now = System.currentTimeMillis();
|
|
||||||
Set<Text> regionsToAssign = new HashSet<Text>();
|
|
||||||
synchronized (this.assignAttempts) {
|
synchronized (this.assignAttempts) {
|
||||||
|
|
||||||
|
// We need to hold a lock on assign attempts while we figure out what to
|
||||||
|
// do so that multiple threads do not execute this method in parallel
|
||||||
|
// resulting in assigning the same region to multiple servers.
|
||||||
|
|
||||||
|
long now = System.currentTimeMillis();
|
||||||
|
Set<Text> regionsToAssign = new HashSet<Text>();
|
||||||
for (Map.Entry<Text, Long> e: this.assignAttempts.entrySet()) {
|
for (Map.Entry<Text, Long> e: this.assignAttempts.entrySet()) {
|
||||||
long diff = now - e.getValue().longValue();
|
long diff = now - e.getValue().longValue();
|
||||||
if (diff > this.maxRegionOpenTime) {
|
if (diff > this.maxRegionOpenTime) {
|
||||||
regionsToAssign.add(e.getKey());
|
regionsToAssign.add(e.getKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
int nRegionsToAssign = regionsToAssign.size();
|
||||||
int nRegionsToAssign = regionsToAssign.size();
|
if (nRegionsToAssign <= 0) {
|
||||||
if (nRegionsToAssign <= 0) {
|
// No regions to assign. Return.
|
||||||
// No regions to assign. Return.
|
return;
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (this.serversToServerInfo.size() == 1) {
|
|
||||||
assignRegionsToOneServer(regionsToAssign, serverName, returnMsgs);
|
|
||||||
// Finished. Return.
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Multiple servers in play.
|
|
||||||
// We need to allocate regions only to most lightly loaded servers.
|
|
||||||
HServerLoad thisServersLoad = info.getLoad();
|
|
||||||
int nregions = regionsPerServer(nRegionsToAssign, thisServersLoad);
|
|
||||||
nRegionsToAssign -= nregions;
|
|
||||||
if (nRegionsToAssign > 0) {
|
|
||||||
// We still have more regions to assign. See how many we can assign
|
|
||||||
// before this server becomes more heavily loaded than the next
|
|
||||||
// most heavily loaded server.
|
|
||||||
SortedMap<HServerLoad, Set<String>> heavyServers =
|
|
||||||
new TreeMap<HServerLoad, Set<String>>();
|
|
||||||
synchronized (this.loadToServers) {
|
|
||||||
heavyServers.putAll(this.loadToServers.tailMap(thisServersLoad));
|
|
||||||
}
|
}
|
||||||
int nservers = 0;
|
|
||||||
HServerLoad heavierLoad = null;
|
if (this.serversToServerInfo.size() == 1) {
|
||||||
for (Map.Entry<HServerLoad, Set<String>> e : heavyServers.entrySet()) {
|
assignRegionsToOneServer(regionsToAssign, serverName, returnMsgs);
|
||||||
Set<String> servers = e.getValue();
|
// Finished. Return.
|
||||||
nservers += servers.size();
|
return;
|
||||||
if (e.getKey().compareTo(thisServersLoad) == 0) {
|
}
|
||||||
// This is the load factor of the server we are considering
|
|
||||||
nservers -= 1;
|
// Multiple servers in play.
|
||||||
continue;
|
// We need to allocate regions only to most lightly loaded servers.
|
||||||
|
HServerLoad thisServersLoad = info.getLoad();
|
||||||
|
int nregions = regionsPerServer(nRegionsToAssign, thisServersLoad);
|
||||||
|
nRegionsToAssign -= nregions;
|
||||||
|
if (nRegionsToAssign > 0) {
|
||||||
|
// We still have more regions to assign. See how many we can assign
|
||||||
|
// before this server becomes more heavily loaded than the next
|
||||||
|
// most heavily loaded server.
|
||||||
|
SortedMap<HServerLoad, Set<String>> heavyServers =
|
||||||
|
new TreeMap<HServerLoad, Set<String>>();
|
||||||
|
synchronized (this.loadToServers) {
|
||||||
|
heavyServers.putAll(this.loadToServers.tailMap(thisServersLoad));
|
||||||
}
|
}
|
||||||
|
int nservers = 0;
|
||||||
|
HServerLoad heavierLoad = null;
|
||||||
|
for (Map.Entry<HServerLoad, Set<String>> e : heavyServers.entrySet()) {
|
||||||
|
Set<String> servers = e.getValue();
|
||||||
|
nservers += servers.size();
|
||||||
|
if (e.getKey().compareTo(thisServersLoad) == 0) {
|
||||||
|
// This is the load factor of the server we are considering
|
||||||
|
nservers -= 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
// If we get here, we are at the first load entry that is a
|
// If we get here, we are at the first load entry that is a
|
||||||
// heavier load than the server we are considering
|
// heavier load than the server we are considering
|
||||||
heavierLoad = e.getKey();
|
heavierLoad = e.getKey();
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
nregions = 0;
|
|
||||||
if (heavierLoad != null) {
|
|
||||||
// There is a more heavily loaded server
|
|
||||||
for (HServerLoad load =
|
|
||||||
new HServerLoad(thisServersLoad.getNumberOfRequests(),
|
|
||||||
thisServersLoad.getNumberOfRegions());
|
|
||||||
load.compareTo(heavierLoad) <= 0 && nregions < nRegionsToAssign;
|
|
||||||
load.setNumberOfRegions(load.getNumberOfRegions() + 1), nregions++) {
|
|
||||||
// continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (nregions < nRegionsToAssign) {
|
|
||||||
// There are some more heavily loaded servers
|
|
||||||
// but we can't assign all the regions to this server.
|
|
||||||
if (nservers > 0) {
|
|
||||||
// There are other servers that can share the load.
|
|
||||||
// Split regions that need assignment across the servers.
|
|
||||||
nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
|
|
||||||
/ (1.0 * nservers));
|
|
||||||
} else {
|
|
||||||
// No other servers with same load.
|
|
||||||
// Split regions over all available servers
|
|
||||||
nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
|
|
||||||
/ (1.0 * serversToServerInfo.size()));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// Assign all regions to this server
|
|
||||||
nregions = nRegionsToAssign;
|
|
||||||
}
|
|
||||||
|
|
||||||
now = System.currentTimeMillis();
|
|
||||||
for (Text regionName: regionsToAssign) {
|
|
||||||
HRegionInfo regionInfo = this.unassignedRegions.get(regionName);
|
|
||||||
LOG.info("assigning region " + regionName + " to server " +
|
|
||||||
serverName);
|
|
||||||
this.assignAttempts.put(regionName, Long.valueOf(now));
|
|
||||||
returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
|
|
||||||
if (--nregions <= 0) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
nregions = 0;
|
||||||
|
if (heavierLoad != null) {
|
||||||
|
// There is a more heavily loaded server
|
||||||
|
for (HServerLoad load =
|
||||||
|
new HServerLoad(thisServersLoad.getNumberOfRequests(),
|
||||||
|
thisServersLoad.getNumberOfRegions());
|
||||||
|
load.compareTo(heavierLoad) <= 0 && nregions < nRegionsToAssign;
|
||||||
|
load.setNumberOfRegions(load.getNumberOfRegions() + 1), nregions++) {
|
||||||
|
// continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (nregions < nRegionsToAssign) {
|
||||||
|
// There are some more heavily loaded servers
|
||||||
|
// but we can't assign all the regions to this server.
|
||||||
|
if (nservers > 0) {
|
||||||
|
// There are other servers that can share the load.
|
||||||
|
// Split regions that need assignment across the servers.
|
||||||
|
nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
|
||||||
|
/ (1.0 * nservers));
|
||||||
|
} else {
|
||||||
|
// No other servers with same load.
|
||||||
|
// Split regions over all available servers
|
||||||
|
nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
|
||||||
|
/ (1.0 * serversToServerInfo.size()));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Assign all regions to this server
|
||||||
|
nregions = nRegionsToAssign;
|
||||||
|
}
|
||||||
|
|
||||||
|
now = System.currentTimeMillis();
|
||||||
|
for (Text regionName: regionsToAssign) {
|
||||||
|
HRegionInfo regionInfo = this.unassignedRegions.get(regionName);
|
||||||
|
LOG.info("assigning region " + regionName + " to server " +
|
||||||
|
serverName);
|
||||||
|
this.assignAttempts.put(regionName, Long.valueOf(now));
|
||||||
|
returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
|
||||||
|
if (--nregions <= 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2092,14 +2097,18 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
|||||||
if (numberOfMetaRegions.get() != onlineMetaRegions.size()) {
|
if (numberOfMetaRegions.get() != onlineMetaRegions.size()) {
|
||||||
// We can't proceed because not all of the meta regions are online.
|
// We can't proceed because not all of the meta regions are online.
|
||||||
// We can't block either because that would prevent the meta region
|
// We can't block either because that would prevent the meta region
|
||||||
// online message from being processed. So return false to have this
|
// online message from being processed. In order to prevent spinning
|
||||||
// operation requeued.
|
// in the run queue, put this request on the delay queue to give
|
||||||
|
// other threads the opportunity to get the meta regions on-line.
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(
|
LOG.debug(
|
||||||
"Requeuing shutdown because not all meta regions are online");
|
"Requeuing shutdown because not all meta regions are online");
|
||||||
}
|
}
|
||||||
return false;
|
this.expire = System.currentTimeMillis() + leaseTimeout / 2;
|
||||||
|
shutdownQueue.put(this);
|
||||||
|
|
||||||
|
// Return true so run() does not put us back on the msgQueue
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
for (int tries = 0; tries < numRetries; tries++) {
|
for (int tries = 0; tries < numRetries; tries++) {
|
||||||
try {
|
try {
|
||||||
|
@ -642,7 +642,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||||||
false, conf);
|
false, conf);
|
||||||
this.serverInfo = new HServerInfo(new HServerAddress(
|
this.serverInfo = new HServerInfo(new HServerAddress(
|
||||||
new InetSocketAddress(getThisIP(),
|
new InetSocketAddress(getThisIP(),
|
||||||
this.server.getListenerAddress().getPort())), this.rand.nextLong(),
|
this.server.getListenerAddress().getPort())), System.currentTimeMillis(),
|
||||||
this.conf.getInt("hbase.regionserver.info.port", 60030));
|
this.conf.getInt("hbase.regionserver.info.port", 60030));
|
||||||
this.leases = new Leases(
|
this.leases = new Leases(
|
||||||
conf.getInt("hbase.regionserver.lease.period", 3 * 60 * 1000),
|
conf.getInt("hbase.regionserver.lease.period", 3 * 60 * 1000),
|
||||||
@ -704,7 +704,12 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
|||||||
synchronized (logRollerLock) {
|
synchronized (logRollerLock) {
|
||||||
try {
|
try {
|
||||||
log.closeAndDelete();
|
log.closeAndDelete();
|
||||||
serverInfo.setStartCode(rand.nextLong());
|
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("error closing and deleting HLog", e);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
serverInfo.setStartCode(System.currentTimeMillis());
|
||||||
log = setupHLog();
|
log = setupHLog();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
this.abortRequested = true;
|
this.abortRequested = true;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user