HBASE-71 [hbase] Master should rebalance region assignments periodically
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@644441 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2a38a13eaa
commit
228fd03bac
@ -11,6 +11,7 @@ Hbase Change Log
|
||||
|
||||
NEW FEATURES
|
||||
HBASE-548 Tool to online single region
|
||||
HBASE-71 Master should rebalance region assignments periodically
|
||||
|
||||
IMPROVEMENTS
|
||||
HBASE-469 Streamline HStore startup and compactions
|
||||
|
@ -112,6 +112,13 @@ public class HServerLoad implements WritableComparable {
|
||||
this.numberOfRegions = numberOfRegions;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param numberOfRequests the numberOfRequests to set
|
||||
*/
|
||||
public void setNumberOfRequests(int numberOfRequests) {
|
||||
this.numberOfRequests = numberOfRequests;
|
||||
}
|
||||
|
||||
// Writable
|
||||
|
||||
/** {@inheritDoc} */
|
||||
|
@ -131,6 +131,20 @@ public class LocalHBaseCluster implements HConstants {
|
||||
public HRegionServer getRegionServer() {
|
||||
return this.regionServer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Block until the region server has come online, indicating it is ready
|
||||
* to be used.
|
||||
*/
|
||||
public void waitForServerOnline() {
|
||||
while (!regionServer.isOnline()) {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
// continue waiting
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -122,6 +122,7 @@ public class HbaseObjectWritable implements Writable, Configurable {
|
||||
e.printStackTrace();
|
||||
}
|
||||
addToMap(RowResult.class, code++);
|
||||
addToMap(HRegionInfo[].class, code++);
|
||||
}
|
||||
|
||||
private Class<?> declaredClass;
|
||||
|
@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.io.HbaseMapWritable;
|
||||
import org.apache.hadoop.ipc.VersionedProtocol;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
import org.apache.hadoop.hbase.HMsg;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
|
||||
/**
|
||||
* HRegionServers interact with the HMasterRegionInterface to report on local
|
||||
@ -49,9 +50,13 @@ public interface HMasterRegionInterface extends VersionedProtocol {
|
||||
*
|
||||
* @param info server's address and start code
|
||||
* @param msgs things the region server wants to tell the master
|
||||
* @param mostLoadedRegions Array of HRegionInfos that should contain the
|
||||
* reporting server's most loaded regions. These are candidates for being
|
||||
* rebalanced.
|
||||
* @return instructions from the master to the region server
|
||||
* @throws IOException
|
||||
*/
|
||||
public HMsg[] regionServerReport(HServerInfo info, HMsg msgs[])
|
||||
public HMsg[] regionServerReport(HServerInfo info, HMsg msgs[],
|
||||
HRegionInfo mostLoadedRegions[])
|
||||
throws IOException;
|
||||
}
|
||||
|
@ -131,7 +131,10 @@ class ChangeTableState extends TableOperation {
|
||||
LOG.debug("adding region " + i.getRegionName() +
|
||||
" to kill list");
|
||||
}
|
||||
// this marks the regions to be closed
|
||||
localKillList.put(i.getRegionName(), i);
|
||||
// this marks the regions to be offlined once they are closed
|
||||
master.regionManager.markRegionForOffline(i.getRegionName());
|
||||
}
|
||||
if (localKillList.size() > 0) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
|
@ -546,9 +546,11 @@ public class HMaster extends Thread implements HConstants, HMasterInterface,
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[])
|
||||
public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[],
|
||||
HRegionInfo[] mostLoadedRegions)
|
||||
throws IOException {
|
||||
return serverManager.regionServerReport(serverInfo, msgs);
|
||||
return serverManager.regionServerReport(serverInfo, msgs,
|
||||
mostLoadedRegions);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -26,23 +26,29 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.RemoteExceptionHandler;
|
||||
|
||||
/**
|
||||
* ProcessRegionClose is instantiated when a region server reports that it
|
||||
* has closed a region.
|
||||
*/
|
||||
* ProcessRegionClose is the way we do post-processing on a closed region. We
|
||||
* only spawn one of these asynchronous tasks when the region needs to be
|
||||
* either offlined or deleted. We used to create one of these tasks whenever
|
||||
* a region was closed, but since closing a region that isn't being offlined
|
||||
* or deleted doesn't actually require post processing, it's no longer
|
||||
* necessary.
|
||||
*/
|
||||
class ProcessRegionClose extends ProcessRegionStatusChange {
|
||||
private boolean reassignRegion;
|
||||
private boolean offlineRegion;
|
||||
private boolean deleteRegion;
|
||||
|
||||
/**
|
||||
* @param regionInfo
|
||||
* @param reassignRegion
|
||||
* @param deleteRegion
|
||||
* @param master
|
||||
* @param regionInfo Region to operate on
|
||||
* @param offlineRegion if true, set the region to offline in meta
|
||||
* @param deleteRegion if true, delete the region row from meta and then
|
||||
* delete the region files from disk.
|
||||
*/
|
||||
public ProcessRegionClose(HMaster master, HRegionInfo regionInfo,
|
||||
boolean reassignRegion, boolean deleteRegion) {
|
||||
boolean offlineRegion, boolean deleteRegion) {
|
||||
|
||||
super(master, regionInfo);
|
||||
this.reassignRegion = reassignRegion;
|
||||
this.offlineRegion = offlineRegion;
|
||||
this.deleteRegion = deleteRegion;
|
||||
}
|
||||
|
||||
@ -50,7 +56,7 @@ class ProcessRegionClose extends ProcessRegionStatusChange {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ProcessRegionClose of " + this.regionInfo.getRegionName() +
|
||||
", " + this.reassignRegion + ", " + this.deleteRegion;
|
||||
", " + this.offlineRegion + ", " + this.deleteRegion;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -75,12 +81,14 @@ class ProcessRegionClose extends ProcessRegionStatusChange {
|
||||
if (deleteRegion) {
|
||||
HRegion.removeRegionFromMETA(getMetaServer(), metaRegionName,
|
||||
regionInfo.getRegionName());
|
||||
} else if (!this.reassignRegion) {
|
||||
} else if (offlineRegion) {
|
||||
// offline the region in meta and then note that we've offlined the
|
||||
// region.
|
||||
HRegion.offlineRegionInMETA(getMetaServer(), metaRegionName,
|
||||
regionInfo);
|
||||
master.regionManager.regionOfflined(regionInfo.getRegionName());
|
||||
}
|
||||
break;
|
||||
|
||||
} catch (IOException e) {
|
||||
if (tries == numRetries - 1) {
|
||||
throw RemoteExceptionHandler.checkIOException(e);
|
||||
@ -89,10 +97,9 @@ class ProcessRegionClose extends ProcessRegionStatusChange {
|
||||
}
|
||||
}
|
||||
|
||||
if (reassignRegion) {
|
||||
LOG.info("reassign region: " + regionInfo.getRegionName());
|
||||
master.regionManager.setUnassigned(regionInfo);
|
||||
} else if (deleteRegion) {
|
||||
// now that meta is updated, if we need to delete the region's files, now's
|
||||
// the time.
|
||||
if (deleteRegion) {
|
||||
try {
|
||||
HRegion.deleteRegion(master.fs, master.rootdir, regionInfo);
|
||||
} catch (IOException e) {
|
||||
@ -101,6 +108,7 @@ class ProcessRegionClose extends ProcessRegionStatusChange {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
@ -22,13 +22,10 @@ package org.apache.hadoop.hbase.master;
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@ -83,6 +80,8 @@ class RegionManager implements HConstants {
|
||||
*
|
||||
* <p>Items are removed from this list when a region server reports in that
|
||||
* the region has been deployed.
|
||||
*
|
||||
* TODO: Need to be a sorted map?
|
||||
*/
|
||||
private final SortedMap<HRegionInfo, Long> unassignedRegions =
|
||||
Collections.synchronizedSortedMap(new TreeMap<HRegionInfo, Long>());
|
||||
@ -104,10 +103,6 @@ class RegionManager implements HConstants {
|
||||
private final Set<Text> closingRegions =
|
||||
Collections.synchronizedSet(new HashSet<Text>());
|
||||
|
||||
/** Regions that are being reassigned for load balancing. */
|
||||
private final Set<Text> regionsBeingReassigned =
|
||||
Collections.synchronizedSet(new HashSet<Text>());
|
||||
|
||||
/**
|
||||
* 'regionsToDelete' contains regions that need to be deleted, but cannot be
|
||||
* until the region server closes it
|
||||
@ -115,10 +110,15 @@ class RegionManager implements HConstants {
|
||||
private final Set<Text> regionsToDelete =
|
||||
Collections.synchronizedSet(new HashSet<Text>());
|
||||
|
||||
|
||||
/**
|
||||
* Set of regions that, once closed, should be marked as offline so that they
|
||||
* are not reassigned.
|
||||
*/
|
||||
private final Set<Text> regionsToOffline =
|
||||
Collections.synchronizedSet(new HashSet<Text>());
|
||||
// How many regions to assign a server at a time.
|
||||
private final int maxAssignInOneGo;
|
||||
|
||||
|
||||
private HMaster master;
|
||||
|
||||
RegionManager(HMaster master) {
|
||||
@ -167,116 +167,104 @@ class RegionManager implements HConstants {
|
||||
* @param returnMsgs
|
||||
*/
|
||||
void assignRegions(HServerInfo info, String serverName,
|
||||
ArrayList<HMsg> returnMsgs) {
|
||||
HRegionInfo[] mostLoadedRegions, ArrayList<HMsg> returnMsgs) {
|
||||
|
||||
HServerLoad thisServersLoad = info.getLoad();
|
||||
|
||||
synchronized (unassignedRegions) {
|
||||
|
||||
// We need to hold a lock on assign attempts while we figure out what to
|
||||
// do so that multiple threads do not execute this method in parallel
|
||||
// resulting in assigning the same region to multiple servers.
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
Set<HRegionInfo> regionsToAssign = new HashSet<HRegionInfo>();
|
||||
for (Map.Entry<HRegionInfo, Long> e: unassignedRegions.entrySet()) {
|
||||
HRegionInfo i = e.getKey();
|
||||
if (numberOfMetaRegions.get() != onlineMetaRegions.size() &&
|
||||
!i.isMetaRegion()) {
|
||||
// Can't assign user regions until all meta regions have been assigned
|
||||
// and are on-line
|
||||
continue;
|
||||
}
|
||||
long diff = now - e.getValue().longValue();
|
||||
if (diff > master.maxRegionOpenTime) {
|
||||
regionsToAssign.add(e.getKey());
|
||||
}
|
||||
}
|
||||
int nRegionsToAssign = regionsToAssign.size();
|
||||
if (nRegionsToAssign <= 0) {
|
||||
// No regions to assign. Return.
|
||||
return;
|
||||
}
|
||||
|
||||
if (master.serverManager.numServers() == 1) {
|
||||
assignRegionsToOneServer(regionsToAssign, serverName, returnMsgs);
|
||||
// Finished. Return.
|
||||
return;
|
||||
}
|
||||
|
||||
// Multiple servers in play.
|
||||
// We need to allocate regions only to most lightly loaded servers.
|
||||
HServerLoad thisServersLoad = info.getLoad();
|
||||
int nregions = regionsPerServer(nRegionsToAssign, thisServersLoad);
|
||||
nRegionsToAssign -= nregions;
|
||||
if (nRegionsToAssign > 0) {
|
||||
// We still have more regions to assign. See how many we can assign
|
||||
// before this server becomes more heavily loaded than the next
|
||||
// most heavily loaded server.
|
||||
SortedMap<HServerLoad, Set<String>> heavyServers =
|
||||
new TreeMap<HServerLoad, Set<String>>();
|
||||
synchronized (master.serverManager.loadToServers) {
|
||||
heavyServers.putAll(
|
||||
master.serverManager.loadToServers.tailMap(thisServersLoad));
|
||||
}
|
||||
int nservers = 0;
|
||||
HServerLoad heavierLoad = null;
|
||||
for (Map.Entry<HServerLoad, Set<String>> e : heavyServers.entrySet()) {
|
||||
Set<String> servers = e.getValue();
|
||||
nservers += servers.size();
|
||||
if (e.getKey().compareTo(thisServersLoad) == 0) {
|
||||
// This is the load factor of the server we are considering
|
||||
nservers -= 1;
|
||||
continue;
|
||||
// figure out what regions need to be assigned and aren't currently being
|
||||
// worked on elsewhere.
|
||||
Set<HRegionInfo> regionsToAssign = regionsAwaitingAssignment();
|
||||
|
||||
if (regionsToAssign.size() == 0) {
|
||||
// There are no regions waiting to be assigned. This is an opportunity
|
||||
// for us to check if this server is overloaded.
|
||||
double avgLoad = master.serverManager.getAverageLoad();
|
||||
if (avgLoad > 2.0 && thisServersLoad.getNumberOfRegions() > avgLoad) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Server " + serverName + " is overloaded. Server load: " +
|
||||
thisServersLoad.getNumberOfRegions() + " avg: " + avgLoad);
|
||||
}
|
||||
|
||||
// If we get here, we are at the first load entry that is a
|
||||
// heavier load than the server we are considering
|
||||
heavierLoad = e.getKey();
|
||||
break;
|
||||
unassignSomeRegions(thisServersLoad, avgLoad, mostLoadedRegions,
|
||||
returnMsgs);
|
||||
}
|
||||
|
||||
nregions = 0;
|
||||
if (heavierLoad != null) {
|
||||
// There is a more heavily loaded server
|
||||
for (HServerLoad load =
|
||||
new HServerLoad(thisServersLoad.getNumberOfRequests(),
|
||||
thisServersLoad.getNumberOfRegions());
|
||||
load.compareTo(heavierLoad) <= 0 && nregions < nRegionsToAssign;
|
||||
load.setNumberOfRegions(load.getNumberOfRegions() + 1), nregions++) {
|
||||
// continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (nregions < nRegionsToAssign) {
|
||||
// There are some more heavily loaded servers
|
||||
// but we can't assign all the regions to this server.
|
||||
if (nservers > 0) {
|
||||
// There are other servers that can share the load.
|
||||
// Split regions that need assignment across the servers.
|
||||
nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
|
||||
/ (1.0 * nservers));
|
||||
} else {
|
||||
// No other servers with same load.
|
||||
// Split regions over all available servers
|
||||
nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
|
||||
/ (1.0 * master.serverManager.numServers()));
|
||||
}
|
||||
} else {
|
||||
// if there's only one server, just give it all the regions
|
||||
if (master.serverManager.numServers() == 1) {
|
||||
assignRegionsToOneServer(regionsToAssign, serverName, returnMsgs);
|
||||
} else {
|
||||
// Assign all regions to this server
|
||||
nregions = nRegionsToAssign;
|
||||
// otherwise, give this server a few regions taking into account the
|
||||
// load of all the other servers.
|
||||
assignRegionsToMultipleServers(thisServersLoad, regionsToAssign,
|
||||
serverName, returnMsgs);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Make region assignments taking into account multiple servers' loads.
|
||||
*/
|
||||
private void assignRegionsToMultipleServers(final HServerLoad thisServersLoad,
|
||||
final Set<HRegionInfo> regionsToAssign, final String serverName,
|
||||
final ArrayList<HMsg> returnMsgs) {
|
||||
|
||||
int nRegionsToAssign = regionsToAssign.size();
|
||||
int nregions = regionsPerServer(nRegionsToAssign, thisServersLoad);
|
||||
nRegionsToAssign -= nregions;
|
||||
if (nRegionsToAssign > 0) {
|
||||
// We still have more regions to assign. See how many we can assign
|
||||
// before this server becomes more heavily loaded than the next
|
||||
// most heavily loaded server.
|
||||
HServerLoad heavierLoad = new HServerLoad();
|
||||
int nservers = computeNextHeaviestLoad(thisServersLoad, heavierLoad);
|
||||
|
||||
if (nregions > this.maxAssignInOneGo) {
|
||||
nregions = this.maxAssignInOneGo;
|
||||
nregions = 0;
|
||||
|
||||
// Advance past any less-loaded servers
|
||||
for (HServerLoad load =
|
||||
new HServerLoad(thisServersLoad.getNumberOfRequests(),
|
||||
thisServersLoad.getNumberOfRegions());
|
||||
load.compareTo(heavierLoad) <= 0 && nregions < nRegionsToAssign;
|
||||
load.setNumberOfRegions(load.getNumberOfRegions() + 1), nregions++) {
|
||||
// continue;
|
||||
}
|
||||
|
||||
if (nregions < nRegionsToAssign) {
|
||||
// There are some more heavily loaded servers
|
||||
// but we can't assign all the regions to this server.
|
||||
if (nservers > 0) {
|
||||
// There are other servers that can share the load.
|
||||
// Split regions that need assignment across the servers.
|
||||
nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
|
||||
/ (1.0 * nservers));
|
||||
} else {
|
||||
// No other servers with same load.
|
||||
// Split regions over all available servers
|
||||
nregions = (int) Math.ceil((1.0 * nRegionsToAssign)
|
||||
/ (1.0 * master.serverManager.numServers()));
|
||||
}
|
||||
now = System.currentTimeMillis();
|
||||
for (HRegionInfo regionInfo: regionsToAssign) {
|
||||
LOG.info("assigning region " + regionInfo.getRegionName() +
|
||||
" to server " + serverName);
|
||||
unassignedRegions.put(regionInfo, Long.valueOf(now));
|
||||
returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
|
||||
if (--nregions <= 0) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// Assign all regions to this server
|
||||
nregions = nRegionsToAssign;
|
||||
}
|
||||
|
||||
if (nregions > this.maxAssignInOneGo) {
|
||||
nregions = this.maxAssignInOneGo;
|
||||
}
|
||||
|
||||
long now = System.currentTimeMillis();
|
||||
for (HRegionInfo regionInfo: regionsToAssign) {
|
||||
LOG.info("assigning region " + regionInfo.getRegionName() +
|
||||
" to server " + serverName);
|
||||
unassignedRegions.put(regionInfo, Long.valueOf(now));
|
||||
returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
|
||||
if (--nregions <= 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -287,34 +275,102 @@ class RegionManager implements HConstants {
|
||||
* @param thisServersLoad
|
||||
* @return How many regions we can assign to more lightly loaded servers
|
||||
*/
|
||||
private int regionsPerServer(final int nRegionsToAssign,
|
||||
final HServerLoad thisServersLoad) {
|
||||
private int regionsPerServer(final int numUnassignedRegions,
|
||||
final HServerLoad thisServersLoad) {
|
||||
|
||||
SortedMap<HServerLoad, Set<String>> lightServers =
|
||||
new TreeMap<HServerLoad, Set<String>>();
|
||||
|
||||
|
||||
// Get all the servers who are more lightly loaded than this one.
|
||||
synchronized (master.serverManager.loadToServers) {
|
||||
lightServers.putAll(master.serverManager.loadToServers.headMap(thisServersLoad));
|
||||
}
|
||||
|
||||
// Examine the list of servers that are more lightly loaded than this one.
|
||||
// Pretend that we will assign regions to these more lightly loaded servers
|
||||
// until they reach load equal with ours. Then, see how many regions are left
|
||||
// unassigned. That is how many regions we should assign to this server.
|
||||
int nRegions = 0;
|
||||
for (Map.Entry<HServerLoad, Set<String>> e : lightServers.entrySet()) {
|
||||
HServerLoad lightLoad = new HServerLoad(e.getKey().getNumberOfRequests(),
|
||||
e.getKey().getNumberOfRegions());
|
||||
e.getKey().getNumberOfRegions());
|
||||
|
||||
do {
|
||||
lightLoad.setNumberOfRegions(lightLoad.getNumberOfRegions() + 1);
|
||||
nRegions += 1;
|
||||
} while (lightLoad.compareTo(thisServersLoad) <= 0
|
||||
&& nRegions < nRegionsToAssign);
|
||||
&& nRegions < numUnassignedRegions);
|
||||
|
||||
nRegions *= e.getValue().size();
|
||||
if (nRegions >= nRegionsToAssign) {
|
||||
if (nRegions >= numUnassignedRegions) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return nRegions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the set of regions that should be assignable in this pass.
|
||||
*/
|
||||
private Set<HRegionInfo> regionsAwaitingAssignment() {
|
||||
long now = System.currentTimeMillis();
|
||||
|
||||
// set of regions we want to assign to this server
|
||||
Set<HRegionInfo> regionsToAssign = new HashSet<HRegionInfo>();
|
||||
|
||||
// Look over the set of regions that aren't currently assigned to
|
||||
// determine which we should assign to this server.
|
||||
for (Map.Entry<HRegionInfo, Long> e: unassignedRegions.entrySet()) {
|
||||
HRegionInfo i = e.getKey();
|
||||
if (numberOfMetaRegions.get() != onlineMetaRegions.size() &&
|
||||
!i.isMetaRegion()) {
|
||||
// Can't assign user regions until all meta regions have been assigned
|
||||
// and are on-line
|
||||
continue;
|
||||
}
|
||||
// If the last attempt to open this region was pretty recent, then we
|
||||
// don't want to try and assign it.
|
||||
long diff = now - e.getValue().longValue();
|
||||
if (diff > master.maxRegionOpenTime) {
|
||||
regionsToAssign.add(e.getKey());
|
||||
}
|
||||
}
|
||||
return regionsToAssign;
|
||||
}
|
||||
|
||||
/**
|
||||
* Figure out the load that is next highest amongst all regionservers. Also,
|
||||
* return how many servers exist at that load.
|
||||
*/
|
||||
private int computeNextHeaviestLoad(HServerLoad referenceLoad,
|
||||
HServerLoad heavierLoad) {
|
||||
|
||||
SortedMap<HServerLoad, Set<String>> heavyServers =
|
||||
new TreeMap<HServerLoad, Set<String>>();
|
||||
synchronized (master.serverManager.loadToServers) {
|
||||
heavyServers.putAll(
|
||||
master.serverManager.loadToServers.tailMap(referenceLoad));
|
||||
}
|
||||
int nservers = 0;
|
||||
for (Map.Entry<HServerLoad, Set<String>> e : heavyServers.entrySet()) {
|
||||
Set<String> servers = e.getValue();
|
||||
nservers += servers.size();
|
||||
if (e.getKey().compareTo(referenceLoad) == 0) {
|
||||
// This is the load factor of the server we are considering
|
||||
nservers -= 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
// If we get here, we are at the first load entry that is a
|
||||
// heavier load than the server we are considering
|
||||
heavierLoad.setNumberOfRequests(e.getKey().getNumberOfRequests());
|
||||
heavierLoad.setNumberOfRegions(e.getKey().getNumberOfRegions());
|
||||
break;
|
||||
}
|
||||
return nservers;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Assign all to the only server. An unlikely case but still possible.
|
||||
* @param regionsToAssign
|
||||
@ -332,6 +388,48 @@ class RegionManager implements HConstants {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The server checking in right now is overloaded. We will tell it to close
|
||||
* some or all of its most loaded regions, allowing it to reduce its load.
|
||||
* The closed regions will then get picked up by other underloaded machines.
|
||||
*/
|
||||
private synchronized void unassignSomeRegions(final HServerLoad load,
|
||||
final double avgLoad, final HRegionInfo[] mostLoadedRegions,
|
||||
ArrayList<HMsg> returnMsgs) {
|
||||
|
||||
int numRegionsToClose = load.getNumberOfRegions() - (int)Math.ceil(avgLoad);
|
||||
LOG.debug("Choosing to reassign " + numRegionsToClose
|
||||
+ " regions. mostLoadedRegions has " + mostLoadedRegions.length
|
||||
+ " regions in it.");
|
||||
|
||||
int regionIdx = 0;
|
||||
int regionsClosed = 0;
|
||||
while (regionsClosed < numRegionsToClose && regionIdx < mostLoadedRegions.length) {
|
||||
HRegionInfo currentRegion = mostLoadedRegions[regionIdx];
|
||||
|
||||
regionIdx++;
|
||||
// skip the region if it's meta or root
|
||||
if (currentRegion.isRootRegion() || currentRegion.isMetaTable()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (isClosing(currentRegion.getRegionName())) {
|
||||
LOG.info("Skipping region " + currentRegion.getRegionName()
|
||||
+ " because it is already closing.");
|
||||
continue;
|
||||
}
|
||||
|
||||
LOG.debug("Going to close region " + currentRegion.getRegionName());
|
||||
|
||||
// make a message to close the region
|
||||
returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE, currentRegion));
|
||||
// mark the region as closing
|
||||
setClosing(currentRegion.getRegionName());
|
||||
// increment the count of regions we've marked
|
||||
regionsClosed++;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Read-only map of online regions.
|
||||
*/
|
||||
@ -339,7 +437,7 @@ class RegionManager implements HConstants {
|
||||
return Collections.unmodifiableSortedMap(onlineMetaRegions);
|
||||
}
|
||||
|
||||
/*
|
||||
/**
|
||||
* Stop the root and meta scanners so that the region servers serving meta
|
||||
* regions can shut down.
|
||||
*/
|
||||
@ -375,13 +473,19 @@ class RegionManager implements HConstants {
|
||||
}
|
||||
}
|
||||
|
||||
public boolean waitForMetaRegionsOrClose() throws IOException {
|
||||
/**
|
||||
* Block until meta regions are online or we're shutting down.
|
||||
* @return true if we found meta regions, false if we're closing.
|
||||
*/
|
||||
public boolean waitForMetaRegionsOrClose() {
|
||||
return metaScannerThread.waitForMetaRegionsOrClose();
|
||||
}
|
||||
|
||||
/**
|
||||
* Search our map of online meta regions to find the first meta region that
|
||||
* should contain a pointer to <i>newRegion</i>.
|
||||
* should contain a pointer to <i>newRegion</i>.
|
||||
* @param newRegion
|
||||
* @return MetaRegion where the newRegion should live
|
||||
*/
|
||||
public MetaRegion getFirstMetaRegionForRegion(HRegionInfo newRegion) {
|
||||
synchronized (onlineMetaRegions) {
|
||||
@ -392,16 +496,17 @@ class RegionManager implements HConstants {
|
||||
} else {
|
||||
if (onlineMetaRegions.containsKey(newRegion.getRegionName())) {
|
||||
return onlineMetaRegions.get(newRegion.getRegionName());
|
||||
} else {
|
||||
return onlineMetaRegions.get(onlineMetaRegions.headMap(
|
||||
newRegion.getTableDesc().getName()).lastKey());
|
||||
}
|
||||
}
|
||||
return onlineMetaRegions.get(onlineMetaRegions.headMap(
|
||||
newRegion.getTableDesc().getName()).lastKey());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a set of all the meta regions that contain info about a given table.
|
||||
* @param tableName Table you need to know all the meta regions for
|
||||
* @return set of MetaRegion objects that contain the table
|
||||
*/
|
||||
public Set<MetaRegion> getMetaRegionsForTable(Text tableName) {
|
||||
Text firstMetaRegion = null;
|
||||
@ -420,6 +525,16 @@ class RegionManager implements HConstants {
|
||||
return metaRegions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a new HRegion, put a row for it into META (or ROOT), and mark the
|
||||
* new region unassigned so that it will get assigned to a region server.
|
||||
* @param newRegion HRegionInfo for the region to create
|
||||
* @param server server hosting the META (or ROOT) region where the new
|
||||
* region needs to be noted
|
||||
* @param metaRegionName name of the meta region where new region is to be
|
||||
* written
|
||||
* @throws IOException
|
||||
*/
|
||||
public void createRegion(HRegionInfo newRegion, HRegionInterface server,
|
||||
Text metaRegionName)
|
||||
throws IOException {
|
||||
@ -442,12 +557,18 @@ class RegionManager implements HConstants {
|
||||
unassignedRegions.put(info, ZERO_L);
|
||||
}
|
||||
|
||||
/** Set a MetaRegion as online. */
|
||||
/**
|
||||
* Set a MetaRegion as online.
|
||||
* @param metaRegion
|
||||
*/
|
||||
public void putMetaRegionOnline(MetaRegion metaRegion) {
|
||||
onlineMetaRegions.put(metaRegion.getStartKey(), metaRegion);
|
||||
}
|
||||
|
||||
/** Get a list of online MetaRegions */
|
||||
/**
|
||||
* Get a list of online MetaRegions
|
||||
* @return list of MetaRegion objects
|
||||
*/
|
||||
public List<MetaRegion> getListOfOnlineMetaRegions() {
|
||||
List<MetaRegion> regions = new ArrayList<MetaRegion>();
|
||||
synchronized(onlineMetaRegions) {
|
||||
@ -456,32 +577,55 @@ class RegionManager implements HConstants {
|
||||
return regions;
|
||||
}
|
||||
|
||||
/** count of online meta regions */
|
||||
/**
|
||||
* Count of online meta regions
|
||||
* @return count of online meta regions
|
||||
*/
|
||||
public int numOnlineMetaRegions() {
|
||||
return onlineMetaRegions.size();
|
||||
}
|
||||
|
||||
/** Check if a meta region is online by its name */
|
||||
/**
|
||||
* Check if a meta region is online by its name
|
||||
* @param startKey name of the meta region to check
|
||||
* @return true if the region is online, false otherwise
|
||||
*/
|
||||
public boolean isMetaRegionOnline(Text startKey) {
|
||||
return onlineMetaRegions.containsKey(startKey);
|
||||
}
|
||||
|
||||
/** Set an online MetaRegion offline - remove it from the map. **/
|
||||
/**
|
||||
* Set an online MetaRegion offline - remove it from the map.
|
||||
* @param startKey region name
|
||||
*/
|
||||
public void offlineMetaRegion(Text startKey) {
|
||||
onlineMetaRegions.remove(startKey);
|
||||
}
|
||||
|
||||
/** Check if a region is unassigned */
|
||||
/**
|
||||
* Check if a region is on the unassigned list
|
||||
* @param info HRegionInfo to check for
|
||||
* @return true if on the unassigned list, false if it isn't. Note that this
|
||||
* means a region could not be on the unassigned list AND not be assigned, if
|
||||
* it happens to be between states.
|
||||
*/
|
||||
public boolean isUnassigned(HRegionInfo info) {
|
||||
return unassignedRegions.containsKey(info);
|
||||
}
|
||||
|
||||
/** Check if a region is pending */
|
||||
/**
|
||||
* Check if a region is pending
|
||||
* @param regionName name of the region
|
||||
* @return true if pending, false otherwise
|
||||
*/
|
||||
public boolean isPending(Text regionName) {
|
||||
return pendingRegions.contains(regionName);
|
||||
}
|
||||
|
||||
/** Set a region to unassigned */
|
||||
/**
|
||||
* Set a region to unassigned
|
||||
* @param info Region to set unassigned
|
||||
*/
|
||||
public void setUnassigned(HRegionInfo info) {
|
||||
synchronized(this.unassignedRegions) {
|
||||
if (!this.unassignedRegions.containsKey(info) &&
|
||||
@ -491,17 +635,26 @@ class RegionManager implements HConstants {
|
||||
}
|
||||
}
|
||||
|
||||
/** Set a region to pending assignment */
|
||||
/**
|
||||
* Set a region to pending assignment
|
||||
* @param regionName
|
||||
*/
|
||||
public void setPending(Text regionName) {
|
||||
pendingRegions.add(regionName);
|
||||
}
|
||||
|
||||
/** Unset region's pending status */
|
||||
/**
|
||||
* Unset region's pending status
|
||||
* @param regionName
|
||||
*/
|
||||
public void noLongerPending(Text regionName) {
|
||||
pendingRegions.remove(regionName);
|
||||
}
|
||||
|
||||
/** Update the deadline for a region assignment to be completed */
|
||||
/**
|
||||
* Extend the update assignment deadline for a region.
|
||||
* @param info Region whose deadline you want to extend
|
||||
*/
|
||||
public void updateAssignmentDeadline(HRegionInfo info) {
|
||||
synchronized (unassignedRegions) {
|
||||
// Region server is reporting in that its working on region open
|
||||
@ -513,12 +666,20 @@ class RegionManager implements HConstants {
|
||||
}
|
||||
}
|
||||
|
||||
/** Unset a region's unassigned status */
|
||||
/**
|
||||
* Unset a region's unassigned status
|
||||
* @param info Region you want to take off the unassigned list
|
||||
*/
|
||||
public void noLongerUnassigned(HRegionInfo info) {
|
||||
unassignedRegions.remove(info);
|
||||
}
|
||||
|
||||
/** Mark a region to be closed */
|
||||
/**
|
||||
* Mark a region to be closed. Server manager will inform hosting region server
|
||||
* to close the region at its next opportunity.
|
||||
* @param serverName address info of server
|
||||
* @param info region to close
|
||||
*/
|
||||
public void markToClose(String serverName, HRegionInfo info) {
|
||||
synchronized (regionsToClose) {
|
||||
Map<Text, HRegionInfo> serverToClose = regionsToClose.get(serverName);
|
||||
@ -528,7 +689,11 @@ class RegionManager implements HConstants {
|
||||
}
|
||||
}
|
||||
|
||||
/** Mark a bunch of regions as closed not reopen at once for a server */
|
||||
/**
|
||||
* Mark a bunch of regions as to close at once for a server
|
||||
* @param serverName address info of server
|
||||
* @param map map of region names to region infos of regions to close
|
||||
*/
|
||||
public void markToCloseBulk(String serverName,
|
||||
Map<Text, HRegionInfo> map) {
|
||||
regionsToClose.put(serverName, map);
|
||||
@ -537,13 +702,18 @@ class RegionManager implements HConstants {
|
||||
/**
|
||||
* Get a map of region names to region infos waiting to be offlined for a
|
||||
* given server
|
||||
* @param serverName
|
||||
* @return map of region names to region infos to close
|
||||
*/
|
||||
public Map<Text, HRegionInfo> getMarkedToClose(String serverName) {
|
||||
return regionsToClose.get(serverName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a region is marked as closed not reopen.
|
||||
* Check if a region is marked as to close
|
||||
* @param serverName address info of server
|
||||
* @param regionName name of the region we might want to close
|
||||
* @return true if the region is marked to close, false otherwise
|
||||
*/
|
||||
public boolean isMarkedToClose(String serverName, Text regionName) {
|
||||
synchronized (regionsToClose) {
|
||||
@ -553,7 +723,10 @@ class RegionManager implements HConstants {
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark a region as no longer waiting to be closed and not reopened.
|
||||
* Mark a region as no longer waiting to be closed. Either it was closed or
|
||||
* we don't want to close it anymore for some reason.
|
||||
* @param serverName address info of server
|
||||
* @param regionName name of the region
|
||||
*/
|
||||
public void noLongerMarkedToClose(String serverName, Text regionName) {
|
||||
synchronized (regionsToClose) {
|
||||
@ -564,55 +737,118 @@ class RegionManager implements HConstants {
|
||||
}
|
||||
}
|
||||
|
||||
/** Check if a region is closing */
|
||||
/**
|
||||
* Check if a region is closing
|
||||
* @param regionName
|
||||
* @return true if the region is marked as closing, false otherwise
|
||||
*/
|
||||
public boolean isClosing(Text regionName) {
|
||||
return closingRegions.contains(regionName);
|
||||
}
|
||||
|
||||
/** Set a region as no longer closing (closed?) */
|
||||
/**
|
||||
* Set a region as no longer closing (closed?)
|
||||
* @param regionName
|
||||
*/
|
||||
public void noLongerClosing(Text regionName) {
|
||||
closingRegions.remove(regionName);
|
||||
}
|
||||
|
||||
/** mark a region as closing */
|
||||
/**
|
||||
* Mark a region as closing
|
||||
* @param regionName
|
||||
*/
|
||||
public void setClosing(Text regionName) {
|
||||
closingRegions.add(regionName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a meta region to the scan queue
|
||||
* @param m MetaRegion that needs to get scanned
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public void addMetaRegionToScan(MetaRegion m) throws InterruptedException {
|
||||
metaScannerThread.addMetaRegionToScan(m);
|
||||
}
|
||||
|
||||
/** Mark a region as to be deleted */
|
||||
/**
|
||||
* Mark a region as to be deleted
|
||||
* @param regionName
|
||||
*/
|
||||
public void markRegionForDeletion(Text regionName) {
|
||||
regionsToDelete.add(regionName);
|
||||
}
|
||||
|
||||
/** Note that a region to delete has been deleted */
|
||||
/**
|
||||
* Note that a region to delete has been deleted
|
||||
* @param regionName
|
||||
*/
|
||||
public void regionDeleted(Text regionName) {
|
||||
regionsToDelete.remove(regionName);
|
||||
}
|
||||
|
||||
/** Check if a region is marked for deletion */
|
||||
/**
|
||||
* Note that a region should be offlined as soon as its closed.
|
||||
* @param regionName
|
||||
*/
|
||||
public void markRegionForOffline(Text regionName) {
|
||||
regionsToOffline.add(regionName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a region is marked for offline
|
||||
* @param regionName
|
||||
* @return true if marked for offline, false otherwise
|
||||
*/
|
||||
public boolean isMarkedForOffline(Text regionName) {
|
||||
return regionsToOffline.contains(regionName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Region was offlined as planned, remove it from the list to offline
|
||||
* @param regionName
|
||||
*/
|
||||
public void regionOfflined(Text regionName) {
|
||||
regionsToOffline.remove(regionName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a region is marked for deletion
|
||||
* @param regionName
|
||||
* @return true if marked for deletion, false otherwise
|
||||
*/
|
||||
public boolean isMarkedForDeletion(Text regionName) {
|
||||
return regionsToDelete.contains(regionName);
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the initial root scan has been completed.
|
||||
* @return true if scan completed, false otherwise
|
||||
*/
|
||||
public boolean isInitialRootScanComplete() {
|
||||
return rootScannerThread.isInitialScanComplete();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Check if the initial meta scan has been completed.
|
||||
* @return true if meta completed, false otherwise
|
||||
*/
|
||||
public boolean isInitialMetaScanComplete() {
|
||||
return metaScannerThread.isInitialScanComplete();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the root region location.
|
||||
* @return HServerAddress describing root region server.
|
||||
*/
|
||||
public HServerAddress getRootRegionLocation() {
|
||||
return rootRegionLocation.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Block until either the root region location is available or we're shutting
|
||||
* down.
|
||||
*/
|
||||
public void waitForRootRegionLocation() {
|
||||
synchronized (rootRegionLocation) {
|
||||
while (!master.closed.get() && rootRegionLocation.get() == null) {
|
||||
@ -628,14 +864,25 @@ class RegionManager implements HConstants {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the number of meta regions.
|
||||
* @return number of meta regions
|
||||
*/
|
||||
public int numMetaRegions() {
|
||||
return numberOfMetaRegions.get();
|
||||
}
|
||||
|
||||
/**
|
||||
* Bump the count of meta regions up one
|
||||
*/
|
||||
public void incrementNumMetaRegions() {
|
||||
numberOfMetaRegions.incrementAndGet();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Set the root region location.
|
||||
* @param address Address of the region server where the root lives
|
||||
*/
|
||||
public void setRootRegionLocation(HServerAddress address) {
|
||||
synchronized (rootRegionLocation) {
|
||||
rootRegionLocation.set(new HServerAddress(address));
|
||||
@ -643,6 +890,10 @@ class RegionManager implements HConstants {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the number of meta regions.
|
||||
* @param num Number of meta regions
|
||||
*/
|
||||
public void setNumMetaRegions(int num) {
|
||||
numberOfMetaRegions.set(num);
|
||||
}
|
||||
|
@ -133,12 +133,15 @@ class ServerManager implements HConstants {
|
||||
*
|
||||
* @param serverInfo
|
||||
* @param msgs
|
||||
* @param mostLoadedRegions Array of regions the region server is submitting
|
||||
* as candidates to be rebalanced, should it be overloaded
|
||||
* @return messages from master to region server indicating what region
|
||||
* server should do.
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[])
|
||||
public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[],
|
||||
HRegionInfo[] mostLoadedRegions)
|
||||
throws IOException {
|
||||
String serverName = serverInfo.getServerAddress().toString().trim();
|
||||
|
||||
@ -206,7 +209,8 @@ class ServerManager implements HConstants {
|
||||
|
||||
return new HMsg[]{new HMsg(HMsg.MSG_REGIONSERVER_STOP)};
|
||||
} else {
|
||||
return processRegionServerAllsWell(serverName, serverInfo, msgs);
|
||||
return processRegionServerAllsWell(serverName, serverInfo,
|
||||
mostLoadedRegions, msgs);
|
||||
}
|
||||
}
|
||||
|
||||
@ -254,7 +258,7 @@ class ServerManager implements HConstants {
|
||||
|
||||
/** RegionServer is checking in, no exceptional circumstances */
|
||||
private HMsg[] processRegionServerAllsWell(String serverName,
|
||||
HServerInfo serverInfo, HMsg[] msgs)
|
||||
HServerInfo serverInfo, HRegionInfo[] mostLoadedRegions, HMsg[] msgs)
|
||||
throws IOException {
|
||||
// All's well. Renew the server's lease.
|
||||
// This will always succeed; otherwise, the fetch of serversToServerInfo
|
||||
@ -287,7 +291,7 @@ class ServerManager implements HConstants {
|
||||
loadToServers.put(load, servers);
|
||||
|
||||
// Next, process messages for this server
|
||||
return processMsgs(serverName, serverInfo, msgs);
|
||||
return processMsgs(serverName, serverInfo, mostLoadedRegions, msgs);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -297,7 +301,7 @@ class ServerManager implements HConstants {
|
||||
* that has already been done in regionServerReport.
|
||||
*/
|
||||
private HMsg[] processMsgs(String serverName, HServerInfo serverInfo,
|
||||
HMsg incomingMsgs[])
|
||||
HRegionInfo[] mostLoadedRegions, HMsg incomingMsgs[])
|
||||
throws IOException {
|
||||
ArrayList<HMsg> returnMsgs = new ArrayList<HMsg>();
|
||||
Map<Text, HRegionInfo> regionsToKill =
|
||||
@ -317,58 +321,11 @@ class ServerManager implements HConstants {
|
||||
break;
|
||||
|
||||
case HMsg.MSG_REPORT_OPEN:
|
||||
processRegionOpen(serverName, serverInfo,
|
||||
incomingMsgs[i].getRegionInfo(), returnMsgs);
|
||||
processRegionOpen(serverName, serverInfo, region, returnMsgs);
|
||||
break;
|
||||
|
||||
case HMsg.MSG_REPORT_CLOSE:
|
||||
LOG.info(serverInfo.getServerAddress().toString() + " no longer serving " +
|
||||
region);
|
||||
|
||||
if (region.isRootRegion()) {
|
||||
// Root region
|
||||
if (region.isOffline()) {
|
||||
// Can't proceed without root region. Shutdown.
|
||||
LOG.fatal("root region is marked offline");
|
||||
master.shutdown();
|
||||
}
|
||||
master.regionManager.unassignRootRegion();
|
||||
|
||||
} else {
|
||||
boolean reassignRegion = !region.isOffline();
|
||||
boolean deleteRegion = false;
|
||||
|
||||
if (master.regionManager.isClosing(region.getRegionName())) {
|
||||
master.regionManager.noLongerClosing(region.getRegionName());
|
||||
reassignRegion = false;
|
||||
}
|
||||
|
||||
if (master.regionManager.isMarkedForDeletion(region.getRegionName())) {
|
||||
master.regionManager.regionDeleted(region.getRegionName());
|
||||
reassignRegion = false;
|
||||
deleteRegion = true;
|
||||
}
|
||||
|
||||
if (region.isMetaTable()) {
|
||||
// Region is part of the meta table. Remove it from onlineMetaRegions
|
||||
master.regionManager.offlineMetaRegion(region.getStartKey());
|
||||
}
|
||||
|
||||
// NOTE: we cannot put the region into unassignedRegions as that
|
||||
// could create a race with the pending close if it gets
|
||||
// reassigned before the close is processed.
|
||||
|
||||
master.regionManager.noLongerUnassigned(region);
|
||||
|
||||
try {
|
||||
master.toDoQueue.put(new ProcessRegionClose(master, region,
|
||||
reassignRegion, deleteRegion));
|
||||
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(
|
||||
"Putting into toDoQueue was interrupted.", e);
|
||||
}
|
||||
}
|
||||
processRegionClose(serverInfo, region);
|
||||
break;
|
||||
|
||||
case HMsg.MSG_REPORT_SPLIT:
|
||||
@ -394,7 +351,8 @@ class ServerManager implements HConstants {
|
||||
}
|
||||
|
||||
// Figure out what the RegionServer ought to do, and write back.
|
||||
master.regionManager.assignRegions(serverInfo, serverName, returnMsgs);
|
||||
master.regionManager.assignRegions(serverInfo, serverName,
|
||||
mostLoadedRegions, returnMsgs);
|
||||
return returnMsgs.toArray(new HMsg[returnMsgs.size()]);
|
||||
}
|
||||
|
||||
@ -497,6 +455,71 @@ class ServerManager implements HConstants {
|
||||
}
|
||||
}
|
||||
|
||||
private void processRegionClose(HServerInfo serverInfo, HRegionInfo region) {
|
||||
LOG.info(serverInfo.getServerAddress().toString() + " no longer serving " +
|
||||
region);
|
||||
|
||||
if (region.isRootRegion()) {
|
||||
// Root region
|
||||
if (region.isOffline()) {
|
||||
// Can't proceed without root region. Shutdown.
|
||||
LOG.fatal("root region is marked offline");
|
||||
master.shutdown();
|
||||
}
|
||||
master.regionManager.unassignRootRegion();
|
||||
|
||||
} else {
|
||||
boolean reassignRegion = !region.isOffline();
|
||||
boolean deleteRegion = false;
|
||||
boolean offlineRegion = false;
|
||||
|
||||
// either this region is being closed because it was marked to close, or
|
||||
// the region server is going down peacefully. in either case, we should
|
||||
// at least try to remove it from the closing list.
|
||||
master.regionManager.noLongerClosing(region.getRegionName());
|
||||
|
||||
// if the region is marked to be offlined, we don't want to reassign
|
||||
// it.
|
||||
if (master.regionManager.isMarkedForOffline(region.getRegionName())) {
|
||||
reassignRegion = false;
|
||||
offlineRegion = true;
|
||||
}
|
||||
|
||||
if (master.regionManager.isMarkedForDeletion(region.getRegionName())) {
|
||||
master.regionManager.regionDeleted(region.getRegionName());
|
||||
reassignRegion = false;
|
||||
deleteRegion = true;
|
||||
}
|
||||
|
||||
if (region.isMetaTable()) {
|
||||
// Region is part of the meta table. Remove it from onlineMetaRegions
|
||||
master.regionManager.offlineMetaRegion(region.getStartKey());
|
||||
}
|
||||
|
||||
// if the region is already on the unassigned list, let's remove it. this
|
||||
// is safe because if it's going to be reassigned, it'll get added again
|
||||
// shortly. if it's not going to get reassigned, then we need to make
|
||||
// sure it's not on the unassigned list, because that would contend with
|
||||
// the ProcessRegionClose going on asynchronously.
|
||||
master.regionManager.noLongerUnassigned(region);
|
||||
|
||||
if (!reassignRegion) {
|
||||
// either the region is being offlined or deleted. we want to do those
|
||||
// operations asynchronously, so we'll creating a todo item for that.
|
||||
try {
|
||||
master.toDoQueue.put(new ProcessRegionClose(master, region,
|
||||
offlineRegion, deleteRegion));
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(
|
||||
"Putting into toDoQueue was interrupted.", e);
|
||||
}
|
||||
} else {
|
||||
// we are reassigning the region eventually, so set it unassigned
|
||||
master.regionManager.setUnassigned(region);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Cancel a server's lease and update its load information */
|
||||
private boolean cancelLease(final String serverName) {
|
||||
boolean leaseCancelled = false;
|
||||
@ -525,10 +548,31 @@ class ServerManager implements HConstants {
|
||||
return leaseCancelled;
|
||||
}
|
||||
|
||||
|
||||
/** @return the average load across all region servers */
|
||||
public int averageLoad() {
|
||||
return 0;
|
||||
/**
|
||||
* Compute the average load across all region servers.
|
||||
* Currently, this uses a very naive computation - just uses the number of
|
||||
* regions being served, ignoring stats about number of requests.
|
||||
* @return the average load
|
||||
*/
|
||||
public double getAverageLoad() {
|
||||
int totalLoad = 0;
|
||||
int numServers = 0;
|
||||
double averageLoad = 0.0;
|
||||
|
||||
synchronized (serversToLoad) {
|
||||
numServers = serversToLoad.size();
|
||||
|
||||
for (Map.Entry<String, HServerLoad> entry : serversToLoad.entrySet()) {
|
||||
totalLoad += entry.getValue().getNumberOfRegions();
|
||||
}
|
||||
|
||||
averageLoad = Math.ceil((double)totalLoad / (double)numServers);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Total Load: " + totalLoad + ", Num Servers: " + numServers
|
||||
+ ", Avg Load: " + averageLoad);
|
||||
}
|
||||
}
|
||||
return averageLoad;
|
||||
}
|
||||
|
||||
/** @return the number of active servers */
|
||||
|
@ -715,7 +715,8 @@ public class HRegion implements HConstants {
|
||||
Text startKey = new Text(this.regionInfo.getStartKey());
|
||||
Text endKey = new Text(this.regionInfo.getEndKey());
|
||||
if (startKey.equals(midKey)) {
|
||||
LOG.debug("Startkey and midkey are same, not splitting");
|
||||
LOG.debug("Startkey (" + startKey + ") and midkey + (" +
|
||||
midKey + ") are same, not splitting");
|
||||
return null;
|
||||
}
|
||||
if (midKey.equals(endKey)) {
|
||||
|
@ -34,7 +34,6 @@ import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
@ -115,13 +114,13 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||
private final Random rand = new Random();
|
||||
|
||||
// region name -> HRegion
|
||||
protected volatile SortedMap<Text, HRegion> onlineRegions =
|
||||
Collections.synchronizedSortedMap(new TreeMap<Text, HRegion>());
|
||||
protected volatile Map<Text, HRegion> onlineRegions =
|
||||
new ConcurrentHashMap<Text, HRegion>();
|
||||
protected volatile Map<Text, HRegion> retiringRegions =
|
||||
new ConcurrentHashMap<Text, HRegion>();
|
||||
|
||||
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
|
||||
private final List<HMsg> outboundMsgs =
|
||||
private volatile List<HMsg> outboundMsgs =
|
||||
Collections.synchronizedList(new ArrayList<HMsg>());
|
||||
|
||||
final int numRetries;
|
||||
@ -129,6 +128,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||
private final int msgInterval;
|
||||
private final int serverLeaseTimeout;
|
||||
|
||||
protected final int numRegionsToReport;
|
||||
|
||||
// Remote HMaster
|
||||
private HMasterRegionInterface hbaseMaster;
|
||||
|
||||
@ -190,6 +191,9 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||
final LogRoller logRoller;
|
||||
final Integer logRollerLock = new Integer(0);
|
||||
|
||||
// flag set after we're done setting up server threads (used for testing)
|
||||
protected volatile boolean isOnline;
|
||||
|
||||
/**
|
||||
* Starts a HRegionServer at the default location
|
||||
* @param conf
|
||||
@ -212,6 +216,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||
this.fsOk = true;
|
||||
this.conf = conf;
|
||||
|
||||
this.isOnline = false;
|
||||
|
||||
// Config'ed params
|
||||
this.numRetries = conf.getInt("hbase.client.retries.number", 2);
|
||||
this.threadWakeFrequency = conf.getInt(THREAD_WAKE_FREQUENCY, 10 * 1000);
|
||||
@ -240,13 +246,16 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||
new InetSocketAddress(getThisIP(),
|
||||
this.server.getListenerAddress().getPort())), System.currentTimeMillis(),
|
||||
this.conf.getInt("hbase.regionserver.info.port", 60030));
|
||||
this.leases = new Leases(
|
||||
conf.getInt("hbase.regionserver.lease.period", 3 * 60 * 1000),
|
||||
this.threadWakeFrequency);
|
||||
this.numRegionsToReport =
|
||||
conf.getInt("hbase.regionserver.numregionstoreport", 10);
|
||||
|
||||
this.leases = new Leases(
|
||||
conf.getInt("hbase.regionserver.lease.period", 3 * 60 * 1000),
|
||||
this.threadWakeFrequency);
|
||||
|
||||
// Register shutdown hook for HRegionServer, runs an orderly shutdown
|
||||
// when a kill signal is recieved
|
||||
Runtime.getRuntime().addShutdownHook(new ShutdownThread(this));
|
||||
// Register shutdown hook for HRegionServer, runs an orderly shutdown
|
||||
// when a kill signal is recieved
|
||||
Runtime.getRuntime().addShutdownHook(new ShutdownThread(this));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -285,8 +294,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||
this.serverInfo.setLoad(new HServerLoad(requestCount.get(),
|
||||
onlineRegions.size()));
|
||||
this.requestCount.set(0);
|
||||
HMsg msgs[] =
|
||||
this.hbaseMaster.regionServerReport(serverInfo, outboundArray);
|
||||
HMsg msgs[] = hbaseMaster.regionServerReport(
|
||||
serverInfo, outboundArray, getMostLoadedRegions());
|
||||
lastMsg = System.currentTimeMillis();
|
||||
|
||||
if (this.quiesced.get() && onlineRegions.size() == 0) {
|
||||
@ -452,7 +461,7 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||
|
||||
LOG.info("telling master that region server is shutting down at: " +
|
||||
serverInfo.getServerAddress().toString());
|
||||
hbaseMaster.regionServerReport(serverInfo, exitMsg);
|
||||
hbaseMaster.regionServerReport(serverInfo, exitMsg, (HRegionInfo[])null);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to send exiting message to master: ",
|
||||
RemoteExceptionHandler.checkIOException(e));
|
||||
@ -489,8 +498,10 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||
this.rootDir = new Path(this.conf.get(HConstants.HBASE_DIR));
|
||||
this.log = setupHLog();
|
||||
startServiceThreads();
|
||||
isOnline = true;
|
||||
} catch (IOException e) {
|
||||
this.stopRequested.set(true);
|
||||
isOnline = false;
|
||||
e = RemoteExceptionHandler.checkIOException(e);
|
||||
LOG.fatal("Failed init", e);
|
||||
IOException ex = new IOException("region server startup failed");
|
||||
@ -498,7 +509,17 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Report the status of the server. A server is online once all the startup
|
||||
* is completed (setting up filesystem, starting service threads, etc.). This
|
||||
* method is designed mostly to be useful in tests.
|
||||
* @return true if online, false if not.
|
||||
*/
|
||||
public boolean isOnline() {
|
||||
return isOnline;
|
||||
}
|
||||
|
||||
private HLog setupHLog() throws RegionServerRunningException,
|
||||
IOException {
|
||||
|
||||
@ -1232,8 +1253,8 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||
/**
|
||||
* @return Immutable list of this servers regions.
|
||||
*/
|
||||
public SortedMap<Text, HRegion> getOnlineRegions() {
|
||||
return Collections.unmodifiableSortedMap(this.onlineRegions);
|
||||
public Map<Text, HRegion> getOnlineRegions() {
|
||||
return Collections.unmodifiableMap(onlineRegions);
|
||||
}
|
||||
|
||||
/** @return the request count */
|
||||
@ -1302,6 +1323,26 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the top N most loaded regions this server is serving so we can
|
||||
* tell the master which regions it can reallocate if we're overloaded.
|
||||
* TODO: actually calculate which regions are most loaded. (Right now, we're
|
||||
* just grabbing the first N regions being served regardless of load.)
|
||||
*/
|
||||
protected HRegionInfo[] getMostLoadedRegions() {
|
||||
ArrayList<HRegionInfo> regions = new ArrayList<HRegionInfo>();
|
||||
synchronized (onlineRegions) {
|
||||
for (Map.Entry<Text, HRegion> entry : onlineRegions.entrySet()) {
|
||||
if (regions.size() < numRegionsToReport) {
|
||||
regions.add(entry.getValue().getRegionInfo());
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
return regions.toArray(new HRegionInfo[regions.size()]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called to verify that this server is up and running.
|
||||
*
|
||||
* @throws IOException
|
||||
|
@ -19,15 +19,11 @@
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.log4j.Logger;
|
||||
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
@ -44,8 +40,10 @@ public class MiniHBaseCluster implements HConstants {
|
||||
private LocalHBaseCluster hbaseCluster;
|
||||
|
||||
/**
|
||||
* Start a MiniHBaseCluster. conf is assumed to contain a valid fs name to
|
||||
* hook up to.
|
||||
* Start a MiniHBaseCluster.
|
||||
* @param conf HBaseConfiguration to be used for cluster
|
||||
* @param numRegionServers initial number of region servers to start.
|
||||
* @throws IOException
|
||||
*/
|
||||
public MiniHBaseCluster(HBaseConfiguration conf, int numRegionServers)
|
||||
throws IOException {
|
||||
@ -74,6 +72,7 @@ public class MiniHBaseCluster implements HConstants {
|
||||
LocalHBaseCluster.RegionServerThread t =
|
||||
this.hbaseCluster.addRegionServer();
|
||||
t.start();
|
||||
t.waitForServerOnline();
|
||||
return t.getName();
|
||||
}
|
||||
|
||||
@ -110,11 +109,10 @@ public class MiniHBaseCluster implements HConstants {
|
||||
* @param serverNumber Used as index into a list.
|
||||
* @return the region server that was stopped
|
||||
*/
|
||||
public HRegionServer stopRegionServer(int serverNumber) {
|
||||
HRegionServer server =
|
||||
this.hbaseCluster.getRegionServers().get(serverNumber).getRegionServer();
|
||||
public LocalHBaseCluster.RegionServerThread stopRegionServer(int serverNumber) {
|
||||
LocalHBaseCluster.RegionServerThread server = hbaseCluster.getRegionServers().get(serverNumber);
|
||||
LOG.info("Stopping " + server.toString());
|
||||
server.stop();
|
||||
server.getRegionServer().stop();
|
||||
return server;
|
||||
}
|
||||
|
||||
@ -144,16 +142,6 @@ public class MiniHBaseCluster implements HConstants {
|
||||
}
|
||||
}
|
||||
|
||||
private void deleteFile(File f) {
|
||||
if(f.isDirectory()) {
|
||||
File[] children = f.listFiles();
|
||||
for(int i = 0; i < children.length; i++) {
|
||||
deleteFile(children[i]);
|
||||
}
|
||||
}
|
||||
f.delete();
|
||||
}
|
||||
|
||||
/**
|
||||
* Call flushCache on all regions on all participating regionservers.
|
||||
* @throws IOException
|
||||
|
@ -22,7 +22,6 @@ package org.apache.hadoop.hbase;
|
||||
import java.io.IOException;
|
||||
import java.util.ConcurrentModificationException;
|
||||
import java.util.Map;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -42,6 +41,9 @@ import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
public class MultiRegionTable extends HBaseClusterTestCase {
|
||||
static final Log LOG = LogFactory.getLog(MultiRegionTable.class.getName());
|
||||
|
||||
/**
|
||||
* Default constructor
|
||||
*/
|
||||
public MultiRegionTable() {
|
||||
super();
|
||||
// These are needed for the new and improved Map/Reduce framework
|
||||
@ -345,7 +347,7 @@ public class MultiRegionTable extends HBaseClusterTestCase {
|
||||
LOG.info("Starting compaction");
|
||||
for (LocalHBaseCluster.RegionServerThread thread:
|
||||
cluster.getRegionThreads()) {
|
||||
SortedMap<Text, HRegion> regions = thread.getRegionServer().getOnlineRegions();
|
||||
Map<Text, HRegion> regions = thread.getRegionServer().getOnlineRegions();
|
||||
|
||||
// Retry if ConcurrentModification... alternative of sync'ing is not
|
||||
// worth it for sake of unit test.
|
||||
|
239
src/test/org/apache/hadoop/hbase/TestRegionRebalancing.java
Normal file
239
src/test/org/apache/hadoop/hbase/TestRegionRebalancing.java
Normal file
@ -0,0 +1,239 @@
|
||||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.io.Text;
|
||||
|
||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseClusterTestCase;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.LocalHBaseCluster;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
|
||||
/**
|
||||
* Test whether region rebalancing works. (HBASE-71)
|
||||
*/
|
||||
public class TestRegionRebalancing extends HBaseClusterTestCase {
|
||||
final Log LOG = LogFactory.getLog(this.getClass().getName());
|
||||
HTable table;
|
||||
|
||||
HTableDescriptor desc;
|
||||
|
||||
final byte[] FIVE_HUNDRED_KBYTES;
|
||||
|
||||
final Text COLUMN_NAME = new Text("col:");
|
||||
|
||||
/** constructor */
|
||||
public TestRegionRebalancing() {
|
||||
super(1);
|
||||
FIVE_HUNDRED_KBYTES = new byte[500 * 1024];
|
||||
for (int i = 0; i < 500 * 1024; i++) {
|
||||
FIVE_HUNDRED_KBYTES[i] = 'x';
|
||||
}
|
||||
|
||||
desc = new HTableDescriptor("test");
|
||||
desc.addFamily(new HColumnDescriptor(COLUMN_NAME.toString()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Before the hbase cluster starts up, create some dummy regions.
|
||||
*/
|
||||
@Override
|
||||
public void preHBaseClusterSetup() throws IOException {
|
||||
// create a 20-region table by writing directly to disk
|
||||
List<Text> startKeys = new ArrayList<Text>();
|
||||
startKeys.add(null);
|
||||
for (int i = 10; i < 29; i++) {
|
||||
startKeys.add(new Text("row_" + i));
|
||||
}
|
||||
startKeys.add(null);
|
||||
LOG.info(startKeys.size() + " start keys generated");
|
||||
|
||||
List<HRegion> regions = new ArrayList<HRegion>();
|
||||
for (int i = 0; i < 20; i++) {
|
||||
regions.add(createAregion(startKeys.get(i), startKeys.get(i+1)));
|
||||
}
|
||||
|
||||
// Now create the root and meta regions and insert the data regions
|
||||
// created above into the meta
|
||||
|
||||
HRegion root = HRegion.createHRegion(HRegionInfo.rootRegionInfo,
|
||||
testDir, conf);
|
||||
HRegion meta = HRegion.createHRegion(HRegionInfo.firstMetaRegionInfo,
|
||||
testDir, conf);
|
||||
HRegion.addRegionToMETA(root, meta);
|
||||
|
||||
for (HRegion region : regions) {
|
||||
HRegion.addRegionToMETA(meta, region);
|
||||
}
|
||||
|
||||
root.close();
|
||||
root.getLog().closeAndDelete();
|
||||
meta.close();
|
||||
meta.getLog().closeAndDelete();
|
||||
}
|
||||
|
||||
/**
|
||||
* For HBASE-71. Try a few different configurations of starting and stopping
|
||||
* region servers to see if the assignment or regions is pretty balanced.
|
||||
*/
|
||||
public void testRebalancing() throws IOException {
|
||||
table = new HTable(conf, new Text("test"));
|
||||
assertEquals("Test table should have 20 regions",
|
||||
20, table.getStartKeys().length);
|
||||
|
||||
// verify that the region assignments are balanced to start out
|
||||
assertRegionsAreBalanced();
|
||||
|
||||
LOG.debug("Adding 2nd region server.");
|
||||
// add a region server - total of 2
|
||||
cluster.startRegionServer();
|
||||
assertRegionsAreBalanced();
|
||||
|
||||
// add a region server - total of 3
|
||||
LOG.debug("Adding 3rd region server.");
|
||||
cluster.startRegionServer();
|
||||
assertRegionsAreBalanced();
|
||||
|
||||
// kill a region server - total of 2
|
||||
LOG.debug("Killing the 3rd region server.");
|
||||
cluster.stopRegionServer(2);
|
||||
cluster.waitOnRegionServer(2);
|
||||
assertRegionsAreBalanced();
|
||||
|
||||
// start two more region servers - total of 4
|
||||
LOG.debug("Adding 3rd region server");
|
||||
cluster.startRegionServer();
|
||||
LOG.debug("Adding 4th region server");
|
||||
cluster.startRegionServer();
|
||||
assertRegionsAreBalanced();
|
||||
}
|
||||
|
||||
/** figure out how many regions are currently being served. */
|
||||
private int getRegionCount() {
|
||||
int total = 0;
|
||||
for (HRegionServer server : getOnlineRegionServers()) {
|
||||
total += server.getOnlineRegions().size();
|
||||
}
|
||||
return total;
|
||||
}
|
||||
|
||||
/**
|
||||
* Determine if regions are balanced. Figure out the total, divide by the
|
||||
* number of online servers, then test if each server is +/- 1 of average
|
||||
* rounded up.
|
||||
*/
|
||||
private void assertRegionsAreBalanced() {
|
||||
boolean success = false;
|
||||
|
||||
for (int i = 0; i < 5; i++) {
|
||||
success = true;
|
||||
// make sure all the regions are reassigned before we test balance
|
||||
waitForAllRegionsAssigned();
|
||||
|
||||
int regionCount = getRegionCount();
|
||||
List<HRegionServer> servers = getOnlineRegionServers();
|
||||
double avg = Math.ceil((double)regionCount / (double)servers.size());
|
||||
LOG.debug("There are " + servers.size() + " servers and " + regionCount
|
||||
+ " regions. Load Average: " + avg);
|
||||
|
||||
for (HRegionServer server : servers) {
|
||||
LOG.debug(server.hashCode() + " Avg: " + avg + " actual: "
|
||||
+ server.getOnlineRegions().size());
|
||||
|
||||
int serverLoad = server.getOnlineRegions().size();
|
||||
if (!(serverLoad <= avg + 2 && serverLoad >= avg - 2)) {
|
||||
success = false;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if (!success) {
|
||||
// one or more servers are not balanced. sleep a little to give it a
|
||||
// chance to catch up. then, go back to the retry loop.
|
||||
try {
|
||||
Thread.sleep(10000);
|
||||
} catch (InterruptedException e) {}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
// if we get here, all servers were balanced, so we should just return.
|
||||
return;
|
||||
}
|
||||
// if we get here, we tried 5 times and never got to short circuit out of
|
||||
// the retry loop, so this is a failure.
|
||||
fail("After 5 attempts, region assignments were not balanced.");
|
||||
}
|
||||
|
||||
private List<HRegionServer> getOnlineRegionServers() {
|
||||
List<HRegionServer> list = new ArrayList<HRegionServer>();
|
||||
for (LocalHBaseCluster.RegionServerThread rst : cluster.getRegionThreads()) {
|
||||
if (rst.getRegionServer().isOnline()) {
|
||||
list.add(rst.getRegionServer());
|
||||
}
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until all the regions are assigned.
|
||||
*/
|
||||
private void waitForAllRegionsAssigned() {
|
||||
while (getRegionCount() < 22) {
|
||||
// while (!cluster.getMaster().allRegionsAssigned()) {
|
||||
LOG.debug("Waiting for there to be 22 regions, but there are " + getRegionCount() + " right now.");
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* create a region with the specified start and end key and exactly one row
|
||||
* inside.
|
||||
*/
|
||||
private HRegion createAregion(Text startKey, Text endKey)
|
||||
throws IOException {
|
||||
|
||||
HRegion region = createNewHRegion(desc, startKey, endKey);
|
||||
|
||||
Text keyToWrite = startKey == null ? new Text("row_000") : startKey;
|
||||
|
||||
BatchUpdate bu = new BatchUpdate(keyToWrite);
|
||||
bu.put(COLUMN_NAME, "test".getBytes());
|
||||
|
||||
region.batchUpdate(bu);
|
||||
|
||||
region.close();
|
||||
region.getLog().closeAndDelete();
|
||||
return region;
|
||||
}
|
||||
}
|
@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
import org.apache.hadoop.hbase.HMsg;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
|
||||
/**
|
||||
* An HMaster that runs out of memory.
|
||||
@ -48,11 +49,12 @@ public class OOMEHMaster extends HMaster {
|
||||
}
|
||||
|
||||
@Override
|
||||
public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg[] msgs)
|
||||
public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg[] msgs,
|
||||
HRegionInfo[] mostLoadedRegions)
|
||||
throws IOException {
|
||||
// Retain 1M.
|
||||
this.retainer.add(new byte [1024 * 1024]);
|
||||
return super.regionServerReport(serverInfo, msgs);
|
||||
return super.regionServerReport(serverInfo, msgs, mostLoadedRegions);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -8,7 +8,7 @@
|
||||
import="org.apache.hadoop.hbase.HRegionInfo" %><%
|
||||
HRegionServer regionServer = (HRegionServer)getServletContext().getAttribute(HRegionServer.REGIONSERVER);
|
||||
HServerInfo serverInfo = regionServer.getServerInfo();
|
||||
SortedMap<Text, HRegion> onlineRegions = regionServer.getOnlineRegions();
|
||||
Map<Text, HRegion> onlineRegions = regionServer.getOnlineRegions();
|
||||
%><?xml version="1.0" encoding="UTF-8" ?>
|
||||
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
|
||||
"http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
|
||||
|
Loading…
x
Reference in New Issue
Block a user